|
@@ -397,12 +397,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
final Integer freeSlot = freeRegions.poll();
|
|
|
if (freeSlot != null) {
|
|
|
// no need to evict an item, just add
|
|
|
- assert regionOwners[freeSlot].compareAndSet(null, entry.chunk);
|
|
|
- synchronized (this) {
|
|
|
- pushEntryToBack(entry);
|
|
|
- // assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
|
|
|
- entry.chunk.sharedBytesPos = freeSlot;
|
|
|
- }
|
|
|
+ assignToSlot(entry, freeSlot);
|
|
|
} else {
|
|
|
// need to evict something
|
|
|
synchronized (this) {
|
|
@@ -410,12 +405,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
}
|
|
|
final Integer freeSlotRetry = freeRegions.poll();
|
|
|
if (freeSlotRetry != null) {
|
|
|
- assert regionOwners[freeSlotRetry].compareAndSet(null, entry.chunk);
|
|
|
- synchronized (this) {
|
|
|
- pushEntryToBack(entry);
|
|
|
- // assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
|
|
|
- entry.chunk.sharedBytesPos = freeSlotRetry;
|
|
|
- }
|
|
|
+ assignToSlot(entry, freeSlotRetry);
|
|
|
} else {
|
|
|
boolean removed = keyMapping.remove(regionKey, entry);
|
|
|
assert removed;
|
|
@@ -431,7 +421,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
|
|
|
// existing item, check if we need to promote item
|
|
|
synchronized (this) {
|
|
|
- if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq) {
|
|
|
+ if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq && entry.chunk.isEvicted() == false) {
|
|
|
unlink(entry);
|
|
|
entry.freq++;
|
|
|
entry.lastAccessed = now;
|
|
@@ -442,6 +432,21 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
return entry.chunk;
|
|
|
}
|
|
|
|
|
|
+ private void assignToSlot(Entry<CacheFileRegion> entry, int freeSlot) {
|
|
|
+ assert regionOwners[freeSlot].compareAndSet(null, entry.chunk);
|
|
|
+ synchronized (this) {
|
|
|
+ if (entry.chunk.isEvicted()) {
|
|
|
+ assert regionOwners[freeSlot].compareAndSet(entry.chunk, null);
|
|
|
+ freeRegions.add(freeSlot);
|
|
|
+ keyMapping.remove(entry.chunk.regionKey, entry);
|
|
|
+ throw new AlreadyClosedException("evicted during free region allocation");
|
|
|
+ }
|
|
|
+ pushEntryToBack(entry);
|
|
|
+ // assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
|
|
|
+ entry.chunk.sharedBytesPos = freeSlot;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void assertChunkActiveOrEvicted(Entry<CacheFileRegion> entry) {
|
|
|
if (Assertions.ENABLED) {
|
|
|
synchronized (this) {
|
|
@@ -454,8 +459,11 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
}
|
|
|
|
|
|
public void onClose(CacheFileRegion chunk) {
|
|
|
- assert regionOwners[chunk.sharedBytesPos].compareAndSet(chunk, null);
|
|
|
- freeRegions.add(chunk.sharedBytesPos);
|
|
|
+ // we held the "this" lock when this was evicted, hence if sharedBytesPos is not filled in, chunk will never be registered.
|
|
|
+ if (chunk.sharedBytesPos != -1) {
|
|
|
+ assert regionOwners[chunk.sharedBytesPos].compareAndSet(chunk, null);
|
|
|
+ freeRegions.add(chunk.sharedBytesPos);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// used by tests
|
|
@@ -510,7 +518,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
for (int i = 0; i < maxFreq; i++) {
|
|
|
for (Entry<CacheFileRegion> entry = freqs[i]; entry != null; entry = entry.next) {
|
|
|
boolean evicted = entry.chunk.tryEvict();
|
|
|
- if (evicted) {
|
|
|
+ if (evicted && entry.chunk.sharedBytesPos != -1) {
|
|
|
unlink(entry);
|
|
|
keyMapping.remove(entry.chunk.regionKey, entry);
|
|
|
return;
|
|
@@ -603,7 +611,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
synchronized (this) {
|
|
|
for (Entry<CacheFileRegion> entry : matchingEntries) {
|
|
|
boolean evicted = entry.chunk.forceEvict();
|
|
|
- if (evicted) {
|
|
|
+ if (evicted && entry.chunk.sharedBytesPos != -1) {
|
|
|
unlink(entry);
|
|
|
keyMapping.remove(entry.chunk.regionKey, entry);
|
|
|
}
|
|
@@ -693,7 +701,9 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
private final AtomicBoolean evicted = new AtomicBoolean(false);
|
|
|
|
|
|
// tries to evict this chunk if noone is holding onto its resources anymore
|
|
|
- public boolean tryEvict() {
|
|
|
+ // visible for tests.
|
|
|
+ boolean tryEvict() {
|
|
|
+ assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
|
|
|
if (refCount() <= 1 && evicted.compareAndSet(false, true)) {
|
|
|
logger.trace("evicted {} with channel offset {}", regionKey, physicalStartOffset());
|
|
|
evictCount.increment();
|
|
@@ -704,6 +714,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
|
|
|
}
|
|
|
|
|
|
public boolean forceEvict() {
|
|
|
+ assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
|
|
|
if (evicted.compareAndSet(false, true)) {
|
|
|
logger.trace("force evicted {} with channel offset {}", regionKey, physicalStartOffset());
|
|
|
evictCount.increment();
|