|
@@ -48,6 +48,7 @@ import java.util.Objects;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -883,13 +884,54 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
final RangeMissingHandler writer,
|
|
|
final String executor
|
|
|
) throws Exception {
|
|
|
+ if (rangeToRead.length() == 0L) {
|
|
|
+ // nothing to read, skip
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ final var exec = threadPool.executor(executor);
|
|
|
+ final int startRegion = getRegion(rangeToWrite.start());
|
|
|
+ final int endRegion = getEndingRegion(rangeToWrite.end());
|
|
|
+ if (startRegion == endRegion) {
|
|
|
+ return readSingleRegion(rangeToWrite, rangeToRead, reader, writer, exec, startRegion);
|
|
|
+ }
|
|
|
+ return readMultiRegions(rangeToWrite, rangeToRead, reader, writer, exec, startRegion, endRegion);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int readSingleRegion(
|
|
|
+ ByteRange rangeToWrite,
|
|
|
+ ByteRange rangeToRead,
|
|
|
+ RangeAvailableHandler reader,
|
|
|
+ RangeMissingHandler writer,
|
|
|
+ Executor executor,
|
|
|
+ int region
|
|
|
+ ) throws InterruptedException, ExecutionException {
|
|
|
+ final PlainActionFuture<Integer> readFuture = PlainActionFuture.newFuture();
|
|
|
+ final CacheFileRegion fileRegion = get(cacheKey, length, region);
|
|
|
+ final long regionStart = getRegionStart(region);
|
|
|
+ fileRegion.populateAndRead(
|
|
|
+ mapSubRangeToRegion(rangeToWrite, region),
|
|
|
+ mapSubRangeToRegion(rangeToRead, region),
|
|
|
+ readerWithOffset(reader, fileRegion, rangeToRead.start() - regionStart),
|
|
|
+ writerWithOffset(writer, fileRegion, rangeToWrite.start() - regionStart),
|
|
|
+ executor,
|
|
|
+ readFuture
|
|
|
+ );
|
|
|
+ return readFuture.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ private int readMultiRegions(
|
|
|
+ ByteRange rangeToWrite,
|
|
|
+ ByteRange rangeToRead,
|
|
|
+ RangeAvailableHandler reader,
|
|
|
+ RangeMissingHandler writer,
|
|
|
+ Executor executor,
|
|
|
+ int startRegion,
|
|
|
+ int endRegion
|
|
|
+ ) throws InterruptedException, ExecutionException {
|
|
|
final PlainActionFuture<Void> readsComplete = new PlainActionFuture<>();
|
|
|
final AtomicInteger bytesRead = new AtomicInteger();
|
|
|
try (var listeners = new RefCountingListener(1, readsComplete)) {
|
|
|
- final long writeStart = rangeToWrite.start();
|
|
|
- final long readStart = rangeToRead.start();
|
|
|
- for (int region = getRegion(rangeToWrite.start()); region <= getEndingRegion(rangeToWrite.end()); region++) {
|
|
|
- final ByteRange subRangeToWrite = mapSubRangeToRegion(rangeToWrite, region);
|
|
|
+ for (int region = startRegion; region <= endRegion; region++) {
|
|
|
final ByteRange subRangeToRead = mapSubRangeToRegion(rangeToRead, region);
|
|
|
if (subRangeToRead.length() == 0L) {
|
|
|
// nothing to read, skip
|
|
@@ -897,23 +939,65 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
}
|
|
|
final CacheFileRegion fileRegion = get(cacheKey, length, region);
|
|
|
final long regionStart = getRegionStart(region);
|
|
|
- final long writeOffset = writeStart - regionStart;
|
|
|
- final long readOffset = readStart - regionStart;
|
|
|
- fileRegion.populateAndRead(subRangeToWrite, subRangeToRead, (channel, channelPos, relativePos, len) -> {
|
|
|
- assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion;
|
|
|
- assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
|
|
|
- return reader.onRangeAvailable(channel, channelPos, relativePos - readOffset, len);
|
|
|
- }, (channel, channelPos, relativePos, len, progressUpdater) -> {
|
|
|
- assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion;
|
|
|
- assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
|
|
|
- writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, len, progressUpdater);
|
|
|
- }, threadPool.executor(executor), listeners.acquire(i -> bytesRead.updateAndGet(j -> Math.addExact(i, j))));
|
|
|
+ fileRegion.populateAndRead(
|
|
|
+ mapSubRangeToRegion(rangeToWrite, region),
|
|
|
+ subRangeToRead,
|
|
|
+ readerWithOffset(reader, fileRegion, rangeToRead.start() - regionStart),
|
|
|
+ writerWithOffset(writer, fileRegion, rangeToWrite.start() - regionStart),
|
|
|
+ executor,
|
|
|
+ listeners.acquire(i -> bytesRead.updateAndGet(j -> Math.addExact(i, j)))
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
readsComplete.get();
|
|
|
return bytesRead.get();
|
|
|
}
|
|
|
|
|
|
+ private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, CacheFileRegion fileRegion, long writeOffset) {
|
|
|
+ final RangeMissingHandler adjustedWriter;
|
|
|
+ if (writeOffset == 0) {
|
|
|
+ // no need to allocate a new capturing lambda if the offset isn't adjusted
|
|
|
+ adjustedWriter = writer;
|
|
|
+ } else {
|
|
|
+ adjustedWriter = (channel, channelPos, relativePos, len, progressUpdater) -> writer.fillCacheRange(
|
|
|
+ channel,
|
|
|
+ channelPos,
|
|
|
+ relativePos - writeOffset,
|
|
|
+ len,
|
|
|
+ progressUpdater
|
|
|
+ );
|
|
|
+ }
|
|
|
+ if (Assertions.ENABLED) {
|
|
|
+ return (channel, channelPos, relativePos, len, progressUpdater) -> {
|
|
|
+ assert assertValidRegionAndLength(fileRegion, channelPos, len);
|
|
|
+ adjustedWriter.fillCacheRange(channel, channelPos, relativePos, len, progressUpdater);
|
|
|
+ };
|
|
|
+ }
|
|
|
+ return adjustedWriter;
|
|
|
+ }
|
|
|
+
|
|
|
+ private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, CacheFileRegion fileRegion, long readOffset) {
|
|
|
+ final RangeAvailableHandler adjustedReader = (channel, channelPos, relativePos, len) -> reader.onRangeAvailable(
|
|
|
+ channel,
|
|
|
+ channelPos,
|
|
|
+ relativePos - readOffset,
|
|
|
+ len
|
|
|
+ );
|
|
|
+ if (Assertions.ENABLED) {
|
|
|
+ return (channel, channelPos, relativePos, len) -> {
|
|
|
+ assert assertValidRegionAndLength(fileRegion, channelPos, len);
|
|
|
+ return adjustedReader.onRangeAvailable(channel, channelPos, relativePos, len);
|
|
|
+ };
|
|
|
+ }
|
|
|
+ return adjustedReader;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean assertValidRegionAndLength(CacheFileRegion fileRegion, long channelPos, long len) {
|
|
|
+ assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion;
|
|
|
+ assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return "SharedCacheFile{" + "cacheKey=" + cacheKey + ", length=" + length + '}';
|