Browse Source

Prevent cache file synchronization to add evicted cache files back in the persistent cache (#67694)

This commit changes how cache files synchronization interacts with 
the persistent cacge in searchable snapshots. Before this change it 
was possible that synchronization reintroduces information about 
an evicted cache file in the persistent cache Lucene index.

This commit introduces an queue of cache file events that are 
periodically processed by the cache synchronization method. The 
events refer to a specific cache file and a type of event (deletion or 
fsync needed) that must be processed by the cache synchronization 
method, which in turn applies the appropriate logic to the persistent 
cache Lucene. This commit changes how the event are inserted into 
the events queue by guaranteeing that no need fsync/update event 
come after a delete event.
index.
Tanguy Leroux 4 years ago
parent
commit
6269af716f

+ 23 - 12
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java

@@ -40,6 +40,15 @@ public class CacheFile {
         void onEviction(CacheFile evictedCacheFile);
     }
 
+    /**
+     * {@link ModificationListener} can be used to be notified when a {@link CacheFile} needs to be fsynced or is deleted.
+     */
+    public interface ModificationListener {
+        void onCacheFileNeedsFsync(CacheFile cacheFile);
+
+        void onCacheFileDelete(CacheFile cacheFile);
+    }
+
     private static final StandardOpenOption[] OPEN_OPTIONS = new StandardOpenOption[] {
         StandardOpenOption.READ,
         StandardOpenOption.WRITE,
@@ -60,6 +69,8 @@ public class CacheFile {
                 Files.deleteIfExists(file);
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
+            } finally {
+                listener.onCacheFileDelete(CacheFile.this);
             }
         }
     };
@@ -78,10 +89,9 @@ public class CacheFile {
     private final AtomicBoolean needsFsync = new AtomicBoolean();
 
     /**
-     * A runnable that is executed every time the {@link #needsFsync} flag is toggled to {@code true}, which indicates that the cache file
-     * has been updated. See {@link #markAsNeedsFSync()} method.
+     * A {@link ModificationListener} that can be used to be notified when the cache file is updated or deleted.
      */
-    private final Runnable needsFsyncRunnable;
+    private final ModificationListener listener;
 
     /**
      * A reference counted holder for the current channel to the physical file backing this cache file instance.
@@ -123,19 +133,19 @@ public class CacheFile {
     @Nullable
     private volatile FileChannelReference channelRef;
 
-    public CacheFile(CacheKey cacheKey, long length, Path file, Runnable onNeedFSync) {
-        this(cacheKey, new SparseFileTracker(file.toString(), length), file, onNeedFSync);
+    public CacheFile(CacheKey cacheKey, long length, Path file, ModificationListener listener) {
+        this(cacheKey, new SparseFileTracker(file.toString(), length), file, listener);
     }
 
-    public CacheFile(CacheKey cacheKey, long length, Path file, SortedSet<Tuple<Long, Long>> ranges, Runnable onNeedFSync) {
-        this(cacheKey, new SparseFileTracker(file.toString(), length, ranges), file, onNeedFSync);
+    public CacheFile(CacheKey cacheKey, long length, Path file, SortedSet<Tuple<Long, Long>> ranges, ModificationListener listener) {
+        this(cacheKey, new SparseFileTracker(file.toString(), length, ranges), file, listener);
     }
 
-    private CacheFile(CacheKey cacheKey, SparseFileTracker tracker, Path file, Runnable onNeedFSync) {
+    private CacheFile(CacheKey cacheKey, SparseFileTracker tracker, Path file, ModificationListener listener) {
         this.cacheKey = Objects.requireNonNull(cacheKey);
         this.tracker = Objects.requireNonNull(tracker);
         this.file = Objects.requireNonNull(file);
-        this.needsFsyncRunnable = Objects.requireNonNull(onNeedFSync);
+        this.listener = Objects.requireNonNull(listener);
         assert invariant();
     }
 
@@ -361,11 +371,11 @@ public class CacheFile {
                         try {
                             ensureOpen();
                             writer.fillCacheRange(reference.fileChannel, gap.start(), gap.end(), gap::onProgress);
+                            gap.onCompletion();
+                            markAsNeedsFSync();
                         } finally {
                             reference.decRef();
                         }
-                        gap.onCompletion();
-                        markAsNeedsFSync();
                     }
 
                     @Override
@@ -469,8 +479,9 @@ public class CacheFile {
      * Marks the current cache file as "fsync needed" and notifies the corresponding listener.
      */
     private void markAsNeedsFSync() {
+        assert refCounter.refCount() > 0 : "file should not be fully released";
         if (needsFsync.getAndSet(true) == false) {
-            needsFsyncRunnable.run();
+            listener.onCacheFileNeedsFsync(this);
         }
     }
 

+ 164 - 73
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java

@@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.cache.Cache;
@@ -135,8 +134,9 @@ public class CacheService extends AbstractLifecycleComponent {
     private static final Logger logger = LogManager.getLogger(CacheService.class);
 
     private final ThreadPool threadPool;
-    private final ConcurrentLinkedQueue<CacheFile> cacheFilesToSync;
-    private final AtomicLong numberOfCacheFilesToSync;
+    private final ConcurrentLinkedQueue<CacheFileEvent> cacheFilesEventsQueue;
+    private final CacheFile.ModificationListener cacheFilesListener;
+    private final AtomicLong numberOfCacheFilesEvents;
     private final CacheSynchronizationTask cacheSyncTask;
     private final TimeValue cacheSyncStopTimeout;
     private final ReentrantLock cacheSyncLock;
@@ -167,12 +167,13 @@ public class CacheService extends AbstractLifecycleComponent {
             .weigher((key, entry) -> entry.getLength())
             // NORELEASE This does not immediately free space on disk, as cache file are only deleted when all index inputs
             // are done with reading/writing the cache file
-            .removalListener(notification -> onCacheFileRemoval(notification.getValue()))
+            .removalListener(notification -> onCacheFileEviction(notification.getValue()))
             .build();
         this.persistentCache = Objects.requireNonNull(persistentCache);
-        this.numberOfCacheFilesToSync = new AtomicLong();
         this.cacheSyncLock = new ReentrantLock();
-        this.cacheFilesToSync = new ConcurrentLinkedQueue<>();
+        this.numberOfCacheFilesEvents = new AtomicLong();
+        this.cacheFilesEventsQueue = new ConcurrentLinkedQueue<>();
+        this.cacheFilesListener = new CacheFileModificationListener();
         final ClusterSettings clusterSettings = clusterService.getClusterSettings();
         this.maxCacheFilesToSyncAtOnce = SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING.get(settings);
         clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING, this::setMaxCacheFilesToSyncAtOnce);
@@ -290,9 +291,7 @@ public class CacheService extends AbstractLifecycleComponent {
             final Path path = cacheDir.resolve(uuid);
             assert Files.notExists(path) : "cache file already exists " + path;
 
-            final SetOnce<CacheFile> cacheFile = new SetOnce<>();
-            cacheFile.set(new CacheFile(key, fileLength, path, () -> onCacheFileUpdate(cacheFile.get())));
-            return cacheFile.get();
+            return new CacheFile(key, fileLength, path, this.cacheFilesListener);
         });
     }
 
@@ -333,10 +332,7 @@ public class CacheService extends AbstractLifecycleComponent {
         if (Files.exists(path) == false) {
             throw new FileNotFoundException("Cache file [" + path + "] not found");
         }
-
-        final SetOnce<CacheFile> cacheFile = new SetOnce<>();
-        cacheFile.set(new CacheFile(cacheKey, fileLength, path, cacheFileRanges, () -> onCacheFileUpdate(cacheFile.get())));
-        cache.put(cacheKey, cacheFile.get());
+        cache.put(cacheKey, new CacheFile(cacheKey, fileLength, path, cacheFileRanges, this.cacheFilesListener));
     }
 
     /**
@@ -494,39 +490,22 @@ public class CacheService extends AbstractLifecycleComponent {
         this.maxCacheFilesToSyncAtOnce = maxCacheFilesToSyncAtOnce;
     }
 
-    /**
-     * This method is invoked when a {@link CacheFile} notifies the current {@link CacheService} that it needs to be fsync on disk.
-     * <p>
-     * It adds the {@link CacheFile} instance to current set of cache files to synchronize.
-     *
-     * @param cacheFile the instance that needs to be fsync
-     */
-    private void onCacheFileUpdate(CacheFile cacheFile) {
-        assert cacheFile != null;
-        cacheFilesToSync.offer(cacheFile);
-        numberOfCacheFilesToSync.incrementAndGet();
-    }
-
     /**
      * This method is invoked after a {@link CacheFile} is evicted from the cache.
      * <p>
-     * It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted and removes it from the persistent cache.
+     * It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted.
      *
      * @param cacheFile the evicted instance
      */
-    private void onCacheFileRemoval(CacheFile cacheFile) {
+    private void onCacheFileEviction(CacheFile cacheFile) {
         IOUtils.closeWhileHandlingException(cacheFile::startEviction);
-        try {
-            persistentCache.removeCacheFile(cacheFile);
-        } catch (Exception e) {
-            assert e instanceof IOException : e;
-            logger.warn("failed to remove cache file from persistent cache", e);
-        }
     }
 
     // used in tests
     boolean isCacheFileToSync(CacheFile cacheFile) {
-        return cacheFilesToSync.contains(cacheFile);
+        return cacheFilesEventsQueue.stream()
+            .filter(event -> event.type == CacheFileEventType.NEEDS_FSYNC)
+            .anyMatch(event -> event.value == cacheFile);
     }
 
     // used in tests
@@ -534,12 +513,26 @@ public class CacheService extends AbstractLifecycleComponent {
         return persistentCache;
     }
 
+    // used in tests
+    long getNumberOfCacheFilesEvents() {
+        return numberOfCacheFilesEvents.get();
+    }
+
+    /**
+     * @return the approximate number of events that are present in the cache files events queue. Note that this requires an O(N)
+     * computation and should be used with caution for debugging or testing purpose only.
+     */
+    // used in tests
+    long getCacheFilesEventsQueueSize() {
+        return cacheFilesEventsQueue.size();
+    }
+
     /**
      * Synchronize the cache files and their parent directories on disk.
      *
      * This method synchronizes the cache files that have been updated since the last time the method was invoked. To be able to do this,
      * the cache files must notify the {@link CacheService} when they need to be fsync. When a {@link CacheFile} notifies the service the
-     * {@link CacheFile} instance is added to the current queue of cache files to synchronize referenced by {@link #cacheFilesToSync}.
+     * {@link CacheFile} instance is added to the current queue of cache files events referenced by {@link #cacheFilesEventsQueue}.
      *
      * Cache files are serially synchronized using the {@link CacheFile#fsync()} method. When the {@link CacheFile#fsync()} call returns a
      * non empty set of completed ranges this method also fsync the shard's snapshot cache directory, which is the parent directory of the
@@ -549,57 +542,86 @@ public class CacheService extends AbstractLifecycleComponent {
     public void synchronizeCache() {
         cacheSyncLock.lock();
         try {
-            long count = 0L;
             final Set<Path> cacheDirs = new HashSet<>();
             final long startTimeNanos = threadPool.relativeTimeInNanos();
-            final long maxCacheFilesToSync = Math.min(numberOfCacheFilesToSync.get(), this.maxCacheFilesToSyncAtOnce);
+            final long maxCacheFilesToSync = Math.min(numberOfCacheFilesEvents.get(), this.maxCacheFilesToSyncAtOnce);
 
-            for (long i = 0L; i < maxCacheFilesToSync; i++) {
+            long updates = 0L;
+            long deletes = 0L;
+            long errors = 0L;
+
+            while ((updates + errors) < maxCacheFilesToSync) {
                 if (lifecycleState() != Lifecycle.State.STARTED) {
                     logger.debug("stopping cache synchronization (cache service is closing)");
-                    return;
+                    break;
+                }
+                final CacheFileEvent event = cacheFilesEventsQueue.poll();
+                if (event == null) {
+                    logger.debug("stopping cache synchronization (no more events to synchronize)");
+                    break;
                 }
 
-                final CacheFile cacheFile = cacheFilesToSync.poll();
-                assert cacheFile != null;
-
-                final long value = numberOfCacheFilesToSync.decrementAndGet();
-                assert value >= 0 : value;
+                final long numberOfEvents = numberOfCacheFilesEvents.decrementAndGet();
+                assert numberOfEvents >= 0L : numberOfEvents;
 
-                final Path cacheFilePath = cacheFile.getFile();
+                final CacheFile cacheFile = event.value;
                 try {
-                    final SortedSet<Tuple<Long, Long>> ranges = cacheFile.fsync();
-                    if (ranges.isEmpty() == false) {
-                        logger.trace(
-                            "cache file [{}] synchronized with [{}] completed range(s)",
-                            cacheFilePath.getFileName(),
-                            ranges.size()
-                        );
-                        final Path cacheDir = cacheFilePath.toAbsolutePath().getParent();
-                        boolean shouldPersist = cacheDirs.contains(cacheDir);
-                        if (shouldPersist == false) {
-                            try {
-                                IOUtils.fsync(cacheDir, true, false); // TODO evict cache file if fsync failed
-                                logger.trace("cache directory [{}] synchronized", cacheDir);
-                                cacheDirs.add(cacheDir);
-                                shouldPersist = true;
-                            } catch (Exception e) {
-                                assert e instanceof IOException : e;
-                                shouldPersist = false;
-                                logger.warn(() -> new ParameterizedMessage("failed to synchronize cache directory [{}]", cacheDir), e);
+                    switch (event.type) {
+                        case DELETE:
+                            logger.trace("deleting cache file [{}] from persistent cache", cacheFile.getFile().getFileName());
+                            persistentCache.removeCacheFile(cacheFile);
+                            deletes += 1L;
+                            break;
+
+                        case NEEDS_FSYNC:
+                            final SortedSet<Tuple<Long, Long>> ranges = cacheFile.fsync();
+                            logger.trace(
+                                "cache file [{}] synchronized with [{}] completed range(s)",
+                                cacheFile.getFile().getFileName(),
+                                ranges.size()
+                            );
+                            if (ranges.isEmpty() == false) {
+                                final Path cacheDir = cacheFile.getFile().toAbsolutePath().getParent();
+                                boolean shouldPersist = cacheDirs.contains(cacheDir);
+                                if (shouldPersist == false) {
+                                    try {
+                                        IOUtils.fsync(cacheDir, true, false); // TODO evict cache file if fsync failed
+                                        logger.trace("cache directory [{}] synchronized", cacheDir);
+                                        cacheDirs.add(cacheDir);
+                                        shouldPersist = true;
+                                    } catch (Exception e) {
+                                        logger.warn(
+                                            () -> new ParameterizedMessage("failed to synchronize cache directory [{}]", cacheDir),
+                                            e
+                                        );
+                                        assert e instanceof IOException : e;
+                                        shouldPersist = false;
+                                    }
+                                }
+                                if (shouldPersist) {
+                                    persistentCache.addCacheFile(cacheFile, ranges);
+                                    updates += 1L;
+                                }
                             }
-                        }
-                        if (shouldPersist) {
-                            persistentCache.addCacheFile(cacheFile, ranges);
-                            count += 1L;
-                        }
+                            break;
+
+                        default:
+                            throw new IllegalArgumentException("Unknown cache file event [" + event + ']');
                     }
                 } catch (Exception e) {
+                    logger.warn(
+                        () -> new ParameterizedMessage(
+                            "failed to process [{}] for cache file [{}]",
+                            event.type,
+                            cacheFile.getFile().getFileName()
+                        ),
+                        e
+                    );
                     assert e instanceof IOException : e;
-                    logger.warn(() -> new ParameterizedMessage("failed to fsync cache file [{}]", cacheFilePath.getFileName()), e);
+                    errors += 1L;
                 }
             }
-            if (count > 0 || persistentCache.hasDeletions()) {
+            if (updates > 0 || deletes > 0) {
                 try {
                     persistentCache.commit();
                 } catch (IOException e) {
@@ -610,7 +632,7 @@ public class CacheService extends AbstractLifecycleComponent {
                 final long elapsedNanos = threadPool.relativeTimeInNanos() - startTimeNanos;
                 logger.debug(
                     "cache files synchronization is done ([{}] cache files synchronized in [{}])",
-                    count,
+                    updates,
                     TimeValue.timeValueNanos(elapsedNanos)
                 );
             }
@@ -646,6 +668,31 @@ public class CacheService extends AbstractLifecycleComponent {
         }
     }
 
+    private class CacheFileModificationListener implements CacheFile.ModificationListener {
+
+        /**
+         * This method is invoked when a {@link CacheFile} notifies the current {@link CacheService} that it needs to be fsync on disk.
+         *
+         * @param cacheFile the instance that needs to be fsync
+         */
+        @Override
+        public void onCacheFileNeedsFsync(CacheFile cacheFile) {
+            cacheFilesEventsQueue.offer(new CacheFileEvent(CacheFileEventType.NEEDS_FSYNC, cacheFile));
+            numberOfCacheFilesEvents.incrementAndGet();
+        }
+
+        /**
+         * This method is invoked after a {@link CacheFile} is deleted from the disk.
+         *
+         * @param cacheFile the deleted instance
+         */
+        @Override
+        public void onCacheFileDelete(CacheFile cacheFile) {
+            cacheFilesEventsQueue.offer(new CacheFileEvent(CacheFileEventType.DELETE, cacheFile));
+            numberOfCacheFilesEvents.incrementAndGet();
+        }
+    }
+
     /**
      * Represents the searchable snapshots information of a shard that has been removed from the node. These information are kept around
      * to evict the cache files associated to that shard.
@@ -707,4 +754,48 @@ public class CacheService extends AbstractLifecycleComponent {
             : "expected generic thread pool but got " + threadName;
         return true;
     }
+
+    private enum CacheFileEventType {
+        NEEDS_FSYNC,
+        DELETE
+    }
+
+    /**
+     * Represents an event that occurred on a specified {@link CacheFile}
+     */
+    public static class CacheFileEvent {
+
+        public final CacheFileEventType type;
+        public final CacheFile value;
+
+        private CacheFileEvent(CacheFileEventType type, CacheFile value) {
+            assert type != null;
+            this.type = type;
+            assert value != null;
+            this.value = value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final CacheFileEvent event = (CacheFileEvent) o;
+            return type == event.type && value == event.value;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(type, value);
+        }
+
+        @Override
+        public String toString() {
+            return "cache file event [type=" + type + ", value=" + value + ']';
+        }
+    }
+
 }

+ 16 - 13
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java

@@ -75,11 +75,13 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntPredicate;
+import java.util.function.Predicate;
 
 import static java.util.Collections.synchronizedMap;
 import static java.util.Collections.unmodifiableList;
 import static java.util.Collections.unmodifiableSortedSet;
 import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.getShardCachePath;
+import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache;
 
 public class PersistentCache implements Closeable {
 
@@ -140,8 +142,17 @@ public class PersistentCache implements Closeable {
     }
 
     public long getCacheSize(ShardId shardId, SnapshotId snapshotId) {
+        return getCacheSize(shardId, snapshotId, Files::exists);
+    }
+
+    // pkg private for tests
+    long getCacheSize(ShardId shardId, SnapshotId snapshotId, Predicate<Path> predicate) {
         long aggregateSize = 0L;
         for (CacheIndexWriter writer : writers) {
+            final Path snapshotCacheDir = resolveSnapshotCache(writer.nodePath().resolve(shardId)).resolve(snapshotId.getUUID());
+            if (Files.exists(snapshotCacheDir) == false) {
+                continue; // searchable snapshot shard is not present on this node path, not need to run a query
+            }
             try (IndexReader indexReader = DirectoryReader.open(writer.indexWriter)) {
                 final IndexSearcher searcher = new IndexSearcher(indexReader);
                 searcher.setQueryCache(null);
@@ -165,9 +176,11 @@ public class PersistentCache implements Closeable {
                         while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
                             if (isLiveDoc.test(docIdSetIterator.docID())) {
                                 final Document document = leafReaderContext.reader().document(docIdSetIterator.docID());
-                                var ranges = buildCacheFileRanges(document);
-                                for (Tuple<Long, Long> range : ranges) {
-                                    aggregateSize += range.v2() - range.v1();
+                                final String cacheFileId = getValue(document, CACHE_ID_FIELD);
+                                if (predicate.test(snapshotCacheDir.resolve(cacheFileId))) {
+                                    long size = buildCacheFileRanges(document).stream().mapToLong(range -> range.v2() - range.v1()).sum();
+                                    logger.trace("cache file [{}] has size [{}]", getValue(document, CACHE_ID_FIELD), size);
+                                    aggregateSize += size;
                                 }
                             }
                         }
@@ -294,16 +307,6 @@ public class PersistentCache implements Closeable {
         }
     }
 
-    public boolean hasDeletions() {
-        ensureOpen();
-        for (CacheIndexWriter writer : writers) {
-            if (writer.indexWriter.hasDeletions()) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     public long getNumDocs() {
         ensureOpen();
         long count = 0L;

+ 109 - 45
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java

@@ -5,13 +5,13 @@
  */
 package org.elasticsearch.index.store.cache;
 
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.PathUtils;
 import org.elasticsearch.common.io.PathUtilsForTesting;
-import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.store.cache.CacheFile.EvictionListener;
 import org.elasticsearch.index.store.cache.TestUtils.FSyncTrackingFileSystemProvider;
@@ -25,13 +25,15 @@ import java.nio.file.FileSystem;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
+import java.util.Set;
 import java.util.SortedSet;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.elasticsearch.common.settings.Settings.builder;
 import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads;
@@ -39,6 +41,7 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -46,7 +49,14 @@ import static org.hamcrest.Matchers.sameInstance;
 
 public class CacheFileTests extends ESTestCase {
 
-    private static final Runnable NOOP = () -> {};
+    private static final CacheFile.ModificationListener NOOP = new CacheFile.ModificationListener() {
+        @Override
+        public void onCacheFileNeedsFsync(CacheFile cacheFile) {}
+
+        @Override
+        public void onCacheFileDelete(CacheFile cacheFile) {}
+    };
+
     private static final CacheKey CACHE_KEY = new CacheKey("_snap_uuid", "_snap_index", new ShardId("_name", "_uuid", 0), "_filename");
 
     public void testGetCacheKey() throws Exception {
@@ -63,7 +73,9 @@ public class CacheFileTests extends ESTestCase {
 
     public void testAcquireAndRelease() throws Exception {
         final Path file = createTempDir().resolve("file.cache");
-        final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, NOOP);
+        final TestCacheFileModificationListener updatesListener = new TestCacheFileModificationListener();
+        final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, updatesListener);
+        assertFalse(updatesListener.containsDelete(cacheFile));
 
         assertThat("Cache file is not acquired: no channel exists", cacheFile.getChannel(), nullValue());
         assertThat("Cache file is not acquired: file does not exist", Files.exists(file), is(false));
@@ -94,15 +106,18 @@ public class CacheFileTests extends ESTestCase {
         assertThat("Cache file is evicted but not fully released: channel still exists", cacheFile.getChannel(), notNullValue());
         assertThat("Cache file is evicted but not fully released: channel is open", cacheFile.getChannel().isOpen(), is(true));
         assertThat("Channel didn't change after eviction", cacheFile.getChannel(), sameInstance(fileChannel));
+        assertFalse(updatesListener.containsDelete(cacheFile));
 
         cacheFile.release(listener);
         assertThat("Cache file evicted and fully released: channel does not exist", cacheFile.getChannel(), nullValue());
         assertThat("Cache file has been deleted", Files.exists(file), is(false));
+        assertTrue(updatesListener.containsDelete(cacheFile));
     }
 
     public void testCacheFileNotAcquired() throws IOException {
         final Path file = createTempDir().resolve("file.cache");
-        final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, NOOP);
+        final TestCacheFileModificationListener updatesListener = new TestCacheFileModificationListener();
+        final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, updatesListener);
 
         assertThat(Files.exists(file), is(false));
         assertThat(cacheFile.getChannel(), nullValue());
@@ -117,14 +132,20 @@ public class CacheFileTests extends ESTestCase {
             cacheFile.release(listener);
         }
 
+        assertFalse(updatesListener.containsUpdate(cacheFile));
+        assertFalse(updatesListener.containsDelete(cacheFile));
+
         cacheFile.startEviction();
+
         assertThat(cacheFile.getChannel(), nullValue());
         assertFalse(Files.exists(file));
+        assertTrue(updatesListener.containsDelete(cacheFile));
     }
 
     public void testDeleteOnCloseAfterLastRelease() throws Exception {
         final Path file = createTempDir().resolve("file.cache");
-        final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, NOOP);
+        final TestCacheFileModificationListener updatesListener = new TestCacheFileModificationListener();
+        final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, updatesListener);
 
         final List<TestEvictionListener> acquiredListeners = new ArrayList<>();
         for (int i = 0; i < randomIntBetween(1, 20); i++) {
@@ -151,12 +172,14 @@ public class CacheFileTests extends ESTestCase {
         acquiredListeners.forEach(l -> assertTrue("Released listeners after cache file eviction are called", l.isCalled()));
         acquiredListeners.forEach(cacheFile::release);
 
+        assertTrue(updatesListener.containsDelete(cacheFile));
         assertFalse(Files.exists(file));
     }
 
     public void testConcurrentAccess() throws Exception {
         final Path file = createTempDir().resolve("file.cache");
-        final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, NOOP);
+        final TestCacheFileModificationListener updatesListener = new TestCacheFileModificationListener();
+        final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(1, 100), file, updatesListener);
 
         final TestEvictionListener evictionListener = new TestEvictionListener();
         cacheFile.acquire(evictionListener);
@@ -191,28 +214,32 @@ public class CacheFileTests extends ESTestCase {
         deterministicTaskQueue.scheduleNow(() -> cacheFile.release(evictionListener));
         deterministicTaskQueue.runAllRunnableTasks();
         if (populateAndReadFuture != null) {
-            assertTrue(populateAndReadFuture.isDone());
+            try {
+                assertTrue(populateAndReadFuture.isDone());
+                populateAndReadFuture.get();
+                assertTrue(updatesListener.containsUpdate(cacheFile));
+            } catch (ExecutionException e) {
+                assertThat(e.getCause(), instanceOf(AlreadyClosedException.class));
+                assertFalse(updatesListener.containsUpdate(cacheFile));
+                assertTrue(updatesListener.containsDelete(cacheFile));
+            }
         }
         if (readIfAvailableFuture != null) {
             assertTrue(readIfAvailableFuture.isDone());
         }
         if (evicted) {
             assertFalse(Files.exists(file));
+            assertTrue(updatesListener.containsDelete(cacheFile));
         }
     }
 
     public void testFSync() throws Exception {
         final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem();
         try {
-            final AtomicBoolean needsFSyncCalled = new AtomicBoolean();
-            final CacheFile cacheFile = new CacheFile(
-                CACHE_KEY,
-                randomLongBetween(0L, 1000L),
-                fileSystem.resolve("test"),
-                () -> assertFalse(needsFSyncCalled.getAndSet(true))
-            );
+            final TestCacheFileModificationListener updatesListener = new TestCacheFileModificationListener();
+            final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(0L, 1000L), fileSystem.resolve("test"), updatesListener);
             assertFalse(cacheFile.needsFsync());
-            assertFalse(needsFSyncCalled.get());
+            assertFalse(updatesListener.containsUpdate(cacheFile));
 
             final TestEvictionListener listener = new TestEvictionListener();
             cacheFile.acquire(listener);
@@ -223,23 +250,24 @@ public class CacheFileTests extends ESTestCase {
                     assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0));
                     assertThat(completedRanges, hasSize(0));
                     assertFalse(cacheFile.needsFsync());
-                    assertFalse(needsFSyncCalled.get());
+                    assertFalse(updatesListener.containsUpdate(cacheFile));
                 }
 
                 final SortedSet<Tuple<Long, Long>> expectedCompletedRanges = randomPopulateAndReads(cacheFile);
                 if (expectedCompletedRanges.isEmpty() == false) {
                     assertTrue(cacheFile.needsFsync());
-                    assertTrue(needsFSyncCalled.getAndSet(false));
+                    assertTrue(updatesListener.containsUpdate(cacheFile));
+                    updatesListener.reset();
                 } else {
                     assertFalse(cacheFile.needsFsync());
-                    assertFalse(needsFSyncCalled.get());
+                    assertFalse(updatesListener.containsUpdate(cacheFile));
                 }
 
                 final SortedSet<Tuple<Long, Long>> completedRanges = cacheFile.fsync();
                 assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1));
                 assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new));
                 assertFalse(cacheFile.needsFsync());
-                assertFalse(needsFSyncCalled.get());
+                assertFalse(updatesListener.containsUpdate(cacheFile));
             } finally {
                 cacheFile.release(listener);
             }
@@ -251,49 +279,55 @@ public class CacheFileTests extends ESTestCase {
     public void testFSyncOnEvictedFile() throws Exception {
         final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem();
         try {
-            final AtomicBoolean needsFSyncCalled = new AtomicBoolean();
-            final CacheFile cacheFile = new CacheFile(
-                CACHE_KEY,
-                randomLongBetween(0L, 1000L),
-                fileSystem.resolve("test"),
-                () -> assertFalse(needsFSyncCalled.getAndSet(true))
-            );
+            final TestCacheFileModificationListener updatesListener = new TestCacheFileModificationListener();
+            final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(0L, 1000L), fileSystem.resolve("test"), updatesListener);
             assertFalse(cacheFile.needsFsync());
-            assertFalse(needsFSyncCalled.get());
+            assertFalse(updatesListener.containsUpdate(cacheFile));
+            assertFalse(updatesListener.containsDelete(cacheFile));
 
             final TestEvictionListener listener = new TestEvictionListener();
             cacheFile.acquire(listener);
 
-            final RunOnce releaseOnce = new RunOnce(() -> cacheFile.release(listener));
+            boolean released = false;
             try {
                 final SortedSet<Tuple<Long, Long>> expectedCompletedRanges = randomPopulateAndReads(cacheFile);
                 if (expectedCompletedRanges.isEmpty() == false) {
                     assertTrue(cacheFile.needsFsync());
-                    assertTrue(needsFSyncCalled.getAndSet(false));
+                    assertTrue(updatesListener.containsUpdate(cacheFile));
+                    updatesListener.reset();
 
                     final SortedSet<Tuple<Long, Long>> completedRanges = cacheFile.fsync();
                     assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new));
                     assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1));
                 }
                 assertFalse(cacheFile.needsFsync());
-                assertFalse(needsFSyncCalled.get());
+                assertFalse(updatesListener.containsUpdate(cacheFile));
+                updatesListener.reset();
 
                 cacheFile.startEviction();
+                assertFalse(updatesListener.containsDelete(cacheFile));
 
                 if (rarely()) {
                     assertThat("New ranges should not be written after cache file eviction", randomPopulateAndReads(cacheFile), hasSize(0));
                 }
                 if (randomBoolean()) {
-                    releaseOnce.run();
+                    cacheFile.release(listener);
+                    assertTrue(updatesListener.containsDelete(cacheFile));
+                    released = true;
                 }
+                updatesListener.reset();
 
                 final SortedSet<Tuple<Long, Long>> completedRangesAfterEviction = cacheFile.fsync();
                 assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1));
                 assertThat(completedRangesAfterEviction, hasSize(0));
                 assertFalse(cacheFile.needsFsync());
-                assertFalse(needsFSyncCalled.get());
+                assertFalse(updatesListener.containsUpdate(cacheFile));
+                updatesListener.reset();
             } finally {
-                releaseOnce.run();
+                if (released == false) {
+                    cacheFile.release(listener);
+                    assertTrue(updatesListener.containsDelete(cacheFile));
+                }
             }
         } finally {
             fileSystem.tearDown();
@@ -305,15 +339,11 @@ public class CacheFileTests extends ESTestCase {
         try {
             fileSystem.failFSyncs(true);
 
-            final AtomicBoolean needsFSyncCalled = new AtomicBoolean();
-            final CacheFile cacheFile = new CacheFile(
-                CACHE_KEY,
-                randomLongBetween(0L, 1000L),
-                fileSystem.resolve("test"),
-                () -> assertFalse(needsFSyncCalled.getAndSet(true))
-            );
+            final TestCacheFileModificationListener updatesListener = new TestCacheFileModificationListener();
+            final CacheFile cacheFile = new CacheFile(CACHE_KEY, randomLongBetween(0L, 1000L), fileSystem.resolve("test"), updatesListener);
             assertFalse(cacheFile.needsFsync());
-            assertFalse(needsFSyncCalled.get());
+            assertFalse(updatesListener.containsUpdate(cacheFile));
+            assertFalse(updatesListener.containsDelete(cacheFile));
 
             final TestEvictionListener listener = new TestEvictionListener();
             cacheFile.acquire(listener);
@@ -322,11 +352,14 @@ public class CacheFileTests extends ESTestCase {
                 final SortedSet<Tuple<Long, Long>> expectedCompletedRanges = randomPopulateAndReads(cacheFile);
                 if (expectedCompletedRanges.isEmpty() == false) {
                     assertTrue(cacheFile.needsFsync());
-                    assertTrue(needsFSyncCalled.getAndSet(false));
+                    assertTrue(updatesListener.containsUpdate(cacheFile));
+                    updatesListener.reset();
+
                     IOException exception = expectThrows(IOException.class, cacheFile::fsync);
                     assertThat(exception.getMessage(), containsString("simulated"));
                     assertTrue(cacheFile.needsFsync());
-                    assertTrue(needsFSyncCalled.getAndSet(false));
+                    assertTrue(updatesListener.containsUpdate(cacheFile));
+                    updatesListener.reset();
                 } else {
                     assertFalse(cacheFile.needsFsync());
                     final SortedSet<Tuple<Long, Long>> completedRanges = cacheFile.fsync();
@@ -340,7 +373,9 @@ public class CacheFileTests extends ESTestCase {
                 assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new));
                 assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1));
                 assertFalse(cacheFile.needsFsync());
-                assertFalse(needsFSyncCalled.get());
+                assertFalse(updatesListener.containsUpdate(cacheFile));
+                assertFalse(updatesListener.containsDelete(cacheFile));
+                updatesListener.reset();
             } finally {
                 cacheFile.release(listener);
             }
@@ -367,6 +402,35 @@ public class CacheFileTests extends ESTestCase {
         }
     }
 
+    private static class TestCacheFileModificationListener implements CacheFile.ModificationListener {
+
+        private final Set<CacheFile> updates = new HashSet<>();
+        private final Set<CacheFile> deletes = new HashSet<>();
+
+        @Override
+        public synchronized void onCacheFileNeedsFsync(CacheFile cacheFile) {
+            assertTrue(updates.add(cacheFile));
+        }
+
+        synchronized boolean containsUpdate(CacheFile cacheFile) {
+            return updates.contains(cacheFile);
+        }
+
+        @Override
+        public synchronized void onCacheFileDelete(CacheFile cacheFile) {
+            assertTrue(deletes.add(cacheFile));
+        }
+
+        synchronized boolean containsDelete(CacheFile cacheFile) {
+            return deletes.contains(cacheFile);
+        }
+
+        synchronized void reset() {
+            updates.clear();
+            deletes.clear();
+        }
+    }
+
     public static void assertNumberOfFSyncs(final Path path, final Matcher<Integer> matcher) {
         final FSyncTrackingFileSystemProvider provider = (FSyncTrackingFileSystemProvider) path.getFileSystem().provider();
         final Integer fsyncCount = provider.getNumberOfFSyncs(path);

+ 4 - 0
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java

@@ -174,6 +174,10 @@ public final class TestUtils {
         assertThat(timedCounter.totalNanoseconds(), equalTo(totalNanoseconds));
     }
 
+    public static long sumOfCompletedRangesLengths(CacheFile cacheFile) {
+        return cacheFile.getCompletedRanges().stream().mapToLong(range -> range.v2() - range.v1()).sum();
+    }
+
     /**
      * A {@link BlobContainer} that can read a single in-memory blob.
      * Any attempt to read a different blob will throw a {@link FileNotFoundException}

+ 253 - 0
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java

@@ -9,30 +9,52 @@ package org.elasticsearch.xpack.searchablesnapshots.cache;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.StringField;
+import org.apache.lucene.mockfile.FilterFileChannel;
+import org.apache.lucene.mockfile.FilterFileSystemProvider;
+import org.apache.lucene.mockfile.FilterPath;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.io.PathUtilsForTesting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.index.store.cache.CacheFile;
+import org.elasticsearch.index.store.cache.CacheKey;
+import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase;
 
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.FileSystem;
 import java.nio.file.Files;
+import java.nio.file.OpenOption;
 import java.nio.file.Path;
+import java.nio.file.attribute.FileAttribute;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES;
 import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE;
 import static org.elasticsearch.index.store.cache.TestUtils.assertCacheFileEquals;
+import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads;
+import static org.elasticsearch.index.store.cache.TestUtils.sumOfCompletedRangesLengths;
 import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
 import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.createCacheIndexWriter;
 import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -172,4 +194,235 @@ public class PersistentCacheTests extends AbstractSearchableSnapshotsTestCase {
         PersistentCache.cleanUp(nodeSettings, nodeEnvironment);
         assertTrue(cacheFiles.stream().noneMatch(Files::exists));
     }
+
+    public void testFSyncDoesNotAddDocumentsBackInPersistentCacheWhenShardIsEvicted() throws Exception {
+        IOUtils.close(nodeEnvironment); // this test uses a specific filesystem to block fsync
+
+        final FSyncBlockingFileSystemProvider fileSystem = setupFSyncBlockingFileSystemProvider();
+        try {
+            nodeEnvironment = newNodeEnvironment(
+                Settings.builder()
+                    .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
+                    .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths())
+                    .build()
+            );
+
+            try (
+                PersistentCache persistentCache = new PersistentCache(nodeEnvironment);
+                CacheService cacheService = new CacheService(Settings.EMPTY, clusterService, threadPool, persistentCache)
+            ) {
+                cacheService.setCacheSyncInterval(TimeValue.ZERO);
+                cacheService.start();
+
+                logger.debug("creating cache files");
+                final List<CacheFile> cacheFiles = randomCacheFiles(cacheService);
+                assertThat(cacheService.getCacheFilesEventsQueueSize(), equalTo((long) cacheFiles.size()));
+                assertThat(cacheService.getNumberOfCacheFilesEvents(), equalTo((long) cacheFiles.size()));
+
+                final CacheFile randomCacheFile = randomFrom(cacheFiles);
+                final CacheKey cacheKey = randomCacheFile.getCacheKey();
+
+                final SnapshotId snapshotId = new SnapshotId("_ignored_", cacheKey.getSnapshotUUID());
+                if (randomBoolean()) {
+                    final Tuple<Long, Long> absent = randomCacheFile.getAbsentRangeWithin(0L, randomCacheFile.getLength());
+                    if (absent != null) {
+                        assertThat(
+                            "Persistent cache should not contain any cached data",
+                            persistentCache.getCacheSize(cacheKey.getShardId(), snapshotId),
+                            equalTo(0L)
+                        );
+
+                        cacheService.synchronizeCache();
+
+                        assertThat(cacheService.getCacheFilesEventsQueueSize(), equalTo(0L));
+                        assertThat(cacheService.getNumberOfCacheFilesEvents(), equalTo(0L));
+
+                        final long sizeInCache = persistentCache.getCacheSize(cacheKey.getShardId(), snapshotId);
+                        final long sizeInCacheFile = sumOfCompletedRangesLengths(randomCacheFile);
+                        assertThat(
+                            "Persistent cache should contain cached data for at least 1 cache file",
+                            sizeInCache,
+                            greaterThanOrEqualTo(sizeInCacheFile)
+                        );
+
+                        final CacheFile.EvictionListener listener = evictedCacheFile -> {};
+                        randomCacheFile.acquire(listener);
+                        try {
+                            SortedSet<Tuple<Long, Long>> ranges = null;
+                            while (ranges == null || ranges.isEmpty()) {
+                                ranges = randomPopulateAndReads(randomCacheFile);
+                            }
+                            assertTrue(cacheService.isCacheFileToSync(randomCacheFile));
+                            assertThat(cacheService.getCacheFilesEventsQueueSize(), equalTo(1L));
+                            assertThat(cacheService.getNumberOfCacheFilesEvents(), equalTo(1L));
+                        } finally {
+                            randomCacheFile.release(listener);
+                        }
+
+                        assertThat(
+                            "Persistent cache should contain cached data for at least 1 cache file",
+                            persistentCache.getCacheSize(cacheKey.getShardId(), snapshotId),
+                            equalTo(sizeInCache)
+                        );
+                    }
+                }
+
+                final boolean fsyncFailure = randomBoolean();
+                logger.debug("blocking fsync for cache file [{}] with failure [{}]", randomCacheFile.getFile(), fsyncFailure);
+                fileSystem.blockFSyncForPath(randomCacheFile.getFile(), fsyncFailure);
+
+                logger.debug("starting synchronization of cache files");
+                final Thread fsyncThread = new Thread(cacheService::synchronizeCache);
+                fsyncThread.start();
+
+                logger.debug("waiting for synchronization of cache files to be blocked");
+                fileSystem.waitForBlock();
+
+                logger.debug("starting eviction of shard [{}]", cacheKey);
+                cacheService.markShardAsEvictedInCache(cacheKey.getSnapshotUUID(), cacheKey.getSnapshotIndexName(), cacheKey.getShardId());
+
+                logger.debug("waiting for shard eviction to be processed");
+                cacheService.waitForCacheFilesEvictionIfNeeded(
+                    cacheKey.getSnapshotUUID(),
+                    cacheKey.getSnapshotIndexName(),
+                    cacheKey.getShardId()
+                );
+
+                logger.debug("unblocking synchronization of cache files");
+                fileSystem.unblock();
+                fsyncThread.join();
+
+                assertThat(
+                    "Persistent cache should not report any cached data for the evicted shard",
+                    persistentCache.getCacheSize(cacheKey.getShardId(), new SnapshotId("_ignored_", cacheKey.getSnapshotUUID())),
+                    equalTo(0L)
+                );
+
+                logger.debug("triggering one more cache synchronization in case all cache files deletions were not processed before");
+                cacheService.synchronizeCache();
+
+                assertThat(
+                    "Persistent cache should not report any cached data for the evicted shard (ignoring deleted files)",
+                    persistentCache.getCacheSize(
+                        cacheKey.getShardId(),
+                        new SnapshotId("_ignored_", cacheKey.getSnapshotUUID()),
+                        path -> true
+                    ),
+                    equalTo(0L)
+                );
+
+                assertThat(cacheService.getCacheFilesEventsQueueSize(), equalTo(0L));
+                assertThat(cacheService.getNumberOfCacheFilesEvents(), equalTo(0L));
+            }
+        } finally {
+            fileSystem.tearDown();
+        }
+    }
+
+    public void testGetCacheSizeIgnoresDeletedCacheFiles() throws Exception {
+        try (CacheService cacheService = defaultCacheService()) {
+            cacheService.setCacheSyncInterval(TimeValue.ZERO);
+            cacheService.start();
+
+            final CacheFile cacheFile = randomFrom(randomCacheFiles(cacheService));
+            final long sizeOfCacheFileInCache = sumOfCompletedRangesLengths(cacheFile);
+
+            final Supplier<Long> cacheSizeSupplier = () -> cacheService.getPersistentCache()
+                .getCacheSize(cacheFile.getCacheKey().getShardId(), new SnapshotId("_ignored_", cacheFile.getCacheKey().getSnapshotUUID()));
+            assertThat(cacheSizeSupplier.get(), equalTo(0L));
+
+            cacheService.synchronizeCache();
+
+            final long sizeOfShardInCache = cacheSizeSupplier.get();
+            assertThat(sizeOfShardInCache, greaterThanOrEqualTo(sizeOfCacheFileInCache));
+
+            final CacheFile.EvictionListener listener = evictedCacheFile -> {};
+            cacheFile.acquire(listener);
+            try {
+                cacheService.removeFromCache(cacheFile.getCacheKey());
+                assertThat(cacheSizeSupplier.get(), equalTo(sizeOfShardInCache));
+            } finally {
+                cacheFile.release(listener);
+            }
+            assertThat(cacheSizeSupplier.get(), equalTo(sizeOfShardInCache - sizeOfCacheFileInCache));
+        }
+    }
+
+    private static FSyncBlockingFileSystemProvider setupFSyncBlockingFileSystemProvider() {
+        final FileSystem defaultFileSystem = PathUtils.getDefaultFileSystem();
+        final FSyncBlockingFileSystemProvider provider = new FSyncBlockingFileSystemProvider(defaultFileSystem, createTempDir());
+        PathUtilsForTesting.installMock(provider.getFileSystem(null));
+        return provider;
+    }
+
+    /**
+     * {@link FilterFileSystemProvider} that can block fsync for a specified {@link Path}.
+     */
+    public static class FSyncBlockingFileSystemProvider extends FilterFileSystemProvider {
+
+        private final AtomicReference<Path> pathToBlock = new AtomicReference<>();
+        private final AtomicBoolean failFSync = new AtomicBoolean();
+        private final CountDownLatch blockingLatch = new CountDownLatch(1);
+        private final CountDownLatch releasingLatch = new CountDownLatch(1);
+
+        private final FileSystem delegateInstance;
+        private final Path rootDir;
+
+        public FSyncBlockingFileSystemProvider(FileSystem delegate, Path rootDir) {
+            super("fsyncblocking://", delegate);
+            this.rootDir = new FilterPath(rootDir, this.fileSystem);
+            this.delegateInstance = delegate;
+        }
+
+        public Path resolve(String other) {
+            return rootDir.resolve(other);
+        }
+
+        @Override
+        public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
+            return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
+
+                @Override
+                public void force(boolean metaData) throws IOException {
+                    final Path blockedPath = pathToBlock.get();
+                    if (blockedPath == null || blockedPath.equals(path.toAbsolutePath()) == false) {
+                        super.force(metaData);
+                        return;
+                    }
+                    try {
+                        blockingLatch.countDown();
+                        releasingLatch.await();
+                        if (failFSync.get()) {
+                            throw new IOException("Simulated");
+                        } else {
+                            super.force(metaData);
+                        }
+                    } catch (InterruptedException e) {
+                        throw new AssertionError(e);
+                    }
+                }
+            };
+        }
+
+        public void blockFSyncForPath(Path path, boolean failure) {
+            pathToBlock.set(path.toAbsolutePath());
+            failFSync.set(failure);
+        }
+
+        public void waitForBlock() {
+            try {
+                blockingLatch.await();
+            } catch (InterruptedException e) {
+                throw new AssertionError(e);
+            }
+        }
+
+        public void unblock() {
+            releasingLatch.countDown();
+        }
+
+        public void tearDown() {
+            PathUtilsForTesting.installMock(delegateInstance);
+        }
+    }
 }