Browse Source

Don't schedule SLM jobs when services have been stopped (#48658)

This adds a guard for the SLM lifecycle and retention service that
prevents new jobs from being scheduled once the service has been
stopped. Previous if the node were shut down the service would be
stopped, but a cluster state or local master election would cause a job
to attempt to be scheduled. This could lead to an uncaught
`RejectedExecutionException`.

Resolves #47749
Lee Hinman 6 years ago
parent
commit
f1f1dda575

+ 9 - 1
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java

@@ -30,6 +30,7 @@ import java.time.Clock;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -48,6 +49,7 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
     private final ClusterService clusterService;
     private final SnapshotLifecycleTask snapshotTask;
     private final Map<String, SchedulerEngine.Job> scheduledTasks = ConcurrentCollections.newConcurrentMap();
+    private final AtomicBoolean running = new AtomicBoolean(true);
     private volatile boolean isMaster = false;
 
     public SnapshotLifecycleService(Settings settings,
@@ -160,6 +162,10 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
      * the same version of a policy has already been scheduled it does not overwrite the job.
      */
     public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) {
+        if (this.running.get() == false) {
+            return;
+        }
+
         final String jobId = getJobId(snapshotLifecyclePolicy);
         final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX);
 
@@ -237,6 +243,8 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
 
     @Override
     public void close() {
-        this.scheduler.stop();
+        if (this.running.compareAndSet(true, false)) {
+            this.scheduler.stop();
+        }
     }
 }

+ 6 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java

@@ -20,6 +20,7 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
 
 import java.io.Closeable;
 import java.time.Clock;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
 /**
@@ -38,6 +39,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
     private final SchedulerEngine scheduler;
     private final SnapshotRetentionTask retentionTask;
     private final Clock clock;
+    private final AtomicBoolean running = new AtomicBoolean(true);
 
     private volatile String slmRetentionSchedule;
     private volatile boolean isMaster = false;
@@ -81,7 +83,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
 
     private void rescheduleRetentionJob() {
         final String schedule = this.slmRetentionSchedule;
-        if (this.isMaster && Strings.hasText(schedule)) {
+        if (this.running.get() && this.isMaster && Strings.hasText(schedule)) {
             final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID,
                 new CronSchedule(schedule));
             logger.debug("scheduling SLM retention job for [{}]", schedule);
@@ -113,6 +115,8 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
 
     @Override
     public void close() {
-        this.scheduler.stop();
+        if (this.running.compareAndSet(true, false)) {
+            this.scheduler.stop();
+        }
     }
 }

+ 7 - 0
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java

@@ -133,6 +133,13 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
             // Since the service is stopped, jobs should have been cancelled
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
 
+            // No jobs should be scheduled when service is closed
+            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()));
+            sls.close();
+            sls.onMaster();
+            sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState));
+            assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
+
             threadPool.shutdownNow();
         }
     }

+ 6 - 0
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java

@@ -65,6 +65,12 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
 
             service.setUpdateSchedule("");
             assertThat(service.getScheduler().jobCount(), equalTo(0));
+
+            // Service should not scheduled any jobs once closed
+            service.close();
+            service.onMaster();
+            assertThat(service.getScheduler().jobCount(), equalTo(0));
+
             threadPool.shutdownNow();
         }
     }