浏览代码

Fix blob cache race, decay, time dependency (#104784)

This commit addresses 3 problems in the blob cache:

* Fix a race during initChunk where the result would be a fallback to direct read.
* Fix a bug in computeDecay that led to only decaying the first item per frequency.
* Remove the time dependency of the cache by moving to a logical clock (epochs)

Trigger decay whenever freq0 is empty, ensuring we decay slowly/rapidly as needed.

Divide time into epochs, switch to new one whenever we need to decay. A region
now promotes 2 freqs per access but only once per epoch

Co-authored-by: Tanguy Leroux <tlrx.dev@gmail.com>
Henning Andersen 1 年之前
父节点
当前提交
f2d96442f6

+ 5 - 0
docs/changelog/104784.yaml

@@ -0,0 +1,5 @@
+pr: 104784
+summary: "Fix blob cache race, decay, time dependency"
+area: Snapshot/Restore
+type: enhancement
+issues: []

+ 258 - 80
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

@@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsException;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.RelativeByteSizeValue;
-import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.core.Assertions;
@@ -50,7 +49,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -58,9 +56,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.IntConsumer;
-import java.util.function.LongSupplier;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -262,6 +260,18 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         }
     }
 
+    // used in tests
+    void maybeScheduleDecayAndNewEpoch() {
+        if (cache instanceof LFUCache lfuCache) {
+            lfuCache.maybeScheduleDecayAndNewEpoch(lfuCache.epoch.get());
+        }
+    }
+
+    // used in tests
+    long epoch() {
+        return ((LFUCache) cache).epoch.get();
+    }
+
     private interface Cache<K, T> extends Releasable {
         CacheEntry<T> get(K cacheKey, long fileLength, int region);
 
@@ -311,7 +321,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
     private final BlobCacheMetrics blobCacheMetrics;
 
-    private final LongSupplier relativeTimeInMillisSupplier;
+    private final Runnable evictIncrementer;
 
     public SharedBlobCacheService(
         NodeEnvironment environment,
@@ -320,7 +330,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         String ioExecutor,
         BlobCacheMetrics blobCacheMetrics
     ) {
-        this(environment, settings, threadPool, ioExecutor, ioExecutor, blobCacheMetrics, threadPool::relativeTimeInMillis);
+        this(environment, settings, threadPool, ioExecutor, ioExecutor, blobCacheMetrics);
     }
 
     public SharedBlobCacheService(
@@ -329,8 +339,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         ThreadPool threadPool,
         String ioExecutor,
         String bulkExecutor,
-        BlobCacheMetrics blobCacheMetrics,
-        LongSupplier relativeTimeInMillisSupplier
+        BlobCacheMetrics blobCacheMetrics
     ) {
         this.threadPool = threadPool;
         this.ioExecutor = threadPool.executor(ioExecutor);
@@ -372,7 +381,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         this.recoveryRangeSize = BlobCacheUtils.toIntBytes(SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings).getBytes());
 
         this.blobCacheMetrics = blobCacheMetrics;
-        this.relativeTimeInMillisSupplier = relativeTimeInMillisSupplier;
+        this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment;
     }
 
     public static long calculateCacheSize(Settings settings, long totalFsSize) {
@@ -612,10 +621,6 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         return -1;
     }
 
-    private long relativeTimeInMillis() {
-        return relativeTimeInMillisSupplier.getAsLong();
-    }
-
     @Override
     public void close() {
         sharedBytes.decRef();
@@ -671,10 +676,16 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         }
     }
 
+    /**
+     * While this class has incRef and tryIncRef methods, incRefEnsureOpen and tryIncrefEnsureOpen should
+     * always be used, ensuring the right ordering between incRef/tryIncRef and ensureOpen
+     * (see {@link LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)})
+     */
     class CacheFileRegion extends EvictableRefCounted {
 
         final RegionKey<KeyType> regionKey;
         final SparseFileTracker tracker;
+        // io can be null when not init'ed or after evict/take
         volatile SharedBytes.IO io = null;
 
         CacheFileRegion(RegionKey<KeyType> regionKey, int regionSize) {
@@ -688,6 +699,27 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             return ioRef == null ? -1L : (long) regionKey.region * regionSize;
         }
 
+        public boolean tryIncRefEnsureOpen() {
+            if (tryIncRef()) {
+                ensureOpenOrDecRef();
+                return true;
+            }
+
+            return false;
+        }
+
+        public void incRefEnsureOpen() {
+            incRef();
+            ensureOpenOrDecRef();
+        }
+
+        private void ensureOpenOrDecRef() {
+            if (isEvicted()) {
+                decRef();
+                throwAlreadyEvicted();
+            }
+        }
+
         // tries to evict this chunk if noone is holding onto its resources anymore
         // visible for tests.
         boolean tryEvict() {
@@ -701,6 +733,17 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             return false;
         }
 
+        boolean tryEvictNoDecRef() {
+            assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
+            if (refCount() <= 1 && evict()) {
+                logger.trace("evicted and take {} with channel offset {}", regionKey, physicalStartOffset());
+                evictCount.increment();
+                return true;
+            }
+
+            return false;
+        }
+
         public boolean forceEvict() {
             assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
             if (evict()) {
@@ -723,23 +766,27 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             logger.trace("closed {} with channel offset {}", regionKey, physicalStartOffset());
         }
 
-        private void ensureOpen() {
-            if (isEvicted()) {
-                throwAlreadyEvicted();
-            }
-        }
-
         private static void throwAlreadyEvicted() {
             throwAlreadyClosed("File chunk is evicted");
         }
 
+        /**
+         * Optimistically try to read from the region
+         * @return true if successful, i.e., not evicted and data available, false if evicted
+         */
         boolean tryRead(ByteBuffer buf, long offset) throws IOException {
-            int readBytes = io.read(buf, getRegionRelativePosition(offset));
-            if (isEvicted()) {
-                buf.position(buf.position() - readBytes);
+            SharedBytes.IO ioRef = this.io;
+            if (ioRef != null) {
+                int readBytes = ioRef.read(buf, getRegionRelativePosition(offset));
+                if (isEvicted()) {
+                    buf.position(buf.position() - readBytes);
+                    return false;
+                }
+                return true;
+            } else {
+                // taken by someone else
                 return false;
             }
-            return true;
         }
 
         /**
@@ -761,9 +808,8 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         ) {
             Releasable resource = null;
             try {
-                incRef();
+                incRefEnsureOpen();
                 resource = Releasables.releaseOnce(this::decRef);
-                ensureOpen();
                 final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
                     rangeToWrite,
                     rangeToWrite,
@@ -796,9 +842,8 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         ) {
             Releasable resource = null;
             try {
-                incRef();
+                incRefEnsureOpen();
                 resource = Releasables.releaseOnce(this::decRef);
-                ensureOpen();
                 final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
                     rangeToWrite,
                     rangeToRead,
@@ -835,8 +880,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             return new AbstractRunnable() {
                 @Override
                 protected void doRun() throws Exception {
-                    ensureOpen();
-                    if (cacheFileRegion.tryIncRef() == false) {
+                    if (cacheFileRegion.tryIncRefEnsureOpen() == false) {
                         throw new AlreadyClosedException("File chunk [" + cacheFileRegion.regionKey + "] has been released");
                     }
                     try {
@@ -1064,6 +1108,8 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         }
 
         private boolean assertValidRegionAndLength(CacheFileRegion fileRegion, int channelPos, int len) {
+            assert fileRegion.io != null;
+            assert fileRegion.hasReferences();
             assert regionOwners.get(fileRegion.io) == fileRegion;
             assert channelPos >= 0 && channelPos + len <= regionSize;
             return true;
@@ -1111,17 +1157,22 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             LFUCacheEntry prev;
             LFUCacheEntry next;
             int freq;
-            volatile long lastAccessed;
+            volatile long lastAccessedEpoch;
 
             LFUCacheEntry(CacheFileRegion chunk, long lastAccessed) {
                 super(chunk);
-                this.lastAccessed = lastAccessed;
+                this.lastAccessedEpoch = lastAccessed;
+                // todo: consider whether freq=1 is still right for new entries.
+                // it could risk decaying to level 0 right after and thus potentially be evicted
+                // if the freq 1 LRU chain was short.
+                // seems ok for now, since if it were to get evicted soon, the decays done would ensure we have more level 1
+                // entries eventually and thus such an entry would (after some decays) be able to survive in the cache.
                 this.freq = 1;
             }
 
             void touch() {
-                long now = relativeTimeInMillis();
-                if (now - lastAccessed >= minTimeDelta) {
+                long now = epoch.get();
+                if (now > lastAccessedEpoch) {
                     maybePromote(now, this);
                 }
             }
@@ -1130,21 +1181,20 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         private final ConcurrentHashMap<RegionKey<KeyType>, LFUCacheEntry> keyMapping = new ConcurrentHashMap<>();
         private final LFUCacheEntry[] freqs;
         private final int maxFreq;
-        private final long minTimeDelta;
-        private final CacheDecayTask decayTask;
+        private final DecayAndNewEpochTask decayAndNewEpochTask;
+
+        private final AtomicLong epoch = new AtomicLong();
 
         @SuppressWarnings("unchecked")
         LFUCache(Settings settings) {
             this.maxFreq = SHARED_CACHE_MAX_FREQ_SETTING.get(settings);
-            this.minTimeDelta = SHARED_CACHE_MIN_TIME_DELTA_SETTING.get(settings).millis();
             freqs = (LFUCacheEntry[]) Array.newInstance(LFUCacheEntry.class, maxFreq);
-            decayTask = new CacheDecayTask(threadPool, threadPool.generic(), SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings));
-            decayTask.rescheduleIfNecessary();
+            decayAndNewEpochTask = new DecayAndNewEpochTask(threadPool.generic());
         }
 
         @Override
         public void close() {
-            decayTask.close();
+            decayAndNewEpochTask.close();
         }
 
         int getFreq(CacheFileRegion cacheFileRegion) {
@@ -1154,7 +1204,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         @Override
         public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
             final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
-            final long now = relativeTimeInMillis();
+            final long now = epoch.get();
             // try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path
             // if we did not find an entry
             var entry = keyMapping.get(regionKey);
@@ -1165,7 +1215,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             // io is volatile, double locking is fine, as long as we assign it last.
             if (entry.chunk.io == null) {
                 synchronized (entry.chunk) {
-                    if (entry.chunk.io == null) {
+                    if (entry.chunk.io == null && entry.chunk.isEvicted() == false) {
                         return initChunk(entry);
                     }
                 }
@@ -1173,7 +1223,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             assert assertChunkActiveOrEvicted(entry);
 
             // existing item, check if we need to promote item
-            if (now - entry.lastAccessed >= minTimeDelta) {
+            if (now > entry.lastAccessedEpoch) {
                 maybePromote(now, entry);
             }
 
@@ -1226,16 +1276,15 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
                 assignToSlot(entry, freeSlot);
             } else {
                 // need to evict something
-                int frequency;
+                SharedBytes.IO io;
                 synchronized (SharedBlobCacheService.this) {
-                    frequency = maybeEvict();
+                    io = maybeEvictAndTake(evictIncrementer);
                 }
-                if (frequency > 0) {
-                    blobCacheMetrics.getEvictedCountNonZeroFrequency().increment();
+                if (io == null) {
+                    io = freeRegions.poll();
                 }
-                final SharedBytes.IO freeSlotRetry = freeRegions.poll();
-                if (freeSlotRetry != null) {
-                    assignToSlot(entry, freeSlotRetry);
+                if (io != null) {
+                    assignToSlot(entry, io);
                 } else {
                     boolean removed = keyMapping.remove(regionKey, entry);
                     assert removed;
@@ -1322,16 +1371,19 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
                 assert entry.prev != null || entry.chunk.isEvicted();
 
             }
-            assert regionOwners.get(entry.chunk.io) == entry.chunk || entry.chunk.isEvicted();
+            SharedBytes.IO io = entry.chunk.io;
+            assert io != null || entry.chunk.isEvicted();
+            assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted();
             return true;
         }
 
-        private void maybePromote(long now, LFUCacheEntry entry) {
+        private void maybePromote(long epoch, LFUCacheEntry entry) {
             synchronized (SharedBlobCacheService.this) {
-                if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq && entry.chunk.isEvicted() == false) {
+                if (epoch > entry.lastAccessedEpoch && entry.freq < maxFreq - 1 && entry.chunk.isEvicted() == false) {
                     unlink(entry);
-                    entry.freq++;
-                    entry.lastAccessed = now;
+                    // go 2 up per epoch, allowing us to decay 1 every epoch.
+                    entry.freq = Math.min(entry.freq + 2, maxFreq - 1);
+                    entry.lastAccessedEpoch = epoch;
                     pushEntryToBack(entry);
                 }
             }
@@ -1363,25 +1415,118 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             assert invariant(entry, false);
         }
 
+        private void appendLevel1ToLevel0() {
+            assert Thread.holdsLock(SharedBlobCacheService.this);
+            var front0 = freqs[0];
+            var front1 = freqs[1];
+            if (front0 == null) {
+                freqs[0] = front1;
+                freqs[1] = null;
+                decrementFreqList(front1);
+                assert front1 == null || invariant(front1, true);
+            } else if (front1 != null) {
+                var back0 = front0.prev;
+                var back1 = front1.prev;
+                assert invariant(front0, true);
+                assert invariant(front1, true);
+                assert invariant(back0, true);
+                assert invariant(back1, true);
+
+                decrementFreqList(front1);
+
+                front0.prev = back1;
+                back0.next = front1;
+                front1.prev = back0;
+                assert back1.next == null;
+
+                freqs[1] = null;
+
+                assert invariant(front0, true);
+                assert invariant(front1, true);
+                assert invariant(back0, true);
+                assert invariant(back1, true);
+            }
+        }
+
+        private void decrementFreqList(LFUCacheEntry entry) {
+            while (entry != null) {
+                entry.freq--;
+                entry = entry.next;
+            }
+        }
+
         /**
          * Cycles through the {@link LFUCacheEntry} from 0 to max frequency and
-         * tries to evict a chunk if no one is holding onto its resources anymore
+         * tries to evict a chunk if no one is holding onto its resources anymore.
          *
-         * @return the frequency of the evicted entry as integer or -1 if no entry was evicted from cache
+         * Also regularly polls for free regions and thus might steal one in case any become available.
+         *
+         * @return a now free IO region or null if none available.
          */
-        private int maybeEvict() {
+        private SharedBytes.IO maybeEvictAndTake(Runnable evictedNotification) {
             assert Thread.holdsLock(SharedBlobCacheService.this);
-            for (int currentFreq = 0; currentFreq < maxFreq; currentFreq++) {
-                for (LFUCacheEntry entry = freqs[currentFreq]; entry != null; entry = entry.next) {
-                    boolean evicted = entry.chunk.tryEvict();
-                    if (evicted && entry.chunk.io != null) {
-                        unlink(entry);
-                        keyMapping.remove(entry.chunk.regionKey, entry);
-                        return currentFreq;
+            long currentEpoch = epoch.get(); // must be captured before attempting to evict a freq 0
+            SharedBytes.IO freq0 = maybeEvictAndTakeForFrequency(evictedNotification, 0);
+            if (freqs[0] == null) {
+                // no frequency 0 entries, let us switch epoch and decay so we get some for next time.
+                maybeScheduleDecayAndNewEpoch(currentEpoch);
+            }
+            if (freq0 != null) {
+                return freq0;
+            }
+            for (int currentFreq = 1; currentFreq < maxFreq; currentFreq++) {
+                // recheck this per freq in case we raced an eviction with an incref'er.
+                SharedBytes.IO freeRegion = freeRegions.poll();
+                if (freeRegion != null) {
+                    return freeRegion;
+                }
+                SharedBytes.IO taken = maybeEvictAndTakeForFrequency(evictedNotification, currentFreq);
+                if (taken != null) {
+                    return taken;
+                }
+            }
+            // give up
+            return null;
+        }
+
+        private SharedBytes.IO maybeEvictAndTakeForFrequency(Runnable evictedNotification, int currentFreq) {
+            for (LFUCacheEntry entry = freqs[currentFreq]; entry != null; entry = entry.next) {
+                boolean evicted = entry.chunk.tryEvictNoDecRef();
+                if (evicted) {
+                    try {
+                        SharedBytes.IO ioRef = entry.chunk.io;
+                        if (ioRef != null) {
+                            try {
+                                if (entry.chunk.refCount() == 1) {
+                                    // we own that one refcount (since we CAS'ed evicted to 1)
+                                    // grab io, rely on incref'ers also checking evicted field.
+                                    entry.chunk.io = null;
+                                    assert regionOwners.remove(ioRef) == entry.chunk;
+                                    return ioRef;
+                                }
+                            } finally {
+                                unlink(entry);
+                                keyMapping.remove(entry.chunk.regionKey, entry);
+                            }
+                        }
+                    } finally {
+                        entry.chunk.decRef();
+                        if (currentFreq > 0) {
+                            evictedNotification.run();
+                        }
                     }
                 }
             }
-            return -1;
+            return null;
+        }
+
+        /**
+         * Check if a new epoch is needed based on the input. The input epoch should be captured
+         * before the determination that a new epoch is needed is done.
+         * @param currentEpoch the epoch to check against if a new epoch is needed
+         */
+        private void maybeScheduleDecayAndNewEpoch(long currentEpoch) {
+            decayAndNewEpochTask.spawnIfNotRunning(currentEpoch);
         }
 
         /**
@@ -1405,40 +1550,73 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         }
 
         private void computeDecay() {
+            long now = threadPool.rawRelativeTimeInMillis();
+            long afterLock;
+            long end;
             synchronized (SharedBlobCacheService.this) {
-                long now = relativeTimeInMillis();
-                for (int i = 0; i < maxFreq; i++) {
-                    for (LFUCacheEntry entry = freqs[i]; entry != null; entry = entry.next) {
-                        if (entry.freq > 0 && now - entry.lastAccessed >= 2 * minTimeDelta) {
-                            unlink(entry);
-                            entry.freq--;
-                            pushEntryToBack(entry);
-                        }
-                    }
+                afterLock = threadPool.rawRelativeTimeInMillis();
+                appendLevel1ToLevel0();
+                for (int i = 2; i < maxFreq; i++) {
+                    assert freqs[i - 1] == null;
+                    freqs[i - 1] = freqs[i];
+                    freqs[i] = null;
+                    decrementFreqList(freqs[i - 1]);
+                    assert freqs[i - 1] == null || invariant(freqs[i - 1], true);
                 }
             }
+            end = threadPool.rawRelativeTimeInMillis();
+            logger.debug("Decay took {} ms (acquire lock: {} ms)", end - now, afterLock - now);
         }
 
-        class CacheDecayTask extends AbstractAsyncTask {
+        class DecayAndNewEpochTask extends AbstractRunnable {
 
-            CacheDecayTask(ThreadPool threadPool, Executor executor, TimeValue interval) {
-                super(logger, Objects.requireNonNull(threadPool), executor, Objects.requireNonNull(interval), true);
+            private final Executor executor;
+            private final AtomicLong pendingEpoch = new AtomicLong();
+            private volatile boolean isClosed;
+
+            DecayAndNewEpochTask(Executor executor) {
+                this.executor = executor;
             }
 
             @Override
-            protected boolean mustReschedule() {
-                return true;
+            protected void doRun() throws Exception {
+                if (isClosed == false) {
+                    computeDecay();
+                }
             }
 
             @Override
-            public void runInternal() {
-                computeDecay();
+            public void onFailure(Exception e) {
+                logger.error("failed to run cache decay task", e);
+            }
+
+            @Override
+            public void onAfter() {
+                assert pendingEpoch.get() == epoch.get() + 1;
+                epoch.incrementAndGet();
+            }
+
+            @Override
+            public void onRejection(Exception e) {
+                assert false : e;
+                logger.error("unexpected rejection", e);
+                epoch.incrementAndGet();
             }
 
             @Override
             public String toString() {
                 return "shared_cache_decay_task";
             }
+
+            public void spawnIfNotRunning(long currentEpoch) {
+                if (isClosed == false && pendingEpoch.compareAndSet(currentEpoch, currentEpoch + 1)) {
+                    executor.execute(this);
+                }
+            }
+
+            public void close() {
+                this.isClosed = true;
+            }
         }
     }
 }

+ 187 - 73
x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

@@ -34,8 +34,10 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
@@ -90,11 +92,11 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
             assertEquals(2, cacheService.freeRegionCount());
 
             synchronized (cacheService) {
-                assertTrue(region1.tryEvict());
+                assertTrue(tryEvict(region1));
             }
             assertEquals(3, cacheService.freeRegionCount());
             synchronized (cacheService) {
-                assertFalse(region1.tryEvict());
+                assertFalse(tryEvict(region1));
             }
             assertEquals(3, cacheService.freeRegionCount());
             final var bytesReadFuture = new PlainActionFuture<Integer>();
@@ -107,17 +109,17 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 bytesReadFuture
             );
             synchronized (cacheService) {
-                assertFalse(region0.tryEvict());
+                assertFalse(tryEvict(region0));
             }
             assertEquals(3, cacheService.freeRegionCount());
             assertFalse(bytesReadFuture.isDone());
             taskQueue.runAllRunnableTasks();
             synchronized (cacheService) {
-                assertTrue(region0.tryEvict());
+                assertTrue(tryEvict(region0));
             }
             assertEquals(4, cacheService.freeRegionCount());
             synchronized (cacheService) {
-                assertTrue(region2.tryEvict());
+                assertTrue(tryEvict(region2));
             }
             assertEquals(5, cacheService.freeRegionCount());
             assertTrue(bytesReadFuture.isDone());
@@ -125,6 +127,18 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         }
     }
 
+    private static boolean tryEvict(SharedBlobCacheService<Object>.CacheFileRegion region1) {
+        if (randomBoolean()) {
+            return region1.tryEvict();
+        } else {
+            boolean result = region1.tryEvictNoDecRef();
+            if (result) {
+                region1.decRef();
+            }
+            return result;
+        }
+    }
+
     public void testAutoEviction() throws IOException {
         Settings settings = Settings.builder()
             .put(NODE_NAME_SETTING.getKey(), "node")
@@ -163,7 +177,7 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
 
             // explicitly evict region 1
             synchronized (cacheService) {
-                assertTrue(region1.tryEvict());
+                assertTrue(tryEvict(region1));
             }
             assertEquals(1, cacheService.freeRegionCount());
         }
@@ -237,9 +251,10 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
     }
 
     public void testDecay() throws IOException {
+        // we have 8 regions
         Settings settings = Settings.builder()
             .put(NODE_NAME_SETTING.getKey(), "node")
-            .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep())
+            .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(400)).getStringRep())
             .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
             .put("path.home", createTempDir())
             .build();
@@ -254,51 +269,152 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 BlobCacheMetrics.NOOP
             )
         ) {
+            assertEquals(4, cacheService.freeRegionCount());
+
             final var cacheKey1 = generateCacheKey();
             final var cacheKey2 = generateCacheKey();
-            assertEquals(5, cacheService.freeRegionCount());
+            final var cacheKey3 = generateCacheKey();
+            // add a region that we can evict when provoking first decay
+            cacheService.get("evictkey", size(250), 0);
+            assertEquals(3, cacheService.freeRegionCount());
             final var region0 = cacheService.get(cacheKey1, size(250), 0);
-            assertEquals(4, cacheService.freeRegionCount());
+            assertEquals(2, cacheService.freeRegionCount());
             final var region1 = cacheService.get(cacheKey2, size(250), 1);
-            assertEquals(3, cacheService.freeRegionCount());
+            assertEquals(1, cacheService.freeRegionCount());
+            final var region2 = cacheService.get(cacheKey3, size(250), 1);
+            assertEquals(0, cacheService.freeRegionCount());
 
             assertEquals(1, cacheService.getFreq(region0));
             assertEquals(1, cacheService.getFreq(region1));
+            assertEquals(1, cacheService.getFreq(region2));
+            AtomicLong expectedEpoch = new AtomicLong();
+            Runnable triggerDecay = () -> {
+                assertThat(taskQueue.hasRunnableTasks(), is(false));
+                cacheService.get(expectedEpoch.toString(), size(250), 0);
+                assertThat(taskQueue.hasRunnableTasks(), is(true));
+                taskQueue.runAllRunnableTasks();
+                assertThat(cacheService.epoch(), equalTo(expectedEpoch.incrementAndGet()));
+            };
+
+            triggerDecay.run();
 
-            taskQueue.advanceTime();
-            taskQueue.runAllRunnableTasks();
+            cacheService.get(cacheKey1, size(250), 0);
+            cacheService.get(cacheKey2, size(250), 1);
+            cacheService.get(cacheKey3, size(250), 1);
+
+            triggerDecay.run();
 
             final var region0Again = cacheService.get(cacheKey1, size(250), 0);
             assertSame(region0Again, region0);
-            assertEquals(2, cacheService.getFreq(region0));
+            assertEquals(3, cacheService.getFreq(region0));
             assertEquals(1, cacheService.getFreq(region1));
+            assertEquals(1, cacheService.getFreq(region2));
+
+            triggerDecay.run();
 
-            taskQueue.advanceTime();
-            taskQueue.runAllRunnableTasks();
             cacheService.get(cacheKey1, size(250), 0);
-            assertEquals(3, cacheService.getFreq(region0));
+            assertEquals(4, cacheService.getFreq(region0));
             cacheService.get(cacheKey1, size(250), 0);
+            assertEquals(4, cacheService.getFreq(region0));
+            assertEquals(0, cacheService.getFreq(region1));
+            assertEquals(0, cacheService.getFreq(region2));
+
+            // ensure no freq=0 entries
+            cacheService.get(cacheKey2, size(250), 1);
+            cacheService.get(cacheKey3, size(250), 1);
+            assertEquals(2, cacheService.getFreq(region1));
+            assertEquals(2, cacheService.getFreq(region2));
+
+            triggerDecay.run();
+
             assertEquals(3, cacheService.getFreq(region0));
+            assertEquals(1, cacheService.getFreq(region1));
+            assertEquals(1, cacheService.getFreq(region2));
 
-            // advance 2 ticks (decay only starts after 2 ticks)
-            taskQueue.advanceTime();
-            taskQueue.runAllRunnableTasks();
-            taskQueue.advanceTime();
-            taskQueue.runAllRunnableTasks();
+            triggerDecay.run();
             assertEquals(2, cacheService.getFreq(region0));
             assertEquals(0, cacheService.getFreq(region1));
+            assertEquals(0, cacheService.getFreq(region2));
 
-            // advance another tick
-            taskQueue.advanceTime();
-            taskQueue.runAllRunnableTasks();
+            // ensure no freq=0 entries
+            cacheService.get(cacheKey2, size(250), 1);
+            cacheService.get(cacheKey3, size(250), 1);
+            assertEquals(2, cacheService.getFreq(region1));
+            assertEquals(2, cacheService.getFreq(region2));
+
+            triggerDecay.run();
             assertEquals(1, cacheService.getFreq(region0));
-            assertEquals(0, cacheService.getFreq(region1));
+            assertEquals(1, cacheService.getFreq(region1));
+            assertEquals(1, cacheService.getFreq(region2));
 
-            // advance another tick
-            taskQueue.advanceTime();
-            taskQueue.runAllRunnableTasks();
+            triggerDecay.run();
             assertEquals(0, cacheService.getFreq(region0));
             assertEquals(0, cacheService.getFreq(region1));
+            assertEquals(0, cacheService.getFreq(region2));
+        }
+    }
+
+    /**
+     * Test when many objects need to decay, in particular useful to measure how long the decay task takes.
+     * For 1M objects (with no assertions) it took 26ms locally.
+     */
+    public void testMassiveDecay() throws IOException {
+        int regions = 1024; // to measure decay time, increase to 1024*1024 and disable assertions.
+        Settings settings = Settings.builder()
+            .put(NODE_NAME_SETTING.getKey(), "node")
+            .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(regions)).getStringRep())
+            .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(1)).getStringRep())
+            .put("path.home", createTempDir())
+            .build();
+        final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
+        try (
+            NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                taskQueue.getThreadPool(),
+                ThreadPool.Names.GENERIC,
+                BlobCacheMetrics.NOOP
+            )
+        ) {
+            Runnable decay = () -> {
+                assertThat(taskQueue.hasRunnableTasks(), is(true));
+                long before = System.currentTimeMillis();
+                taskQueue.runAllRunnableTasks();
+                long after = System.currentTimeMillis();
+                logger.debug("took {} ms", (after - before));
+            };
+            long fileLength = size(regions + 100);
+            Object cacheKey = new Object();
+            for (int i = 0; i < regions; ++i) {
+                cacheService.get(cacheKey, fileLength, i);
+                if (Integer.bitCount(i) == 1) {
+                    logger.debug("did {} gets", i);
+                }
+            }
+            assertThat(taskQueue.hasRunnableTasks(), is(false));
+            cacheService.get(cacheKey, fileLength, regions);
+            decay.run();
+            int maxRounds = 5;
+            for (int round = 2; round <= maxRounds; ++round) {
+                for (int i = round; i < regions + round; ++i) {
+                    cacheService.get(cacheKey, fileLength, i);
+                    if (Integer.bitCount(i) == 1) {
+                        logger.debug("did {} gets", i);
+                    }
+                }
+                decay.run();
+            }
+
+            Map<Integer, Integer> freqs = new HashMap<>();
+            for (int i = maxRounds; i < regions + maxRounds; ++i) {
+                int freq = cacheService.getFreq(cacheService.get(cacheKey, fileLength, i)) - 2;
+                freqs.compute(freq, (k, v) -> v == null ? 1 : v + 1);
+                if (Integer.bitCount(i) == 1) {
+                    logger.debug("did {} gets", i);
+                }
+            }
+            assertThat(freqs.get(4), equalTo(regions - maxRounds + 1));
         }
     }
 
@@ -308,12 +424,12 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
      */
     public void testGetMultiThreaded() throws IOException {
         int threads = between(2, 10);
+        int regionCount = between(1, 20);
+        // if we have enough regions, a get should always have a result (except for explicit evict interference)
+        final boolean allowAlreadyClosed = regionCount < threads;
         Settings settings = Settings.builder()
             .put(NODE_NAME_SETTING.getKey(), "node")
-            .put(
-                SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(),
-                ByteSizeValue.ofBytes(size(between(1, 20) * 100L)).getStringRep()
-            )
+            .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(regionCount * 100L)).getStringRep())
             .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
             .put(SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getKey(), randomFrom("0", "1ms", "10s"))
             .put("path.home", createTempDir())
@@ -343,11 +459,13 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                         ready.await();
                         for (int i = 0; i < iterations; ++i) {
                             try {
-                                SharedBlobCacheService<String>.CacheFileRegion cacheFileRegion = cacheService.get(
-                                    cacheKeys[i],
-                                    fileLength,
-                                    regions[i]
-                                );
+                                SharedBlobCacheService<String>.CacheFileRegion cacheFileRegion;
+                                try {
+                                    cacheFileRegion = cacheService.get(cacheKeys[i], fileLength, regions[i]);
+                                } catch (AlreadyClosedException e) {
+                                    assert allowAlreadyClosed || e.getMessage().equals("evicted during free region allocation") : e;
+                                    throw e;
+                                }
                                 if (cacheFileRegion.tryIncRef()) {
                                     if (yield[i] == 0) {
                                         Thread.yield();
@@ -415,8 +533,7 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 threadPool,
                 ThreadPool.Names.GENERIC,
                 "bulk",
-                BlobCacheMetrics.NOOP,
-                threadPool::relativeTimeInMillis
+                BlobCacheMetrics.NOOP
             )
         ) {
             {
@@ -477,8 +594,7 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 threadPool,
                 ThreadPool.Names.GENERIC,
                 "bulk",
-                BlobCacheMetrics.NOOP,
-                threadPool::relativeTimeInMillis
+                BlobCacheMetrics.NOOP
             )
         ) {
 
@@ -708,8 +824,7 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
     }
 
     public void testMaybeEvictLeastUsed() throws Exception {
-        final int numRegions = 3;
-        randomIntBetween(1, 500);
+        final int numRegions = 10;
         final long regionSize = size(1L);
         Settings settings = Settings.builder()
             .put(NODE_NAME_SETTING.getKey(), "node")
@@ -728,11 +843,10 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 taskQueue.getThreadPool(),
                 ThreadPool.Names.GENERIC,
                 "bulk",
-                BlobCacheMetrics.NOOP,
-                relativeTimeInMillis::get
+                BlobCacheMetrics.NOOP
             )
         ) {
-            final Set<Object> cacheKeys = new HashSet<>();
+            final Map<Object, SharedBlobCacheService<Object>.CacheFileRegion> cacheEntries = new HashMap<>();
 
             assertThat("All regions are free", cacheService.freeRegionCount(), equalTo(numRegions));
             assertThat("Cache has no entries", cacheService.maybeEvictLeastUsed(), is(false));
@@ -748,8 +862,7 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                     ActionListener.noop()
                 );
                 assertThat(cacheService.getFreq(entry), equalTo(1));
-                relativeTimeInMillis.incrementAndGet();
-                cacheKeys.add(cacheKey);
+                cacheEntries.put(cacheKey, entry);
             }
 
             assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
@@ -760,33 +873,41 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
             assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
             assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(false));
 
-            // simulate elapsed time
-            var minInternalMillis = SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getDefault(Settings.EMPTY).millis();
-            relativeTimeInMillis.addAndGet(minInternalMillis);
+            cacheService.maybeScheduleDecayAndNewEpoch();
+            taskQueue.runAllRunnableTasks();
+
+            cacheEntries.keySet().forEach(key -> cacheService.get(key, regionSize, 0));
+            cacheService.maybeScheduleDecayAndNewEpoch();
+            taskQueue.runAllRunnableTasks();
 
             // touch some random cache entries
-            var unusedCacheKeys = Set.copyOf(randomSubsetOf(cacheKeys));
-            cacheKeys.forEach(key -> {
-                if (unusedCacheKeys.contains(key) == false) {
-                    var entry = cacheService.get(key, regionSize, 0);
-                    assertThat(cacheService.getFreq(entry), equalTo(2));
-                }
-            });
+            var usedCacheKeys = Set.copyOf(randomSubsetOf(cacheEntries.keySet()));
+            usedCacheKeys.forEach(key -> cacheService.get(key, regionSize, 0));
+
+            cacheEntries.forEach(
+                (key, entry) -> assertThat(cacheService.getFreq(entry), usedCacheKeys.contains(key) ? equalTo(3) : equalTo(1))
+            );
 
             assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
             assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(false));
 
-            for (int i = 1; i <= unusedCacheKeys.size(); i++) {
-                // need to advance time and compute decay to decrease frequencies in cache and have an evictable entry
-                relativeTimeInMillis.addAndGet(minInternalMillis);
-                cacheService.computeDecay();
+            cacheService.maybeScheduleDecayAndNewEpoch();
+            taskQueue.runAllRunnableTasks();
 
-                assertThat("Cache entry is old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(true));
+            assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
+            cacheEntries.forEach(
+                (key, entry) -> assertThat(cacheService.getFreq(entry), usedCacheKeys.contains(key) ? equalTo(2) : equalTo(0))
+            );
+
+            var zeroFrequencyCacheEntries = cacheEntries.size() - usedCacheKeys.size();
+            for (int i = 0; i < zeroFrequencyCacheEntries; i++) {
                 assertThat(cacheService.freeRegionCount(), equalTo(i));
+                assertThat("Cache entry is old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(true));
+                assertThat(cacheService.freeRegionCount(), equalTo(i + 1));
             }
 
             assertThat("No more cache entries old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(false));
-            assertThat(cacheService.freeRegionCount(), equalTo(unusedCacheKeys.size()));
+            assertThat(cacheService.freeRegionCount(), equalTo(zeroFrequencyCacheEntries));
         }
     }
 
@@ -817,7 +938,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 return generic;
             }
         };
-        final AtomicLong relativeTimeInMillis = new AtomicLong(0L);
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
             var cacheService = new SharedBlobCacheService<>(
@@ -826,8 +946,7 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 threadPool,
                 ThreadPool.Names.GENERIC,
                 "bulk",
-                BlobCacheMetrics.NOOP,
-                relativeTimeInMillis::get
+                BlobCacheMetrics.NOOP
             )
         ) {
             {
@@ -860,7 +979,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 final PlainActionFuture<Collection<Boolean>> future = new PlainActionFuture<>();
                 final var listener = new GroupedActionListener<>(remainingFreeRegions, future);
                 for (int region = 0; region < remainingFreeRegions; region++) {
-                    relativeTimeInMillis.addAndGet(1_000L);
                     cacheService.maybeFetchRegion(
                         cacheKey,
                         region,
@@ -897,9 +1015,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 assertThat("Region already exists in cache", future.get(), is(false));
             }
             {
-                // simulate elapsed time and compute decay
-                var minInternalMillis = SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getDefault(Settings.EMPTY).millis();
-                relativeTimeInMillis.addAndGet(minInternalMillis * 2);
                 cacheService.computeDecay();
 
                 // fetch one more region should evict an old cache entry
@@ -942,8 +1057,7 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 taskQueue.getThreadPool(),
                 ThreadPool.Names.GENERIC,
                 ThreadPool.Names.GENERIC,
-                BlobCacheMetrics.NOOP,
-                relativeTimeInMillis::get
+                BlobCacheMetrics.NOOP
             )
         ) {
             final var cacheKey = generateCacheKey();