|
@@ -21,6 +21,7 @@ import java.io.EOFException;
|
|
|
import java.io.FilterInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.util.Objects;
|
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
|
|
|
@@ -39,7 +40,7 @@ import java.util.concurrent.atomic.LongAdder;
|
|
|
*
|
|
|
* {@link DirectBlobContainerIndexInput} maintains a global position that indicates the current position in the Lucene file where the
|
|
|
* next read will occur. In the case of a Lucene file snapshotted into multiple parts, this position is used to identify which part must
|
|
|
- * be read at which position (see {@link #readInternal(byte[], int, int)}. This position is also passed over to cloned and sliced input
|
|
|
+ * be read at which position (see {@link #readInternal(ByteBuffer)}. This position is also passed over to cloned and sliced input
|
|
|
* along with the {@link FileInfo} so that they can also track their reading position.
|
|
|
*
|
|
|
* The {@code sequentialReadSize} constructor parameter configures the {@link DirectBlobContainerIndexInput} to perform a larger read on the
|
|
@@ -56,6 +57,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
private StreamForSequentialReads streamForSequentialReads;
|
|
|
private long sequentialReadSize;
|
|
|
private static final long NO_SEQUENTIAL_READ_OPTIMIZATION = 0L;
|
|
|
+ private static final int COPY_BUFFER_SIZE = 8192;
|
|
|
|
|
|
public DirectBlobContainerIndexInput(
|
|
|
BlobContainer blobContainer,
|
|
@@ -99,14 +101,12 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected void readInternal(byte[] b, int offset, int length) throws IOException {
|
|
|
+ protected void readInternal(ByteBuffer b) throws IOException {
|
|
|
ensureOpen();
|
|
|
if (fileInfo.numberOfParts() == 1L) {
|
|
|
- readInternalBytes(0, position, b, offset, length);
|
|
|
+ readInternalBytes(0, position, b, b.remaining());
|
|
|
} else {
|
|
|
- int len = length;
|
|
|
- int off = offset;
|
|
|
- while (len > 0) {
|
|
|
+ while (b.hasRemaining()) {
|
|
|
int currentPart = Math.toIntExact(position / fileInfo.partSize().getBytes());
|
|
|
int remainingBytesInPart;
|
|
|
if (currentPart < (fileInfo.numberOfParts() - 1)) {
|
|
@@ -114,16 +114,14 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
} else {
|
|
|
remainingBytesInPart = Math.toIntExact(fileInfo.length() - position);
|
|
|
}
|
|
|
- final int read = Math.min(len, remainingBytesInPart);
|
|
|
- readInternalBytes(currentPart, position % fileInfo.partSize().getBytes(), b, off, read);
|
|
|
- len -= read;
|
|
|
- off += read;
|
|
|
+ final int read = Math.min(b.remaining(), remainingBytesInPart);
|
|
|
+ readInternalBytes(currentPart, position % fileInfo.partSize().getBytes(), b, read);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void readInternalBytes(final int part, long pos, final byte[] b, int offset, int length) throws IOException {
|
|
|
- int optimizedReadSize = readOptimized(part, pos, b, offset, length);
|
|
|
+ private void readInternalBytes(final int part, long pos, final ByteBuffer b, int length) throws IOException {
|
|
|
+ int optimizedReadSize = readOptimized(part, pos, b, length);
|
|
|
assert optimizedReadSize <= length;
|
|
|
position += optimizedReadSize;
|
|
|
|
|
@@ -134,7 +132,6 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
final int directReadSize = readFully(
|
|
|
inputStream,
|
|
|
b,
|
|
|
- offset + optimizedReadSize,
|
|
|
length - optimizedReadSize,
|
|
|
() -> { throw new EOFException("Read past EOF at [" + position + "] with length [" + fileInfo.partBytes(part) + "]"); }
|
|
|
);
|
|
@@ -150,7 +147,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
* Attempt to satisfy this read in an optimized fashion using {@code streamForSequentialReadsRef}.
|
|
|
* @return the number of bytes read
|
|
|
*/
|
|
|
- private int readOptimized(int part, long pos, byte[] b, int offset, int length) throws IOException {
|
|
|
+ private int readOptimized(int part, long pos, ByteBuffer b, int length) throws IOException {
|
|
|
if (sequentialReadSize == NO_SEQUENTIAL_READ_OPTIMIZATION) {
|
|
|
return 0;
|
|
|
}
|
|
@@ -158,10 +155,10 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
int read = 0;
|
|
|
if (streamForSequentialReads == null) {
|
|
|
// starting a new sequential read
|
|
|
- read = readFromNewSequentialStream(part, pos, b, offset, length);
|
|
|
+ read = readFromNewSequentialStream(part, pos, b, length);
|
|
|
} else if (streamForSequentialReads.canContinueSequentialRead(part, pos)) {
|
|
|
// continuing a sequential read that we started previously
|
|
|
- read = streamForSequentialReads.read(b, offset, length);
|
|
|
+ read = streamForSequentialReads.read(b, length);
|
|
|
if (streamForSequentialReads.isFullyRead()) {
|
|
|
// the current stream was exhausted by this read, so it should be closed
|
|
|
streamForSequentialReads.close();
|
|
@@ -173,7 +170,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
|
|
|
if (read < length) {
|
|
|
// the current stream didn't contain enough data for this read, so we must read more
|
|
|
- read += readFromNewSequentialStream(part, pos + read, b, offset + read, length - read);
|
|
|
+ read += readFromNewSequentialStream(part, pos + read, b, length - read);
|
|
|
}
|
|
|
} else {
|
|
|
// not a sequential read, so stop optimizing for this usage pattern and fall through to the unoptimized behaviour
|
|
@@ -196,7 +193,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
* If appropriate, open a new stream for sequential reading and satisfy the given read using it.
|
|
|
* @return the number of bytes read; if a new stream wasn't opened then nothing was read so the caller should perform the read directly.
|
|
|
*/
|
|
|
- private int readFromNewSequentialStream(int part, long pos, byte[] b, int offset, int length) throws IOException {
|
|
|
+ private int readFromNewSequentialStream(int part, long pos, ByteBuffer b, int length) throws IOException {
|
|
|
|
|
|
assert streamForSequentialReads == null : "should only be called when a new stream is needed";
|
|
|
assert sequentialReadSize > 0L : "should only be called if optimizing sequential reads";
|
|
@@ -243,7 +240,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
}
|
|
|
}, part, pos, streamLength);
|
|
|
|
|
|
- final int read = streamForSequentialReads.read(b, offset, length);
|
|
|
+ final int read = streamForSequentialReads.read(b, length);
|
|
|
assert read == length : read + " vs " + length;
|
|
|
assert streamForSequentialReads.isFullyRead() == false;
|
|
|
return read;
|
|
@@ -347,15 +344,18 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
/**
|
|
|
* Fully read up to {@code length} bytes from the given {@link InputStream}
|
|
|
*/
|
|
|
- private static int readFully(InputStream inputStream, byte[] b, int offset, int length, CheckedRunnable<IOException> onEOF)
|
|
|
+ private static int readFully(InputStream inputStream, final ByteBuffer b, int length, CheckedRunnable<IOException> onEOF)
|
|
|
throws IOException {
|
|
|
int totalRead = 0;
|
|
|
+ final byte[] buffer = new byte[Math.min(length, COPY_BUFFER_SIZE)];
|
|
|
while (totalRead < length) {
|
|
|
- final int read = inputStream.read(b, offset + totalRead, length - totalRead);
|
|
|
+ final int len = Math.min(length - totalRead, COPY_BUFFER_SIZE);
|
|
|
+ final int read = inputStream.read(buffer, 0, len);
|
|
|
if (read == -1) {
|
|
|
onEOF.run();
|
|
|
break;
|
|
|
}
|
|
|
+ b.put(buffer, 0, read);
|
|
|
totalRead += read;
|
|
|
}
|
|
|
return totalRead > 0 ? totalRead : -1;
|
|
@@ -378,9 +378,9 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|
|
return this.part == part && this.pos == pos;
|
|
|
}
|
|
|
|
|
|
- int read(byte[] b, int offset, int length) throws IOException {
|
|
|
+ int read(ByteBuffer b, int length) throws IOException {
|
|
|
assert this.pos < maxPos : "should not try and read from a fully-read stream";
|
|
|
- final int read = readFully(inputStream, b, offset, length, () -> {});
|
|
|
+ final int read = readFully(inputStream, b, length, () -> {});
|
|
|
assert read <= length : read + " vs " + length;
|
|
|
pos += read;
|
|
|
return read;
|