1
0
Эх сурвалжийг харах

Don't run async actions when ILM is stopped (#133683)

Today ILM will run `AsyncActionStep`s even if ILM is stopped. These
actions are started either as callbacks after previous actions complete
or when the move-to-step API is used. By checking the ILM operation mode
before running the action in `IndexLifecycleRunner#maybeRunAsyncAction`,
we prevent these actions from being executed while ILM is stopped.

`AsyncActionStep`s are currently only automatically started as callbacks
after previous actions complete or after a master failover. To ensure
that these steps will be executed when ILM is restarted after a stop, we
loop over all the managed indices and start all async action steps.

Fixes #81234
Fixes #85097
Fixes #99859
Niels Bauman 1 сар өмнө
parent
commit
2b73a6ca16

+ 8 - 0
docs/changelog/133683.yaml

@@ -0,0 +1,8 @@
+pr: 133683
+summary: Avoid running asynchronous ILM actions while ILM is stopped
+area: ILM+SLM
+type: bug
+issues:
+ - 99859
+ - 81234
+ - 85097

+ 66 - 0
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeseriesMoveToStepIT.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.ilm;
 import org.apache.http.util.EntityUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -19,18 +20,22 @@ import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xpack.core.ilm.DeleteAction;
 import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
 import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
+import org.elasticsearch.xpack.core.ilm.Phase;
 import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
+import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
 import org.elasticsearch.xpack.core.ilm.RolloverAction;
 import org.elasticsearch.xpack.core.ilm.ShrinkAction;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
 import org.junit.Before;
 
 import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
+import static org.elasticsearch.xpack.TimeSeriesRestDriver.createPolicy;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.index;
 import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
@@ -51,6 +56,7 @@ public class TimeseriesMoveToStepIT extends ESRestTestCase {
         index = "index-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
         policy = "policy-" + randomAlphaOfLength(5);
         alias = "alias-" + randomAlphaOfLength(5);
+        logger.info("--> running [{}] with index [{}], alias [{}] and policy [{}]", getTestName(), index, alias, policy);
     }
 
     public void testMoveToAllocateStep() throws Exception {
@@ -245,6 +251,66 @@ public class TimeseriesMoveToStepIT extends ESRestTestCase {
         assertBusy(() -> { indexExists("test-000002"); });
     }
 
+    /**
+     * Test that an async action does not execute when the Move To Step API is used while ILM is stopped.
+     * Unfortunately, this test doesn't prove that the async action never executes, as it's hard to prove that an asynchronous process
+     * never happens - waiting for a certain period would only increase our confidence but not actually prove it, and it would increase the
+     * runtime of the test significantly. We also assert that the remainder of the policy executes after ILM is started again to ensure that
+     * the index is not stuck in the async action step.
+     */
+    public void testAsyncActionDoesNotExecuteAfterILMStop() throws Exception {
+        String originalIndex = index + "-000001";
+        // Create a simply policy with the most important aspect being the readonly action, which contains the ReadOnlyStep AsyncActionStep.
+        var actions = Map.of(
+            "rollover",
+            new RolloverAction(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueHours(1)).build()),
+            "readonly",
+            new ReadOnlyAction()
+        );
+        Phase phase = new Phase("hot", TimeValue.ZERO, actions);
+        createPolicy(client(), policy, phase, null, null, null, null);
+
+        createIndexWithSettings(
+            client(),
+            originalIndex,
+            alias,
+            Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
+        );
+
+        // Wait for ILM to do everything it can for this index
+        assertBusy(() -> assertEquals(new StepKey("hot", "rollover", "check-rollover-ready"), getStepKeyForIndex(client(), originalIndex)));
+
+        // Stop ILM
+        client().performRequest(new Request("POST", "/_ilm/stop"));
+
+        // Move ILM to the readonly step, which is an async action step.
+        Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
+        moveToStepRequest.setJsonEntity("""
+            {
+              "current_step": {
+                "phase": "hot",
+                "action": "rollover",
+                "name": "check-rollover-ready"
+              },
+              "next_step": {
+                "phase": "hot",
+                "action": "readonly",
+                "name": "readonly"
+              }
+            }""");
+        client().performRequest(moveToStepRequest);
+
+        // Since ILM is stopped, the async action should not execute and the index should remain in the readonly step.
+        // This is the tricky part of the test, as we can't really verify that the async action will never happen.
+        assertEquals(new StepKey("hot", "readonly", "readonly"), getStepKeyForIndex(client(), originalIndex));
+
+        // Restart ILM
+        client().performRequest(new Request("POST", "/_ilm/start"));
+
+        // Make sure we actually complete the remainder of the policy after ILM is started again.
+        assertBusy(() -> assertEquals(new StepKey("hot", "complete", "complete"), getStepKeyForIndex(client(), originalIndex)));
+    }
+
     public void testMoveToStepWithInvalidNextStep() throws Exception {
         createNewSingletonPolicy(client(), policy, "delete", DeleteAction.WITH_SNAPSHOT_DELETE, TimeValue.timeValueDays(100));
         createIndexWithSettings(

+ 8 - 0
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

@@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ilm.ClusterStateActionStep;
 import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep;
 import org.elasticsearch.xpack.core.ilm.ErrorStep;
 import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
+import org.elasticsearch.xpack.core.ilm.OperationMode;
 import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
 import org.elasticsearch.xpack.core.ilm.Step;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
@@ -48,6 +49,7 @@ import java.util.function.LongSupplier;
 
 import static org.elasticsearch.core.Strings.format;
 import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
+import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode;
 
 class IndexLifecycleRunner {
     private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class);
@@ -308,6 +310,12 @@ class IndexLifecycleRunner {
     void maybeRunAsyncAction(ProjectState state, IndexMetadata indexMetadata, String policy, StepKey expectedStepKey) {
         final var projectId = state.projectId();
         String index = indexMetadata.getIndex().getName();
+        OperationMode currentMode = currentILMMode(state.metadata());
+        if (OperationMode.RUNNING.equals(currentMode) == false) {
+            logger.info("[{}] not running async action in policy [{}] because ILM is [{}]", index, policy, currentMode);
+            return;
+        }
+
         if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) {
             logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP);
             return;

+ 76 - 72
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

@@ -184,88 +184,78 @@ public class IndexLifecycleService
         maybeScheduleJob();
 
         for (var projectId : clusterState.metadata().projects().keySet()) {
-            onMaster(clusterState.projectState(projectId));
+            maybeRunAsyncActions(clusterState.projectState(projectId));
         }
     }
 
-    void onMaster(ProjectState state) {
+    /**
+     * Kicks off any async actions that may not have been run due to either master failover or ILM being manually stopped.
+     */
+    private void maybeRunAsyncActions(ProjectState state) {
         final ProjectMetadata projectMetadata = state.metadata();
         final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);
-        if (currentMetadata != null) {
-            OperationMode currentMode = currentILMMode(projectMetadata);
-            if (OperationMode.STOPPED.equals(currentMode)) {
-                return;
-            }
-
-            boolean safeToStop = true; // true until proven false by a run policy
-
-            // If we just became master, we need to kick off any async actions that
-            // may have not been run due to master rollover
-            for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
-                if (projectMetadata.isIndexManagedByILM(idxMeta)) {
-                    String policyName = idxMeta.getLifecyclePolicyName();
-                    final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
-                    StepKey stepKey = Step.getCurrentStepKey(lifecycleState);
-
-                    try {
-                        if (OperationMode.STOPPING == currentMode) {
-                            if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
-                                logger.info(
-                                    "waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
-                                    idxMeta.getIndex().getName(),
-                                    policyName,
-                                    stepKey.name()
-                                );
-                                lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
-                                // ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
-                                safeToStop = false;
-                            } else {
-                                logger.info(
-                                    "skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
-                                    stepKey == null ? "n/a" : stepKey.name(),
-                                    idxMeta.getIndex().getName(),
-                                    policyName
-                                );
-                            }
-                        } else {
-                            lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
-                        }
-                    } catch (Exception e) {
-                        if (logger.isTraceEnabled()) {
-                            logger.warn(
-                                () -> format(
-                                    "async action execution failed during master election trigger"
-                                        + " for index [%s] with policy [%s] in step [%s], lifecycle state: [%s]",
-                                    idxMeta.getIndex().getName(),
-                                    policyName,
-                                    stepKey,
-                                    lifecycleState.asMap()
-                                ),
-                                e
-                            );
-                        } else {
-                            logger.warn(
-                                () -> format(
-                                    "async action execution failed during master election trigger"
-                                        + " for index [%s] with policy [%s] in step [%s]",
-                                    idxMeta.getIndex().getName(),
-                                    policyName,
-                                    stepKey
-                                ),
-                                e
-                            );
+        if (currentMetadata == null) {
+            return;
+        }
+        OperationMode currentMode = currentILMMode(projectMetadata);
+        if (OperationMode.STOPPED.equals(currentMode)) {
+            return;
+        }
 
-                        }
-                        // Don't rethrow the exception, we don't want a failure for one index to be
-                        // called to cause actions not to be triggered for further indices
-                    }
-                }
+        boolean safeToStop = true; // true until proven false by a run policy
+        for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
+            if (projectMetadata.isIndexManagedByILM(idxMeta) == false) {
+                continue;
             }
+            String policyName = idxMeta.getLifecyclePolicyName();
+            final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
+            StepKey stepKey = Step.getCurrentStepKey(lifecycleState);
+
+            try {
+                if (currentMode == OperationMode.RUNNING) {
+                    lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
+                    continue;
+                }
+                // We only get here if ILM is in STOPPING mode. In that case, we need to check if there is any index that is in a step
+                // that we can't stop ILM in. If there is, we don't stop ILM yet.
+                if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
+                    logger.info(
+                        "waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
+                        idxMeta.getIndex().getName(),
+                        policyName,
+                        stepKey.name()
+                    );
+                    lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
+                    // ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
+                    safeToStop = false;
+                } else {
+                    logger.info(
+                        "skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
+                        stepKey == null ? "n/a" : stepKey.name(),
+                        idxMeta.getIndex().getName(),
+                        policyName
+                    );
+                }
+            } catch (Exception e) {
+                String logMessage = format(
+                    "async action execution failed during master election trigger for index [%s] with policy [%s] in step [%s]",
+                    idxMeta.getIndex().getName(),
+                    policyName,
+                    stepKey
+                );
+                if (logger.isTraceEnabled()) {
+                    logMessage += format(", lifecycle state: [%s]", lifecycleState.asMap());
+                }
+                logger.warn(logMessage, e);
 
-            if (safeToStop && OperationMode.STOPPING == currentMode) {
-                stopILM(state.projectId());
+                // Don't rethrow the exception, we don't want a failure for one index to be
+                // called to cause actions not to be triggered for further indices
             }
         }
+
+        if (safeToStop && OperationMode.STOPPING == currentMode) {
+            stopILM(state.projectId());
+        }
     }
 
     private void stopILM(ProjectId projectId) {
@@ -333,6 +323,20 @@ public class IndexLifecycleService
                 cancelJob();
                 policyRegistry.clear();
             }
+        } else if (this.isMaster) {
+            // If we are the master and we were before, check if any projects changed their ILM mode from non-RUNNING to RUNNING.
+            // If so, kick off any async actions that may not have run while not in RUNNING mode.
+            for (ProjectMetadata project : event.state().metadata().projects().values()) {
+                final var previousProject = event.previousState().metadata().projects().get(project.id());
+                if (previousProject == null || project == previousProject) {
+                    continue;
+                }
+                final OperationMode currentMode = currentILMMode(project);
+                final OperationMode previousMode = currentILMMode(previousProject);
+                if (currentMode == OperationMode.RUNNING && previousMode != OperationMode.RUNNING) {
+                    maybeRunAsyncActions(event.state().projectState(project.id()));
+                }
+            }
         }
 
         // if we're the master, then process deleted indices and trigger policies

+ 28 - 1
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java

@@ -265,6 +265,34 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
         Mockito.verifyNoMoreInteractions(clusterService);
     }
 
+    /**
+     * Test that an async action step is not executed when ILM is stopped.
+     */
+    public void testNotRunningAsyncActionWhenILMIsStopped() {
+        String policyName = "stopped_policy";
+        Step.StepKey stepKey = new Step.StepKey("phase", "action", "async_action_step");
+
+        MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);
+
+        PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
+        ClusterService clusterService = mock(ClusterService.class);
+        newMockTaskQueue(clusterService); // ensure constructor call to createTaskQueue is satisfied
+        IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);
+
+        IndexMetadata indexMetadata = IndexMetadata.builder("test")
+            .settings(randomIndexSettings().put(LifecycleSettings.LIFECYCLE_NAME, policyName))
+            .build();
+
+        IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.STOPPED);
+        final var project = ProjectMetadata.builder(randomProjectIdOrDefault())
+            .put(indexMetadata, true)
+            .putCustom(IndexLifecycleMetadata.TYPE, ilm)
+            .build();
+        runner.maybeRunAsyncAction(projectStateFromProject(project), indexMetadata, policyName, stepKey);
+
+        assertThat(step.getExecuteCount(), equalTo(0L));
+    }
+
     public void testRunPolicyErrorStepOnRetryableFailedStep() {
         String policyName = "rollover_policy";
         String phaseName = "hot";
@@ -586,7 +614,6 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
             .putProjectMetadata(project)
             .nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId()))
             .build();
-        logger.info("--> state: {}", state);
         ClusterServiceUtils.setState(clusterService, state);
         IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);