浏览代码

Add fetchRange and fetchRegion to SharedBlobCacheService (#131000)

This commit adds two new methods to SharedBlobCacheService,
fetchRange and fetchRegion. This new methods provide the same
functionality as their maybe version, but the main difference is
that they take a new boolean parameter (force) that allows the caller
to specify if fetching the range/region should force an eviction
if there are no free regions available.
Francisco Fernández Castaño 3 月之前
父节点
当前提交
8d83ede901

+ 64 - 61
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

@@ -492,8 +492,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
     }
 
     /**
-     * Fetch and cache the full blob for the given cache entry from the remote repository if there
-     * are enough free pages in the cache to do so.
+     * Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
      * <p>
      * This method returns as soon as the download tasks are instantiated, but the tasks themselves
      * are run on the bulk executor.
@@ -502,67 +501,32 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
      * and unlinked
      *
      * @param cacheKey      the key to fetch data for
-     * @param length        the length of the blob to fetch
+     * @param region        the region of the blob to fetch
+     * @param blobLength    the length of the blob from which the region is fetched (used to compute the size of the ending region)
      * @param writer        a writer that handles writing of newly downloaded data to the shared cache
      * @param fetchExecutor an executor to use for reading from the blob store
-     * @param listener      listener that is called once all downloading has finished
-     * @return {@code true} if there were enough free pages to start downloading the full entry
+     * @param listener      a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
+     *                      which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
+     *                      the region to write is already available in cache, if the region is pending fetching via another thread or if
+     *                      there is not enough free pages to fetch the region.
      */
-    public boolean maybeFetchFullEntry(
-        KeyType cacheKey,
-        long length,
-        RangeMissingHandler writer,
-        Executor fetchExecutor,
-        ActionListener<Void> listener
+    public void maybeFetchRegion(
+        final KeyType cacheKey,
+        final int region,
+        final long blobLength,
+        final RangeMissingHandler writer,
+        final Executor fetchExecutor,
+        final ActionListener<Boolean> listener
     ) {
-        int finalRegion = getEndingRegion(length);
-        // TODO freeRegionCount uses freeRegions.size() which is is NOT a constant-time operation. Can we do better?
-        if (freeRegionCount() < finalRegion) {
-            // Not enough room to download a full file without evicting existing data, so abort
-            listener.onResponse(null);
-            return false;
-        }
-        long regionLength = regionSize;
-        try (RefCountingListener refCountingListener = new RefCountingListener(listener)) {
-            for (int region = 0; region <= finalRegion; region++) {
-                if (region == finalRegion) {
-                    regionLength = length - getRegionStart(region);
-                }
-                ByteRange rangeToWrite = ByteRange.of(0, regionLength);
-                if (rangeToWrite.isEmpty()) {
-                    return true;
-                }
-                final ActionListener<Integer> regionListener = refCountingListener.acquire(ignored -> {});
-                final CacheFileRegion<KeyType> entry;
-                try {
-                    entry = get(cacheKey, length, region);
-                } catch (AlreadyClosedException e) {
-                    // failed to grab a cache page because some other operation concurrently acquired some
-                    regionListener.onResponse(0);
-                    return false;
-                }
-                // set read range == write range so the listener completes only once all the bytes have been downloaded
-                entry.populateAndRead(
-                    rangeToWrite,
-                    rangeToWrite,
-                    (channel, pos, relativePos, len) -> Math.toIntExact(len),
-                    writer,
-                    fetchExecutor,
-                    regionListener.delegateResponse((l, e) -> {
-                        if (e instanceof AlreadyClosedException) {
-                            l.onResponse(0);
-                        } else {
-                            l.onFailure(e);
-                        }
-                    })
-                );
-            }
-        }
-        return true;
+        fetchRegion(cacheKey, region, blobLength, writer, fetchExecutor, false, listener);
     }
 
     /**
-     * Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
+     * Fetch and write in cache a region of a blob.
+     * <p>
+     * If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room.
+     * </p>
+     *
      * <p>
      * This method returns as soon as the download tasks are instantiated, but the tasks themselves
      * are run on the bulk executor.
@@ -575,20 +539,23 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
      * @param blobLength    the length of the blob from which the region is fetched (used to compute the size of the ending region)
      * @param writer        a writer that handles writing of newly downloaded data to the shared cache
      * @param fetchExecutor an executor to use for reading from the blob store
+     * @param force         flag indicating whether the cache should free an occupied region to accommodate the requested
+     *                      region when none are free.
      * @param listener      a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
      *                      which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
      *                      the region to write is already available in cache, if the region is pending fetching via another thread or if
      *                      there is not enough free pages to fetch the region.
      */
-    public void maybeFetchRegion(
+    public void fetchRegion(
         final KeyType cacheKey,
         final int region,
         final long blobLength,
         final RangeMissingHandler writer,
         final Executor fetchExecutor,
+        final boolean force,
         final ActionListener<Boolean> listener
     ) {
-        if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
+        if (force == false && freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
             // no free page available and no old enough unused region to be evicted
             logger.info("No free regions, skipping loading region [{}]", region);
             listener.onResponse(false);
@@ -636,7 +603,45 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         final Executor fetchExecutor,
         final ActionListener<Boolean> listener
     ) {
-        if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
+        fetchRange(cacheKey, region, range, blobLength, writer, fetchExecutor, false, listener);
+    }
+
+    /**
+     * Fetch and write in cache a range within a blob region.
+     * <p>
+     * If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room.
+     * </p>
+     * <p>
+     * This method returns as soon as the download tasks are instantiated, but the tasks themselves
+     * are run on the bulk executor.
+     * <p>
+     * If an exception is thrown from the writer then the cache entry being downloaded is freed
+     * and unlinked
+     *
+     * @param cacheKey      the key to fetch data for
+     * @param region        the region of the blob
+     * @param range         the range of the blob to fetch
+     * @param blobLength    the length of the blob from which the region is fetched (used to compute the size of the ending region)
+     * @param writer        a writer that handles writing of newly downloaded data to the shared cache
+     * @param fetchExecutor an executor to use for reading from the blob store
+     * @param force         flag indicating whether the cache should free an occupied region to accommodate the requested
+     *                      range when none are free.
+     * @param listener      a listener that is completed with {@code true} if the current thread triggered the fetching of the range, in
+     *                      which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
+     *                      the range to write is already available in cache, if the range is pending fetching via another thread or if
+     *                      there is not enough free pages to fetch the range.
+     */
+    public void fetchRange(
+        final KeyType cacheKey,
+        final int region,
+        final ByteRange range,
+        final long blobLength,
+        final RangeMissingHandler writer,
+        final Executor fetchExecutor,
+        final boolean force,
+        final ActionListener<Boolean> listener
+    ) {
+        if (force == false && freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
             // no free page available and no old enough unused region to be evicted
             logger.info("No free regions, skipping loading region [{}]", region);
             listener.onResponse(false);
@@ -723,8 +728,6 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
     /**
      * NOTE: Method is package private mostly to allow checking the number of fee regions in tests.
-     * However, it is also used by {@link SharedBlobCacheService#maybeFetchFullEntry} but we should try
-     * to move away from that because calling "size" on a ConcurrentLinkedQueue is not a constant time operation.
      */
     int freeRegionCount() {
         return freeRegions.size();

+ 391 - 133
x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

@@ -565,139 +565,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         }
     }
 
-    public void testFetchFullCacheEntry() throws Exception {
-        Settings settings = Settings.builder()
-            .put(NODE_NAME_SETTING.getKey(), "node")
-            .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep())
-            .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
-            .put("path.home", createTempDir())
-            .build();
-
-        final var bulkTaskCount = new AtomicInteger(0);
-        final var threadPool = new TestThreadPool("test");
-        final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) {
-            @Override
-            public void execute(Runnable command) {
-                super.execute(command);
-                bulkTaskCount.incrementAndGet();
-            }
-        };
-
-        try (
-            NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<>(
-                environment,
-                settings,
-                threadPool,
-                threadPool.executor(ThreadPool.Names.GENERIC),
-                BlobCacheMetrics.NOOP
-            )
-        ) {
-            {
-                final var cacheKey = generateCacheKey();
-                assertEquals(5, cacheService.freeRegionCount());
-                final long size = size(250);
-                AtomicLong bytesRead = new AtomicLong(size);
-                final PlainActionFuture<Void> future = new PlainActionFuture<>();
-                cacheService.maybeFetchFullEntry(
-                    cacheKey,
-                    size,
-                    (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
-                        completionListener,
-                        () -> {
-                            assert streamFactory == null : streamFactory;
-                            bytesRead.addAndGet(-length);
-                            progressUpdater.accept(length);
-                        }
-                    ),
-                    bulkExecutor,
-                    future
-                );
-
-                future.get(10, TimeUnit.SECONDS);
-                assertEquals(0L, bytesRead.get());
-                assertEquals(2, cacheService.freeRegionCount());
-                assertEquals(3, bulkTaskCount.get());
-            }
-            {
-                // a download that would use up all regions should not run
-                final var cacheKey = generateCacheKey();
-                assertEquals(2, cacheService.freeRegionCount());
-                var configured = cacheService.maybeFetchFullEntry(
-                    cacheKey,
-                    size(500),
-                    (ch, chPos, streamFactory, relPos, len, update, completionListener) -> completeWith(completionListener, () -> {
-                        throw new AssertionError("Should never reach here");
-                    }),
-                    bulkExecutor,
-                    ActionListener.noop()
-                );
-                assertFalse(configured);
-                assertEquals(2, cacheService.freeRegionCount());
-            }
-        }
-
-        threadPool.shutdown();
-    }
-
-    public void testFetchFullCacheEntryConcurrently() throws Exception {
-        Settings settings = Settings.builder()
-            .put(NODE_NAME_SETTING.getKey(), "node")
-            .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep())
-            .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
-            .put("path.home", createTempDir())
-            .build();
-
-        final var threadPool = new TestThreadPool("test");
-        final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic());
-
-        try (
-            NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<>(
-                environment,
-                settings,
-                threadPool,
-                threadPool.executor(ThreadPool.Names.GENERIC),
-                BlobCacheMetrics.NOOP
-            )
-        ) {
-
-            final long size = size(randomIntBetween(1, 100));
-            final Thread[] threads = new Thread[10];
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(() -> {
-                    for (int j = 0; j < 1000; j++) {
-                        final var cacheKey = generateCacheKey();
-                        safeAwait(
-                            (ActionListener<Void> listener) -> cacheService.maybeFetchFullEntry(
-                                cacheKey,
-                                size,
-                                (
-                                    channel,
-                                    channelPos,
-                                    streamFactory,
-                                    relativePos,
-                                    length,
-                                    progressUpdater,
-                                    completionListener) -> completeWith(completionListener, () -> progressUpdater.accept(length)),
-                                bulkExecutor,
-                                listener
-                            )
-                        );
-                    }
-                });
-            }
-            for (Thread thread : threads) {
-                thread.start();
-            }
-            for (Thread thread : threads) {
-                thread.join();
-            }
-        } finally {
-            assertTrue(ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS));
-        }
-    }
-
     public void testCacheSizeRejectedOnNonFrozenNodes() {
         String cacheSize = randomBoolean()
             ? ByteSizeValue.ofBytes(size(500)).getStringRep()
@@ -1130,6 +997,195 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         threadPool.shutdown();
     }
 
+    public void testFetchRegion() throws Exception {
+        final long cacheSize = size(500L);
+        final long regionSize = size(100L);
+        Settings settings = Settings.builder()
+            .put(NODE_NAME_SETTING.getKey(), "node")
+            .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep())
+            .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep())
+            .put("path.home", createTempDir())
+            .build();
+
+        final var bulkTaskCount = new AtomicInteger(0);
+        final var threadPool = new TestThreadPool("test");
+        final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) {
+            @Override
+            public void execute(Runnable command) {
+                super.execute(command);
+                bulkTaskCount.incrementAndGet();
+            }
+        };
+
+        try (
+            NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                threadPool,
+                threadPool.executor(ThreadPool.Names.GENERIC),
+                BlobCacheMetrics.NOOP
+            )
+        ) {
+            {
+                // fetch a single region
+                final var cacheKey = generateCacheKey();
+                assertEquals(5, cacheService.freeRegionCount());
+                final long blobLength = size(250); // 3 regions
+                AtomicLong bytesRead = new AtomicLong(0L);
+                final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
+                cacheService.fetchRegion(
+                    cacheKey,
+                    0,
+                    blobLength,
+                    (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                        completionListener,
+                        () -> {
+                            assert streamFactory == null : streamFactory;
+                            bytesRead.addAndGet(length);
+                            progressUpdater.accept(length);
+                        }
+                    ),
+                    bulkExecutor,
+                    true,
+                    future
+                );
+
+                var fetched = future.get(10, TimeUnit.SECONDS);
+                assertThat("Region has been fetched", fetched, is(true));
+                assertEquals(regionSize, bytesRead.get());
+                assertEquals(4, cacheService.freeRegionCount());
+                assertEquals(1, bulkTaskCount.get());
+            }
+            {
+                // fetch multiple regions to used all the cache
+                final int remainingFreeRegions = cacheService.freeRegionCount();
+                assertEquals(4, cacheService.freeRegionCount());
+
+                final var cacheKey = generateCacheKey();
+                final long blobLength = regionSize * remainingFreeRegions;
+                AtomicLong bytesRead = new AtomicLong(0L);
+
+                final PlainActionFuture<Collection<Boolean>> future = new PlainActionFuture<>();
+                final var listener = new GroupedActionListener<>(remainingFreeRegions, future);
+                for (int region = 0; region < remainingFreeRegions; region++) {
+                    cacheService.fetchRegion(
+                        cacheKey,
+                        region,
+                        blobLength,
+                        (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                            completionListener,
+                            () -> {
+                                assert streamFactory == null : streamFactory;
+                                bytesRead.addAndGet(length);
+                                progressUpdater.accept(length);
+                            }
+                        ),
+                        bulkExecutor,
+                        true,
+                        listener
+                    );
+                }
+
+                var results = future.get(10, TimeUnit.SECONDS);
+                assertThat(results.stream().allMatch(result -> result), is(true));
+                assertEquals(blobLength, bytesRead.get());
+                assertEquals(0, cacheService.freeRegionCount());
+                assertEquals(1 + remainingFreeRegions, bulkTaskCount.get());
+            }
+            {
+                // cache fully used, no entry old enough to be evicted and force=false should not evict entries
+                assertEquals(0, cacheService.freeRegionCount());
+                final var cacheKey = generateCacheKey();
+                final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
+                cacheService.fetchRegion(
+                    cacheKey,
+                    0,
+                    regionSize,
+                    (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                        completionListener,
+                        () -> {
+                            throw new AssertionError("should not be executed");
+                        }
+                    ),
+                    bulkExecutor,
+                    false,
+                    future
+                );
+                assertThat("Listener is immediately completed", future.isDone(), is(true));
+                assertThat("Region already exists in cache", future.get(), is(false));
+            }
+            {
+                // cache fully used, but force=true, so the cache should evict regions to make space for the requested regions
+                assertEquals(0, cacheService.freeRegionCount());
+                AtomicLong bytesRead = new AtomicLong(0L);
+                final var cacheKey = generateCacheKey();
+                final PlainActionFuture<Collection<Boolean>> future = new PlainActionFuture<>();
+                var regionsToFetch = randomIntBetween(1, (int) (cacheSize / regionSize));
+                final var listener = new GroupedActionListener<>(regionsToFetch, future);
+                long blobLength = regionsToFetch * regionSize;
+                for (int region = 0; region < regionsToFetch; region++) {
+                    cacheService.fetchRegion(
+                        cacheKey,
+                        region,
+                        blobLength,
+                        (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                            completionListener,
+                            () -> {
+                                assert streamFactory == null : streamFactory;
+                                bytesRead.addAndGet(length);
+                                progressUpdater.accept(length);
+                            }
+                        ),
+                        bulkExecutor,
+                        true,
+                        listener
+                    );
+                }
+
+                var results = future.get(10, TimeUnit.SECONDS);
+                assertThat(results.stream().allMatch(result -> result), is(true));
+                assertEquals(blobLength, bytesRead.get());
+                assertEquals(0, cacheService.freeRegionCount());
+                assertEquals(regionsToFetch + 5, bulkTaskCount.get());
+            }
+            {
+                cacheService.computeDecay();
+
+                // We explicitly called computeDecay, meaning that some regions must have been demoted to level 0,
+                // therefore there should be enough room to fetch the requested range regardless of the force flag.
+                final var cacheKey = generateCacheKey();
+                assertEquals(0, cacheService.freeRegionCount());
+                long blobLength = randomLongBetween(1L, regionSize);
+                AtomicLong bytesRead = new AtomicLong(0L);
+                final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
+                cacheService.fetchRegion(
+                    cacheKey,
+                    0,
+                    blobLength,
+                    (channel, channelPos, ignore, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                        completionListener,
+                        () -> {
+                            assert ignore == null : ignore;
+                            bytesRead.addAndGet(length);
+                            progressUpdater.accept(length);
+                        }
+                    ),
+                    bulkExecutor,
+                    randomBoolean(),
+                    future
+                );
+
+                var fetched = future.get(10, TimeUnit.SECONDS);
+                assertThat("Region has been fetched", fetched, is(true));
+                assertEquals(blobLength, bytesRead.get());
+                assertEquals(0, cacheService.freeRegionCount());
+            }
+        } finally {
+            TestThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
+        }
+    }
+
     public void testMaybeFetchRange() throws Exception {
         final long cacheSize = size(500L);
         final long regionSize = size(100L);
@@ -1301,6 +1357,208 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         threadPool.shutdown();
     }
 
+    public void testFetchRange() throws Exception {
+        final long cacheSize = size(500L);
+        final long regionSize = size(100L);
+        Settings settings = Settings.builder()
+            .put(NODE_NAME_SETTING.getKey(), "node")
+            .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep())
+            .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep())
+            .put("path.home", createTempDir())
+            .build();
+
+        final var bulkTaskCount = new AtomicInteger(0);
+        final var threadPool = new TestThreadPool("test");
+        final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) {
+            @Override
+            public void execute(Runnable command) {
+                super.execute(command);
+                bulkTaskCount.incrementAndGet();
+            }
+        };
+
+        try (
+            NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                threadPool,
+                threadPool.executor(ThreadPool.Names.GENERIC),
+                BlobCacheMetrics.NOOP
+            )
+        ) {
+            {
+                // fetch a random range in a random region of the blob
+                final var cacheKey = generateCacheKey();
+                assertEquals(5, cacheService.freeRegionCount());
+
+                // blobLength is 1024000 bytes and requires 3 regions
+                final long blobLength = size(250);
+                final var regions = List.of(
+                    // region 0: 0-409600
+                    ByteRange.of(cacheService.getRegionStart(0), cacheService.getRegionEnd(0)),
+                    // region 1: 409600-819200
+                    ByteRange.of(cacheService.getRegionStart(1), cacheService.getRegionEnd(1)),
+                    // region 2: 819200-1228800
+                    ByteRange.of(cacheService.getRegionStart(2), cacheService.getRegionEnd(2))
+                );
+
+                long pos = randomLongBetween(0, blobLength - 1L);
+                long len = randomLongBetween(1, blobLength - pos);
+                var range = ByteRange.of(pos, pos + len);
+                var region = between(0, regions.size() - 1);
+                var regionRange = cacheService.mapSubRangeToRegion(range, region);
+
+                var bytesCopied = new AtomicLong(0L);
+                var future = new PlainActionFuture<Boolean>();
+                cacheService.maybeFetchRange(
+                    cacheKey,
+                    region,
+                    range,
+                    blobLength,
+                    (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                        completionListener,
+                        () -> {
+                            assertThat(range.start() + relativePos, equalTo(cacheService.getRegionStart(region) + regionRange.start()));
+                            assertThat(channelPos, equalTo(Math.toIntExact(regionRange.start())));
+                            assertThat(length, equalTo(Math.toIntExact(regionRange.length())));
+                            bytesCopied.addAndGet(length);
+                        }
+                    ),
+                    bulkExecutor,
+                    future
+                );
+                var fetched = future.get(10, TimeUnit.SECONDS);
+
+                assertThat(regionRange.length(), equalTo(bytesCopied.get()));
+                if (regionRange.isEmpty()) {
+                    assertThat(fetched, is(false));
+                    assertEquals(5, cacheService.freeRegionCount());
+                    assertEquals(0, bulkTaskCount.get());
+                } else {
+                    assertThat(fetched, is(true));
+                    assertEquals(4, cacheService.freeRegionCount());
+                    assertEquals(1, bulkTaskCount.get());
+                }
+            }
+            {
+                // fetch multiple ranges to use all the cache
+                final int remainingFreeRegions = cacheService.freeRegionCount();
+                assertThat(remainingFreeRegions, greaterThanOrEqualTo(4));
+                bulkTaskCount.set(0);
+
+                final var cacheKey = generateCacheKey();
+                final long blobLength = regionSize * remainingFreeRegions;
+                AtomicLong bytesCopied = new AtomicLong(0L);
+
+                final PlainActionFuture<Collection<Boolean>> future = new PlainActionFuture<>();
+                final var listener = new GroupedActionListener<>(remainingFreeRegions, future);
+                for (int region = 0; region < remainingFreeRegions; region++) {
+                    cacheService.fetchRange(
+                        cacheKey,
+                        region,
+                        ByteRange.of(0L, blobLength),
+                        blobLength,
+                        (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                            completionListener,
+                            () -> bytesCopied.addAndGet(length)
+                        ),
+                        bulkExecutor,
+                        true,
+                        listener
+                    );
+                }
+
+                var results = future.get(10, TimeUnit.SECONDS);
+                assertThat(results.stream().allMatch(result -> result), is(true));
+                assertEquals(blobLength, bytesCopied.get());
+                assertEquals(0, cacheService.freeRegionCount());
+                assertEquals(remainingFreeRegions, bulkTaskCount.get());
+            }
+            {
+                // cache fully used, no entry old enough to be evicted and force=false
+                assertEquals(0, cacheService.freeRegionCount());
+                final var cacheKey = generateCacheKey();
+                final var blobLength = randomLongBetween(1L, regionSize);
+                final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
+                cacheService.fetchRange(
+                    cacheKey,
+                    randomIntBetween(0, 10),
+                    ByteRange.of(0L, blobLength),
+                    blobLength,
+                    (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                        completionListener,
+                        () -> {
+                            throw new AssertionError("should not be executed");
+                        }
+                    ),
+                    bulkExecutor,
+                    false,
+                    future
+                );
+                assertThat("Listener is immediately completed", future.isDone(), is(true));
+                assertThat("Region already exists in cache", future.get(), is(false));
+            }
+            {
+                // cache fully used, since force=true the range should be populated
+                final var cacheKey = generateCacheKey();
+                assertEquals(0, cacheService.freeRegionCount());
+                long blobLength = randomLongBetween(1L, regionSize);
+                AtomicLong bytesCopied = new AtomicLong(0L);
+                final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
+                cacheService.fetchRange(
+                    cacheKey,
+                    0,
+                    ByteRange.of(0L, blobLength),
+                    blobLength,
+                    (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                        completionListener,
+                        () -> bytesCopied.addAndGet(length)
+                    ),
+                    bulkExecutor,
+                    true,
+                    future
+                );
+
+                var fetched = future.get(10, TimeUnit.SECONDS);
+                assertThat("Region has been fetched", fetched, is(true));
+                assertEquals(blobLength, bytesCopied.get());
+                assertEquals(0, cacheService.freeRegionCount());
+            }
+            {
+                cacheService.computeDecay();
+
+                // We explicitly called computeDecay, meaning that some regions must have been demoted to level 0,
+                // therefore there should be enough room to fetch the requested range regardless of the force flag.
+                final var cacheKey = generateCacheKey();
+                assertEquals(0, cacheService.freeRegionCount());
+                long blobLength = randomLongBetween(1L, regionSize);
+                AtomicLong bytesCopied = new AtomicLong(0L);
+                final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
+                cacheService.fetchRange(
+                    cacheKey,
+                    0,
+                    ByteRange.of(0L, blobLength),
+                    blobLength,
+                    (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
+                        completionListener,
+                        () -> bytesCopied.addAndGet(length)
+                    ),
+                    bulkExecutor,
+                    randomBoolean(),
+                    future
+                );
+
+                var fetched = future.get(10, TimeUnit.SECONDS);
+                assertThat("Region has been fetched", fetched, is(true));
+                assertEquals(blobLength, bytesCopied.get());
+                assertEquals(0, cacheService.freeRegionCount());
+            }
+        } finally {
+            TestThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
+        }
+    }
+
     public void testPopulate() throws Exception {
         final long regionSize = size(1L);
         Settings settings = Settings.builder()