Parcourir la source

Ensure cancelled jobs do not continue to run (#63762)

This commit ensures that jobs within the SchedulerEngine do not
continue to run after they are cancelled. There was no synchronization
between the cancel method of an ActiveSchedule and the run method, so
an actively running schedule would go ahead and reschedule itself even
if the cancel method had been called.

This commit adds synchronization between cancelling and the scheduling
of the next run to ensure that the job is cancelled. In real life
scenarios this could manifest as a job running multiple times for
SLM. This could happen if a job had been triggered and was cancelled
prior to completing its run such as if the node was no longer the
master node or if SLM was stopping/stopped.

Closes #63754
Jay Modi il y a 5 ans
Parent
commit
879dd5a99c

+ 8 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java

@@ -194,8 +194,8 @@ public class SchedulerEngine {
         private final Schedule schedule;
         private final long startTime;
 
-        private volatile ScheduledFuture<?> future;
-        private volatile long scheduledTime;
+        private ScheduledFuture<?> future;
+        private long scheduledTime;
 
         ActiveSchedule(String name, Schedule schedule, long startTime) {
             this.name = name;
@@ -228,7 +228,11 @@ public class SchedulerEngine {
             if (scheduledTime != -1) {
                 long delay = Math.max(0, scheduledTime - currentTime);
                 try {
-                    future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
+                    synchronized (this) {
+                        if (future == null || future.isCancelled() == false) {
+                            future = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
+                        }
+                    }
                 } catch (RejectedExecutionException e) {
                     // ignoring rejections if the scheduler has been shut down already
                     if (scheduler.isShutdown() == false) {
@@ -238,7 +242,7 @@ public class SchedulerEngine {
             }
         }
 
-        public void cancel() {
+        public synchronized void cancel() {
             FutureUtils.cancel(future);
         }
     }

+ 34 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java

@@ -11,6 +11,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Job;
 import org.mockito.ArgumentCaptor;
 
 import java.time.Clock;
@@ -18,6 +19,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -25,6 +27,7 @@ import static org.hamcrest.Matchers.any;
 import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -150,6 +153,37 @@ public class SchedulerEngineTests extends ESTestCase {
         }
     }
 
+    public void testCancellingDuringRunPreventsRescheduling() throws Exception {
+        final CountDownLatch jobRunningLatch = new CountDownLatch(1);
+        final CountDownLatch listenerLatch = new CountDownLatch(1);
+        final AtomicInteger calledCount = new AtomicInteger(0);
+        final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC());
+        final String jobId = randomAlphaOfLength(4);
+        try {
+            engine.register(event -> {
+                assertThat(event.getJobName(), is(jobId));
+                calledCount.incrementAndGet();
+                jobRunningLatch.countDown();
+                try {
+                    listenerLatch.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            });
+            engine.add(new Job(jobId, ((startTime, now) -> 0)));
+
+            jobRunningLatch.await();
+            final int called = calledCount.get();
+            assertEquals(1, called);
+            engine.remove(jobId);
+            listenerLatch.countDown();
+
+            assertBusy(() -> assertEquals(called, calledCount.get()), 5, TimeUnit.MILLISECONDS);
+        } finally {
+            engine.stop();
+        }
+    }
+
     private void assertFailedListenerLogMessage(Logger mockLogger, int times) {
         final ArgumentCaptor<ParameterizedMessage> messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class);
         final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);