Quellcode durchsuchen

#111433 Watch Next Run Interval Resets On Shard Move or Node Restart (#115102) (#115329)

* Switch Watcher scheduler to use last exec time

when restarting, moving shards or resuming from stopped.

* Add tests for last runtime calculation

* Update docs/changelog/115102.yaml

* Add counter to watcher job executions to check no additional executions happen during test
Luke Whiting vor 1 Jahr
Ursprung
Commit
86a9b7d835

+ 6 - 0
docs/changelog/115102.yaml

@@ -0,0 +1,6 @@
+pr: 115102
+summary: Watch Next Run Interval Resets On Shard Move or Node Restart
+area: Watcher
+type: bug
+issues:
+ - 111433

+ 38 - 2
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java

@@ -17,6 +17,8 @@ import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.xpack.core.watcher.support.WatcherDateTimeUtils;
 import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
 import org.elasticsearch.xpack.core.watcher.watch.Watch;
+import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
+import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
 import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
 import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
 import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
@@ -32,6 +34,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -67,7 +70,11 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
         Map<String, ActiveSchedule> startingSchedules = Maps.newMapWithExpectedSize(jobs.size());
         for (Watch job : jobs) {
             if (job.trigger() instanceof ScheduleTrigger trigger) {
-                startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
+                if (trigger.getSchedule() instanceof IntervalSchedule) {
+                    startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), calculateLastStartTime(job)));
+                } else {
+                    startingSchedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime));
+                }
             }
         }
         // why are we calling putAll() here instead of assigning a brand
@@ -108,10 +115,39 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
         // watcher indexing listener
         // this also means that updating an existing watch would not retrigger the schedule time, if it remains the same schedule
         if (currentSchedule == null || currentSchedule.schedule.equals(trigger.getSchedule()) == false) {
-            schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
+            if (trigger.getSchedule() instanceof IntervalSchedule) {
+                schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), calculateLastStartTime(watch)));
+            } else {
+                schedules.put(watch.id(), new ActiveSchedule(watch.id(), trigger.getSchedule(), clock.millis()));
+            }
+
         }
     }
 
+    /**
+     * Attempts to calculate the epoch millis of the last time the watch was checked, If the watch has never been checked, the timestamp of
+     * the last state change is used. If the watch has never been checked and has never been in an active state, the current time is used.
+     * @param job the watch to calculate the last start time for
+     * @return the epoch millis of the last time the watch was checked or now
+     */
+    private long calculateLastStartTime(Watch job) {
+        var lastChecked = Optional.ofNullable(job)
+            .map(Watch::status)
+            .map(WatchStatus::lastChecked)
+            .map(ZonedDateTime::toInstant)
+            .map(Instant::toEpochMilli);
+
+        return lastChecked.orElseGet(
+            () -> Optional.ofNullable(job)
+                .map(Watch::status)
+                .map(WatchStatus::state)
+                .map(WatchStatus.State::getTimestamp)
+                .map(ZonedDateTime::toInstant)
+                .map(Instant::toEpochMilli)
+                .orElse(clock.millis())
+        );
+    }
+
     @Override
     public boolean remove(String jobId) {
         logger.debug("Removing watch [{}] from engine (engine is running: {})", jobId, isRunning.get());

+ 239 - 0
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
 import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
 import org.elasticsearch.xpack.core.watcher.watch.Watch;
+import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
 import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
 import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
 import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
@@ -283,6 +284,244 @@ public class TickerScheduleEngineTests extends ESTestCase {
         assertThat(engine.getSchedules().get("_id"), not(is(activeSchedule)));
     }
 
+    /**
+     * This test verifies that a watch with a valid lastCheckedTime executes before the interval time to ensure the job resumes waiting
+     * from the same point it left off before the reallocation / restart
+     */
+    public void testWatchWithLastCheckedTimeExecutesBeforeInitialInterval() throws Exception {
+        final var firstLatch = new CountDownLatch(1);
+        final var secondLatch = new CountDownLatch(1);
+
+        Watch watch = new Watch(
+            "watch",
+            new ScheduleTrigger(interval("1s")),
+            new ExecutableNoneInput(),
+            InternalAlwaysCondition.INSTANCE,
+            null,
+            null,
+            Collections.emptyList(),
+            null,
+            new WatchStatus(-1L, null, null, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC), null, null, null),
+            SequenceNumbers.UNASSIGNED_SEQ_NO,
+            SequenceNumbers.UNASSIGNED_PRIMARY_TERM
+        );
+
+        var watches = Collections.singletonList(watch);
+
+        var runCount = new AtomicInteger(0);
+
+        engine.register(events -> {
+            for (TriggerEvent ignored : events) {
+                if (runCount.get() == 0) {
+                    logger.info("job first fire");
+                    firstLatch.countDown();
+                } else {
+                    logger.info("job second fire");
+                    secondLatch.countDown();
+                }
+                runCount.incrementAndGet();
+            }
+        });
+
+        engine.start(watches);
+        advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
+        if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
+            fail("waiting too long for all watches to be triggered");
+        }
+
+        advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
+        if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
+            fail("waiting too long for all watches to be triggered");
+        }
+
+        assertThat(runCount.get(), is(2));
+
+        engine.stop();
+    }
+
+    /**
+     * This test verifies that a watch without a lastCheckedTime but with a valid activationTime executes before the interval time to
+     * ensure the job resumes waiting from the same point it left off before the reallocation / restart
+     */
+    public void testWatchWithNoLastCheckedTimeButHasActivationTimeExecutesBeforeInitialInterval() throws Exception {
+        final var firstLatch = new CountDownLatch(1);
+        final var secondLatch = new CountDownLatch(1);
+
+        Watch watch = new Watch(
+            "watch",
+            new ScheduleTrigger(interval("1s")),
+            new ExecutableNoneInput(),
+            InternalAlwaysCondition.INSTANCE,
+            null,
+            null,
+            Collections.emptyList(),
+            null,
+            new WatchStatus(
+                -1L,
+                new WatchStatus.State(true, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC)),
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            SequenceNumbers.UNASSIGNED_SEQ_NO,
+            SequenceNumbers.UNASSIGNED_PRIMARY_TERM
+        );
+
+        var watches = Collections.singletonList(watch);
+
+        var runCount = new AtomicInteger(0);
+
+        engine.register(events -> {
+            for (TriggerEvent ignored : events) {
+                if (runCount.get() == 0) {
+                    logger.info("job first fire");
+                    firstLatch.countDown();
+                } else {
+                    logger.info("job second fire");
+                    secondLatch.countDown();
+                }
+                runCount.incrementAndGet();
+            }
+        });
+
+        engine.start(watches);
+        advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
+        if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
+            fail("waiting too long for all watches to be triggered");
+        }
+
+        advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
+        if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
+            fail("waiting too long for all watches to be triggered");
+        }
+
+        assertThat(runCount.get(), is(2));
+
+        engine.stop();
+    }
+
+    /**
+     * This test verifies that a watch added after service start with a lastCheckedTime executes before the interval time to ensure the job
+     * resumes waiting from the same point it left off before the reallocation / restart
+     */
+    public void testAddWithLastCheckedTimeExecutesBeforeInitialInterval() throws Exception {
+        final var firstLatch = new CountDownLatch(1);
+        final var secondLatch = new CountDownLatch(1);
+
+        Watch watch = new Watch(
+            "watch",
+            new ScheduleTrigger(interval("1s")),
+            new ExecutableNoneInput(),
+            InternalAlwaysCondition.INSTANCE,
+            null,
+            null,
+            Collections.emptyList(),
+            null,
+            new WatchStatus(-1L, null, null, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC), null, null, null),
+            SequenceNumbers.UNASSIGNED_SEQ_NO,
+            SequenceNumbers.UNASSIGNED_PRIMARY_TERM
+        );
+
+        var runCount = new AtomicInteger(0);
+
+        engine.register(events -> {
+            for (TriggerEvent ignored : events) {
+                if (runCount.get() == 0) {
+                    logger.info("job first fire");
+                    firstLatch.countDown();
+                } else {
+                    logger.info("job second fire");
+                    secondLatch.countDown();
+                }
+                runCount.incrementAndGet();
+            }
+        });
+
+        engine.start(Collections.emptyList());
+        advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
+        engine.add(watch);
+
+        advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
+        if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
+            fail("waiting too long for all watches to be triggered");
+        }
+
+        advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
+        if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
+            fail("waiting too long for all watches to be triggered");
+        }
+
+        assertThat(runCount.get(), is(2));
+
+        engine.stop();
+    }
+
+    /**
+     * This test verifies that a watch added after service start without a lastCheckedTime but with a valid activationTime executes before
+     * the interval time to ensure the job resumes waiting from the same point it left off before the reallocation / restart
+     */
+    public void testAddWithNoLastCheckedTimeButHasActivationTimeExecutesBeforeInitialInterval() throws Exception {
+        final var firstLatch = new CountDownLatch(1);
+        final var secondLatch = new CountDownLatch(1);
+
+        Watch watch = new Watch(
+            "watch",
+            new ScheduleTrigger(interval("1s")),
+            new ExecutableNoneInput(),
+            InternalAlwaysCondition.INSTANCE,
+            null,
+            null,
+            Collections.emptyList(),
+            null,
+            new WatchStatus(
+                -1L,
+                new WatchStatus.State(true, clock.instant().minusMillis(500).atZone(ZoneOffset.UTC)),
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            SequenceNumbers.UNASSIGNED_SEQ_NO,
+            SequenceNumbers.UNASSIGNED_PRIMARY_TERM
+        );
+
+        var runCount = new AtomicInteger(0);
+
+        engine.register(events -> {
+            for (TriggerEvent ignored : events) {
+                if (runCount.get() == 0) {
+                    logger.info("job first fire");
+                    firstLatch.countDown();
+                } else {
+                    logger.info("job second fire");
+                    secondLatch.countDown();
+                }
+                runCount.incrementAndGet();
+            }
+        });
+
+        engine.start(Collections.emptyList());
+        advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
+        engine.add(watch);
+
+        advanceClockIfNeeded(clock.instant().plusMillis(510).atZone(ZoneOffset.UTC));
+        if (firstLatch.await(3, TimeUnit.SECONDS) == false) {
+            fail("waiting too long for all watches to be triggered");
+        }
+
+        advanceClockIfNeeded(clock.instant().plusMillis(1100).atZone(ZoneOffset.UTC));
+        if (secondLatch.await(3, TimeUnit.SECONDS) == false) {
+            fail("waiting too long for all watches to be triggered");
+        }
+
+        assertThat(runCount.get(), is(2));
+
+        engine.stop();
+    }
+
     private Watch createWatch(String name, Schedule schedule) {
         return new Watch(
             name,