Browse Source

Remove the now-unused bulk executor from SharedBlobCacheService (#107713)

Relates #107626
David Turner 1 year ago
parent
commit
8947b83e29

+ 0 - 67
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

@@ -298,10 +298,6 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
     // executor to run reading from the blobstore on
     private final Executor ioExecutor;
 
-    // executor to run bulk reading from the blobstore on
-    @Deprecated(forRemoval = true)
-    private final Executor bulkIOExecutor;
-
     private final SharedBytes sharedBytes;
     private final long cacheSize;
     private final int regionSize;
@@ -333,21 +329,9 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         ThreadPool threadPool,
         String ioExecutor,
         BlobCacheMetrics blobCacheMetrics
-    ) {
-        this(environment, settings, threadPool, ioExecutor, ioExecutor, blobCacheMetrics);
-    }
-
-    public SharedBlobCacheService(
-        NodeEnvironment environment,
-        Settings settings,
-        ThreadPool threadPool,
-        String ioExecutor,
-        String bulkExecutor,
-        BlobCacheMetrics blobCacheMetrics
     ) {
         this.threadPool = threadPool;
         this.ioExecutor = threadPool.executor(ioExecutor);
-        this.bulkIOExecutor = threadPool.executor(bulkExecutor);
         long totalFsSize;
         try {
             totalFsSize = FsProbe.getTotal(Environment.getFileStore(environment.nodeDataPaths()[0]));
@@ -469,28 +453,6 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         return cache.get(cacheKey, fileLength, region).chunk;
     }
 
-    /**
-     * Fetch and cache the full blob for the given cache entry from the remote repository if there
-     * are enough free pages in the cache to do so.
-     *
-     * This method returns as soon as the download tasks are instantiated, but the tasks themselves
-     * are run on the bulk executor.
-     *
-     * If an exception is thrown from the writer then the cache entry being downloaded is freed
-     * and unlinked
-     *
-     * @param cacheKey  the key to fetch data for
-     * @param length    the length of the blob to fetch
-     * @param writer    a writer that handles writing of newly downloaded data to the shared cache
-     * @param listener  listener that is called once all downloading has finished
-     *
-     * @return {@code true} if there were enough free pages to start downloading the full entry
-     */
-    @Deprecated(forRemoval = true)
-    public boolean maybeFetchFullEntry(KeyType cacheKey, long length, RangeMissingHandler writer, ActionListener<Void> listener) {
-        return maybeFetchFullEntry(cacheKey, length, writer, bulkIOExecutor, listener);
-    }
-
     /**
      * Fetch and cache the full blob for the given cache entry from the remote repository if there
      * are enough free pages in the cache to do so.
@@ -560,35 +522,6 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         return true;
     }
 
-    /**
-     * Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
-     *
-     * This method returns as soon as the download tasks are instantiated, but the tasks themselves
-     * are run on the bulk executor.
-     *
-     * If an exception is thrown from the writer then the cache entry being downloaded is freed
-     * and unlinked
-     *
-     * @param cacheKey  the key to fetch data for
-     * @param region    the region of the blob to fetch
-     * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
-     * @param writer    a writer that handles writing of newly downloaded data to the shared cache
-     * @param listener  a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in which
-     *                  case the data is available in cache. The listener is completed with {@code false} in every other cases: if the
-     *                  region to write is already available in cache, if the region is pending fetching via another thread or if there is
-     *                  not enough free pages to fetch the region.
-     */
-    @Deprecated(forRemoval = true)
-    public void maybeFetchRegion(
-        final KeyType cacheKey,
-        final int region,
-        final long blobLength,
-        final RangeMissingHandler writer,
-        final ActionListener<Boolean> listener
-    ) {
-        maybeFetchRegion(cacheKey, region, blobLength, writer, bulkIOExecutor, listener);
-    }
-
     /**
      * Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
      * <p>

+ 15 - 50
x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

@@ -39,11 +39,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -513,24 +511,15 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
             .put("path.home", createTempDir())
             .build();
 
-        AtomicInteger bulkTaskCount = new AtomicInteger(0);
-        ThreadPool threadPool = new TestThreadPool("test") {
+        final var bulkTaskCount = new AtomicInteger(0);
+        final var threadPool = new TestThreadPool("test");
+        final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) {
             @Override
-            public ExecutorService executor(String name) {
-                ExecutorService generic = super.executor(Names.GENERIC);
-                if (Objects.equals(name, "bulk")) {
-                    return new StoppableExecutorServiceWrapper(generic) {
-                        @Override
-                        public void execute(Runnable command) {
-                            super.execute(command);
-                            bulkTaskCount.incrementAndGet();
-                        }
-                    };
-                }
-                return generic;
+            public void execute(Runnable command) {
+                super.execute(command);
+                bulkTaskCount.incrementAndGet();
             }
         };
-        final var bulkExecutor = threadPool.executor("bulk");
 
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
@@ -539,7 +528,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 settings,
                 threadPool,
                 ThreadPool.Names.GENERIC,
-                "bulk",
                 BlobCacheMetrics.NOOP
             )
         ) {
@@ -582,17 +570,8 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
             .put("path.home", createTempDir())
             .build();
 
-        ThreadPool threadPool = new TestThreadPool("test") {
-            @Override
-            public ExecutorService executor(String name) {
-                ExecutorService generic = super.executor(Names.GENERIC);
-                if (Objects.equals(name, "bulk")) {
-                    return new StoppableExecutorServiceWrapper(generic);
-                }
-                return generic;
-            }
-        };
-        final var bulkExecutor = threadPool.executor("bulk");
+        final var threadPool = new TestThreadPool("test");
+        final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic());
 
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
@@ -601,7 +580,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 settings,
                 threadPool,
                 ThreadPool.Names.GENERIC,
-                "bulk",
                 BlobCacheMetrics.NOOP
             )
         ) {
@@ -842,7 +820,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
             .put("path.home", createTempDir())
             .build();
 
-        final AtomicLong relativeTimeInMillis = new AtomicLong(0L);
         final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
@@ -851,7 +828,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 settings,
                 taskQueue.getThreadPool(),
                 ThreadPool.Names.GENERIC,
-                "bulk",
                 BlobCacheMetrics.NOOP
             )
         ) {
@@ -930,24 +906,16 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
             .put("path.home", createTempDir())
             .build();
 
-        AtomicInteger bulkTaskCount = new AtomicInteger(0);
-        ThreadPool threadPool = new TestThreadPool("test") {
+        final var bulkTaskCount = new AtomicInteger(0);
+        final var threadPool = new TestThreadPool("test");
+        final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) {
             @Override
-            public ExecutorService executor(String name) {
-                ExecutorService generic = super.executor(Names.GENERIC);
-                if (Objects.equals(name, "bulk")) {
-                    return new StoppableExecutorServiceWrapper(generic) {
-                        @Override
-                        public void execute(Runnable command) {
-                            super.execute(command);
-                            bulkTaskCount.incrementAndGet();
-                        }
-                    };
-                }
-                return generic;
+            public void execute(Runnable command) {
+                super.execute(command);
+                bulkTaskCount.incrementAndGet();
             }
         };
-        final var bulkExecutor = threadPool.executor("bulk");
+
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
             var cacheService = new SharedBlobCacheService<>(
@@ -955,7 +923,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 settings,
                 threadPool,
                 ThreadPool.Names.GENERIC,
-                "bulk",
                 BlobCacheMetrics.NOOP
             )
         ) {
@@ -1067,7 +1034,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 settings,
                 taskQueue.getThreadPool(),
                 ThreadPool.Names.GENERIC,
-                ThreadPool.Names.GENERIC,
                 BlobCacheMetrics.NOOP
             )
         ) {
@@ -1160,7 +1126,6 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 settings,
                 taskQueue.getThreadPool(),
                 ThreadPool.Names.GENERIC,
-                ThreadPool.Names.GENERIC,
                 BlobCacheMetrics.NOOP
             ) {
                 @Override