浏览代码

Migrate AbstractAsyncTask away from name-based executor (#99337)

Replaces the executor name returned from a protected method with a
proper executor passed into the constructor, and removes the default
implementation that dangerously defaults to running the task on the
scheduler thread so that callers have to make a conscious choice of
executor.

Relates #99027 and friends
David Turner 2 年之前
父节点
当前提交
6b85f740cd

+ 5 - 10
server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java

@@ -14,6 +14,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.Closeable;
 import java.util.Objects;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.elasticsearch.core.Strings.format;
@@ -25,6 +26,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
 
     private final Logger logger;
     private final ThreadPool threadPool;
+    private final Executor executor;
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final boolean autoReschedule;
     private volatile Scheduler.Cancellable cancellable;
@@ -32,9 +34,10 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
     private volatile Exception lastThrownException;
     private volatile TimeValue interval;
 
-    protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) {
+    protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, Executor executor, TimeValue interval, boolean autoReschedule) {
         this.logger = logger;
         this.threadPool = threadPool;
+        this.executor = executor;
         this.interval = interval;
         this.autoReschedule = autoReschedule;
     }
@@ -81,7 +84,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
             if (logger.isTraceEnabled()) {
                 logger.trace("scheduling {} every {}", toString(), interval);
             }
-            cancellable = threadPool.schedule(this, interval, getThreadPool());
+            cancellable = threadPool.schedule(this, interval, executor);
             isScheduledOrRunning = true;
         } else {
             logger.trace("scheduled {} disabled", toString());
@@ -167,12 +170,4 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
     }
 
     protected abstract void runInternal();
-
-    /**
-     * Use the same threadpool by default.
-     * Derived classes can change this if required.
-     */
-    protected String getThreadPool() {
-        return ThreadPool.Names.SAME;
-    }
 }

+ 24 - 31
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -98,6 +98,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BooleanSupplier;
@@ -1062,8 +1063,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
 
         protected final IndexService indexService;
 
-        BaseAsyncTask(final IndexService indexService, final TimeValue interval) {
-            super(indexService.logger, indexService.threadPool, interval, true);
+        BaseAsyncTask(final IndexService indexService, final Executor executor, final TimeValue interval) {
+            super(indexService.logger, indexService.threadPool, executor, interval, true);
             this.indexService = indexService;
             rescheduleIfNecessary();
         }
@@ -1082,12 +1083,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
     static final class AsyncTranslogFSync extends BaseAsyncTask {
 
         AsyncTranslogFSync(IndexService indexService) {
-            super(indexService, indexService.getIndexSettings().getTranslogSyncInterval());
-        }
-
-        @Override
-        protected String getThreadPool() {
-            return ThreadPool.Names.FLUSH;
+            super(
+                indexService,
+                indexService.threadPool.executor(ThreadPool.Names.FLUSH),
+                indexService.getIndexSettings().getTranslogSyncInterval()
+            );
         }
 
         @Override
@@ -1111,7 +1111,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
     static final class AsyncRefreshTask extends BaseAsyncTask {
 
         AsyncRefreshTask(IndexService indexService) {
-            super(indexService, indexService.getIndexSettings().getRefreshInterval());
+            super(
+                indexService,
+                indexService.threadPool.executor(ThreadPool.Names.REFRESH),
+                indexService.getIndexSettings().getRefreshInterval()
+            );
         }
 
         @Override
@@ -1119,11 +1123,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
             indexService.maybeRefreshEngine(false);
         }
 
-        @Override
-        protected String getThreadPool() {
-            return ThreadPool.Names.REFRESH;
-        }
-
         @Override
         public String toString() {
             return "refresh";
@@ -1135,6 +1134,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         AsyncTrimTranslogTask(IndexService indexService) {
             super(
                 indexService,
+                threadPool.generic(),
                 indexService.getIndexSettings()
                     .getSettings()
                     .getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10))
@@ -1151,11 +1151,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
             indexService.maybeTrimTranslog();
         }
 
-        @Override
-        protected String getThreadPool() {
-            return ThreadPool.Names.GENERIC;
-        }
-
         @Override
         public String toString() {
             return "trim_translog";
@@ -1187,7 +1182,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
 
         AsyncGlobalCheckpointTask(final IndexService indexService) {
             // index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests
-            super(indexService, GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
+            super(
+                indexService,
+                indexService.getThreadPool().generic(),
+                GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())
+            );
         }
 
         @Override
@@ -1195,11 +1194,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
             indexService.maybeSyncGlobalCheckpoints();
         }
 
-        @Override
-        protected String getThreadPool() {
-            return ThreadPool.Names.GENERIC;
-        }
-
         @Override
         public String toString() {
             return "global_checkpoint_sync";
@@ -1209,7 +1203,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
     private static final class AsyncRetentionLeaseSyncTask extends BaseAsyncTask {
 
         AsyncRetentionLeaseSyncTask(final IndexService indexService) {
-            super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
+            super(
+                indexService,
+                indexService.threadPool.executor(ThreadPool.Names.MANAGEMENT),
+                RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())
+            );
         }
 
         @Override
@@ -1217,11 +1215,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
             indexService.syncRetentionLeases();
         }
 
-        @Override
-        protected String getThreadPool() {
-            return ThreadPool.Names.MANAGEMENT;
-        }
-
         @Override
         public String toString() {
             return "retention_lease_sync";

+ 3 - 1
server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.core.TimeValue;
@@ -524,7 +525,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
     class PeriodicRechecker extends AbstractAsyncTask {
 
         PeriodicRechecker(TimeValue recheckInterval) {
-            super(logger, threadPool, recheckInterval, false);
+            super(logger, threadPool, EsExecutors.DIRECT_EXECUTOR_SERVICE, recheckInterval, false);
         }
 
         @Override
@@ -535,6 +536,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
         @Override
         public void runInternal() {
             if (clusterService.localNode().isMasterNode()) {
+                // TODO just run on the elected master?
                 final ClusterState state = clusterService.state();
                 logger.trace("periodic persistent task assignment check running for cluster state {}", state.getVersion());
                 if (isAnyTaskUnassigned(state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE))) {

+ 24 - 13
server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java

@@ -45,7 +45,7 @@ public class AbstractAsyncTaskTests extends ESTestCase {
         final CyclicBarrier barrier1 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence
         final CyclicBarrier barrier2 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence
         final AtomicInteger count = new AtomicInteger();
-        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), true) {
+        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, threadPool.generic(), TimeValue.timeValueMillis(1), true) {
 
             @Override
             protected boolean mustReschedule() {
@@ -71,10 +71,6 @@ public class AbstractAsyncTaskTests extends ESTestCase {
                 }
             }
 
-            @Override
-            protected String getThreadPool() {
-                return ThreadPool.Names.GENERIC;
-            }
         };
 
         assertFalse(task.isScheduled());
@@ -101,7 +97,7 @@ public class AbstractAsyncTaskTests extends ESTestCase {
         boolean shouldRunThrowException = randomBoolean();
         final CyclicBarrier barrier = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence
         final AtomicInteger count = new AtomicInteger();
-        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), false) {
+        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, threadPool.generic(), TimeValue.timeValueMillis(1), false) {
 
             @Override
             protected boolean mustReschedule() {
@@ -122,10 +118,6 @@ public class AbstractAsyncTaskTests extends ESTestCase {
                 }
             }
 
-            @Override
-            protected String getThreadPool() {
-                return ThreadPool.Names.GENERIC;
-            }
         };
 
         assertFalse(task.isScheduled());
@@ -148,7 +140,13 @@ public class AbstractAsyncTaskTests extends ESTestCase {
 
     public void testCloseWithNoRun() {
 
-        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMinutes(10), true) {
+        AbstractAsyncTask task = new AbstractAsyncTask(
+            logger,
+            threadPool,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE,
+            TimeValue.timeValueMinutes(10),
+            true
+        ) {
 
             @Override
             protected boolean mustReschedule() {
@@ -171,7 +169,13 @@ public class AbstractAsyncTaskTests extends ESTestCase {
 
         final CountDownLatch latch = new CountDownLatch(2);
 
-        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueHours(1), true) {
+        AbstractAsyncTask task = new AbstractAsyncTask(
+            logger,
+            threadPool,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE,
+            TimeValue.timeValueHours(1),
+            true
+        ) {
 
             @Override
             protected boolean mustReschedule() {
@@ -202,7 +206,14 @@ public class AbstractAsyncTaskTests extends ESTestCase {
         List<AbstractAsyncTask> tasks = new ArrayList<>(numTasks);
         AtomicLong counter = new AtomicLong();
         for (int i = 0; i < numTasks; i++) {
-            AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(randomIntBetween(1, 2)), true) {
+            AbstractAsyncTask task = new AbstractAsyncTask(
+                logger,
+                threadPool,
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                TimeValue.timeValueMillis(randomIntBetween(1, 2)),
+                true
+            ) {
+
                 @Override
                 protected boolean mustReschedule() {
                     return counter.get() <= 1000;

+ 10 - 15
server/src/test/java/org/elasticsearch/index/IndexServiceTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -24,7 +25,6 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.test.InternalSettingsPlugin;
-import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
@@ -75,7 +75,11 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
         AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
         AtomicReference<CountDownLatch> latch2 = new AtomicReference<>(new CountDownLatch(1));
         final AtomicInteger count = new AtomicInteger();
-        IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1)) {
+        IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(
+            indexService,
+            indexService.getThreadPool().generic(),
+            TimeValue.timeValueMillis(1)
+        ) {
             @Override
             protected void runInternal() {
                 final CountDownLatch l1 = latch.get();
@@ -96,11 +100,6 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
                     }
                 }
             }
-
-            @Override
-            protected String getThreadPool() {
-                return ThreadPool.Names.GENERIC;
-            }
         };
 
         latch.get().await();
@@ -115,11 +114,9 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
         latch2.get().countDown();
         assertEquals(2, count.get());
 
-        task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1000000)) {
+        task = new IndexService.BaseAsyncTask(indexService, EsExecutors.DIRECT_EXECUTOR_SERVICE, TimeValue.timeValueMillis(1000000)) {
             @Override
-            protected void runInternal() {
-
-            }
+            protected void runInternal() {}
         };
         assertTrue(task.mustReschedule());
 
@@ -140,11 +137,9 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
         indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
         assertNotSame(closedIndexService, indexService);
 
-        task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(100000)) {
+        task = new IndexService.BaseAsyncTask(indexService, EsExecutors.DIRECT_EXECUTOR_SERVICE, TimeValue.timeValueMillis(100000)) {
             @Override
-            protected void runInternal() {
-
-            }
+            protected void runInternal() {}
         };
         assertTrue(task.mustReschedule());
         assertFalse(task.isClosed());

+ 3 - 8
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

@@ -336,7 +336,7 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         for (int i = 0; i < numRegions; i++) {
             freeRegions.add(sharedBytes.getFileChannel(i));
         }
-        decayTask = new CacheDecayTask(threadPool, SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings));
+        decayTask = new CacheDecayTask(threadPool, threadPool.generic(), SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings));
         decayTask.rescheduleIfNecessary();
         this.rangeSize = SHARED_CACHE_RANGE_SIZE_SETTING.get(settings);
         this.recoveryRangeSize = SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings);
@@ -745,8 +745,8 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
     class CacheDecayTask extends AbstractAsyncTask {
 
-        CacheDecayTask(ThreadPool threadPool, TimeValue interval) {
-            super(logger, Objects.requireNonNull(threadPool), Objects.requireNonNull(interval), true);
+        CacheDecayTask(ThreadPool threadPool, Executor executor, TimeValue interval) {
+            super(logger, Objects.requireNonNull(threadPool), executor, Objects.requireNonNull(interval), true);
         }
 
         @Override
@@ -759,11 +759,6 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             computeDecay();
         }
 
-        @Override
-        protected String getThreadPool() {
-            return ThreadPool.Names.GENERIC;
-        }
-
         @Override
         public String toString() {
             return "shared_cache_decay_task";

+ 2 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
@@ -211,7 +212,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
 
     private final class InactiveSinksReaper extends AbstractAsyncTask {
         InactiveSinksReaper(Logger logger, ThreadPool threadPool, TimeValue interval) {
-            super(logger, threadPool, interval, true);
+            super(logger, threadPool, EsExecutors.DIRECT_EXECUTOR_SERVICE, interval, true);
             rescheduleIfNecessary();
         }
 

+ 1 - 6
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java

@@ -636,7 +636,7 @@ public class CacheService extends AbstractLifecycleComponent {
     class CacheSynchronizationTask extends AbstractAsyncTask {
 
         CacheSynchronizationTask(ThreadPool threadPool, TimeValue interval) {
-            super(logger, Objects.requireNonNull(threadPool), Objects.requireNonNull(interval), true);
+            super(logger, Objects.requireNonNull(threadPool), threadPool.generic(), Objects.requireNonNull(interval), true);
         }
 
         @Override
@@ -649,11 +649,6 @@ public class CacheService extends AbstractLifecycleComponent {
             synchronizeCache();
         }
 
-        @Override
-        protected String getThreadPool() {
-            return ThreadPool.Names.GENERIC;
-        }
-
         @Override
         public String toString() {
             return "cache_synchronization_task";