瀏覽代碼

Handle rejected execution exception on reschedule

A self-rescheduling runnable can hit a rejected execution exception but
this exception goes uncaught. Instead, this exception should be caught
and passed to the onRejected handler. Not catching handling this
rejected execution exception can lead to test failures. Namely, a race
condition can arise between the shutting down of the thread pool and
cancelling of the rescheduling of the task. If another reschedule fires
right as the thread pool is being terminated, the rescheduled task will
be rejected leading to an uncaught exception which will cause a test
failure. This commit addresses these issues.

Relates #19505
Jason Tedor 9 年之前
父節點
當前提交
720b53b018

+ 7 - 1
core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -50,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -326,6 +327,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
      * @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if
      *         the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
      *         the ScheduledFuture will cannot interact with it.
+     * @throws java.util.concurrent.RejectedExecutionException {@inheritDoc}
      */
     public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) {
         if (!Names.SAME.equals(executor)) {
@@ -792,7 +794,11 @@ public class ThreadPool extends AbstractComponent implements Closeable {
         public void onAfter() {
             // if this has not been cancelled reschedule it to run again
             if (run) {
-                threadPool.schedule(interval, executor, this);
+                try {
+                    threadPool.schedule(interval, executor, this);
+                } catch (final RejectedExecutionException e) {
+                    onRejection(e);
+                }
             }
         }
     }

+ 0 - 1
core/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java

@@ -35,7 +35,6 @@ import org.junit.Before;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;