|
@@ -34,6 +34,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import static org.elasticsearch.common.settings.Setting.positiveTimeSetting;
|
|
|
|
|
@@ -50,6 +51,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|
|
private final TimeValue tickInterval;
|
|
|
private final Map<String, ActiveSchedule> schedules = new ConcurrentHashMap<>();
|
|
|
private final Ticker ticker;
|
|
|
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
|
|
|
|
|
public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
|
|
|
super(scheduleRegistry, clock);
|
|
@@ -60,7 +62,8 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|
|
@Override
|
|
|
public synchronized void start(Collection<Watch> jobs) {
|
|
|
long startTime = clock.millis();
|
|
|
- logger.info("Watcher starting watches at {}", WatcherDateTimeUtils.dateTimeFormatter.formatMillis(startTime));
|
|
|
+ isRunning.set(true);
|
|
|
+ logger.info("Starting watcher engine at {}", WatcherDateTimeUtils.dateTimeFormatter.formatMillis(startTime));
|
|
|
Map<String, ActiveSchedule> startingSchedules = Maps.newMapWithExpectedSize(jobs.size());
|
|
|
for (Watch job : jobs) {
|
|
|
if (job.trigger() instanceof ScheduleTrigger trigger) {
|
|
@@ -81,17 +84,22 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|
|
|
|
|
@Override
|
|
|
public void stop() {
|
|
|
+ logger.info("Stopping watcher engine");
|
|
|
+ isRunning.set(false);
|
|
|
schedules.clear();
|
|
|
ticker.close();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized void pauseExecution() {
|
|
|
+ public void pauseExecution() {
|
|
|
+ logger.info("Pausing watcher engine");
|
|
|
+ isRunning.set(false);
|
|
|
schedules.clear();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void add(Watch watch) {
|
|
|
+ logger.trace("Adding watch [{}] to engine (engine is running: {})", watch.id(), isRunning.get());
|
|
|
assert watch.trigger() instanceof ScheduleTrigger;
|
|
|
ScheduleTrigger trigger = (ScheduleTrigger) watch.trigger();
|
|
|
ActiveSchedule currentSchedule = schedules.get(watch.id());
|
|
@@ -106,13 +114,25 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
|
|
|
|
|
|
@Override
|
|
|
public boolean remove(String jobId) {
|
|
|
+ logger.debug("Removing watch [{}] from engine (engine is running: {})", jobId, isRunning.get());
|
|
|
return schedules.remove(jobId) != null;
|
|
|
}
|
|
|
|
|
|
void checkJobs() {
|
|
|
+ if (isRunning.get() == false) {
|
|
|
+ logger.debug(
|
|
|
+ "Watcher not running because the engine is paused. Currently scheduled watches being skipped: {}",
|
|
|
+ schedules.size()
|
|
|
+ );
|
|
|
+ return;
|
|
|
+ }
|
|
|
long triggeredTime = clock.millis();
|
|
|
List<TriggerEvent> events = new ArrayList<>();
|
|
|
for (ActiveSchedule schedule : schedules.values()) {
|
|
|
+ if (isRunning.get() == false) {
|
|
|
+ logger.debug("Watcher paused while running [{}]", schedule.name);
|
|
|
+ break;
|
|
|
+ }
|
|
|
long scheduledTime = schedule.check(triggeredTime);
|
|
|
if (scheduledTime > 0) {
|
|
|
ZonedDateTime triggeredDateTime = utcDateTimeAtEpochMillis(triggeredTime);
|