Browse Source

Reduce overhead in blob cache service get (#96399)

Avoid the use of KeyedLock, which has a high overhead for uncontended locks.
Reduce granularity of lock during #get to the actual region.

Relates #96372
Henning Andersen 2 years ago
parent
commit
856a244286

+ 5 - 0
docs/changelog/96399.yaml

@@ -0,0 +1,5 @@
+pr: 96399
+summary: Reduce overhead in blob cache service get
+area: Snapshot/Restore
+type: enhancement
+issues: []

+ 65 - 45
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

@@ -26,7 +26,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.RelativeByteSizeValue;
 import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.common.util.concurrent.KeyedLock;
 import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.Releasable;
@@ -242,8 +241,6 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
     private final ConcurrentHashMap<RegionKey<KeyType>, Entry<CacheFileRegion>> keyMapping;
     private final ThreadPool threadPool;
 
-    private final KeyedLock<KeyType> keyedLock = new KeyedLock<>();
-
     private final SharedBytes sharedBytes;
     private final long cacheSize;
     private final long regionSize;
@@ -380,57 +377,80 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
     public CacheFileRegion get(KeyType cacheKey, long fileLength, int region) {
         final long effectiveRegionSize = getRegionSize(fileLength, region);
-        try (Releasable ignore = keyedLock.acquire(cacheKey)) {
-            final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
-            final long now = threadPool.relativeTimeInMillis();
-            final Entry<CacheFileRegion> entry = keyMapping.computeIfAbsent(
-                regionKey,
-                key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now)
-            );
-            if (entry.chunk.sharedBytesPos == -1) {
-                // new item
-                assert entry.freq == 0;
-                assert entry.prev == null;
-                assert entry.next == null;
-                final Integer freeSlot = freeRegions.poll();
-                if (freeSlot != null) {
-                    // no need to evict an item, just add
-                    entry.chunk.sharedBytesPos = freeSlot;
-                    assert regionOwners[freeSlot].compareAndSet(null, entry.chunk);
-                    synchronized (this) {
-                        pushEntryToBack(entry);
-                    }
-                } else {
-                    // need to evict something
-                    synchronized (this) {
-                        maybeEvict();
+        final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
+        final long now = threadPool.relativeTimeInMillis();
+        final Entry<CacheFileRegion> entry = keyMapping.computeIfAbsent(
+            regionKey,
+            key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now)
+        );
+        // sharedBytesPos is volatile, double locking is fine, as long as we assign it last.
+        if (entry.chunk.sharedBytesPos == -1) {
+            synchronized (entry.chunk) {
+                if (entry.chunk.sharedBytesPos == -1) {
+                    if (keyMapping.get(regionKey) != entry) {
+                        throw new AlreadyClosedException("no free region found (contender)");
                     }
-                    final Integer freeSlotRetry = freeRegions.poll();
-                    if (freeSlotRetry != null) {
-                        entry.chunk.sharedBytesPos = freeSlotRetry;
-                        assert regionOwners[freeSlotRetry].compareAndSet(null, entry.chunk);
+                    // new item
+                    assert entry.freq == 0;
+                    assert entry.prev == null;
+                    assert entry.next == null;
+                    final Integer freeSlot = freeRegions.poll();
+                    if (freeSlot != null) {
+                        // no need to evict an item, just add
+                        assert regionOwners[freeSlot].compareAndSet(null, entry.chunk);
                         synchronized (this) {
                             pushEntryToBack(entry);
+                            // assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
+                            entry.chunk.sharedBytesPos = freeSlot;
                         }
                     } else {
-                        boolean removed = keyMapping.remove(regionKey, entry);
-                        assert removed;
-                        throw new AlreadyClosedException("no free region found");
-                    }
-                }
-            } else {
-                // check if we need to promote item
-                synchronized (this) {
-                    if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq) {
-                        unlink(entry);
-                        entry.freq++;
-                        entry.lastAccessed = now;
-                        pushEntryToBack(entry);
+                        // need to evict something
+                        synchronized (this) {
+                            maybeEvict();
+                        }
+                        final Integer freeSlotRetry = freeRegions.poll();
+                        if (freeSlotRetry != null) {
+                            assert regionOwners[freeSlotRetry].compareAndSet(null, entry.chunk);
+                            synchronized (this) {
+                                pushEntryToBack(entry);
+                                // assign sharedBytesPos only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
+                                entry.chunk.sharedBytesPos = freeSlotRetry;
+                            }
+                        } else {
+                            boolean removed = keyMapping.remove(regionKey, entry);
+                            assert removed;
+                            throw new AlreadyClosedException("no free region found");
+                        }
                     }
+
+                    return entry.chunk;
                 }
             }
-            return entry.chunk;
         }
+        assertChunkActiveOrEvicted(entry);
+
+        // existing item, check if we need to promote item
+        synchronized (this) {
+            if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq) {
+                unlink(entry);
+                entry.freq++;
+                entry.lastAccessed = now;
+                pushEntryToBack(entry);
+            }
+        }
+
+        return entry.chunk;
+    }
+
+    private void assertChunkActiveOrEvicted(Entry<CacheFileRegion> entry) {
+        if (Assertions.ENABLED) {
+            synchronized (this) {
+                // assert linked (or evicted)
+                assert entry.prev != null || entry.chunk.isEvicted();
+
+            }
+        }
+        assert regionOwners[entry.chunk.sharedBytesPos].get() == entry.chunk || entry.chunk.isEvicted();
     }
 
     public void onClose(CacheFileRegion chunk) {

+ 73 - 0
x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.blobcache.shared;
 
+import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.blobcache.common.ByteRange;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -24,13 +25,17 @@ import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.env.TestEnvironment;
 import org.elasticsearch.node.NodeRoleSettings;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
 import static org.hamcrest.Matchers.equalTo;
@@ -212,6 +217,74 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         }
     }
 
+    /**
+     * Exercise SharedBlobCacheService#get in multiple threads to trigger any assertion errors.
+     * @throws IOException
+     */
+    public void testGetMultiThreaded() throws IOException {
+        int threads = between(2, 10);
+        Settings settings = Settings.builder()
+            .put(NODE_NAME_SETTING.getKey(), "node")
+            .put(
+                SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(),
+                ByteSizeValue.ofBytes(size(between(1, 20) * 100L)).getStringRep()
+            )
+            .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
+            .put("path.home", createTempDir())
+            .build();
+        long fileLength = size(500);
+        ThreadPool threadPool = new TestThreadPool("testGetMultiThreaded");
+        Set<String> files = randomSet(1, 10, () -> randomAlphaOfLength(5));
+        try (
+            NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
+            var cacheService = new SharedBlobCacheService<String>(environment, settings, threadPool)
+        ) {
+            CyclicBarrier ready = new CyclicBarrier(threads);
+            List<Thread> threadList = IntStream.range(0, threads).mapToObj(no -> {
+                int iterations = between(100, 500);
+                String[] cacheKeys = IntStream.range(0, iterations).mapToObj(ignore -> randomFrom(files)).toArray(String[]::new);
+                int[] regions = IntStream.range(0, iterations).map(ignore -> between(0, 4)).toArray();
+                int[] yield = IntStream.range(0, iterations).map(ignore -> between(0, 9)).toArray();
+                return new Thread(() -> {
+                    try {
+                        ready.await();
+                        for (int i = 0; i < iterations; ++i) {
+                            try {
+                                SharedBlobCacheService<String>.CacheFileRegion cacheFileRegion = cacheService.get(
+                                    cacheKeys[i],
+                                    fileLength,
+                                    regions[i]
+                                );
+                                if (cacheFileRegion.tryIncRef()) {
+                                    if (yield[i] == 0) {
+                                        Thread.yield();
+                                    }
+                                    cacheFileRegion.decRef();
+                                }
+                            } catch (AlreadyClosedException e) {
+                                // ignore
+                            }
+                        }
+                    } catch (InterruptedException | BrokenBarrierException e) {
+                        assert false;
+                        throw new RuntimeException(e);
+                    }
+                });
+            }).toList();
+            threadList.forEach(Thread::start);
+            threadList.forEach(thread -> {
+                try {
+                    thread.join();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                }
+            });
+        } finally {
+            threadPool.shutdownNow();
+        }
+    }
+
     public void testCacheSizeRejectedOnNonFrozenNodes() {
         String cacheSize = randomBoolean()
             ? ByteSizeValue.ofBytes(size(500)).getStringRep()