|
@@ -54,7 +54,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
-import java.util.concurrent.atomic.AtomicReferenceArray;
|
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
|
import java.util.function.LongConsumer;
|
|
|
import java.util.function.Predicate;
|
|
@@ -266,12 +265,12 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
private final ByteSizeValue recoveryRangeSize;
|
|
|
|
|
|
private final int numRegions;
|
|
|
- private final ConcurrentLinkedQueue<Integer> freeRegions = new ConcurrentLinkedQueue<>();
|
|
|
+ private final ConcurrentLinkedQueue<SharedBytes.IO> freeRegions = new ConcurrentLinkedQueue<>();
|
|
|
private final Entry<CacheFileRegion>[] freqs;
|
|
|
private final int maxFreq;
|
|
|
private final long minTimeDelta;
|
|
|
|
|
|
- private final AtomicReferenceArray<CacheFileRegion> regionOwners; // to assert exclusive access of regions
|
|
|
+ private final ConcurrentHashMap<SharedBytes.IO, CacheFileRegion> regionOwners; // to assert exclusive access of regions
|
|
|
|
|
|
private final CacheDecayTask decayTask;
|
|
|
|
|
@@ -298,13 +297,10 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
this.numRegions = Math.toIntExact(cacheSize / regionSize);
|
|
|
keyMapping = new ConcurrentHashMap<>();
|
|
|
if (Assertions.ENABLED) {
|
|
|
- regionOwners = new AtomicReferenceArray<>(numRegions);
|
|
|
+ regionOwners = new ConcurrentHashMap<>();
|
|
|
} else {
|
|
|
regionOwners = null;
|
|
|
}
|
|
|
- for (int i = 0; i < numRegions; i++) {
|
|
|
- freeRegions.add(i);
|
|
|
- }
|
|
|
this.regionSize = regionSize;
|
|
|
assert regionSize > 0L;
|
|
|
this.maxFreq = SHARED_CACHE_MAX_FREQ_SETTING.get(settings);
|
|
@@ -322,6 +318,9 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
} catch (IOException e) {
|
|
|
throw new UncheckedIOException(e);
|
|
|
}
|
|
|
+ for (int i = 0; i < numRegions; i++) {
|
|
|
+ freeRegions.add(sharedBytes.getFileChannel(i));
|
|
|
+ }
|
|
|
decayTask = new CacheDecayTask(threadPool, SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings));
|
|
|
decayTask.rescheduleIfNecessary();
|
|
|
this.rangeSize = SHARED_CACHE_RANGE_SIZE_SETTING.get(settings);
|
|
@@ -404,10 +403,10 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
final long effectiveRegionSize = getRegionSize(fileLength, region);
|
|
|
entry = keyMapping.computeIfAbsent(regionKey, key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now));
|
|
|
}
|
|
|
- // sharedBytesPos is volatile, double locking is fine, as long as we assign it last.
|
|
|
- if (entry.chunk.sharedBytesPos == -1) {
|
|
|
+ // io is volatile, double locking is fine, as long as we assign it last.
|
|
|
+ if (entry.chunk.io == null) {
|
|
|
synchronized (entry.chunk) {
|
|
|
- if (entry.chunk.sharedBytesPos == -1) {
|
|
|
+ if (entry.chunk.io == null) {
|
|
|
return initChunk(entry);
|
|
|
}
|
|
|
}
|
|
@@ -432,7 +431,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
assert entry.freq == 0;
|
|
|
assert entry.prev == null;
|
|
|
assert entry.next == null;
|
|
|
- final Integer freeSlot = freeRegions.poll();
|
|
|
+ final SharedBytes.IO freeSlot = freeRegions.poll();
|
|
|
if (freeSlot != null) {
|
|
|
// no need to evict an item, just add
|
|
|
assignToSlot(entry, freeSlot);
|
|
@@ -441,7 +440,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
synchronized (this) {
|
|
|
maybeEvict();
|
|
|
}
|
|
|
- final Integer freeSlotRetry = freeRegions.poll();
|
|
|
+ final SharedBytes.IO freeSlotRetry = freeRegions.poll();
|
|
|
if (freeSlotRetry != null) {
|
|
|
assignToSlot(entry, freeSlotRetry);
|
|
|
} else {
|
|
@@ -465,18 +464,18 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void assignToSlot(Entry<CacheFileRegion> entry, int freeSlot) {
|
|
|
- assert regionOwners.compareAndSet(freeSlot, null, entry.chunk);
|
|
|
+ private void assignToSlot(Entry<CacheFileRegion> entry, SharedBytes.IO freeSlot) {
|
|
|
+ assert regionOwners.put(freeSlot, entry.chunk) == null;
|
|
|
synchronized (this) {
|
|
|
if (entry.chunk.isEvicted()) {
|
|
|
- assert regionOwners.compareAndSet(freeSlot, entry.chunk, null);
|
|
|
+ assert regionOwners.remove(freeSlot) == entry.chunk;
|
|
|
freeRegions.add(freeSlot);
|
|
|
keyMapping.remove(entry.chunk.regionKey, entry);
|
|
|
throwAlreadyClosed("evicted during free region allocation");
|
|
|
}
|
|
|
pushEntryToBack(entry);
|
|
|
- // assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
|
|
|
- entry.chunk.sharedBytesPos = freeSlot;
|
|
|
+ // assign io only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
|
|
|
+ entry.chunk.io = freeSlot;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -490,7 +489,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
assert entry.prev != null || entry.chunk.isEvicted();
|
|
|
|
|
|
}
|
|
|
- assert regionOwners.get(entry.chunk.sharedBytesPos) == entry.chunk || entry.chunk.isEvicted();
|
|
|
+ assert regionOwners.get(entry.chunk.io) == entry.chunk || entry.chunk.isEvicted();
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -546,7 +545,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
for (int i = 0; i < maxFreq; i++) {
|
|
|
for (Entry<CacheFileRegion> entry = freqs[i]; entry != null; entry = entry.next) {
|
|
|
boolean evicted = entry.chunk.tryEvict();
|
|
|
- if (evicted && entry.chunk.sharedBytesPos != -1) {
|
|
|
+ if (evicted && entry.chunk.io != null) {
|
|
|
unlink(entry);
|
|
|
keyMapping.remove(entry.chunk.regionKey, entry);
|
|
|
return;
|
|
@@ -644,7 +643,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
synchronized (this) {
|
|
|
for (Entry<CacheFileRegion> entry : matchingEntries) {
|
|
|
boolean evicted = entry.chunk.forceEvict();
|
|
|
- if (evicted && entry.chunk.sharedBytesPos != -1) {
|
|
|
+ if (evicted && entry.chunk.io != null) {
|
|
|
unlink(entry);
|
|
|
keyMapping.remove(entry.chunk.regionKey, entry);
|
|
|
evictedCount++;
|
|
@@ -760,7 +759,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
|
|
|
final RegionKey<KeyType> regionKey;
|
|
|
final SparseFileTracker tracker;
|
|
|
- volatile int sharedBytesPos = -1;
|
|
|
+ volatile SharedBytes.IO io = null;
|
|
|
|
|
|
CacheFileRegion(RegionKey<KeyType> regionKey, long regionSize) {
|
|
|
this.regionKey = regionKey;
|
|
@@ -769,11 +768,12 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
}
|
|
|
|
|
|
public long physicalStartOffset() {
|
|
|
- return sharedBytes.getPhysicalOffset(sharedBytesPos);
|
|
|
+ var ioRef = io;
|
|
|
+ return ioRef == null ? -1L : ioRef.pageStart();
|
|
|
}
|
|
|
|
|
|
public long physicalEndOffset() {
|
|
|
- return sharedBytes.getPhysicalOffset(sharedBytesPos + 1);
|
|
|
+ return physicalStartOffset() + sharedBytes.regionSize;
|
|
|
}
|
|
|
|
|
|
// tries to evict this chunk if noone is holding onto its resources anymore
|
|
@@ -803,10 +803,10 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
@Override
|
|
|
protected void closeInternal() {
|
|
|
// now actually free the region associated with this chunk
|
|
|
- // we held the "this" lock when this was evicted, hence if sharedBytesPos is not filled in, chunk will never be registered.
|
|
|
- if (sharedBytesPos != -1) {
|
|
|
- assert regionOwners.compareAndSet(sharedBytesPos, this, null);
|
|
|
- freeRegions.add(sharedBytesPos);
|
|
|
+ // we held the "this" lock when this was evicted, hence if io is not filled in, chunk will never be registered.
|
|
|
+ if (io != null) {
|
|
|
+ assert regionOwners.remove(io) == this;
|
|
|
+ freeRegions.add(io);
|
|
|
}
|
|
|
logger.trace("closed {} with channel offset {}", regionKey, physicalStartOffset());
|
|
|
}
|
|
@@ -823,7 +823,8 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
|
|
|
boolean tryRead(ByteBuffer buf, long offset) throws IOException {
|
|
|
int startingPos = buf.position();
|
|
|
- sharedBytes.getFileChannel(sharedBytesPos).read(buf, physicalStartOffset() + getRegionRelativePosition(offset));
|
|
|
+ var ioRef = io;
|
|
|
+ ioRef.read(buf, ioRef.pageStart() + getRegionRelativePosition(offset));
|
|
|
if (isEvicted()) {
|
|
|
buf.position(startingPos);
|
|
|
return false;
|
|
@@ -848,10 +849,11 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
rangeToWrite,
|
|
|
rangeToRead,
|
|
|
ActionListener.runBefore(listener, resource::close).delegateFailureAndWrap((l, success) -> {
|
|
|
- final long physicalStartOffset = physicalStartOffset();
|
|
|
- assert regionOwners.get(sharedBytesPos) == this;
|
|
|
+ var ioRef = io;
|
|
|
+ final long physicalStartOffset = ioRef.pageStart();
|
|
|
+ assert regionOwners.get(ioRef) == this;
|
|
|
final int read = reader.onRangeAvailable(
|
|
|
- sharedBytes.getFileChannel(sharedBytesPos),
|
|
|
+ ioRef,
|
|
|
physicalStartOffset + rangeToRead.start(),
|
|
|
rangeToRead.start(),
|
|
|
rangeToRead.length()
|
|
@@ -878,7 +880,6 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
}
|
|
|
|
|
|
private void fillGaps(RangeMissingHandler writer, List<SparseFileTracker.Gap> gaps) {
|
|
|
- SharedBytes.IO fileChannel = sharedBytes.getFileChannel(sharedBytesPos);
|
|
|
for (SparseFileTracker.Gap gap : gaps) {
|
|
|
ioExecutor.execute(new AbstractRunnable() {
|
|
|
|
|
@@ -887,10 +888,11 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
assert CacheFileRegion.this.hasReferences();
|
|
|
ensureOpen();
|
|
|
final long start = gap.start();
|
|
|
- assert regionOwners.get(sharedBytesPos) == CacheFileRegion.this;
|
|
|
+ var ioRef = io;
|
|
|
+ assert regionOwners.get(ioRef) == CacheFileRegion.this;
|
|
|
writer.fillCacheRange(
|
|
|
- fileChannel,
|
|
|
- physicalStartOffset() + start,
|
|
|
+ ioRef,
|
|
|
+ ioRef.pageStart() + start,
|
|
|
start,
|
|
|
gap.end() - start,
|
|
|
progress -> gap.onProgress(start + progress)
|
|
@@ -1083,7 +1085,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
}
|
|
|
|
|
|
private boolean assertValidRegionAndLength(CacheFileRegion fileRegion, long channelPos, long len) {
|
|
|
- assert regionOwners.get(fileRegion.sharedBytesPos) == fileRegion;
|
|
|
+ assert regionOwners.get(fileRegion.io) == fileRegion;
|
|
|
assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
|
|
|
return true;
|
|
|
}
|