Browse Source

Fix race in SLM master/cluster state listeners (#59801)

This change fixes two possible race conditions in SLM related to
how local master changes and cluster state events are observed. When
implementing the `LocalNodeMasterListener` interface, it is only
recommended to execute on a separate threadpool if the operations are
heavy and would block the cluster state thread. SLM specified that the
listeners should run in the Snapshot thread pool, but the operations
in the listener were lightweight. This had the side effect of causing
master changes to be delayed if the Snapshot threads were all busy and
could also potentially cause the `onMaster` and `offMaster` calls to
race if both were queued and then executed concurrently. Additionally,
the `SnapshotLifecycleService` is also a `ClusterStateListener` and
there is currently no order of operations guarantee between
`LocalNodeMasterListeners` and `ClusterStateListeners` so this could
lead to incorrect behavior.

The resolution for these two issues is that the
SnapshotRetentionService now specifies the `SAME` executor for its
implementation of the `LocalNodeMasterListener` interface. The
`SnapshotLifecycleService` is no longer a `LocalNodeMasterListener` and
instead tracks local master changes in its `ClusterStateListner`.
Jay Modi 5 years ago
parent
commit
c41ac5f7cb

+ 1 - 2
x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java

@@ -969,8 +969,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
             assertTrue(indexExists(shrunkenIndex));
             assertTrue(aliasExists(shrunkenIndex, index));
             assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
-
-        });
+        }, 30, TimeUnit.SECONDS);
     }
 
     @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53612")

+ 3 - 1
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java

@@ -193,9 +193,11 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
             clusterService));
         snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
             () -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
+        snapshotLifecycleService.get().init();
         snapshotRetentionService.set(new SnapshotRetentionService(settings,
             () -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, snapshotHistoryStore.get(), threadPool),
-            clusterService, getClock()));
+            getClock()));
+        snapshotRetentionService.get().init(clusterService);
         components.addAll(Arrays.asList(snapshotLifecycleService.get(), snapshotHistoryStore.get(), snapshotRetentionService.get()));
 
         return components;

+ 21 - 28
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java

@@ -11,12 +11,10 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
-import org.elasticsearch.cluster.LocalNodeMasterListener;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ilm.OperationMode;
 import org.elasticsearch.xpack.core.scheduler.CronSchedule;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@@ -40,7 +38,7 @@ import java.util.stream.Collectors;
  * {@link SnapshotLifecycleTask}. It reacts to new policies in the cluster state by scheduling a
  * task according to the policy's schedule.
  */
-public class SnapshotLifecycleService implements LocalNodeMasterListener, Closeable, ClusterStateListener {
+public class SnapshotLifecycleService implements Closeable, ClusterStateListener {
 
     private static final Logger logger = LogManager.getLogger(SnapshotLifecycleService.class);
     private static final String JOB_PATTERN_SUFFIX = "-\\d+$";
@@ -59,12 +57,31 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
         this.scheduler = new SchedulerEngine(settings, clock);
         this.clusterService = clusterService;
         this.snapshotTask = taskSupplier.get();
-        clusterService.addLocalNodeMasterListener(this); // TODO: change this not to use 'this'
+    }
+
+    /**
+     * Initializer method to avoid the publication of a self reference in the constructor.
+     */
+    public void init() {
         clusterService.addListener(this);
     }
 
     @Override
     public void clusterChanged(final ClusterChangedEvent event) {
+        // Instead of using a LocalNodeMasterListener to track master changes, this service will
+        // track them here to avoid conditions where master listener events run after other
+        // listeners that depend on what happened in the master listener
+        final boolean prevIsMaster = this.isMaster;
+        if (prevIsMaster != event.localNodeMaster()) {
+            this.isMaster = event.localNodeMaster();
+            if (this.isMaster) {
+                scheduler.register(snapshotTask);
+            } else {
+                scheduler.unregister(snapshotTask);
+                cancelSnapshotJobs();
+            }
+        }
+
         if (this.isMaster) {
             final ClusterState state = event.state();
 
@@ -83,25 +100,6 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
         }
     }
 
-    @Override
-    public void onMaster() {
-        this.isMaster = true;
-        scheduler.register(snapshotTask);
-        final ClusterState state = clusterService.state();
-        if (slmStoppedOrStopping(state)) {
-            // SLM is currently stopped, so don't schedule jobs
-            return;
-        }
-        scheduleSnapshotJobs(state);
-    }
-
-    @Override
-    public void offMaster() {
-        this.isMaster = false;
-        scheduler.unregister(snapshotTask);
-        cancelSnapshotJobs();
-    }
-
     // Only used for testing
     SchedulerEngine getScheduler() {
         return this.scheduler;
@@ -236,11 +234,6 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
             .orElseThrow(() -> new IllegalArgumentException("no such repository [" + repository + "]"));
     }
 
-    @Override
-    public String executorName() {
-        return ThreadPool.Names.SNAPSHOT;
-    }
-
     @Override
     public void close() {
         if (this.running.compareAndSet(true, false)) {

+ 7 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java

@@ -46,13 +46,18 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
 
     public SnapshotRetentionService(Settings settings,
                                     Supplier<SnapshotRetentionTask> taskSupplier,
-                                    ClusterService clusterService,
                                     Clock clock) {
         this.clock = clock;
         this.scheduler = new SchedulerEngine(settings, clock);
         this.retentionTask = taskSupplier.get();
         this.scheduler.register(this.retentionTask);
         this.slmRetentionSchedule = LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING.get(settings);
+    }
+
+    /**
+     * Initializer method to avoid the publication of a self reference in the constructor.
+     */
+    public void init(ClusterService clusterService) {
         clusterService.addLocalNodeMasterListener(this);
         clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
             this::setUpdateSchedule);
@@ -110,7 +115,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
 
     @Override
     public String executorName() {
-        return ThreadPool.Names.SNAPSHOT;
+        return ThreadPool.Names.SAME;
     }
 
     @Override

+ 64 - 34
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java

@@ -6,14 +6,18 @@
 
 package org.elasticsearch.xpack.slm;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
@@ -100,8 +104,7 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
         try (ClusterService clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
              SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
                  () -> new FakeSnapshotTask(e -> logger.info("triggered")), clusterService, clock)) {
-
-            sls.offMaster();
+            sls.init();
 
             SnapshotLifecyclePolicyMetadata newPolicy = SnapshotLifecyclePolicyMetadata.builder()
                 .setPolicy(createPolicy("foo", "*/1 * * * * ?"))
@@ -121,26 +124,30 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
             // Since the service does not think it is master, it should not be triggered or scheduled
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
 
-            sls.onMaster();
-            assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("initial-1")));
+            ClusterState prevState = state;
+            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
+            sls.clusterChanged(new ClusterChangedEvent("2", state, prevState));
+            assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2")));
 
-            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPING, new SnapshotLifecycleStats()));
-            sls.clusterChanged(new ClusterChangedEvent("2", state, emptyState));
+            prevState = state;
+            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPING, new SnapshotLifecycleStats()), true);
+            sls.clusterChanged(new ClusterChangedEvent("3", state, prevState));
 
             // Since the service is stopping, jobs should have been cancelled
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
 
-            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPED, new SnapshotLifecycleStats()));
-            sls.clusterChanged(new ClusterChangedEvent("3", state, emptyState));
+            prevState = state;
+            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPED, new SnapshotLifecycleStats()), true);
+            sls.clusterChanged(new ClusterChangedEvent("4", state, prevState));
 
             // Since the service is stopped, jobs should have been cancelled
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
 
             // No jobs should be scheduled when service is closed
-            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()));
+            prevState = state;
+            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
             sls.close();
-            sls.onMaster();
-            sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState));
+            sls.clusterChanged(new ClusterChangedEvent("5", state, prevState));
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
         } finally {
             threadPool.shutdownNow();
@@ -160,11 +167,12 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
         try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
              SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
                  () -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
-
-            sls.offMaster();
+            sls.init();
             SnapshotLifecycleMetadata snapMeta =
                 new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats());
-            ClusterState previousState = createState(snapMeta);
+            ClusterState state = createState(snapMeta, false);
+            sls.clusterChanged(new ClusterChangedEvent("1", state, ClusterState.EMPTY_STATE));
+
             Map<String, SnapshotLifecyclePolicyMetadata> policies = new HashMap<>();
 
             SnapshotLifecyclePolicyMetadata policy = SnapshotLifecyclePolicyMetadata.builder()
@@ -174,8 +182,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
                 .build();
             policies.put(policy.getPolicy().getId(), policy);
             snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
-            ClusterState state = createState(snapMeta);
-            ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState);
+            ClusterState previousState = state;
+            state = createState(snapMeta, false);
+            ClusterChangedEvent event = new ClusterChangedEvent("2", state, previousState);
             trigger.set(e -> {
                 fail("trigger should not be invoked");
             });
@@ -185,8 +194,10 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
 
             // Change the service to think it's on the master node, events should be scheduled now
-            sls.onMaster();
             trigger.set(e -> triggerCount.incrementAndGet());
+            previousState = state;
+            state = createState(snapMeta, true);
+            event = new ClusterChangedEvent("3", state, previousState);
             sls.clusterChanged(event);
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-1")));
 
@@ -202,8 +213,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
                 .setModifiedDate(2)
                 .build();
             policies.put(policy.getPolicy().getId(), newPolicy);
-            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()));
-            event = new ClusterChangedEvent("2", state, previousState);
+            state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
+            event = new ClusterChangedEvent("4", state, previousState);
             sls.clusterChanged(event);
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2")));
 
@@ -226,9 +237,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
             final int currentCount2 = triggerCount.get();
             previousState = state;
             // Create a state simulating the policy being deleted
-            state =
-                createState(new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats()));
-            event = new ClusterChangedEvent("2", state, previousState);
+            state = createState(
+                new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
+            event = new ClusterChangedEvent("5", state, previousState);
             sls.clusterChanged(event);
             clock.fastForwardSeconds(2);
 
@@ -246,8 +257,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
             policies.put(policy.getPolicy().getId(), policy);
             snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
             previousState = state;
-            state = createState(snapMeta);
-            event = new ClusterChangedEvent("1", state, previousState);
+            state = createState(snapMeta, true);
+            event = new ClusterChangedEvent("6", state, previousState);
             trigger.set(e -> triggerCount.incrementAndGet());
             sls.clusterChanged(event);
             clock.fastForwardSeconds(2);
@@ -257,7 +268,10 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-3")));
 
             // Signify becoming non-master, the jobs should all be cancelled
-            sls.offMaster();
+            previousState = state;
+            state = createState(snapMeta, false);
+            event = new ClusterChangedEvent("7", state, previousState);
+            sls.clusterChanged(event);
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
         } finally {
             threadPool.shutdownNow();
@@ -276,11 +290,13 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
         try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
              SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
                  () -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
-            sls.onMaster();
-
+            sls.init();
             SnapshotLifecycleMetadata snapMeta =
                 new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats());
-            ClusterState previousState = createState(snapMeta);
+            ClusterState state = createState(snapMeta, true);
+            ClusterChangedEvent event = new ClusterChangedEvent("1", state, ClusterState.EMPTY_STATE);
+            sls.clusterChanged(event);
+
             Map<String, SnapshotLifecyclePolicyMetadata> policies = new HashMap<>();
 
             SnapshotLifecyclePolicyMetadata policy = SnapshotLifecyclePolicyMetadata.builder()
@@ -291,13 +307,13 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
                 .build();
             policies.put(policy.getPolicy().getId(), policy);
             snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
-            ClusterState state = createState(snapMeta);
-            ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState);
+            ClusterState previousState = state;
+            state = createState(snapMeta, true);
+            event = new ClusterChangedEvent("2", state, previousState);
             sls.clusterChanged(event);
 
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2-1")));
 
-            previousState = state;
             SnapshotLifecyclePolicyMetadata secondPolicy = SnapshotLifecyclePolicyMetadata.builder()
                 .setPolicy(createPolicy("foo-1", "45 * * * * ?"))
                 .setHeaders(Collections.emptyMap())
@@ -306,13 +322,17 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
                 .build();
             policies.put(secondPolicy.getPolicy().getId(), secondPolicy);
             snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
-            state = createState(snapMeta);
-            event = new ClusterChangedEvent("2", state, previousState);
+            previousState = state;
+            state = createState(snapMeta, true);
+            event = new ClusterChangedEvent("3", state, previousState);
             sls.clusterChanged(event);
 
             assertThat(sls.getScheduler().scheduledJobIds(), containsInAnyOrder("foo-2-1", "foo-1-2"));
 
-            sls.offMaster();
+            previousState = state;
+            state = createState(snapMeta, false);
+            event = new ClusterChangedEvent("4", state, previousState);
+            sls.clusterChanged(event);
             assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
         } finally {
             threadPool.shutdownNow();
@@ -336,10 +356,20 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
     }
 
     public ClusterState createState(SnapshotLifecycleMetadata snapMeta) {
+        return createState(snapMeta, false);
+    }
+
+    public ClusterState createState(SnapshotLifecycleMetadata snapMeta, boolean localNodeMaster) {
         Metadata metadata = Metadata.builder()
             .putCustom(SnapshotLifecycleMetadata.TYPE, snapMeta)
             .build();
+        final DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder()
+            .add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9300), "local"))
+            .add(new DiscoveryNode("remote", new TransportAddress(TransportAddress.META_ADDRESS, 9301), Version.CURRENT))
+            .localNodeId("local")
+            .masterNodeId(localNodeMaster ? "local" : "remote");
         return ClusterState.builder(new ClusterName("cluster"))
+            .nodes(discoveryNodesBuilder)
             .metadata(metadata)
             .build();
     }

+ 7 - 7
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java

@@ -50,8 +50,8 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
 
         ThreadPool threadPool = new TestThreadPool("test");
         try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
-             SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
-                 FakeRetentionTask::new, clusterService, clock)) {
+             SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY, FakeRetentionTask::new, clock)) {
+            service.init(clusterService);
             assertThat(service.getScheduler().jobCount(), equalTo(0));
 
             service.onMaster();
@@ -81,23 +81,23 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
             Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
         ClockMock clock = new ClockMock();
         AtomicInteger invoked = new AtomicInteger(0);
-    
+
         ThreadPool threadPool = new TestThreadPool("test");
         try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
              SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
                  () -> new FakeRetentionTask(event -> {
                      assertThat(event.getJobName(), equalTo(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID));
                      invoked.incrementAndGet();
-                 }), clusterService, clock)) {
-    
+                 }), clock)) {
+            service.init(clusterService);
             service.onMaster();
             service.triggerRetention();
             assertThat(invoked.get(), equalTo(1));
-    
+
             service.offMaster();
             service.triggerRetention();
             assertThat(invoked.get(), equalTo(1));
-    
+
             service.onMaster();
             service.triggerRetention();
             assertThat(invoked.get(), equalTo(2));