|
@@ -9,15 +9,23 @@ package org.elasticsearch.index.store.cache;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
+import org.apache.lucene.codecs.CodecUtil;
|
|
|
import org.apache.lucene.store.AlreadyClosedException;
|
|
|
import org.apache.lucene.store.IOContext;
|
|
|
import org.apache.lucene.store.IndexInput;
|
|
|
+import org.apache.lucene.util.BytesRef;
|
|
|
+import org.apache.lucene.util.BytesRefIterator;
|
|
|
+import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.blobstore.cache.BlobStoreCacheService;
|
|
|
+import org.elasticsearch.blobstore.cache.CachedBlob;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.SuppressForbidden;
|
|
|
+import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.io.Channels;
|
|
|
import org.elasticsearch.common.lease.Releasable;
|
|
|
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
|
|
|
+import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
|
|
|
import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput;
|
|
|
import org.elasticsearch.index.store.IndexInputStats;
|
|
|
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
|
|
@@ -29,12 +37,15 @@ import java.io.InputStream;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.FileChannel;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Predicate;
|
|
|
import java.util.stream.IntStream;
|
|
|
|
|
|
+import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray;
|
|
|
+
|
|
|
public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
|
|
|
/**
|
|
@@ -74,6 +85,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()),
|
|
|
rangeSize
|
|
|
);
|
|
|
+ assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth
|
|
|
stats.incrementOpenCount();
|
|
|
}
|
|
|
|
|
@@ -136,53 +148,296 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
final long position = getFilePointer() + this.offset;
|
|
|
final int length = b.remaining();
|
|
|
|
|
|
- int totalBytesRead = 0;
|
|
|
- while (totalBytesRead < length) {
|
|
|
- final long pos = position + totalBytesRead;
|
|
|
- final int len = length - totalBytesRead;
|
|
|
- int bytesRead = 0;
|
|
|
- try {
|
|
|
- final CacheFile cacheFile = getCacheFileSafe();
|
|
|
- try (Releasable ignored = cacheFile.fileLock()) {
|
|
|
- final Tuple<Long, Long> rangeToWrite = computeRange(pos);
|
|
|
- final Tuple<Long, Long> rangeToRead = Tuple.tuple(pos, Math.min(pos + len, rangeToWrite.v2()));
|
|
|
-
|
|
|
- bytesRead = cacheFile.fetchAsync(rangeToWrite, rangeToRead, (channel) -> {
|
|
|
- final int read;
|
|
|
- if ((rangeToRead.v2() - rangeToRead.v1()) < b.remaining()) {
|
|
|
- final ByteBuffer duplicate = b.duplicate();
|
|
|
- duplicate.limit(duplicate.position() + Math.toIntExact(rangeToRead.v2() - rangeToRead.v1()));
|
|
|
- read = readCacheFile(channel, pos, duplicate);
|
|
|
- assert duplicate.position() <= b.limit();
|
|
|
- b.position(duplicate.position());
|
|
|
+ // We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often
|
|
|
+ // executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer.
|
|
|
+ if (length == CodecUtil.footerLength() && isClone == false && position == fileInfo.length() - length) {
|
|
|
+ if (readChecksumFromFileInfo(b)) {
|
|
|
+ logger.trace("read footer of file [{}] at position [{}], bypassing all caches", fileInfo.physicalName(), position);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ assert b.remaining() == length;
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this);
|
|
|
+
|
|
|
+ try {
|
|
|
+ final CacheFile cacheFile = getCacheFileSafe();
|
|
|
+ try (Releasable ignored = cacheFile.fileLock()) {
|
|
|
+
|
|
|
+ // Can we serve the read directly from disk? If so, do so and don't worry about anything else.
|
|
|
+
|
|
|
+ final CompletableFuture<Integer> waitingForRead = cacheFile.readIfAvailableOrPending(
|
|
|
+ Tuple.tuple(position, position + length),
|
|
|
+ channel -> {
|
|
|
+ final int read = readCacheFile(channel, position, b);
|
|
|
+ assert read == length : read + " vs " + length;
|
|
|
+ return read;
|
|
|
+ }
|
|
|
+ );
|
|
|
+
|
|
|
+ if (waitingForRead != null) {
|
|
|
+ final Integer read = waitingForRead.get();
|
|
|
+ assert read == length;
|
|
|
+ readComplete(position, length);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Requested data is not on disk, so try the cache index next.
|
|
|
+
|
|
|
+ final Tuple<Long, Long> indexCacheMiss; // null if not a miss
|
|
|
+
|
|
|
+ // We try to use the cache index if:
|
|
|
+ // - the file is small enough to be fully cached
|
|
|
+ final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2;
|
|
|
+ // - we're reading the first N bytes of the file
|
|
|
+ final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);
|
|
|
+
|
|
|
+ if (canBeFullyCached || isStartOfFile) {
|
|
|
+ final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length);
|
|
|
+
|
|
|
+ if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) {
|
|
|
+ // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested
|
|
|
+ // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of
|
|
|
+ // {start, end} where positions are relative to the whole file.
|
|
|
+
|
|
|
+ if (canBeFullyCached) {
|
|
|
+ // if the index input is smaller than twice the size of the blob cache, it will be fully indexed
|
|
|
+ indexCacheMiss = Tuple.tuple(0L, fileInfo.length());
|
|
|
} else {
|
|
|
- read = readCacheFile(channel, pos, b);
|
|
|
+ // the index input is too large to fully cache, so just cache the initial range
|
|
|
+ indexCacheMiss = Tuple.tuple(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);
|
|
|
}
|
|
|
- return read;
|
|
|
- }, this::writeCacheFile, directory.cacheFetchAsyncExecutor()).get();
|
|
|
+
|
|
|
+ // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put.
|
|
|
+ // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case.
|
|
|
+ } else {
|
|
|
+ logger.trace(
|
|
|
+ "reading [{}] bytes of file [{}] at position [{}] using cache index",
|
|
|
+ length,
|
|
|
+ fileInfo.physicalName(),
|
|
|
+ position
|
|
|
+ );
|
|
|
+ stats.addIndexCacheBytesRead(cachedBlob.length());
|
|
|
+
|
|
|
+ final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(Math.toIntExact(position), length).iterator();
|
|
|
+ BytesRef bytesRef;
|
|
|
+ while ((bytesRef = cachedBytesIterator.next()) != null) {
|
|
|
+ b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length);
|
|
|
+ }
|
|
|
+ assert b.position() == length : "copied " + b.position() + " but expected " + length;
|
|
|
+
|
|
|
+ try {
|
|
|
+ final Tuple<Long, Long> cachedRange = Tuple.tuple(cachedBlob.from(), cachedBlob.to());
|
|
|
+ cacheFile.populateAndRead(
|
|
|
+ cachedRange,
|
|
|
+ cachedRange,
|
|
|
+ channel -> cachedBlob.length(),
|
|
|
+ (channel, from, to, progressUpdater) -> {
|
|
|
+ final long startTimeNanos = stats.currentTimeNanos();
|
|
|
+ final BytesRefIterator iterator = cachedBlob.bytes()
|
|
|
+ .slice(Math.toIntExact(from - cachedBlob.from()), Math.toIntExact(to - from))
|
|
|
+ .iterator();
|
|
|
+ long writePosition = from;
|
|
|
+ BytesRef current;
|
|
|
+ while ((current = iterator.next()) != null) {
|
|
|
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length);
|
|
|
+ while (byteBuffer.remaining() > 0) {
|
|
|
+ writePosition += positionalWrite(channel, writePosition, byteBuffer);
|
|
|
+ progressUpdater.accept(writePosition);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assert writePosition == to : writePosition + " vs " + to;
|
|
|
+ final long endTimeNanos = stats.currentTimeNanos();
|
|
|
+ stats.addCachedBytesWritten(to - from, endTimeNanos - startTimeNanos);
|
|
|
+ logger.trace("copied bytes [{}-{}] of file [{}] from cache index to disk", from, to, fileInfo);
|
|
|
+ },
|
|
|
+ directory.cacheFetchAsyncExecutor()
|
|
|
+ );
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.debug(
|
|
|
+ new ParameterizedMessage(
|
|
|
+ "failed to store bytes [{}-{}] of file [{}] obtained from index cache",
|
|
|
+ cachedBlob.from(),
|
|
|
+ cachedBlob.to(),
|
|
|
+ fileInfo
|
|
|
+ ),
|
|
|
+ e
|
|
|
+ );
|
|
|
+ // oh well, no big deal, at least we can return them to the caller.
|
|
|
+ }
|
|
|
+
|
|
|
+ readComplete(position, length);
|
|
|
+
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // requested range is not eligible for caching
|
|
|
+ indexCacheMiss = null;
|
|
|
}
|
|
|
- } catch (final Exception e) {
|
|
|
- if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) {
|
|
|
- try {
|
|
|
- // cache file was evicted during the range fetching, read bytes directly from source
|
|
|
- bytesRead = readDirectly(pos, pos + len, b);
|
|
|
- continue;
|
|
|
- } catch (Exception inner) {
|
|
|
- e.addSuppressed(inner);
|
|
|
+
|
|
|
+ // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any
|
|
|
+ // miss in the cache index.
|
|
|
+
|
|
|
+ final Tuple<Long, Long> startRangeToWrite = computeRange(position);
|
|
|
+ final Tuple<Long, Long> endRangeToWrite = computeRange(position + length - 1);
|
|
|
+ assert startRangeToWrite.v2() <= endRangeToWrite.v2() : startRangeToWrite + " vs " + endRangeToWrite;
|
|
|
+ final Tuple<Long, Long> rangeToWrite = Tuple.tuple(
|
|
|
+ Math.min(startRangeToWrite.v1(), indexCacheMiss == null ? Long.MAX_VALUE : indexCacheMiss.v1()),
|
|
|
+ Math.max(endRangeToWrite.v2(), indexCacheMiss == null ? Long.MIN_VALUE : indexCacheMiss.v2())
|
|
|
+ );
|
|
|
+
|
|
|
+ assert rangeToWrite.v1() <= position && position + length <= rangeToWrite.v2() : "["
|
|
|
+ + position
|
|
|
+ + "-"
|
|
|
+ + (position + length)
|
|
|
+ + "] vs "
|
|
|
+ + rangeToWrite;
|
|
|
+ final Tuple<Long, Long> rangeToRead = Tuple.tuple(position, position + length);
|
|
|
+
|
|
|
+ final CompletableFuture<Integer> populateCacheFuture = cacheFile.populateAndRead(rangeToWrite, rangeToRead, channel -> {
|
|
|
+ final int read;
|
|
|
+ if ((rangeToRead.v2() - rangeToRead.v1()) < b.remaining()) {
|
|
|
+ final ByteBuffer duplicate = b.duplicate();
|
|
|
+ duplicate.limit(duplicate.position() + Math.toIntExact(rangeToRead.v2() - rangeToRead.v1()));
|
|
|
+ read = readCacheFile(channel, position, duplicate);
|
|
|
+ assert duplicate.position() <= b.limit();
|
|
|
+ b.position(duplicate.position());
|
|
|
+ } else {
|
|
|
+ read = readCacheFile(channel, position, b);
|
|
|
+ }
|
|
|
+ return read;
|
|
|
+ }, this::writeCacheFile, directory.cacheFetchAsyncExecutor());
|
|
|
+
|
|
|
+ if (indexCacheMiss != null) {
|
|
|
+ final Releasable onCacheFillComplete = stats.addIndexCacheFill();
|
|
|
+ final CompletableFuture<Integer> readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> {
|
|
|
+ final int indexCacheMissLength = Math.toIntExact(indexCacheMiss.v2() - indexCacheMiss.v1());
|
|
|
+
|
|
|
+ // We assume that we only cache small portions of blobs so that we do not need to:
|
|
|
+ // - use a BigArrays for allocation
|
|
|
+ // - use an intermediate copy buffer to read the file in sensibly-sized chunks
|
|
|
+ // - release the buffer once the indexing operation is complete
|
|
|
+ assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss;
|
|
|
+
|
|
|
+ final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
|
|
|
+ Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.v1(), byteBuffer);
|
|
|
+ // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
|
|
|
+ byteBuffer.flip();
|
|
|
+ final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
|
|
|
+ directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(Void response) {
|
|
|
+ onCacheFillComplete.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e1) {
|
|
|
+ onCacheFillComplete.close();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return indexCacheMissLength;
|
|
|
+ });
|
|
|
+
|
|
|
+ if (readFuture == null) {
|
|
|
+ // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically
|
|
|
+ // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that
|
|
|
+ // case, simply move on.
|
|
|
+ onCacheFillComplete.close();
|
|
|
}
|
|
|
}
|
|
|
- throw new IOException("Fail to read data from cache", e);
|
|
|
|
|
|
- } finally {
|
|
|
- totalBytesRead += bytesRead;
|
|
|
+ final int bytesRead = populateCacheFuture.get();
|
|
|
+ assert bytesRead == length : bytesRead + " vs " + length;
|
|
|
}
|
|
|
+ } catch (final Exception e) {
|
|
|
+ // may have partially filled the buffer before the exception was thrown, so try and get the remainder directly.
|
|
|
+ final int alreadyRead = length - b.remaining();
|
|
|
+ final int bytesRead = readDirectlyIfAlreadyClosed(position + alreadyRead, b, e);
|
|
|
+ assert alreadyRead + bytesRead == length : alreadyRead + " + " + bytesRead + " vs " + length;
|
|
|
+
|
|
|
+ // In principle we could handle an index cache miss here too, ensuring that the direct read was large enough, but this is
|
|
|
+ // already a rare case caused by an overfull/undersized cache.
|
|
|
}
|
|
|
- assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]";
|
|
|
- stats.incrementBytesRead(lastReadPosition, position, totalBytesRead);
|
|
|
- lastReadPosition = position + totalBytesRead;
|
|
|
+
|
|
|
+ readComplete(position, length);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void readComplete(long position, int length) {
|
|
|
+ stats.incrementBytesRead(lastReadPosition, position, length);
|
|
|
+ lastReadPosition = position + length;
|
|
|
lastSeekPosition = lastReadPosition;
|
|
|
}
|
|
|
|
|
|
+ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e) throws IOException {
|
|
|
+ if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) {
|
|
|
+ try {
|
|
|
+ // cache file was evicted during the range fetching, read bytes directly from blob container
|
|
|
+ final long length = b.remaining();
|
|
|
+ final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
|
|
|
+ logger.trace(
|
|
|
+ () -> new ParameterizedMessage(
|
|
|
+ "direct reading of range [{}-{}] for cache file [{}]",
|
|
|
+ position,
|
|
|
+ position + length,
|
|
|
+ cacheFileReference
|
|
|
+ )
|
|
|
+ );
|
|
|
+
|
|
|
+ int bytesCopied = 0;
|
|
|
+ final long startTimeNanos = stats.currentTimeNanos();
|
|
|
+ try (InputStream input = openInputStreamFromBlobStore(position, length)) {
|
|
|
+ long remaining = length;
|
|
|
+ while (remaining > 0) {
|
|
|
+ final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length;
|
|
|
+ int bytesRead = input.read(copyBuffer, 0, len);
|
|
|
+ if (bytesRead == -1) {
|
|
|
+ throw new EOFException(
|
|
|
+ String.format(
|
|
|
+ Locale.ROOT,
|
|
|
+ "unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s",
|
|
|
+ position,
|
|
|
+ position + length,
|
|
|
+ remaining,
|
|
|
+ cacheFileReference
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+ b.put(copyBuffer, 0, bytesRead);
|
|
|
+ bytesCopied += bytesRead;
|
|
|
+ remaining -= bytesRead;
|
|
|
+ assert remaining == b.remaining() : remaining + " vs " + b.remaining();
|
|
|
+ }
|
|
|
+ final long endTimeNanos = stats.currentTimeNanos();
|
|
|
+ stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos);
|
|
|
+ }
|
|
|
+ return bytesCopied;
|
|
|
+ } catch (Exception inner) {
|
|
|
+ e.addSuppressed(inner);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw new IOException("failed to read data from cache", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean readChecksumFromFileInfo(ByteBuffer b) throws IOException {
|
|
|
+ assert isClone == false;
|
|
|
+ byte[] footer;
|
|
|
+ try {
|
|
|
+ footer = checksumToBytesArray(fileInfo.checksum());
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ // tests disable this optimisation by passing an invalid checksum
|
|
|
+ footer = null;
|
|
|
+ }
|
|
|
+ if (footer == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ b.put(footer);
|
|
|
+ assert b.remaining() == 0L;
|
|
|
+ return true;
|
|
|
+
|
|
|
+ // TODO we should add this to DirectBlobContainerIndexInput too.
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Prefetches a complete part and writes it in cache. This method is used to prewarm the cache.
|
|
|
*/
|
|
@@ -232,7 +487,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
final AtomicLong totalBytesWritten = new AtomicLong();
|
|
|
long remainingBytes = rangeEnd - rangeStart;
|
|
|
final long startTimeNanos = stats.currentTimeNanos();
|
|
|
- try (InputStream input = openInputStream(rangeStart, rangeLength)) {
|
|
|
+ try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) {
|
|
|
while (remainingBytes > 0L) {
|
|
|
assert totalBytesRead + remainingBytes == rangeLength;
|
|
|
final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference);
|
|
@@ -241,23 +496,33 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
final long readStart = rangeStart + totalBytesRead;
|
|
|
final Tuple<Long, Long> rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead);
|
|
|
|
|
|
- cacheFile.fetchAsync(rangeToWrite, rangeToWrite, (channel) -> bytesRead, (channel, start, end, progressUpdater) -> {
|
|
|
- final ByteBuffer byteBuffer = ByteBuffer.wrap(
|
|
|
- copyBuffer,
|
|
|
- Math.toIntExact(start - readStart),
|
|
|
- Math.toIntExact(end - start)
|
|
|
- );
|
|
|
- final int writtenBytes = positionalWrite(channel, start, byteBuffer);
|
|
|
- logger.trace(
|
|
|
- "prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written",
|
|
|
- start,
|
|
|
- end,
|
|
|
- fileInfo.physicalName(),
|
|
|
- writtenBytes
|
|
|
- );
|
|
|
- totalBytesWritten.addAndGet(writtenBytes);
|
|
|
- progressUpdater.accept(start + writtenBytes);
|
|
|
- }, directory.cacheFetchAsyncExecutor()).get();
|
|
|
+ // We do not actually read anything, but we want to wait for the write to complete before proceeding.
|
|
|
+ // noinspection UnnecessaryLocalVariable
|
|
|
+ final Tuple<Long, Long> rangeToRead = rangeToWrite;
|
|
|
+
|
|
|
+ cacheFile.populateAndRead(
|
|
|
+ rangeToWrite,
|
|
|
+ rangeToRead,
|
|
|
+ (channel) -> bytesRead,
|
|
|
+ (channel, start, end, progressUpdater) -> {
|
|
|
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(
|
|
|
+ copyBuffer,
|
|
|
+ Math.toIntExact(start - readStart),
|
|
|
+ Math.toIntExact(end - start)
|
|
|
+ );
|
|
|
+ final int writtenBytes = positionalWrite(channel, start, byteBuffer);
|
|
|
+ logger.trace(
|
|
|
+ "prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written",
|
|
|
+ start,
|
|
|
+ end,
|
|
|
+ fileInfo.physicalName(),
|
|
|
+ writtenBytes
|
|
|
+ );
|
|
|
+ totalBytesWritten.addAndGet(writtenBytes);
|
|
|
+ progressUpdater.accept(start + writtenBytes);
|
|
|
+ },
|
|
|
+ directory.cacheFetchAsyncExecutor()
|
|
|
+ ).get();
|
|
|
totalBytesRead += bytesRead;
|
|
|
remainingBytes -= bytesRead;
|
|
|
}
|
|
@@ -357,7 +622,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
long bytesCopied = 0L;
|
|
|
long remaining = end - start;
|
|
|
final long startTimeNanos = stats.currentTimeNanos();
|
|
|
- try (InputStream input = openInputStream(start, length)) {
|
|
|
+ try (InputStream input = openInputStreamFromBlobStore(start, length)) {
|
|
|
while (remaining > 0L) {
|
|
|
final int bytesRead = readSafe(input, copyBuffer, start, end, remaining, cacheFileReference);
|
|
|
positionalWrite(fc, start + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead));
|
|
@@ -370,6 +635,86 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range
|
|
|
+ * spans multiple blobs then this stream will request them in turn.
|
|
|
+ *
|
|
|
+ * @param position The start of the range of bytes to read, relative to the start of the corresponding Lucene file.
|
|
|
+ * @param length The number of bytes to read
|
|
|
+ */
|
|
|
+ private InputStream openInputStreamFromBlobStore(final long position, final long length) throws IOException {
|
|
|
+ assert assertCurrentThreadMayAccessBlobStore();
|
|
|
+ if (fileInfo.numberOfParts() == 1L) {
|
|
|
+ assert position + length <= fileInfo.partBytes(0) : "cannot read ["
|
|
|
+ + position
|
|
|
+ + "-"
|
|
|
+ + (position + length)
|
|
|
+ + "] from ["
|
|
|
+ + fileInfo
|
|
|
+ + "]";
|
|
|
+ stats.addBlobStoreBytesRequested(length);
|
|
|
+ return blobContainer.readBlob(fileInfo.partName(0L), position, length);
|
|
|
+ } else {
|
|
|
+ final long startPart = getPartNumberForPosition(position);
|
|
|
+ final long endPart = getPartNumberForPosition(position + length - 1);
|
|
|
+
|
|
|
+ for (long currentPart = startPart; currentPart <= endPart; currentPart++) {
|
|
|
+ final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
|
|
|
+ final long endInPart = (currentPart == endPart)
|
|
|
+ ? getRelativePositionInPart(position + length - 1) + 1
|
|
|
+ : getLengthOfPart(currentPart);
|
|
|
+ stats.addBlobStoreBytesRequested(endInPart - startInPart);
|
|
|
+ }
|
|
|
+
|
|
|
+ return new SlicedInputStream(endPart - startPart + 1L) {
|
|
|
+ @Override
|
|
|
+ protected InputStream openSlice(long slice) throws IOException {
|
|
|
+ final long currentPart = startPart + slice;
|
|
|
+ final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
|
|
|
+ final long endInPart = (currentPart == endPart)
|
|
|
+ ? getRelativePositionInPart(position + length - 1) + 1
|
|
|
+ : getLengthOfPart(currentPart);
|
|
|
+ return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the part number that contains the byte at the given position in the corresponding Lucene file.
|
|
|
+ */
|
|
|
+ private long getPartNumberForPosition(long position) {
|
|
|
+ ensureValidPosition(position);
|
|
|
+ final long part = position / fileInfo.partSize().getBytes();
|
|
|
+ assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts();
|
|
|
+ assert part >= 0L : "part number [" + part + "] is negative";
|
|
|
+ return part;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the position of the given byte relative to the start of its part.
|
|
|
+ * @param position the position of the required byte (within the corresponding Lucene file)
|
|
|
+ */
|
|
|
+ private long getRelativePositionInPart(long position) {
|
|
|
+ ensureValidPosition(position);
|
|
|
+ final long pos = position % fileInfo.partSize().getBytes();
|
|
|
+ assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length";
|
|
|
+ assert pos >= 0L : "position in part [" + pos + "] is negative";
|
|
|
+ return pos;
|
|
|
+ }
|
|
|
+
|
|
|
+ private long getLengthOfPart(long part) {
|
|
|
+ return fileInfo.partBytes(Math.toIntExact(part));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void ensureValidPosition(long position) {
|
|
|
+ assert position >= 0L && position < fileInfo.length() : position + " vs " + fileInfo.length();
|
|
|
+ // noinspection ConstantConditions in case assertions are disabled
|
|
|
+ if (position < 0L || position >= fileInfo.length()) {
|
|
|
+ throw new IllegalArgumentException("Position [" + position + "] is invalid for a file of length [" + fileInfo.length() + "]");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected void seekInternal(long pos) throws IOException {
|
|
|
if (pos > length()) {
|
|
@@ -431,43 +776,11 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
+ getFilePointer()
|
|
|
+ ", rangeSize="
|
|
|
+ getDefaultRangeSize()
|
|
|
+ + ", directory="
|
|
|
+ + directory
|
|
|
+ '}';
|
|
|
}
|
|
|
|
|
|
- private int readDirectly(long start, long end, ByteBuffer b) throws IOException {
|
|
|
- final long length = end - start;
|
|
|
- final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
|
|
|
- logger.trace(() -> new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference));
|
|
|
-
|
|
|
- int bytesCopied = 0;
|
|
|
- final long startTimeNanos = stats.currentTimeNanos();
|
|
|
- try (InputStream input = openInputStream(start, length)) {
|
|
|
- long remaining = end - start;
|
|
|
- while (remaining > 0) {
|
|
|
- final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length;
|
|
|
- int bytesRead = input.read(copyBuffer, 0, len);
|
|
|
- if (bytesRead == -1) {
|
|
|
- throw new EOFException(
|
|
|
- String.format(
|
|
|
- Locale.ROOT,
|
|
|
- "unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s",
|
|
|
- start,
|
|
|
- end,
|
|
|
- remaining,
|
|
|
- cacheFileReference
|
|
|
- )
|
|
|
- );
|
|
|
- }
|
|
|
- b.put(copyBuffer, 0, bytesRead);
|
|
|
- bytesCopied += bytesRead;
|
|
|
- remaining -= bytesRead;
|
|
|
- }
|
|
|
- final long endTimeNanos = stats.currentTimeNanos();
|
|
|
- stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos);
|
|
|
- }
|
|
|
- return bytesCopied;
|
|
|
- }
|
|
|
-
|
|
|
private static class CacheFileReference implements CacheFile.EvictionListener {
|
|
|
|
|
|
private final long fileLength;
|