Browse Source

Migrate RetryableAction away from name-based executor (#99301)

Replaces the executor name with a proper executor, and also removes the
constructor overrides that dangerously default to running the retries on
the scheduler thread so that callers have to make a conscious choice of
executor.

Relates #99027 and friends
David Turner 2 years ago
parent
commit
e2e7b8bc69

+ 5 - 25
server/src/main/java/org/elasticsearch/action/support/RetryableAction.java

@@ -18,6 +18,7 @@ import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayDeque;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.elasticsearch.core.Strings.format;
@@ -39,42 +40,21 @@ public abstract class RetryableAction<Response> {
     private final long timeoutMillis;
     private final long startMillis;
     private final ActionListener<Response> finalListener;
-    private final String executor;
+    private final Executor executor;
 
     private volatile Scheduler.ScheduledCancellable retryTask;
 
-    public RetryableAction(
-        Logger logger,
-        ThreadPool threadPool,
-        TimeValue initialDelay,
-        TimeValue timeoutValue,
-        ActionListener<Response> listener
-    ) {
-        this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME);
-    }
-
     public RetryableAction(
         Logger logger,
         ThreadPool threadPool,
         TimeValue initialDelay,
         TimeValue timeoutValue,
         ActionListener<Response> listener,
-        String executor
+        Executor executor
     ) {
         this(logger, threadPool, initialDelay, TimeValue.MAX_VALUE, timeoutValue, listener, executor);
     }
 
-    public RetryableAction(
-        Logger logger,
-        ThreadPool threadPool,
-        TimeValue initialDelay,
-        TimeValue maxDelayBound,
-        TimeValue timeoutValue,
-        ActionListener<Response> listener
-    ) {
-        this(logger, threadPool, initialDelay, maxDelayBound, timeoutValue, listener, ThreadPool.Names.SAME);
-    }
-
     public RetryableAction(
         Logger logger,
         ThreadPool threadPool,
@@ -82,7 +62,7 @@ public abstract class RetryableAction<Response> {
         TimeValue maxDelayBound,
         TimeValue timeoutValue,
         ActionListener<Response> listener,
-        String executor
+        Executor executor
     ) {
         this.logger = logger;
         this.threadPool = threadPool;
@@ -106,7 +86,7 @@ public abstract class RetryableAction<Response> {
     public void run() {
         final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null);
         final Runnable runnable = createRunnable(retryingListener);
-        threadPool.executor(executor).execute(runnable);
+        executor.execute(runnable);
     }
 
     public void cancel(Exception e) {

+ 3 - 1
server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

@@ -22,6 +22,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
@@ -293,7 +294,8 @@ public class ReplicationOperation<
             threadPool,
             initialRetryBackoffBound,
             retryTimeout,
-            replicationListener
+            replicationListener,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
         ) {
 
             @Override

+ 9 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

@@ -24,6 +24,7 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.util.CancellableThreads;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.seqno.ReplicationTracker;
@@ -334,7 +335,14 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
         final ActionListener<T> removeListener = ActionListener.runBefore(actionListener, () -> onGoingRetryableActions.remove(key));
         final TimeValue initialDelay = TimeValue.timeValueMillis(200);
         final TimeValue timeout = recoverySettings.internalActionRetryTimeout();
-        final RetryableAction<T> retryableAction = new RetryableAction<>(logger, threadPool, initialDelay, timeout, removeListener) {
+        final RetryableAction<T> retryableAction = new RetryableAction<>(
+            logger,
+            threadPool,
+            initialDelay,
+            timeout,
+            removeListener,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
+        ) {
 
             @Override
             public void tryAction(ActionListener<T> listener) {

+ 15 - 7
server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java

@@ -11,6 +11,7 @@ package org.elasticsearch.action.support;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.test.ESTestCase;
@@ -38,7 +39,8 @@ public class RetryableActionTests extends ESTestCase {
             taskQueue.getThreadPool(),
             TimeValue.timeValueMillis(10),
             TimeValue.timeValueSeconds(30),
-            future
+            future,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
         ) {
 
             @Override
@@ -69,7 +71,8 @@ public class RetryableActionTests extends ESTestCase {
             taskQueue.getThreadPool(),
             TimeValue.timeValueMillis(10),
             TimeValue.timeValueSeconds(30),
-            future
+            future,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
         ) {
 
             @Override
@@ -116,7 +119,8 @@ public class RetryableActionTests extends ESTestCase {
             taskQueue.getThreadPool(),
             TimeValue.timeValueMillis(randomFrom(1, 10, randomIntBetween(100, 2000))),
             TimeValue.timeValueSeconds(1),
-            future
+            future,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
         ) {
 
             @Override
@@ -163,7 +167,8 @@ public class RetryableActionTests extends ESTestCase {
             taskQueue.getThreadPool(),
             TimeValue.timeValueMillis(10),
             TimeValue.timeValueSeconds(0),
-            future
+            future,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
         ) {
 
             @Override
@@ -192,7 +197,8 @@ public class RetryableActionTests extends ESTestCase {
             taskQueue.getThreadPool(),
             TimeValue.timeValueMillis(10),
             TimeValue.timeValueSeconds(30),
-            future
+            future,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
         ) {
 
             @Override
@@ -221,7 +227,8 @@ public class RetryableActionTests extends ESTestCase {
             taskQueue.getThreadPool(),
             TimeValue.timeValueMillis(10),
             TimeValue.timeValueSeconds(30),
-            future
+            future,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
         ) {
 
             @Override
@@ -259,7 +266,8 @@ public class RetryableActionTests extends ESTestCase {
             TimeValue.timeValueMillis(10),
             TimeValue.timeValueMillis(50),
             TimeValue.timeValueSeconds(1),
-            future
+            future,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
         ) {
 
             @Override

+ 9 - 1
server/src/test/java/org/elasticsearch/action/support/replication/PendingReplicationActionsTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.RetryableAction;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.shard.IndexShardClosedException;
 import org.elasticsearch.index.shard.ShardId;
@@ -91,7 +92,14 @@ public class PendingReplicationActionsTests extends ESTestCase {
         }
 
         private TestAction(ActionListener<Void> listener, boolean succeed) {
-            super(logger, threadPool, TimeValue.timeValueMillis(1), TimeValue.timeValueMinutes(1), listener);
+            super(
+                logger,
+                threadPool,
+                TimeValue.timeValueMillis(1),
+                TimeValue.timeValueMinutes(1),
+                listener,
+                EsExecutors.DIRECT_EXECUTOR_SERVICE
+            );
             this.succeed = succeed;
         }
 

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

@@ -486,7 +486,7 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
                 // to be available so that and data deletion can succeed.
                 TimeValue.timeValueMinutes(15),
                 listener,
-                MachineLearning.UTILITY_THREAD_POOL_NAME
+                client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
             );
             this.jobTask = Objects.requireNonNull(jobTask);
         }

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java

@@ -469,7 +469,7 @@ public class ResultsPersisterService {
                 TimeValue.timeValueMillis(MIN_RETRY_SLEEP_MILLIS),
                 TimeValue.MAX_VALUE,
                 listener,
-                UTILITY_THREAD_POOL_NAME
+                threadPool.executor(UTILITY_THREAD_POOL_NAME)
             );
             this.jobId = jobId;
             this.shouldRetry = shouldRetry;