|
|
@@ -72,6 +72,7 @@ import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.function.BiFunction;
|
|
|
import java.util.function.Function;
|
|
|
@@ -356,6 +357,84 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|
|
threadPool.shutdownNow();
|
|
|
}
|
|
|
|
|
|
+ public void testRunPeriodicPolicyWithFailureToReadPolicy() throws Exception {
|
|
|
+ doTestRunPolicyWithFailureToReadPolicy(false, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testRunStateChangePolicyWithFailureToReadPolicy() throws Exception {
|
|
|
+ doTestRunPolicyWithFailureToReadPolicy(false, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testRunAsyncActionPolicyWithFailureToReadPolicy() throws Exception {
|
|
|
+ doTestRunPolicyWithFailureToReadPolicy(true, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void doTestRunPolicyWithFailureToReadPolicy(boolean asyncAction, boolean periodicAction) throws Exception {
|
|
|
+ String policyName = "foo";
|
|
|
+ StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
|
|
|
+ StepKey nextStepKey = new StepKey("phase", "action", "next_cluster_state_action_step");
|
|
|
+ MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, nextStepKey);
|
|
|
+ MockClusterStateActionStep nextStep = new MockClusterStateActionStep(nextStepKey, null);
|
|
|
+ MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
|
|
+ AtomicBoolean resolved = new AtomicBoolean(false);
|
|
|
+ stepRegistry.setResolver((i, k) -> {
|
|
|
+ resolved.set(true);
|
|
|
+ throw new IllegalArgumentException("fake failure retrieving step");
|
|
|
+ });
|
|
|
+ ThreadPool threadPool = new TestThreadPool("name");
|
|
|
+ LifecycleExecutionState les = LifecycleExecutionState.builder()
|
|
|
+ .setPhase("phase")
|
|
|
+ .setAction("action")
|
|
|
+ .setStep("cluster_state_action_step")
|
|
|
+ .build();
|
|
|
+ IndexMetaData indexMetaData = IndexMetaData.builder("test")
|
|
|
+ .settings(Settings.builder()
|
|
|
+ .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
|
|
+ .put(LifecycleSettings.LIFECYCLE_NAME, policyName))
|
|
|
+ .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap())
|
|
|
+ .build();
|
|
|
+ ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
|
|
+ DiscoveryNode node = clusterService.localNode();
|
|
|
+ IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING);
|
|
|
+ ClusterState state = ClusterState.builder(new ClusterName("cluster"))
|
|
|
+ .metaData(MetaData.builder()
|
|
|
+ .put(indexMetaData, true)
|
|
|
+ .putCustom(IndexLifecycleMetadata.TYPE, ilm))
|
|
|
+ .nodes(DiscoveryNodes.builder()
|
|
|
+ .add(node)
|
|
|
+ .masterNodeId(node.getId())
|
|
|
+ .localNodeId(node.getId()))
|
|
|
+ .build();
|
|
|
+ ClusterServiceUtils.setState(clusterService, state);
|
|
|
+ long stepTime = randomLong();
|
|
|
+ IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime);
|
|
|
+
|
|
|
+ ClusterState before = clusterService.state();
|
|
|
+ if (asyncAction) {
|
|
|
+ runner.maybeRunAsyncAction(before, indexMetaData, policyName, stepKey);
|
|
|
+ } else if (periodicAction) {
|
|
|
+ runner.runPeriodicStep(policyName, indexMetaData);
|
|
|
+ } else {
|
|
|
+ runner.runPolicyAfterStateChange(policyName, indexMetaData);
|
|
|
+ }
|
|
|
+
|
|
|
+ // The cluster state can take a few extra milliseconds to update after the steps are executed
|
|
|
+ assertBusy(() -> assertNotEquals(before, clusterService.state()));
|
|
|
+ LifecycleExecutionState newExecutionState = LifecycleExecutionState
|
|
|
+ .fromIndexMetadata(clusterService.state().metaData().index(indexMetaData.getIndex()));
|
|
|
+ assertThat(newExecutionState.getPhase(), equalTo("phase"));
|
|
|
+ assertThat(newExecutionState.getAction(), equalTo("action"));
|
|
|
+ assertThat(newExecutionState.getStep(), equalTo("cluster_state_action_step"));
|
|
|
+ assertThat(step.getExecuteCount(), equalTo(0L));
|
|
|
+ assertThat(nextStep.getExecuteCount(), equalTo(0L));
|
|
|
+ assertThat(newExecutionState.getStepInfo(),
|
|
|
+ containsString("{\"type\":\"illegal_argument_exception\",\"reason\":\"fake failure retrieving step\"}"));
|
|
|
+ clusterService.close();
|
|
|
+ threadPool.shutdownNow();
|
|
|
+ }
|
|
|
+
|
|
|
public void testRunAsyncActionDoesNotRun() {
|
|
|
String policyName = "foo";
|
|
|
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
|