|
@@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.ActionRunnable;
|
|
|
import org.elasticsearch.common.Randomness;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
+import org.elasticsearch.threadpool.Scheduler;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.util.ArrayDeque;
|
|
@@ -48,6 +49,8 @@ public abstract class RetryableAction<Response> {
|
|
|
private final ActionListener<Response> finalListener;
|
|
|
private final String executor;
|
|
|
|
|
|
+ private volatile Scheduler.ScheduledCancellable retryTask;
|
|
|
+
|
|
|
public RetryableAction(Logger logger, ThreadPool threadPool, TimeValue initialDelay, TimeValue timeoutValue,
|
|
|
ActionListener<Response> listener) {
|
|
|
this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME);
|
|
@@ -75,6 +78,10 @@ public abstract class RetryableAction<Response> {
|
|
|
|
|
|
public void cancel(Exception e) {
|
|
|
if (isDone.compareAndSet(false, true)) {
|
|
|
+ Scheduler.ScheduledCancellable localRetryTask = this.retryTask;
|
|
|
+ if (localRetryTask != null) {
|
|
|
+ localRetryTask.cancel();
|
|
|
+ }
|
|
|
finalListener.onFailure(e);
|
|
|
}
|
|
|
}
|
|
@@ -84,11 +91,16 @@ public abstract class RetryableAction<Response> {
|
|
|
|
|
|
@Override
|
|
|
protected void doRun() {
|
|
|
- tryAction(listener);
|
|
|
+ retryTask = null;
|
|
|
+ // It is possible that the task was cancelled in between the retry being dispatched and now
|
|
|
+ if (isDone.get() == false) {
|
|
|
+ tryAction(listener);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void onRejection(Exception e) {
|
|
|
+ retryTask = null;
|
|
|
// TODO: The only implementations of this class use SAME which means the execution will not be
|
|
|
// rejected. Future implementations can adjust this functionality as needed.
|
|
|
onFailure(e);
|
|
@@ -140,7 +152,7 @@ public abstract class RetryableAction<Response> {
|
|
|
if (isDone.get() == false) {
|
|
|
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
|
|
|
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
|
|
|
- threadPool.schedule(runnable, delay, executor);
|
|
|
+ retryTask = threadPool.schedule(runnable, delay, executor);
|
|
|
}
|
|
|
}
|
|
|
} else {
|