|
@@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|
|
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
|
|
import org.elasticsearch.index.engine.DocumentMissingException;
|
|
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
|
|
+import org.elasticsearch.xpack.core.watcher.WatcherState;
|
|
|
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
|
|
|
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
|
|
|
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
|
|
@@ -135,23 +136,31 @@ public class ExecutionService {
|
|
|
* Pausing means, that no new watch executions will be done unless this pausing is explicitly unset.
|
|
|
* This is important when watcher is stopped, so that scheduled watches do not accidentally get executed.
|
|
|
* This should not be used when we need to reload watcher based on some cluster state changes, then just calling
|
|
|
- * {@link #clearExecutionsAndQueue()} is the way to go
|
|
|
+ * {@link #clearExecutionsAndQueue(Runnable)} is the way to go
|
|
|
+ *
|
|
|
+ * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
|
|
|
+ * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
|
|
|
*
|
|
|
* @return the number of tasks that have been removed
|
|
|
*/
|
|
|
- public int pause() {
|
|
|
+ public int pause(Runnable stoppedListener) {
|
|
|
+ assert stoppedListener != null;
|
|
|
paused.set(true);
|
|
|
- return clearExecutionsAndQueue();
|
|
|
+ return clearExecutionsAndQueue(stoppedListener);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Empty the currently queued tasks and wait for current executions to finish.
|
|
|
*
|
|
|
+ * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
|
|
|
+ * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
|
|
|
+ *
|
|
|
* @return the number of tasks that have been removed
|
|
|
*/
|
|
|
- public int clearExecutionsAndQueue() {
|
|
|
+ public int clearExecutionsAndQueue(Runnable stoppedListener) {
|
|
|
+ assert stoppedListener != null;
|
|
|
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
|
|
|
- this.clearExecutions();
|
|
|
+ this.clearExecutions(stoppedListener);
|
|
|
return cancelledTaskCount;
|
|
|
}
|
|
|
|
|
@@ -278,8 +287,10 @@ public class ExecutionService {
|
|
|
ctx.setNodeId(clusterService.localNode().getId());
|
|
|
WatchRecord record = null;
|
|
|
final String watchId = ctx.id().watchId();
|
|
|
+ //pull this to a local reference since the class reference can be swapped, and need to ensure same object is used for put/remove
|
|
|
+ final CurrentExecutions currentExecutions = this.currentExecutions.get();
|
|
|
try {
|
|
|
- boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread()));
|
|
|
+ boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread()));
|
|
|
if (executionAlreadyExists) {
|
|
|
logger.trace("not executing watch [{}] because it is already queued", watchId);
|
|
|
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
|
|
@@ -334,7 +345,7 @@ public class ExecutionService {
|
|
|
|
|
|
triggeredWatchStore.delete(ctx.id());
|
|
|
}
|
|
|
- currentExecutions.get().remove(watchId);
|
|
|
+ currentExecutions.remove(watchId);
|
|
|
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
|
|
|
}
|
|
|
return record;
|
|
@@ -577,11 +588,15 @@ public class ExecutionService {
|
|
|
/**
|
|
|
* This clears out the current executions and sets new empty current executions
|
|
|
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
|
|
|
+ *
|
|
|
+ * @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
|
|
|
+ * {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
|
|
|
*/
|
|
|
- private void clearExecutions() {
|
|
|
+ private void clearExecutions(Runnable stoppedListener) {
|
|
|
+ assert stoppedListener != null;
|
|
|
final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions());
|
|
|
// clear old executions in background, no need to wait
|
|
|
- genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout));
|
|
|
+ genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout, stoppedListener));
|
|
|
}
|
|
|
|
|
|
// the watch execution task takes another runnable as parameter
|