|
@@ -10,6 +10,8 @@ package org.elasticsearch.index.store.cache;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
+import org.apache.lucene.codecs.CodecUtil;
|
|
|
+import org.apache.lucene.index.IndexFileNames;
|
|
|
import org.apache.lucene.store.AlreadyClosedException;
|
|
|
import org.apache.lucene.store.IOContext;
|
|
|
import org.apache.lucene.store.IndexInput;
|
|
@@ -35,6 +37,7 @@ import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.Objects;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
import java.util.function.Consumer;
|
|
@@ -52,12 +55,26 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
private final int defaultRangeSize;
|
|
|
private final int recoveryRangeSize;
|
|
|
|
|
|
+ /**
|
|
|
+ * If > 0, represents a logical file within a compound (CFS) file or is a slice thereof represents the offset of the logical
|
|
|
+ * compound file within the physical CFS file
|
|
|
+ */
|
|
|
+ private final long compoundFileOffset;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Range of bytes that should be cached in the blob cache for the current index input's footer. This footer byte range should only be
|
|
|
+ * required for slices of CFS files; regular files already have their footers extracted from the {@link FileInfo} (see method
|
|
|
+ * {@link BaseSearchableSnapshotIndexInput#maybeReadChecksumFromFileInfo}).
|
|
|
+ */
|
|
|
+ private final ByteRange footerBlobCacheByteRange;
|
|
|
+
|
|
|
// last read position is kept around in order to detect (non)contiguous reads for stats
|
|
|
private long lastReadPosition;
|
|
|
// last seek position is kept around in order to detect forward/backward seeks for stats
|
|
|
private long lastSeekPosition;
|
|
|
|
|
|
public FrozenIndexInput(
|
|
|
+ String name,
|
|
|
SearchableSnapshotDirectory directory,
|
|
|
FileInfo fileInfo,
|
|
|
IOContext context,
|
|
@@ -66,41 +83,48 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
int recoveryRangeSize
|
|
|
) {
|
|
|
this(
|
|
|
- "FrozenIndexInput(" + fileInfo.physicalName() + ")",
|
|
|
+ name,
|
|
|
directory,
|
|
|
fileInfo,
|
|
|
context,
|
|
|
stats,
|
|
|
0L,
|
|
|
+ 0L,
|
|
|
fileInfo.length(),
|
|
|
- directory.getFrozenCacheFile(fileInfo.physicalName(), fileInfo.length()),
|
|
|
+ directory.getFrozenCacheFile(name, fileInfo.length()),
|
|
|
rangeSize,
|
|
|
recoveryRangeSize,
|
|
|
- directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length())
|
|
|
+ directory.getBlobCacheByteRange(name, fileInfo.length()),
|
|
|
+ ByteRange.EMPTY
|
|
|
);
|
|
|
assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth
|
|
|
stats.incrementOpenCount();
|
|
|
}
|
|
|
|
|
|
private FrozenIndexInput(
|
|
|
- String resourceDesc,
|
|
|
+ String name,
|
|
|
SearchableSnapshotDirectory directory,
|
|
|
FileInfo fileInfo,
|
|
|
IOContext context,
|
|
|
IndexInputStats stats,
|
|
|
long offset,
|
|
|
+ long compoundFileOffset,
|
|
|
long length,
|
|
|
FrozenCacheFile frozenCacheFile,
|
|
|
int rangeSize,
|
|
|
int recoveryRangeSize,
|
|
|
- ByteRange blobCacheByteRange
|
|
|
+ ByteRange headerBlobCacheByteRange,
|
|
|
+ ByteRange footerBlobCacheByteRange
|
|
|
) {
|
|
|
- super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange);
|
|
|
+ super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange);
|
|
|
this.frozenCacheFile = frozenCacheFile;
|
|
|
this.lastReadPosition = this.offset;
|
|
|
this.lastSeekPosition = this.offset;
|
|
|
this.defaultRangeSize = rangeSize;
|
|
|
this.recoveryRangeSize = recoveryRangeSize;
|
|
|
+ this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange);
|
|
|
+ this.compoundFileOffset = compoundFileOffset;
|
|
|
+ assert offset >= compoundFileOffset;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -109,22 +133,20 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
}
|
|
|
|
|
|
private long getDefaultRangeSize() {
|
|
|
- return (context != CACHE_WARMING_CONTEXT)
|
|
|
- ? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize)
|
|
|
- : fileInfo.partSize().getBytes();
|
|
|
+ return directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize;
|
|
|
}
|
|
|
|
|
|
private ByteRange computeRange(long position) {
|
|
|
final long rangeSize = getDefaultRangeSize();
|
|
|
long start = (position / rangeSize) * rangeSize;
|
|
|
- long end = Math.min(start + rangeSize, fileInfo.length());
|
|
|
+ long end = Math.min(start + rangeSize, frozenCacheFile.getLength());
|
|
|
return ByteRange.of(start, end);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void doReadInternal(ByteBuffer b) throws IOException {
|
|
|
ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT);
|
|
|
- final long position = getFilePointer() + this.offset;
|
|
|
+ final long position = getAbsolutePosition() - compoundFileOffset;
|
|
|
final int length = b.remaining();
|
|
|
|
|
|
final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock();
|
|
@@ -141,6 +163,7 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+ final String fileName = frozenCacheFile.getCacheKey().getFileName();
|
|
|
logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this);
|
|
|
|
|
|
try {
|
|
@@ -168,8 +191,9 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
// Requested data is not on disk, so try the cache index next.
|
|
|
final ByteRange indexCacheMiss; // null if not a miss
|
|
|
|
|
|
- if (blobCacheByteRange.contains(position, position + length)) {
|
|
|
- final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange);
|
|
|
+ final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length);
|
|
|
+ if (blobCacheByteRange != ByteRange.EMPTY) {
|
|
|
+ final CachedBlob cachedBlob = directory.getCachedBlob(fileName, blobCacheByteRange);
|
|
|
assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position;
|
|
|
assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length();
|
|
|
|
|
@@ -182,12 +206,7 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
// We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put.
|
|
|
// TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case.
|
|
|
} else {
|
|
|
- logger.trace(
|
|
|
- "reading [{}] bytes of file [{}] at position [{}] using cache index",
|
|
|
- length,
|
|
|
- fileInfo.physicalName(),
|
|
|
- position
|
|
|
- );
|
|
|
+ logger.trace("reading [{}] bytes of file [{}] at position [{}] using cache index", length, fileName, position);
|
|
|
stats.addIndexCacheBytesRead(cachedBlob.length());
|
|
|
|
|
|
preventAsyncBufferChanges.run();
|
|
@@ -233,7 +252,7 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
"copied bytes [{}-{}] of file [{}] from cache index to disk",
|
|
|
relativePos,
|
|
|
relativePos + len,
|
|
|
- fileInfo
|
|
|
+ fileName
|
|
|
);
|
|
|
},
|
|
|
directory.cacheFetchAsyncExecutor()
|
|
@@ -244,7 +263,7 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
"failed to store bytes [{}-{}] of file [{}] obtained from index cache",
|
|
|
cachedBlob.from(),
|
|
|
cachedBlob.to(),
|
|
|
- fileInfo
|
|
|
+ fileName
|
|
|
),
|
|
|
e
|
|
|
);
|
|
@@ -302,65 +321,12 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
);
|
|
|
|
|
|
if (indexCacheMiss != null) {
|
|
|
- final Releasable onCacheFillComplete = stats.addIndexCacheFill();
|
|
|
- final int indexCacheMissLength = toIntBytes(indexCacheMiss.length());
|
|
|
-
|
|
|
- // We assume that we only cache small portions of blobs so that we do not need to:
|
|
|
- // - use a BigArrays for allocation
|
|
|
- // - use an intermediate copy buffer to read the file in sensibly-sized chunks
|
|
|
- // - release the buffer once the indexing operation is complete
|
|
|
- final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
|
|
|
-
|
|
|
- final StepListener<Integer> readListener = frozenCacheFile.readIfAvailableOrPending(
|
|
|
- indexCacheMiss,
|
|
|
- (channel, channelPos, relativePos, len) -> {
|
|
|
- assert len <= indexCacheMissLength;
|
|
|
-
|
|
|
- if (len == 0) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- // create slice that is positioned to read the given values
|
|
|
- final ByteBuffer dup = byteBuffer.duplicate();
|
|
|
- final int newPosition = dup.position() + Math.toIntExact(relativePos);
|
|
|
- assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit();
|
|
|
- assert newPosition + len <= byteBuffer.limit();
|
|
|
- dup.position(newPosition);
|
|
|
- dup.limit(newPosition + Math.toIntExact(len));
|
|
|
-
|
|
|
- final int read = channel.read(dup, channelPos);
|
|
|
- if (read < 0) {
|
|
|
- throw new EOFException("read past EOF. pos [" + relativePos + "] length: [" + len + "]");
|
|
|
- }
|
|
|
- // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
|
|
|
- assert read == len;
|
|
|
- return read;
|
|
|
- }
|
|
|
- );
|
|
|
|
|
|
- if (readListener == null) {
|
|
|
- // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically
|
|
|
- // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that
|
|
|
- // case, simply move on.
|
|
|
- onCacheFillComplete.close();
|
|
|
- } else {
|
|
|
- readListener.whenComplete(read -> {
|
|
|
- assert read == indexCacheMissLength;
|
|
|
- byteBuffer.position(read); // mark all bytes as accounted for
|
|
|
- byteBuffer.flip();
|
|
|
- final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
|
|
|
- directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener<>() {
|
|
|
- @Override
|
|
|
- public void onResponse(Void response) {
|
|
|
- onCacheFillComplete.close();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e1) {
|
|
|
- onCacheFillComplete.close();
|
|
|
- }
|
|
|
- });
|
|
|
- }, e -> onCacheFillComplete.close());
|
|
|
+ fillIndexCache(fileName, indexCacheMiss);
|
|
|
+ if (compoundFileOffset > 0L
|
|
|
+ && indexCacheMiss.equals(headerBlobCacheByteRange)
|
|
|
+ && footerBlobCacheByteRange != ByteRange.EMPTY) {
|
|
|
+ fillIndexCache(fileName, footerBlobCacheByteRange);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -385,6 +351,69 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
readComplete(position, length);
|
|
|
}
|
|
|
|
|
|
+ private void fillIndexCache(String fileName, ByteRange indexCacheMiss) {
|
|
|
+ final Releasable onCacheFillComplete = stats.addIndexCacheFill();
|
|
|
+ final int indexCacheMissLength = toIntBytes(indexCacheMiss.length());
|
|
|
+
|
|
|
+ // We assume that we only cache small portions of blobs so that we do not need to:
|
|
|
+ // - use a BigArrays for allocation
|
|
|
+ // - use an intermediate copy buffer to read the file in sensibly-sized chunks
|
|
|
+ // - release the buffer once the indexing operation is complete
|
|
|
+ final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
|
|
|
+
|
|
|
+ final StepListener<Integer> readListener = frozenCacheFile.readIfAvailableOrPending(
|
|
|
+ indexCacheMiss,
|
|
|
+ (channel, channelPos, relativePos, len) -> {
|
|
|
+ assert len <= indexCacheMissLength;
|
|
|
+
|
|
|
+ if (len == 0) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ // create slice that is positioned to read the given values
|
|
|
+ final ByteBuffer dup = byteBuffer.duplicate();
|
|
|
+ final int newPosition = dup.position() + Math.toIntExact(relativePos);
|
|
|
+ assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit();
|
|
|
+ assert newPosition + len <= byteBuffer.limit();
|
|
|
+ dup.position(newPosition);
|
|
|
+ dup.limit(newPosition + Math.toIntExact(len));
|
|
|
+
|
|
|
+ final int read = channel.read(dup, channelPos);
|
|
|
+ if (read < 0) {
|
|
|
+ throw new EOFException("read past EOF. pos [" + relativePos + "] length: [" + len + "]");
|
|
|
+ }
|
|
|
+ // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
|
|
|
+ assert read == len;
|
|
|
+ return read;
|
|
|
+ }
|
|
|
+ );
|
|
|
+
|
|
|
+ if (readListener == null) {
|
|
|
+ // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically
|
|
|
+ // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that
|
|
|
+ // case, simply move on.
|
|
|
+ onCacheFillComplete.close();
|
|
|
+ } else {
|
|
|
+ readListener.whenComplete(read -> {
|
|
|
+ assert read == indexCacheMissLength;
|
|
|
+ byteBuffer.position(read); // mark all bytes as accounted for
|
|
|
+ byteBuffer.flip();
|
|
|
+ final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
|
|
|
+ directory.putCachedBlob(fileName, indexCacheMiss.start(), content, new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(Void response) {
|
|
|
+ onCacheFillComplete.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e1) {
|
|
|
+ onCacheFillComplete.close();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }, e -> onCacheFillComplete.close());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void readComplete(long position, int length) {
|
|
|
stats.incrementBytesRead(lastReadPosition, position, length);
|
|
|
lastReadPosition = position + length;
|
|
@@ -408,7 +437,7 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
|
|
|
int bytesCopied = 0;
|
|
|
final long startTimeNanos = stats.currentTimeNanos();
|
|
|
- try (InputStream input = openInputStreamFromBlobStore(position, length)) {
|
|
|
+ try (InputStream input = openInputStreamFromBlobStore(position + compoundFileOffset, length)) {
|
|
|
long remaining = length;
|
|
|
while (remaining > 0) {
|
|
|
final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length;
|
|
@@ -441,6 +470,17 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
throw new IOException("failed to read data from cache", e);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected ByteRange maybeReadFromBlobCache(long position, int length) {
|
|
|
+ final long end = position + length;
|
|
|
+ if (headerBlobCacheByteRange.contains(position, end)) {
|
|
|
+ return headerBlobCacheByteRange;
|
|
|
+ } else if (footerBlobCacheByteRange.contains(position, end)) {
|
|
|
+ return footerBlobCacheByteRange;
|
|
|
+ }
|
|
|
+ return ByteRange.EMPTY;
|
|
|
+ }
|
|
|
+
|
|
|
private static int positionalWrite(SharedBytes.IO fc, long start, ByteBuffer byteBuffer) throws IOException {
|
|
|
assert assertCurrentThreadMayWriteCacheFile();
|
|
|
return fc.write(byteBuffer, start);
|
|
@@ -572,7 +612,7 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
long bytesCopied = 0L;
|
|
|
long remaining = length;
|
|
|
final long startTimeNanos = stats.currentTimeNanos();
|
|
|
- try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos, length)) {
|
|
|
+ try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + compoundFileOffset, length)) {
|
|
|
while (remaining > 0L) {
|
|
|
final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile);
|
|
|
positionalWrite(fc, fileChannelPos + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead));
|
|
@@ -593,7 +633,7 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
} else if (pos < 0L) {
|
|
|
throw new IOException("Seeking to negative position [" + pos + "] for " + toString());
|
|
|
}
|
|
|
- final long position = pos + this.offset;
|
|
|
+ final long position = pos + this.offset - compoundFileOffset;
|
|
|
stats.incrementSeeks(lastSeekPosition, position);
|
|
|
lastSeekPosition = position;
|
|
|
}
|
|
@@ -604,33 +644,63 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public IndexInput slice(String sliceDescription, long offset, long length) {
|
|
|
- if (offset < 0 || length < 0 || offset + length > length()) {
|
|
|
+ public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) {
|
|
|
+ if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > length()) {
|
|
|
throw new IllegalArgumentException(
|
|
|
"slice() "
|
|
|
- + sliceDescription
|
|
|
+ + sliceName
|
|
|
+ " out of bounds: offset="
|
|
|
- + offset
|
|
|
+ + sliceOffset
|
|
|
+ ",length="
|
|
|
- + length
|
|
|
+ + sliceLength
|
|
|
+ ",fileLength="
|
|
|
+ length()
|
|
|
+ ": "
|
|
|
+ this
|
|
|
);
|
|
|
}
|
|
|
+
|
|
|
+ // Are we creating a slice from a CFS file?
|
|
|
+ final boolean sliceCompoundFile = IndexFileNames.matchesExtension(name, "cfs")
|
|
|
+ && IndexFileNames.getExtension(sliceName) != null
|
|
|
+ && compoundFileOffset == 0L // not already a compound file
|
|
|
+ && isClone == false; // tests aggressively clone and slice
|
|
|
+
|
|
|
+ final FrozenCacheFile sliceFrozenCacheFile;
|
|
|
+ final ByteRange sliceHeaderByteRange;
|
|
|
+ final ByteRange sliceFooterByteRange;
|
|
|
+ final long sliceCompoundFileOffset;
|
|
|
+
|
|
|
+ if (sliceCompoundFile) {
|
|
|
+ sliceCompoundFileOffset = this.offset + sliceOffset;
|
|
|
+ sliceFrozenCacheFile = directory.getFrozenCacheFile(sliceName, sliceLength);
|
|
|
+ sliceHeaderByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength);
|
|
|
+ if (sliceHeaderByteRange.length() < sliceLength) {
|
|
|
+ sliceFooterByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength);
|
|
|
+ } else {
|
|
|
+ sliceFooterByteRange = ByteRange.EMPTY;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ sliceCompoundFileOffset = this.compoundFileOffset;
|
|
|
+ sliceFrozenCacheFile = this.frozenCacheFile;
|
|
|
+ sliceHeaderByteRange = ByteRange.EMPTY;
|
|
|
+ sliceFooterByteRange = ByteRange.EMPTY;
|
|
|
+ }
|
|
|
+
|
|
|
final FrozenIndexInput slice = new FrozenIndexInput(
|
|
|
- getFullSliceDescription(sliceDescription),
|
|
|
+ sliceName,
|
|
|
directory,
|
|
|
fileInfo,
|
|
|
context,
|
|
|
stats,
|
|
|
- this.offset + offset,
|
|
|
- length,
|
|
|
- frozenCacheFile,
|
|
|
+ this.offset + sliceOffset,
|
|
|
+ sliceCompoundFileOffset,
|
|
|
+ sliceLength,
|
|
|
+ sliceFrozenCacheFile,
|
|
|
defaultRangeSize,
|
|
|
recoveryRangeSize,
|
|
|
- ByteRange.EMPTY // TODO implement blob cache for slices when it makes sense (like CFs)
|
|
|
+ sliceHeaderByteRange,
|
|
|
+ sliceFooterByteRange
|
|
|
);
|
|
|
slice.isClone = true;
|
|
|
return slice;
|