Bläddra i källkod

Minor Adjustments to make SharedBlobCacheService more generic (#93234)

Some naming adjustments, the ability to set up the cache on a non-frozen
search node and a small API adjustment that makes it so a `ThreadPool`
is not a dependency for using the cache.
Armin Braun 2 år sedan
förälder
incheckning
fa52364462

+ 11 - 12
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

@@ -53,7 +53,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Consumer;
-import java.util.function.LongSupplier;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -130,7 +129,8 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
                 if (value.isNonZeroSize()) {
                     @SuppressWarnings("unchecked")
                     final List<DiscoveryNodeRole> roles = (List<DiscoveryNodeRole>) settings.get(NodeRoleSettings.NODE_ROLES_SETTING);
-                    if (DataTier.isFrozenNode(Set.of(roles.toArray(DiscoveryNodeRole[]::new))) == false) {
+                    final var rolesSet = Set.copyOf(roles);
+                    if (DataTier.isFrozenNode(rolesSet) == false && rolesSet.contains(DiscoveryNodeRole.SEARCH_ROLE) == false) {
                         throw new SettingsException(
                             "setting [{}] to be positive [{}] is only permitted on nodes with the data_frozen role, roles are [{}]",
                             SHARED_CACHE_SETTINGS_PREFIX + "size",
@@ -229,8 +229,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
     private static final Logger logger = LogManager.getLogger(SharedBlobCacheService.class);
 
     private final ConcurrentHashMap<RegionKey<KeyType>, Entry<CacheFileRegion>> keyMapping;
-
-    private final LongSupplier currentTimeSupplier;
+    private final ThreadPool threadPool;
 
     private final KeyedLock<KeyType> keyedLock = new KeyedLock<>();
 
@@ -260,7 +259,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public SharedBlobCacheService(NodeEnvironment environment, Settings settings, ThreadPool threadPool) {
-        this.currentTimeSupplier = threadPool::relativeTimeInMillis;
+        this.threadPool = threadPool;
         long totalFsSize;
         try {
             totalFsSize = FsProbe.getTotal(Environment.getFileStore(environment.nodeDataPaths()[0]));
@@ -372,7 +371,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         final long effectiveRegionSize = getRegionSize(fileLength, region);
         try (Releasable ignore = keyedLock.acquire(cacheKey)) {
             final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
-            final long now = currentTimeSupplier.getAsLong();
+            final long now = threadPool.relativeTimeInMillis();
             final Entry<CacheFileRegion> entry = keyMapping.computeIfAbsent(
                 regionKey,
                 key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now)
@@ -543,7 +542,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
     private void computeDecay() {
         synchronized (this) {
-            long now = currentTimeSupplier.getAsLong();
+            long now = threadPool.relativeTimeInMillis();
             for (int i = 0; i < maxFreq; i++) {
                 for (Entry<CacheFileRegion> entry = freqs[i]; entry != null; entry = entry.next) {
                     if (now - entry.lastAccessed >= 2 * minTimeDelta) {
@@ -616,7 +615,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
         @Override
         public String toString() {
-            return "frozen_cache_decay_task";
+            return "shared_cache_decay_task";
         }
     }
 
@@ -830,7 +829,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             final ByteRange rangeToRead,
             final RangeAvailableHandler reader,
             final RangeMissingHandler writer,
-            final Executor executor
+            final String executor
         ) throws Exception {
             StepListener<Integer> stepListener = null;
             final long writeStart = rangeToWrite.start();
@@ -863,7 +862,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
                         assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
                         writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, len, progressUpdater);
                     },
-                    executor
+                    threadPool.executor(executor)
                 );
                 assert lis != null;
                 if (stepListener == null) {
@@ -878,11 +877,11 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
         @Override
         public String toString() {
-            return "FrozenCacheFile{" + "cacheKey=" + cacheKey + ", length=" + length + '}';
+            return "SharedCacheFile{" + "cacheKey=" + cacheKey + ", length=" + length + '}';
         }
     }
 
-    public CacheFile getFrozenCacheFile(KeyType cacheKey, long length) {
+    public CacheFile getCacheFile(KeyType cacheKey, long length) {
         return new CacheFile(cacheKey, length);
     }
 

+ 1 - 1
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java

@@ -712,7 +712,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
     }
 
     public SharedBlobCacheService<CacheKey>.CacheFile getFrozenCacheFile(String fileName, long length) {
-        return sharedBlobCacheService.getFrozenCacheFile(createCacheKey(fileName), length);
+        return sharedBlobCacheService.getCacheFile(createCacheKey(fileName), length);
     }
 
     private static Repository repositoryByUuid(Map<String, Repository> repositories, String repositoryUuid, String originalName) {

+ 1 - 1
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java

@@ -160,7 +160,7 @@ public class FrozenIndexInput extends MetadataCachingIndexInput {
                         writeCacheFile(channel, input, channelPos, relativePos, len, progressUpdater, startTimeNanos);
                     }
                 },
-                directory.cacheFetchAsyncExecutor()
+                SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
             );
             assert bytesRead == length : bytesRead + " vs " + length;
             assert luceneByteBufLock.getReadHoldCount() == 0;