Преглед изворни кода

Make Searchable Snapshot's CacheFile Lock less (#63911)

Replacing the mechanism for eviction and listener references via a read-write lock by
a reference counting implementation.
This fixes a bug that caused test failure #63586 in which concurrently trying to acquire or release
an eviction listener while doing a file operation would sometimes lead to throwing an exception
since the `tryLock` call on the read lock would fail in this case.
Also this removes the possibility of blocking cluster state updates as a result of them waiting
on the write-lock which might take a long time if a slow read operation executes concurrently.

Closes #63586
Armin Braun пре 5 година
родитељ
комит
e2e01ed317

+ 186 - 165
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java

@@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 
@@ -19,14 +20,14 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 public class CacheFile {
@@ -42,37 +43,73 @@ public class CacheFile {
         StandardOpenOption.CREATE,
         StandardOpenOption.SPARSE };
 
+    /**
+     * Reference counter that counts the number of eviction listeners referencing this cache file plus the number of open file channels
+     * for it. Once this instance has been evicted, all listeners notified and all {@link FileChannelReference} for it released,
+     * it makes sure to delete the physical file backing this cache.
+     */
     private final AbstractRefCounted refCounter = new AbstractRefCounted("CacheFile") {
         @Override
         protected void closeInternal() {
-            CacheFile.this.finishEviction();
+            assert assertNoPendingListeners();
+            try {
+                Files.deleteIfExists(file);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
         }
     };
 
-    private final ReentrantReadWriteLock.WriteLock evictionLock;
-    private final ReentrantReadWriteLock.ReadLock readLock;
-
     private final SparseFileTracker tracker;
     private final String description;
     private final Path file;
 
-    private volatile Set<EvictionListener> listeners;
-    private volatile boolean evicted;
+    private final Set<EvictionListener> listeners = new HashSet<>();
+
+    /**
+     * A reference counted holder for the current channel to the physical file backing this cache file instance.
+     * By guarding access to the file channel by ref-counting and giving the channel its own life-cycle we remove all need for
+     * locking when dealing with file-channel closing and opening as this file is referenced and de-referenced via {@link #acquire}
+     * and {@link #release}.
+     * Background operations running for index inputs that get closed concurrently are tied to a specific instance of this reference and
+     * will simply fail once all references to the channel have been released since they won't be able to acquire a reference to the
+     * channel again.
+     * Each instance of this class also increments the count in {@link #refCounter} by one when instantiated and decrements it by one
+     * again when it is closed. This is done to ensure that the file backing this cache file instance is only deleted after all channels
+     * to it have been closed.
+     */
+    private final class FileChannelReference extends AbstractRefCounted {
+
+        private final FileChannel fileChannel;
+
+        FileChannelReference() throws IOException {
+            super("FileChannel[" + file + "]");
+            this.fileChannel = FileChannel.open(file, OPEN_OPTIONS);
+            refCounter.incRef();
+        }
+
+        @Override
+        protected void closeInternal() {
+            try {
+                fileChannel.close();
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            } finally {
+                refCounter.decRef();
+            }
+        }
+    }
 
-    @Nullable // if evicted, or there are no listeners
-    private volatile FileChannel channel;
+    // If true this file has been evicted from the cache and should not be used any more
+    private final AtomicBoolean evicted = new AtomicBoolean(false);
+
+    @Nullable
+    private volatile FileChannelReference channelRef;
 
     public CacheFile(String description, long length, Path file) {
         this.tracker = new SparseFileTracker(file.toString(), length);
         this.description = Objects.requireNonNull(description);
         this.file = Objects.requireNonNull(file);
-        this.listeners = new HashSet<>();
-        this.evicted = false;
-
-        final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
-        this.evictionLock = cacheLock.writeLock();
-        this.readLock = cacheLock.readLock();
-
         assert invariant();
     }
 
@@ -84,27 +121,11 @@ public class CacheFile {
         return file;
     }
 
-    Releasable fileLock() {
-        boolean success = false;
-        readLock.lock();
-        try {
-            ensureOpen();
-            // check if we have a channel while holding the read lock
-            if (channel == null) {
-                throw new AlreadyClosedException("Cache file channel has been released and closed");
-            }
-            success = true;
-            return readLock::unlock;
-        } finally {
-            if (success == false) {
-                readLock.unlock();
-            }
-        }
-    }
-
+    // Only used in tests
     @Nullable
-    public FileChannel getChannel() {
-        return channel;
+    FileChannel getChannel() {
+        final FileChannelReference reference = channelRef;
+        return reference == null ? null : reference.fileChannel;
     }
 
     public boolean acquire(final EvictionListener listener) throws IOException {
@@ -113,22 +134,20 @@ public class CacheFile {
         ensureOpen();
         boolean success = false;
         if (refCounter.tryIncRef()) {
-            evictionLock.lock();
             try {
-                ensureOpen();
-                final Set<EvictionListener> newListeners = new HashSet<>(listeners);
-                final boolean added = newListeners.add(listener);
-                assert added : "listener already exists " + listener;
-                maybeOpenFileChannel(newListeners);
-                listeners = Collections.unmodifiableSet(newListeners);
+                synchronized (listeners) {
+                    ensureOpen();
+                    final boolean added = listeners.add(listener);
+                    assert added : "listener already exists " + listener;
+                    if (listeners.size() == 1) {
+                        assert channelRef == null;
+                        channelRef = new FileChannelReference();
+                    }
+                }
                 success = true;
             } finally {
-                try {
-                    if (success == false) {
-                        refCounter.decRef();
-                    }
-                } finally {
-                    evictionLock.unlock();
+                if (success == false) {
+                    refCounter.decRef();
                 }
             }
         }
@@ -140,122 +159,91 @@ public class CacheFile {
         assert listener != null;
 
         boolean success = false;
-        evictionLock.lock();
         try {
-            try {
-                final Set<EvictionListener> newListeners = new HashSet<>(listeners);
-                final boolean removed = newListeners.remove(Objects.requireNonNull(listener));
+            synchronized (listeners) {
+                final boolean removed = listeners.remove(Objects.requireNonNull(listener));
                 assert removed : "listener does not exist " + listener;
                 if (removed == false) {
                     throw new IllegalStateException("Cannot remove an unknown listener");
                 }
-                maybeCloseFileChannel(newListeners);
-                listeners = Collections.unmodifiableSet(newListeners);
-                success = true;
-            } finally {
-                if (success) {
-                    refCounter.decRef();
+                if (listeners.isEmpty()) {
+                    // nobody is using this file so we close the channel
+                    channelRef.decRef();
+                    channelRef = null;
                 }
             }
+            success = true;
         } finally {
-            evictionLock.unlock();
+            if (success) {
+                refCounter.decRef();
+            }
         }
         assert invariant();
         return success;
     }
 
-    private void finishEviction() {
-        assert evictionLock.isHeldByCurrentThread();
-        assert listeners.isEmpty();
-        assert channel == null;
-        try {
-            Files.deleteIfExists(file);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
+    private boolean assertNoPendingListeners() {
+        synchronized (listeners) {
+            assert listeners.isEmpty();
+            assert channelRef == null;
         }
+        return true;
     }
 
+    /**
+     * Evicts this file from the cache. Once this method has been called, subsequent use of this class with throw exceptions.
+     */
     public void startEviction() {
-        if (evicted == false) {
-            final Set<EvictionListener> evictionListeners = new HashSet<>();
-            evictionLock.lock();
-            try {
-                if (evicted == false) {
-                    evicted = true;
-                    evictionListeners.addAll(listeners);
-                    refCounter.decRef();
-                }
-            } finally {
-                evictionLock.unlock();
+        if (evicted.compareAndSet(false, true)) {
+            final Set<EvictionListener> evictionListeners;
+            synchronized (listeners) {
+                evictionListeners = new HashSet<>(listeners);
             }
+            refCounter.decRef();
             evictionListeners.forEach(listener -> listener.onEviction(this));
         }
         assert invariant();
     }
 
-    private void maybeOpenFileChannel(Set<EvictionListener> listeners) throws IOException {
-        assert evictionLock.isHeldByCurrentThread();
-        if (listeners.size() == 1) {
-            assert channel == null;
-            channel = FileChannel.open(file, OPEN_OPTIONS);
-        }
-    }
-
-    private void maybeCloseFileChannel(Set<EvictionListener> listeners) {
-        assert evictionLock.isHeldByCurrentThread();
-        if (listeners.size() == 0) {
-            assert channel != null;
-            try {
-                channel.close();
-            } catch (IOException e) {
-                throw new UncheckedIOException("Exception when closing channel", e);
-            } finally {
-                channel = null;
-            }
-        }
-    }
-
     private boolean invariant() {
-        readLock.lock();
-        try {
-            assert listeners != null;
+        synchronized (listeners) {
             if (listeners.isEmpty()) {
-                assert channel == null;
-                assert evicted == false || refCounter.refCount() != 0 || Files.notExists(file);
+                assert channelRef == null;
+                assert evicted.get() == false || refCounter.refCount() != 0 || Files.notExists(file);
             } else {
-                assert channel != null;
+                assert channelRef != null;
                 assert refCounter.refCount() > 0;
-                assert channel.isOpen();
+                assert channelRef.refCount() > 0;
                 assert Files.exists(file);
             }
-        } finally {
-            readLock.unlock();
         }
         return true;
     }
 
     @Override
     public String toString() {
-        return "CacheFile{"
-            + "desc='"
-            + description
-            + "', file="
-            + file
-            + ", length="
-            + tracker.getLength()
-            + ", channel="
-            + (channel != null ? "yes" : "no")
-            + ", listeners="
-            + listeners.size()
-            + ", evicted="
-            + evicted
-            + ", tracker="
-            + tracker
-            + '}';
+        synchronized (listeners) {
+            return "CacheFile{"
+                + "desc='"
+                + description
+                + "', file="
+                + file
+                + ", length="
+                + tracker.getLength()
+                + ", channel="
+                + (channelRef != null ? "yes" : "no")
+                + ", listeners="
+                + listeners.size()
+                + ", evicted="
+                + evicted
+                + ", tracker="
+                + tracker
+                + '}';
+        }
     }
 
     private void ensureOpen() {
-        if (evicted) {
+        if (evicted.get()) {
             throw new AlreadyClosedException("Cache file is evicted");
         }
     }
@@ -278,7 +266,7 @@ public class CacheFile {
      *
      * @return a future which returns the result of the {@link RangeAvailableHandler} once it has completed.
      */
-    CompletableFuture<Integer> populateAndRead(
+    Future<Integer> populateAndRead(
         final Tuple<Long, Long> rangeToWrite,
         final Tuple<Long, Long> rangeToRead,
         final RangeAvailableHandler reader,
@@ -286,19 +274,15 @@ public class CacheFile {
         final Executor executor
     ) {
         final CompletableFuture<Integer> future = new CompletableFuture<>();
+        Releasable decrementRef = null;
         try {
-            ensureOpen();
-            final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(rangeToWrite, rangeToRead, ActionListener.wrap(success -> {
-                final int read = reader.onRangeAvailable(channel);
-                assert read == rangeToRead.v2() - rangeToRead.v1() : "partial read ["
-                    + read
-                    + "] does not match the range to read ["
-                    + rangeToRead.v2()
-                    + '-'
-                    + rangeToRead.v1()
-                    + ']';
-                future.complete(read);
-            }, future::completeExceptionally));
+            final FileChannelReference reference = acquireFileChannelReference();
+            decrementRef = Releasables.releaseOnce(reference::decRef);
+            final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
+                rangeToWrite,
+                rangeToRead,
+                rangeListener(rangeToRead, reader, future, reference, decrementRef)
+            );
 
             if (gaps.isEmpty() == false) {
                 executor.execute(new AbstractRunnable() {
@@ -307,20 +291,17 @@ public class CacheFile {
                     protected void doRun() {
                         for (SparseFileTracker.Gap gap : gaps) {
                             try {
-                                ensureOpen();
-                                if (readLock.tryLock() == false) {
-                                    throw new AlreadyClosedException("Cache file channel is being evicted, writing attempt cancelled");
+                                if (reference.tryIncRef() == false) {
+                                    assert false : "expected a non-closed channel reference";
+                                    throw new AlreadyClosedException("Cache file channel has been released and closed");
                                 }
                                 try {
                                     ensureOpen();
-                                    if (channel == null) {
-                                        throw new AlreadyClosedException("Cache file channel has been released and closed");
-                                    }
-                                    writer.fillCacheRange(channel, gap.start(), gap.end(), gap::onProgress);
-                                    gap.onCompletion();
+                                    writer.fillCacheRange(reference.fileChannel, gap.start(), gap.end(), gap::onProgress);
                                 } finally {
-                                    readLock.unlock();
+                                    reference.decRef();
                                 }
+                                gap.onCompletion();
                             } catch (Exception e) {
                                 gap.onFailure(e);
                             }
@@ -334,7 +315,7 @@ public class CacheFile {
                 });
             }
         } catch (Exception e) {
-            future.completeExceptionally(e);
+            releaseAndFail(future, decrementRef, e);
         }
         return future;
     }
@@ -349,31 +330,71 @@ public class CacheFile {
      *         target range is neither available nor pending.
      */
     @Nullable
-    CompletableFuture<Integer> readIfAvailableOrPending(final Tuple<Long, Long> rangeToRead, final RangeAvailableHandler reader) {
+    Future<Integer> readIfAvailableOrPending(final Tuple<Long, Long> rangeToRead, final RangeAvailableHandler reader) {
         final CompletableFuture<Integer> future = new CompletableFuture<>();
+        Releasable decrementRef = null;
         try {
-            ensureOpen();
-            if (tracker.waitForRangeIfPending(rangeToRead, ActionListener.wrap(success -> {
-                final int read = reader.onRangeAvailable(channel);
-                assert read == rangeToRead.v2() - rangeToRead.v1() : "partial read ["
-                    + read
-                    + "] does not match the range to read ["
-                    + rangeToRead.v2()
-                    + '-'
-                    + rangeToRead.v1()
-                    + ']';
-                future.complete(read);
-            }, future::completeExceptionally))) {
+            final FileChannelReference reference = acquireFileChannelReference();
+            decrementRef = Releasables.releaseOnce(reference::decRef);
+            if (tracker.waitForRangeIfPending(rangeToRead, rangeListener(rangeToRead, reader, future, reference, decrementRef))) {
                 return future;
             } else {
+                decrementRef.close();
                 return null;
             }
         } catch (Exception e) {
-            future.completeExceptionally(e);
+            releaseAndFail(future, decrementRef, e);
             return future;
         }
     }
 
+    private static void releaseAndFail(CompletableFuture<Integer> future, Releasable decrementRef, Exception e) {
+        try {
+            Releasables.close(decrementRef);
+        } catch (Exception ex) {
+            e.addSuppressed(ex);
+        }
+        future.completeExceptionally(e);
+    }
+
+    private static ActionListener<Void> rangeListener(
+        Tuple<Long, Long> rangeToRead,
+        RangeAvailableHandler reader,
+        CompletableFuture<Integer> future,
+        FileChannelReference reference,
+        Releasable releasable
+    ) {
+        return ActionListener.runAfter(ActionListener.wrap(success -> {
+            final int read = reader.onRangeAvailable(reference.fileChannel);
+            assert read == rangeToRead.v2() - rangeToRead.v1() : "partial read ["
+                + read
+                + "] does not match the range to read ["
+                + rangeToRead.v2()
+                + '-'
+                + rangeToRead.v1()
+                + ']';
+            future.complete(read);
+        }, future::completeExceptionally), releasable::close);
+    }
+
+    /**
+     * Get the reference to the currently open file channel for this cache file for a read operation
+     *
+     * @return file channel reference
+     */
+    private FileChannelReference acquireFileChannelReference() {
+        final FileChannelReference reference;
+        synchronized (listeners) {
+            ensureOpen();
+            reference = channelRef;
+            assert reference != null
+                && reference.refCount() > 0 : "impossible to run into a fully released channel reference under the listeners mutex";
+            assert refCounter.refCount() > 0 : "file should not be fully released";
+            reference.incRef();
+        }
+        return reference;
+    }
+
     public Tuple<Long, Long> getAbsentRangeWithin(long start, long end) {
         ensureOpen();
         return tracker.getAbsentRangeWithin(start, end);

+ 175 - 195
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java

@@ -38,7 +38,7 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Locale;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -164,193 +164,188 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
 
         try {
             final CacheFile cacheFile = getCacheFileSafe();
-            try (Releasable ignored = cacheFile.fileLock()) {
 
-                // Can we serve the read directly from disk? If so, do so and don't worry about anything else.
+            // Can we serve the read directly from disk? If so, do so and don't worry about anything else.
 
-                final CompletableFuture<Integer> waitingForRead = cacheFile.readIfAvailableOrPending(
-                    Tuple.tuple(position, position + length),
-                    channel -> {
-                        final int read = readCacheFile(channel, position, b);
-                        assert read == length : read + " vs " + length;
-                        return read;
-                    }
-                );
+            final Future<Integer> waitingForRead = cacheFile.readIfAvailableOrPending(Tuple.tuple(position, position + length), channel -> {
+                final int read = readCacheFile(channel, position, b);
+                assert read == length : read + " vs " + length;
+                return read;
+            });
 
-                if (waitingForRead != null) {
-                    final Integer read = waitingForRead.get();
-                    assert read == length;
-                    readComplete(position, length);
-                    return;
-                }
-
-                // Requested data is not on disk, so try the cache index next.
+            if (waitingForRead != null) {
+                final Integer read = waitingForRead.get();
+                assert read == length;
+                readComplete(position, length);
+                return;
+            }
 
-                final Tuple<Long, Long> indexCacheMiss; // null if not a miss
+            // Requested data is not on disk, so try the cache index next.
 
-                // We try to use the cache index if:
-                // - the file is small enough to be fully cached
-                final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2;
-                // - we're reading the first N bytes of the file
-                final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);
+            final Tuple<Long, Long> indexCacheMiss; // null if not a miss
 
-                if (canBeFullyCached || isStartOfFile) {
-                    final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length);
+            // We try to use the cache index if:
+            // - the file is small enough to be fully cached
+            final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2;
+            // - we're reading the first N bytes of the file
+            final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);
 
-                    if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) {
-                        // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested
-                        // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of
-                        // {start, end} where positions are relative to the whole file.
+            if (canBeFullyCached || isStartOfFile) {
+                final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length);
 
-                        if (canBeFullyCached) {
-                            // if the index input is smaller than twice the size of the blob cache, it will be fully indexed
-                            indexCacheMiss = Tuple.tuple(0L, fileInfo.length());
-                        } else {
-                            // the index input is too large to fully cache, so just cache the initial range
-                            indexCacheMiss = Tuple.tuple(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);
-                        }
+                if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) {
+                    // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested
+                    // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of
+                    // {start, end} where positions are relative to the whole file.
 
-                        // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put.
-                        // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case.
+                    if (canBeFullyCached) {
+                        // if the index input is smaller than twice the size of the blob cache, it will be fully indexed
+                        indexCacheMiss = Tuple.tuple(0L, fileInfo.length());
                     } else {
-                        logger.trace(
-                            "reading [{}] bytes of file [{}] at position [{}] using cache index",
-                            length,
-                            fileInfo.physicalName(),
-                            position
-                        );
-                        stats.addIndexCacheBytesRead(cachedBlob.length());
+                        // the index input is too large to fully cache, so just cache the initial range
+                        indexCacheMiss = Tuple.tuple(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);
+                    }
 
-                        final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator();
-                        BytesRef bytesRef;
-                        while ((bytesRef = cachedBytesIterator.next()) != null) {
-                            b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length);
-                        }
-                        assert b.position() == length : "copied " + b.position() + " but expected " + length;
-
-                        try {
-                            final Tuple<Long, Long> cachedRange = Tuple.tuple(cachedBlob.from(), cachedBlob.to());
-                            cacheFile.populateAndRead(
-                                cachedRange,
-                                cachedRange,
-                                channel -> cachedBlob.length(),
-                                (channel, from, to, progressUpdater) -> {
-                                    final long startTimeNanos = stats.currentTimeNanos();
-                                    final BytesRefIterator iterator = cachedBlob.bytes()
-                                        .slice(toIntBytes(from - cachedBlob.from()), toIntBytes(to - from))
-                                        .iterator();
-                                    long writePosition = from;
-                                    BytesRef current;
-                                    while ((current = iterator.next()) != null) {
-                                        final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length);
-                                        while (byteBuffer.remaining() > 0) {
-                                            writePosition += positionalWrite(channel, writePosition, byteBuffer);
-                                            progressUpdater.accept(writePosition);
-                                        }
+                    // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put.
+                    // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case.
+                } else {
+                    logger.trace(
+                        "reading [{}] bytes of file [{}] at position [{}] using cache index",
+                        length,
+                        fileInfo.physicalName(),
+                        position
+                    );
+                    stats.addIndexCacheBytesRead(cachedBlob.length());
+
+                    final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator();
+                    BytesRef bytesRef;
+                    while ((bytesRef = cachedBytesIterator.next()) != null) {
+                        b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length);
+                    }
+                    assert b.position() == length : "copied " + b.position() + " but expected " + length;
+
+                    try {
+                        final Tuple<Long, Long> cachedRange = Tuple.tuple(cachedBlob.from(), cachedBlob.to());
+                        cacheFile.populateAndRead(
+                            cachedRange,
+                            cachedRange,
+                            channel -> cachedBlob.length(),
+                            (channel, from, to, progressUpdater) -> {
+                                final long startTimeNanos = stats.currentTimeNanos();
+                                final BytesRefIterator iterator = cachedBlob.bytes()
+                                    .slice(toIntBytes(from - cachedBlob.from()), toIntBytes(to - from))
+                                    .iterator();
+                                long writePosition = from;
+                                BytesRef current;
+                                while ((current = iterator.next()) != null) {
+                                    final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length);
+                                    while (byteBuffer.remaining() > 0) {
+                                        writePosition += positionalWrite(channel, writePosition, byteBuffer);
+                                        progressUpdater.accept(writePosition);
                                     }
-                                    assert writePosition == to : writePosition + " vs " + to;
-                                    final long endTimeNanos = stats.currentTimeNanos();
-                                    stats.addCachedBytesWritten(to - from, endTimeNanos - startTimeNanos);
-                                    logger.trace("copied bytes [{}-{}] of file [{}] from cache index to disk", from, to, fileInfo);
-                                },
-                                directory.cacheFetchAsyncExecutor()
-                            );
-                        } catch (Exception e) {
-                            logger.debug(
-                                new ParameterizedMessage(
-                                    "failed to store bytes [{}-{}] of file [{}] obtained from index cache",
-                                    cachedBlob.from(),
-                                    cachedBlob.to(),
-                                    fileInfo
-                                ),
-                                e
-                            );
-                            // oh well, no big deal, at least we can return them to the caller.
-                        }
+                                }
+                                assert writePosition == to : writePosition + " vs " + to;
+                                final long endTimeNanos = stats.currentTimeNanos();
+                                stats.addCachedBytesWritten(to - from, endTimeNanos - startTimeNanos);
+                                logger.trace("copied bytes [{}-{}] of file [{}] from cache index to disk", from, to, fileInfo);
+                            },
+                            directory.cacheFetchAsyncExecutor()
+                        );
+                    } catch (Exception e) {
+                        logger.debug(
+                            new ParameterizedMessage(
+                                "failed to store bytes [{}-{}] of file [{}] obtained from index cache",
+                                cachedBlob.from(),
+                                cachedBlob.to(),
+                                fileInfo
+                            ),
+                            e
+                        );
+                        // oh well, no big deal, at least we can return them to the caller.
+                    }
 
-                        readComplete(position, length);
+                    readComplete(position, length);
 
-                        return;
-                    }
-                } else {
-                    // requested range is not eligible for caching
-                    indexCacheMiss = null;
+                    return;
                 }
+            } else {
+                // requested range is not eligible for caching
+                indexCacheMiss = null;
+            }
 
-                // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any
-                // miss in the cache index.
+            // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any
+            // miss in the cache index.
 
-                final Tuple<Long, Long> startRangeToWrite = computeRange(position);
-                final Tuple<Long, Long> endRangeToWrite = computeRange(position + length - 1);
-                assert startRangeToWrite.v2() <= endRangeToWrite.v2() : startRangeToWrite + " vs " + endRangeToWrite;
-                final Tuple<Long, Long> rangeToWrite = Tuple.tuple(
-                    Math.min(startRangeToWrite.v1(), indexCacheMiss == null ? Long.MAX_VALUE : indexCacheMiss.v1()),
-                    Math.max(endRangeToWrite.v2(), indexCacheMiss == null ? Long.MIN_VALUE : indexCacheMiss.v2())
-                );
-
-                assert rangeToWrite.v1() <= position && position + length <= rangeToWrite.v2() : "["
-                    + position
-                    + "-"
-                    + (position + length)
-                    + "] vs "
-                    + rangeToWrite;
-                final Tuple<Long, Long> rangeToRead = Tuple.tuple(position, position + length);
-
-                final CompletableFuture<Integer> populateCacheFuture = cacheFile.populateAndRead(rangeToWrite, rangeToRead, channel -> {
-                    final int read;
-                    if ((rangeToRead.v2() - rangeToRead.v1()) < b.remaining()) {
-                        final ByteBuffer duplicate = b.duplicate();
-                        duplicate.limit(duplicate.position() + toIntBytes(rangeToRead.v2() - rangeToRead.v1()));
-                        read = readCacheFile(channel, position, duplicate);
-                        assert duplicate.position() <= b.limit();
-                        b.position(duplicate.position());
-                    } else {
-                        read = readCacheFile(channel, position, b);
-                    }
-                    return read;
-                }, this::writeCacheFile, directory.cacheFetchAsyncExecutor());
-
-                if (indexCacheMiss != null) {
-                    final Releasable onCacheFillComplete = stats.addIndexCacheFill();
-                    final CompletableFuture<Integer> readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> {
-                        final int indexCacheMissLength = toIntBytes(indexCacheMiss.v2() - indexCacheMiss.v1());
-
-                        // We assume that we only cache small portions of blobs so that we do not need to:
-                        // - use a BigArrays for allocation
-                        // - use an intermediate copy buffer to read the file in sensibly-sized chunks
-                        // - release the buffer once the indexing operation is complete
-                        assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss;
-
-                        final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
-                        Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.v1(), byteBuffer);
-                        // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
-                        byteBuffer.flip();
-                        final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
-                        directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener<>() {
-                            @Override
-                            public void onResponse(Void response) {
-                                onCacheFillComplete.close();
-                            }
-
-                            @Override
-                            public void onFailure(Exception e1) {
-                                onCacheFillComplete.close();
-                            }
-                        });
-                        return indexCacheMissLength;
-                    });
+            final Tuple<Long, Long> startRangeToWrite = computeRange(position);
+            final Tuple<Long, Long> endRangeToWrite = computeRange(position + length - 1);
+            assert startRangeToWrite.v2() <= endRangeToWrite.v2() : startRangeToWrite + " vs " + endRangeToWrite;
+            final Tuple<Long, Long> rangeToWrite = Tuple.tuple(
+                Math.min(startRangeToWrite.v1(), indexCacheMiss == null ? Long.MAX_VALUE : indexCacheMiss.v1()),
+                Math.max(endRangeToWrite.v2(), indexCacheMiss == null ? Long.MIN_VALUE : indexCacheMiss.v2())
+            );
 
-                    if (readFuture == null) {
-                        // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically
-                        // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that
-                        // case, simply move on.
-                        onCacheFillComplete.close();
-                    }
+            assert rangeToWrite.v1() <= position && position + length <= rangeToWrite.v2() : "["
+                + position
+                + "-"
+                + (position + length)
+                + "] vs "
+                + rangeToWrite;
+            final Tuple<Long, Long> rangeToRead = Tuple.tuple(position, position + length);
+
+            final Future<Integer> populateCacheFuture = cacheFile.populateAndRead(rangeToWrite, rangeToRead, channel -> {
+                final int read;
+                if ((rangeToRead.v2() - rangeToRead.v1()) < b.remaining()) {
+                    final ByteBuffer duplicate = b.duplicate();
+                    duplicate.limit(duplicate.position() + toIntBytes(rangeToRead.v2() - rangeToRead.v1()));
+                    read = readCacheFile(channel, position, duplicate);
+                    assert duplicate.position() <= b.limit();
+                    b.position(duplicate.position());
+                } else {
+                    read = readCacheFile(channel, position, b);
                 }
+                return read;
+            }, this::writeCacheFile, directory.cacheFetchAsyncExecutor());
+
+            if (indexCacheMiss != null) {
+                final Releasable onCacheFillComplete = stats.addIndexCacheFill();
+                final Future<Integer> readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> {
+                    final int indexCacheMissLength = toIntBytes(indexCacheMiss.v2() - indexCacheMiss.v1());
+
+                    // We assume that we only cache small portions of blobs so that we do not need to:
+                    // - use a BigArrays for allocation
+                    // - use an intermediate copy buffer to read the file in sensibly-sized chunks
+                    // - release the buffer once the indexing operation is complete
+                    assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss;
+
+                    final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
+                    Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.v1(), byteBuffer);
+                    // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
+                    byteBuffer.flip();
+                    final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
+                    directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener<>() {
+                        @Override
+                        public void onResponse(Void response) {
+                            onCacheFillComplete.close();
+                        }
 
-                final int bytesRead = populateCacheFuture.get();
-                assert bytesRead == length : bytesRead + " vs " + length;
+                        @Override
+                        public void onFailure(Exception e1) {
+                            onCacheFillComplete.close();
+                        }
+                    });
+                    return indexCacheMissLength;
+                });
+
+                if (readFuture == null) {
+                    // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically
+                    // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that
+                    // case, simply move on.
+                    onCacheFillComplete.close();
+                }
             }
+
+            final int bytesRead = populateCacheFuture.get();
+            assert bytesRead == length : bytesRead + " vs " + length;
         } catch (final Exception e) {
             // may have partially filled the buffer before the exception was thrown, so try and get the remainder directly.
             final int alreadyRead = length - b.remaining();
@@ -498,34 +493,19 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
                     // We do not actually read anything, but we want to wait for the write to complete before proceeding.
                     // noinspection UnnecessaryLocalVariable
                     final Tuple<Long, Long> rangeToRead = rangeToWrite;
-
-                    try (Releasable ignored = cacheFile.fileLock()) {
-                        assert assertFileChannelOpen(cacheFile.getChannel());
-
-                        cacheFile.populateAndRead(
-                            rangeToWrite,
-                            rangeToRead,
-                            (channel) -> bytesRead,
-                            (channel, start, end, progressUpdater) -> {
-                                final ByteBuffer byteBuffer = ByteBuffer.wrap(
-                                    copyBuffer,
-                                    toIntBytes(start - readStart),
-                                    toIntBytes(end - start)
-                                );
-                                final int writtenBytes = positionalWrite(channel, start, byteBuffer);
-                                logger.trace(
-                                    "prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written",
-                                    start,
-                                    end,
-                                    fileInfo.physicalName(),
-                                    writtenBytes
-                                );
-                                totalBytesWritten.addAndGet(writtenBytes);
-                                progressUpdater.accept(start + writtenBytes);
-                            },
-                            directory.cacheFetchAsyncExecutor()
-                        ).get();
-                    }
+                    cacheFile.populateAndRead(rangeToWrite, rangeToRead, (channel) -> bytesRead, (channel, start, end, progressUpdater) -> {
+                        final ByteBuffer byteBuffer = ByteBuffer.wrap(copyBuffer, toIntBytes(start - readStart), toIntBytes(end - start));
+                        final int writtenBytes = positionalWrite(channel, start, byteBuffer);
+                        logger.trace(
+                            "prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written",
+                            start,
+                            end,
+                            fileInfo.physicalName(),
+                            writtenBytes
+                        );
+                        totalBytesWritten.addAndGet(writtenBytes);
+                        progressUpdater.accept(start + writtenBytes);
+                    }, directory.cacheFetchAsyncExecutor()).get();
                     totalBytesRead += bytesRead;
                     remainingBytes -= bytesRead;
                 }

+ 55 - 2
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java

@@ -6,8 +6,11 @@
 package org.elasticsearch.index.store.cache;
 
 import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.index.store.cache.CacheFile.EvictionListener;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.nio.channels.FileChannel;
@@ -17,7 +20,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.Future;
 
+import static org.elasticsearch.common.settings.Settings.builder;
+import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -124,9 +130,56 @@ public class CacheFileTests extends ESTestCase {
         assertFalse(Files.exists(file));
     }
 
-    class TestEvictionListener implements EvictionListener {
+    public void testConcurrentAccess() throws Exception {
+        final Path file = createTempDir().resolve("file.cache");
+        final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file);
+
+        final TestEvictionListener evictionListener = new TestEvictionListener();
+        assertTrue(cacheFile.acquire(evictionListener));
+        final long length = cacheFile.getLength();
+        final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
+            builder().put(NODE_NAME_SETTING.getKey(), getTestName()).build(),
+            random()
+        );
+        final ThreadPool threadPool = deterministicTaskQueue.getThreadPool();
+        final Future<Integer> populateAndReadFuture;
+        final Future<Integer> readIfAvailableFuture;
+        if (randomBoolean()) {
+            populateAndReadFuture = cacheFile.populateAndRead(
+                Tuple.tuple(0L, length),
+                Tuple.tuple(0L, length),
+                channel -> Math.toIntExact(length),
+                (channel, from, to, progressUpdater) -> progressUpdater.accept(length),
+                threadPool.generic()
+            );
+        } else {
+            populateAndReadFuture = null;
+        }
+        if (randomBoolean()) {
+            readIfAvailableFuture = cacheFile.readIfAvailableOrPending(Tuple.tuple(0L, length), channel -> Math.toIntExact(length));
+        } else {
+            readIfAvailableFuture = null;
+        }
+        final boolean evicted = randomBoolean();
+        if (evicted) {
+            deterministicTaskQueue.scheduleNow(cacheFile::startEviction);
+        }
+        deterministicTaskQueue.scheduleNow(() -> cacheFile.release(evictionListener));
+        deterministicTaskQueue.runAllRunnableTasks();
+        if (populateAndReadFuture != null) {
+            assertTrue(populateAndReadFuture.isDone());
+        }
+        if (readIfAvailableFuture != null) {
+            assertTrue(readIfAvailableFuture.isDone());
+        }
+        if (evicted) {
+            assertFalse(Files.exists(file));
+        }
+    }
+
+    static class TestEvictionListener implements EvictionListener {
 
-        private SetOnce<CacheFile> evicted = new SetOnce<>();
+        private final SetOnce<CacheFile> evicted = new SetOnce<>();
 
         CacheFile getEvictedCacheFile() {
             return evicted.get();