Browse Source

Refresh cached phase policy definition if possible on new poli… (#50820)

* Refresh cached phase policy definition if possible on new policy

There are some cases when updating a policy does not change the
structure in a significant way. In these cases, we can reread the
policy definition for any indices using the updated policy.

This commit adds this refreshing to the `TransportPutLifecycleAction`
to allow this. It allows us to do things like change the configuration
values for a particular step, even when on that step (for example,
changing the rollover criteria while on the `check-rollover-ready` step).

There are more cases where the phase definition can be reread that just
the ones checked here (for example, removing an action that has already
been passed), and those will be added in subsequent work.

Relates to #48431
Lee Hinman 5 years ago
parent
commit
f53c9680ed

+ 9 - 5
docs/reference/ilm/policy-definitions.asciidoc

@@ -55,7 +55,7 @@ PUT _ilm/policy/my_policy
 }
 --------------------------------------------------
 
-The Above example configures a policy that moves the index into the warm
+The above example configures a policy that moves the index into the warm
 phase after one day. Until then, the index is in a waiting state. After
 moving into the warm phase, it will wait until 30 days have elapsed before
 moving to the delete phase and deleting the index.
@@ -76,10 +76,14 @@ check occurs.
 === Phase Execution
 
 The current phase definition, of an index's policy being executed, is stored
-in the index's metadata. The phase and its actions are compiled into a series
-of discrete steps that are executed sequentially. Since some {ilm-init} actions
-are more complex and involve multiple operations against an index, each of these
-operations are done in isolation in a unit called a "step". The
+in the index's metadata. This phase definition is cached to prevent changes to
+the policy from putting the index in a state where it cannot proceed from its
+current step. When the policy is updated we check to see if this phase
+definition can be safely updated, and if so, update the cached definition in
+indices using the updated policy. The phase and its actions are compiled into a
+series of discrete steps that are executed sequentially. Since some {ilm-init}
+actions are more complex and involve multiple operations against an index, each
+of these operations are done in isolation in a unit called a "step". The
 <<ilm-explain-lifecycle,Explain Lifecycle API>> exposes this information to us
 to see which step our index is either to execute next, or is currently
 executing.

+ 35 - 0
x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java

@@ -1342,6 +1342,41 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
         });
     }
 
+    public void testRefreshablePhaseJson() throws Exception {
+        String index = "refresh-index";
+
+        createNewSingletonPolicy("hot", new RolloverAction(null, null, 100L));
+        Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
+        createIndexTemplate.setJsonEntity("{" +
+            "\"index_patterns\": [\""+ index + "-*\"], \n" +
+            "  \"settings\": {\n" +
+            "    \"number_of_shards\": 1,\n" +
+            "    \"number_of_replicas\": 0,\n" +
+            "    \"index.lifecycle.name\": \"" + policy+ "\",\n" +
+            "    \"index.lifecycle.rollover_alias\": \"alias\"\n" +
+            "  }\n" +
+            "}");
+        client().performRequest(createIndexTemplate);
+
+        createIndexWithSettings(index + "-1",
+            Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0),
+            true);
+
+        // Index a document
+        index(client(), index + "-1", "1", "foo", "bar");
+
+        // Wait for the index to enter the check-rollover-ready step
+        assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(WaitForRolloverReadyStep.NAME)));
+
+        // Update the policy to allow rollover at 1 document instead of 100
+        createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L));
+
+        // Index should now have been able to roll over, creating the new index and proceeding to the "complete" step
+        assertBusy(() -> assertThat(indexExists(index + "-000002"), is(true)));
+        assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(TerminalPolicyStep.KEY.getName())));
+    }
+
     // This method should be called inside an assertBusy, it has no retry logic of its own
     private void assertHistoryIsPresent(String policyName, String indexName, boolean success, String stepName) throws IOException {
         assertHistoryIsPresent(policyName, indexName, success, null, null, stepName);

+ 2 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleTransition.java

@@ -251,8 +251,8 @@ public final class IndexLifecycleTransition {
     /**
      * Given a cluster state and lifecycle state, return a new state using the new lifecycle state for the given index.
      */
-    private static ClusterState.Builder newClusterStateWithLifecycleState(Index index, ClusterState clusterState,
-                                                                          LifecycleExecutionState lifecycleState) {
+    public static ClusterState.Builder newClusterStateWithLifecycleState(Index index, ClusterState clusterState,
+                                                                         LifecycleExecutionState lifecycleState) {
         ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState);
         newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData())
             .put(IndexMetaData.builder(clusterState.getMetaData().index(index))

+ 211 - 4
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java

@@ -8,35 +8,55 @@ package org.elasticsearch.xpack.ilm.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.ilm.ErrorStep;
 import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
+import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
 import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
 import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
+import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
+import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo;
+import org.elasticsearch.xpack.core.ilm.Step;
 import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction;
 import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Request;
 import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Response;
+import org.elasticsearch.xpack.ilm.IndexLifecycleTransition;
 
 import java.io.IOException;
 import java.time.Instant;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
+import java.util.Spliterators;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 /**
  * This class is responsible for bootstrapping {@link IndexLifecycleMetadata} into the cluster-state, as well
@@ -45,12 +65,17 @@ import java.util.stream.Collectors;
 public class TransportPutLifecycleAction extends TransportMasterNodeAction<Request, Response> {
 
     private static final Logger logger = LogManager.getLogger(TransportPutLifecycleAction.class);
+    private final NamedXContentRegistry xContentRegistry;
+    private final Client client;
 
     @Inject
     public TransportPutLifecycleAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
-                                       ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
+                                       ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
+                                       NamedXContentRegistry namedXContentRegistry, Client client) {
         super(PutLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new,
             indexNameExpressionResolver);
+        this.xContentRegistry = namedXContentRegistry;
+        this.client = client;
     }
 
     @Override
@@ -82,7 +107,7 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction<Reque
 
                     @Override
                     public ClusterState execute(ClusterState currentState) throws Exception {
-                        ClusterState.Builder newState = ClusterState.builder(currentState);
+                        ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
                         IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
                         if (currentMetadata == null) { // first time using index-lifecycle feature, bootstrap metadata
                             currentMetadata = IndexLifecycleMetadata.EMPTY;
@@ -100,13 +125,195 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction<Reque
                             logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
                         }
                         IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
-                        newState.metaData(MetaData.builder(currentState.getMetaData())
+                        stateBuilder.metaData(MetaData.builder(currentState.getMetaData())
                                 .putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
-                        return newState.build();
+                        ClusterState nonRefreshedState = stateBuilder.build();
+                        if (oldPolicy == null) {
+                            return nonRefreshedState;
+                        } else {
+                            try {
+                                return updateIndicesForPolicy(nonRefreshedState, xContentRegistry, client,
+                                    oldPolicy.getPolicy(), lifecyclePolicyMetadata);
+                            } catch (Exception e) {
+                                logger.warn(new ParameterizedMessage("unable to refresh indices phase JSON for updated policy [{}]",
+                                    oldPolicy.getName()), e);
+                                // Revert to the non-refreshed state
+                                return nonRefreshedState;
+                            }
+                        }
                     }
                 });
     }
 
+    /**
+     * Ensure that we have the minimum amount of metadata necessary to check for cache phase
+     * refresh. This includes:
+     * - An execution state
+     * - Existing phase definition JSON
+     * - A current step key
+     * - A current phase in the step key
+     * - Not currently in the ERROR step
+     */
+    static boolean eligibleToCheckForRefresh(final IndexMetaData metaData) {
+        LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metaData);
+        if (executionState == null || executionState.getPhaseDefinition() == null) {
+            return false;
+        }
+
+        Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
+        if (currentStepKey == null || currentStepKey.getPhase() == null) {
+            return false;
+        }
+
+        return ErrorStep.NAME.equals(currentStepKey.getName()) == false;
+    }
+
+    /**
+     * Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase.
+     * If there is an error parsing or if the phase definition is missing the required
+     * information, returns null.
+     */
+    @Nullable
+    static Set<Step.StepKey> readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client,
+                                          final String phaseDef, final String currentPhase) {
+        final PhaseExecutionInfo phaseExecutionInfo;
+        try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
+            DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
+            phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
+        } catch (Exception e) {
+            logger.trace(new ParameterizedMessage("exception reading step keys checking for refreshability, phase definition: {}",
+                phaseDef), e);
+            return null;
+        }
+
+        if (phaseExecutionInfo == null || phaseExecutionInfo.getPhase() == null) {
+            return null;
+        }
+
+        return phaseExecutionInfo.getPhase().getActions().values().stream()
+            .flatMap(a -> a.toSteps(client, phaseExecutionInfo.getPhase().getName(), null).stream())
+            .map(Step::getKey)
+            .collect(Collectors.toCollection(LinkedHashSet::new));
+    }
+
+    /**
+     * Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise.
+     */
+    static boolean isIndexPhaseDefinitionUpdatable(final NamedXContentRegistry xContentRegistry, final Client client,
+                                                   final IndexMetaData metaData, final LifecyclePolicy newPolicy) {
+        final String index = metaData.getIndex().getName();
+        if (eligibleToCheckForRefresh(metaData) == false) {
+            logger.debug("[{}] does not contain enough information to check for eligibility of refreshing phase", index);
+            return false;
+        }
+        final String policyId = newPolicy.getName();
+
+        final LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metaData);
+        final Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
+        final String currentPhase = currentStepKey.getPhase();
+
+        final Set<Step.StepKey> newStepKeys = newPolicy.toSteps(client).stream()
+            .map(Step::getKey)
+            .collect(Collectors.toCollection(LinkedHashSet::new));
+
+        if (newStepKeys.contains(currentStepKey) == false) {
+            // The index is on a step that doesn't exist in the new policy, we
+            // can't safely re-read the JSON
+            logger.debug("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed",
+                index, policyId, currentStepKey);
+            return false;
+        }
+
+        final String phaseDef = executionState.getPhaseDefinition();
+        final Set<Step.StepKey> oldStepKeys = readStepKeys(xContentRegistry, client, phaseDef, currentPhase);
+        if (oldStepKeys == null) {
+            logger.debug("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed",
+                index, policyId);
+            return false;
+        }
+
+        final Set<Step.StepKey> oldPhaseStepKeys = oldStepKeys.stream()
+            .filter(sk -> currentPhase.equals(sk.getPhase()))
+            .collect(Collectors.toCollection(LinkedHashSet::new));
+
+        final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policyId, newPolicy.getPhases().get(currentPhase), 1L, 1L);
+        final String peiJson = Strings.toString(phaseExecutionInfo);
+
+        final Set<Step.StepKey> newPhaseStepKeys = readStepKeys(xContentRegistry, client, peiJson, currentPhase);
+        if (newPhaseStepKeys == null) {
+            logger.debug(new ParameterizedMessage("[{}] unable to parse phase definition for policy [{}] " +
+                "to determine if it could be refreshed", index, policyId));
+            return false;
+        }
+
+        if (newPhaseStepKeys.equals(oldPhaseStepKeys)) {
+            // The new and old phase have the same stepkeys for this current phase, so we can
+            // refresh the definition because we know it won't change the execution flow.
+            logger.debug("[{}] updated policy [{}] contains the same phase step keys and can be refreshed", index, policyId);
+            return true;
+        } else {
+            logger.debug("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " +
+                    "definition as it differs too greatly. old: {}, new: {}",
+                index, policyId, oldPhaseStepKeys, newPhaseStepKeys);
+            return false;
+        }
+    }
+
+    /**
+     * Rereads the phase JSON for the given index, returning a new cluster state.
+     */
+    static ClusterState refreshPhaseDefinition(final ClusterState state, final String index, final LifecyclePolicyMetadata updatedPolicy) {
+        final IndexMetaData idxMeta = state.metaData().index(index);
+        assert eligibleToCheckForRefresh(idxMeta) : "index " + index + " is missing crucial information needed to refresh phase definition";
+
+        logger.trace("[{}] updating cached phase definition for policy [{}]", index, updatedPolicy.getName());
+        LifecycleExecutionState currentExState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
+
+        String currentPhase = currentExState.getPhase();
+        PhaseExecutionInfo pei = new PhaseExecutionInfo(updatedPolicy.getName(),
+            updatedPolicy.getPolicy().getPhases().get(currentPhase), updatedPolicy.getVersion(), updatedPolicy.getModifiedDate());
+
+        LifecycleExecutionState newExState = LifecycleExecutionState.builder(currentExState)
+            .setPhaseDefinition(Strings.toString(pei, false, false))
+            .build();
+
+        return IndexLifecycleTransition.newClusterStateWithLifecycleState(idxMeta.getIndex(), state, newExState).build();
+    }
+
+    /**
+     * For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed.
+     */
+    static ClusterState updateIndicesForPolicy(final ClusterState state, final NamedXContentRegistry xContentRegistry, final Client client,
+                                               final LifecyclePolicy oldPolicy, final LifecyclePolicyMetadata newPolicy) {
+        assert oldPolicy.getName().equals(newPolicy.getName()) : "expected both policies to have the same id but they were: [" +
+            oldPolicy.getName() + "] vs. [" + newPolicy.getName() + "]";
+
+        // No need to update anything if the policies are identical in contents
+        if (oldPolicy.equals(newPolicy.getPolicy())) {
+            logger.debug("policy [{}] is unchanged and no phase definition refresh is needed", oldPolicy.getName());
+            return state;
+        }
+
+        final List<String> indicesThatCanBeUpdated =
+            StreamSupport.stream(Spliterators.spliteratorUnknownSize(state.metaData().indices().valuesIt(), 0), false)
+                .filter(meta -> newPolicy.getName().equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(meta.getSettings())))
+                .filter(meta -> isIndexPhaseDefinitionUpdatable(xContentRegistry, client, meta, newPolicy.getPolicy()))
+                .map(meta -> meta.getIndex().getName())
+                .collect(Collectors.toList());
+
+        ClusterState updatedState = state;
+        for (String index : indicesThatCanBeUpdated) {
+            try {
+                updatedState = refreshPhaseDefinition(updatedState, index, newPolicy);
+            } catch (Exception e) {
+                logger.warn(new ParameterizedMessage("[{}] unable to refresh phase definition for updated policy [{}]",
+                    index, newPolicy.getName()), e);
+            }
+        }
+
+        return updatedState;
+    }
+
     @Override
     protected ClusterBlockException checkBlock(Request request, ClusterState state) {
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

+ 499 - 0
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java

@@ -0,0 +1,499 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ilm.action;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.ilm.AllocateAction;
+import org.elasticsearch.xpack.core.ilm.AllocationRoutedStep;
+import org.elasticsearch.xpack.core.ilm.ErrorStep;
+import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
+import org.elasticsearch.xpack.core.ilm.FreezeAction;
+import org.elasticsearch.xpack.core.ilm.LifecycleAction;
+import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
+import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
+import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
+import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
+import org.elasticsearch.xpack.core.ilm.Phase;
+import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo;
+import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
+import org.elasticsearch.xpack.core.ilm.RolloverAction;
+import org.elasticsearch.xpack.core.ilm.RolloverStep;
+import org.elasticsearch.xpack.core.ilm.SegmentCountStep;
+import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
+import org.elasticsearch.xpack.core.ilm.Step;
+import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep;
+import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
+import org.elasticsearch.xpack.ilm.IndexLifecycle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+
+public class TransportPutLifecycleActionTests extends ESTestCase {
+    private static final NamedXContentRegistry REGISTRY;
+    private static final Client client = mock(Client.class);
+    private static final String index = "eggplant";
+
+    static {
+        try (IndexLifecycle indexLifecycle = new IndexLifecycle(Settings.EMPTY)) {
+            List<NamedXContentRegistry.Entry> entries = new ArrayList<>(indexLifecycle.getNamedXContent());
+            REGISTRY = new NamedXContentRegistry(entries);
+        }
+    }
+
+    public void testEligibleForRefresh() {
+        IndexMetaData meta = mkMeta().build();
+        assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta));
+
+        LifecycleExecutionState state = LifecycleExecutionState.builder().build();
+        meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build();
+        assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta));
+
+        state = LifecycleExecutionState.builder()
+            .setPhase("phase")
+            .setAction("action")
+            .setStep("step")
+            .build();
+        meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build();
+        assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta));
+
+        state = LifecycleExecutionState.builder()
+            .setPhaseDefinition("{}")
+            .build();
+        meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build();
+        assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta));
+
+        state = LifecycleExecutionState.builder()
+            .setPhase("phase")
+            .setAction("action")
+            .setStep(ErrorStep.NAME)
+            .setPhaseDefinition("{}")
+            .build();
+        meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build();
+        assertFalse(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta));
+
+        state = LifecycleExecutionState.builder()
+            .setPhase("phase")
+            .setAction("action")
+            .setStep("step")
+            .setPhaseDefinition("{}")
+            .build();
+        meta = mkMeta().putCustom(ILM_CUSTOM_METADATA_KEY, state.asMap()).build();
+        assertTrue(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta));
+    }
+
+    public void testReadStepKeys() {
+        assertNull(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "{}", "phase"));
+        assertNull(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "aoeu", "phase"));
+        assertNull(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "", "phase"));
+
+        assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "{\n" +
+            "        \"policy\": \"my_lifecycle3\",\n" +
+            "        \"phase_definition\": { \n" +
+            "          \"min_age\": \"0ms\",\n" +
+            "          \"actions\": {\n" +
+            "            \"rollover\": {\n" +
+            "              \"max_age\": \"30s\"\n" +
+            "            }\n" +
+            "          }\n" +
+            "        },\n" +
+            "        \"version\": 3, \n" +
+            "        \"modified_date_in_millis\": 1539609701576 \n" +
+            "      }", "phase"),
+            contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME),
+                new Step.StepKey("phase", "rollover", RolloverStep.NAME),
+                new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME),
+                new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME)));
+
+        assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, "{\n" +
+                "        \"policy\" : \"my_lifecycle3\",\n" +
+                "        \"phase_definition\" : {\n" +
+                "          \"min_age\" : \"20m\",\n" +
+                "          \"actions\" : {\n" +
+                "            \"rollover\" : {\n" +
+                "              \"max_age\" : \"5s\"\n" +
+                "            },\n" +
+                "            \"set_priority\" : {\n" +
+                "              \"priority\" : 150\n" +
+                "            }\n" +
+                "          }\n" +
+                "        },\n" +
+                "        \"version\" : 1,\n" +
+                "        \"modified_date_in_millis\" : 1578521007076\n" +
+                "      }", "phase"),
+            contains(new Step.StepKey("phase", "rollover", WaitForRolloverReadyStep.NAME),
+                new Step.StepKey("phase", "rollover", RolloverStep.NAME),
+                new Step.StepKey("phase", "rollover", UpdateRolloverLifecycleDateStep.NAME),
+                new Step.StepKey("phase", "rollover", RolloverAction.INDEXING_COMPLETE_STEP_NAME),
+                new Step.StepKey("phase", "set_priority", SetPriorityAction.NAME)));
+
+        Map<String, LifecycleAction> actions = new HashMap<>();
+        actions.put("forcemerge", new ForceMergeAction(5));
+        actions.put("freeze", new FreezeAction());
+        actions.put("allocate", new AllocateAction(1, null, null, null));
+        PhaseExecutionInfo pei = new PhaseExecutionInfo("policy", new Phase("wonky", TimeValue.ZERO, actions), 1, 1);
+        String phaseDef = Strings.toString(pei);
+        logger.info("--> phaseDef: {}", phaseDef);
+
+        assertThat(TransportPutLifecycleAction.readStepKeys(REGISTRY, client, phaseDef, "phase"),
+            contains(new Step.StepKey("phase", "freeze", FreezeAction.NAME),
+                new Step.StepKey("phase", "allocate", AllocateAction.NAME),
+                new Step.StepKey("phase", "allocate", AllocationRoutedStep.NAME),
+                new Step.StepKey("phase", "forcemerge", ReadOnlyAction.NAME),
+                new Step.StepKey("phase", "forcemerge", ForceMergeAction.NAME),
+                new Step.StepKey("phase", "forcemerge", SegmentCountStep.NAME)));
+    }
+
+    public void testIndexCanBeSafelyUpdated() {
+
+        // Success case, it can be updated even though the configuration for the
+        // rollover and set_priority actions has changed
+        {
+            LifecycleExecutionState exState = LifecycleExecutionState.builder()
+                .setPhase("hot")
+                .setAction("rollover")
+                .setStep("check-rollover-ready")
+                .setPhaseDefinition("{\n" +
+                    "        \"policy\" : \"my-policy\",\n" +
+                    "        \"phase_definition\" : {\n" +
+                    "          \"min_age\" : \"20m\",\n" +
+                    "          \"actions\" : {\n" +
+                    "            \"rollover\" : {\n" +
+                    "              \"max_age\" : \"5s\"\n" +
+                    "            },\n" +
+                    "            \"set_priority\" : {\n" +
+                    "              \"priority\" : 150\n" +
+                    "            }\n" +
+                    "          }\n" +
+                    "        },\n" +
+                    "        \"version\" : 1,\n" +
+                    "        \"modified_date_in_millis\" : 1578521007076\n" +
+                    "      }")
+                .build();
+
+            IndexMetaData meta = mkMeta()
+                .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap())
+                .build();
+
+            Map<String, LifecycleAction> actions = new HashMap<>();
+            actions.put("rollover", new RolloverAction(null, null, 1L));
+            actions.put("set_priority", new SetPriorityAction(100));
+            Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
+            Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
+            LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases);
+
+            assertTrue(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy));
+        }
+
+        // Failure case, can't update because the step we're currently on has been removed in the new policy
+        {
+            LifecycleExecutionState exState = LifecycleExecutionState.builder()
+                .setPhase("hot")
+                .setAction("rollover")
+                .setStep("check-rollover-ready")
+                .setPhaseDefinition("{\n" +
+                    "        \"policy\" : \"my-policy\",\n" +
+                    "        \"phase_definition\" : {\n" +
+                    "          \"min_age\" : \"20m\",\n" +
+                    "          \"actions\" : {\n" +
+                    "            \"rollover\" : {\n" +
+                    "              \"max_age\" : \"5s\"\n" +
+                    "            },\n" +
+                    "            \"set_priority\" : {\n" +
+                    "              \"priority\" : 150\n" +
+                    "            }\n" +
+                    "          }\n" +
+                    "        },\n" +
+                    "        \"version\" : 1,\n" +
+                    "        \"modified_date_in_millis\" : 1578521007076\n" +
+                    "      }")
+                .build();
+
+            IndexMetaData meta = mkMeta()
+                .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap())
+                .build();
+
+            Map<String, LifecycleAction> actions = new HashMap<>();
+            actions.put("set_priority", new SetPriorityAction(150));
+            Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
+            Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
+            LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases);
+
+            assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy));
+        }
+
+        // Failure case, can't update because the future step has been deleted
+        {
+            LifecycleExecutionState exState = LifecycleExecutionState.builder()
+                .setPhase("hot")
+                .setAction("rollover")
+                .setStep("check-rollover-ready")
+                .setPhaseDefinition("{\n" +
+                    "        \"policy\" : \"my-policy\",\n" +
+                    "        \"phase_definition\" : {\n" +
+                    "          \"min_age\" : \"20m\",\n" +
+                    "          \"actions\" : {\n" +
+                    "            \"rollover\" : {\n" +
+                    "              \"max_age\" : \"5s\"\n" +
+                    "            },\n" +
+                    "            \"set_priority\" : {\n" +
+                    "              \"priority\" : 150\n" +
+                    "            }\n" +
+                    "          }\n" +
+                    "        },\n" +
+                    "        \"version\" : 1,\n" +
+                    "        \"modified_date_in_millis\" : 1578521007076\n" +
+                    "      }")
+                .build();
+
+            IndexMetaData meta = mkMeta()
+                .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap())
+                .build();
+
+            Map<String, LifecycleAction> actions = new HashMap<>();
+            actions.put("rollover", new RolloverAction(null, TimeValue.timeValueSeconds(5), null));
+            Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
+            Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
+            LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases);
+
+            assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy));
+        }
+
+        // Failure case, index doesn't have enough info to check
+        {
+            LifecycleExecutionState exState = LifecycleExecutionState.builder()
+                .setPhaseDefinition("{\n" +
+                    "        \"policy\" : \"my-policy\",\n" +
+                    "        \"phase_definition\" : {\n" +
+                    "          \"min_age\" : \"20m\",\n" +
+                    "          \"actions\" : {\n" +
+                    "            \"rollover\" : {\n" +
+                    "              \"max_age\" : \"5s\"\n" +
+                    "            },\n" +
+                    "            \"set_priority\" : {\n" +
+                    "              \"priority\" : 150\n" +
+                    "            }\n" +
+                    "          }\n" +
+                    "        },\n" +
+                    "        \"version\" : 1,\n" +
+                    "        \"modified_date_in_millis\" : 1578521007076\n" +
+                    "      }")
+                .build();
+
+            IndexMetaData meta = mkMeta()
+                .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap())
+                .build();
+
+            Map<String, LifecycleAction> actions = new HashMap<>();
+            actions.put("rollover", new RolloverAction(null, null, 1L));
+            actions.put("set_priority", new SetPriorityAction(100));
+            Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
+            Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
+            LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases);
+
+            assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy));
+        }
+
+        // Failure case, the phase JSON is unparseable
+        {
+            LifecycleExecutionState exState = LifecycleExecutionState.builder()
+                .setPhase("hot")
+                .setAction("rollover")
+                .setStep("check-rollover-ready")
+                .setPhaseDefinition("potato")
+                .build();
+
+            IndexMetaData meta = mkMeta()
+                .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap())
+                .build();
+
+            Map<String, LifecycleAction> actions = new HashMap<>();
+            actions.put("rollover", new RolloverAction(null, null, 1L));
+            actions.put("set_priority", new SetPriorityAction(100));
+            Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
+            Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
+            LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases);
+
+            assertFalse(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy));
+        }
+    }
+
+    public void testRefreshPhaseJson() {
+        LifecycleExecutionState exState = LifecycleExecutionState.builder()
+            .setPhase("hot")
+            .setAction("rollover")
+            .setStep("check-rollover-ready")
+            .setPhaseDefinition("{\n" +
+                "        \"policy\" : \"my-policy\",\n" +
+                "        \"phase_definition\" : {\n" +
+                "          \"min_age\" : \"20m\",\n" +
+                "          \"actions\" : {\n" +
+                "            \"rollover\" : {\n" +
+                "              \"max_age\" : \"5s\"\n" +
+                "            },\n" +
+                "            \"set_priority\" : {\n" +
+                "              \"priority\" : 150\n" +
+                "            }\n" +
+                "          }\n" +
+                "        },\n" +
+                "        \"version\" : 1,\n" +
+                "        \"modified_date_in_millis\" : 1578521007076\n" +
+                "      }")
+            .build();
+
+        IndexMetaData meta = mkMeta()
+            .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap())
+            .build();
+
+        Map<String, LifecycleAction> actions = new HashMap<>();
+        actions.put("rollover", new RolloverAction(null, null, 1L));
+        actions.put("set_priority", new SetPriorityAction(100));
+        Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
+        Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
+        LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases);
+        LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap(), 2L, 2L);
+
+        ClusterState existingState = ClusterState.builder(ClusterState.EMPTY_STATE)
+            .metaData(MetaData.builder(MetaData.EMPTY_META_DATA)
+                .put(meta, false)
+                .build())
+            .build();
+
+        ClusterState changedState = TransportPutLifecycleAction.refreshPhaseDefinition(existingState, index, policyMetadata);
+
+        IndexMetaData newIdxMeta = changedState.metaData().index(index);
+        LifecycleExecutionState afterExState = LifecycleExecutionState.fromIndexMetadata(newIdxMeta);
+        Map<String, String> beforeState = new HashMap<>(exState.asMap());
+        beforeState.remove("phase_definition");
+        Map<String, String> afterState = new HashMap<>(afterExState.asMap());
+        afterState.remove("phase_definition");
+        // Check that no other execution state changes have been made
+        assertThat(beforeState, equalTo(afterState));
+
+        // Check that the phase definition has been refreshed
+        assertThat(afterExState.getPhaseDefinition(),
+            equalTo("{\"policy\":\"my-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":{\"max_docs\":1}," +
+                "\"set_priority\":{\"priority\":100}}},\"version\":2,\"modified_date_in_millis\":2}"));
+    }
+
+    public void testUpdateIndicesForPolicy() {
+        LifecycleExecutionState exState = LifecycleExecutionState.builder()
+            .setPhase("hot")
+            .setAction("rollover")
+            .setStep("check-rollover-ready")
+            .setPhaseDefinition("{\"policy\":\"my-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":" +
+                "{\"max_docs\":1},\"set_priority\":{\"priority\":100}}},\"version\":1,\"modified_date_in_millis\":1578521007076}")
+            .build();
+
+        IndexMetaData meta = mkMeta()
+            .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap())
+            .build();
+
+        assertTrue(TransportPutLifecycleAction.eligibleToCheckForRefresh(meta));
+
+        Map<String, LifecycleAction> oldActions = new HashMap<>();
+        oldActions.put("rollover", new RolloverAction(null, null, 1L));
+        oldActions.put("set_priority", new SetPriorityAction(100));
+        Phase oldHotPhase = new Phase("hot", TimeValue.ZERO, oldActions);
+        Map<String, Phase> oldPhases = Collections.singletonMap("hot", oldHotPhase);
+        LifecyclePolicy oldPolicy = new LifecyclePolicy("my-policy", oldPhases);
+
+        Map<String, LifecycleAction> actions = new HashMap<>();
+        actions.put("rollover", new RolloverAction(null, null, 1L));
+        actions.put("set_priority", new SetPriorityAction(100));
+        Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
+        Map<String, Phase> phases = Collections.singletonMap("hot", hotPhase);
+        LifecyclePolicy newPolicy = new LifecyclePolicy("my-policy", phases);
+        LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap(), 2L, 2L);
+
+        assertTrue(TransportPutLifecycleAction.isIndexPhaseDefinitionUpdatable(REGISTRY, client, meta, newPolicy));
+
+        ClusterState existingState = ClusterState.builder(ClusterState.EMPTY_STATE)
+            .metaData(MetaData.builder(MetaData.EMPTY_META_DATA)
+                .put(meta, false)
+                .build())
+            .build();
+
+        logger.info("--> update for unchanged policy");
+        ClusterState updatedState = TransportPutLifecycleAction.updateIndicesForPolicy(existingState, REGISTRY,
+            client, oldPolicy, policyMetadata);
+
+        // No change, because the policies were identical
+        assertThat(updatedState, equalTo(existingState));
+
+        actions = new HashMap<>();
+        actions.put("rollover", new RolloverAction(null, null, 2L));
+        actions.put("set_priority", new SetPriorityAction(150));
+        hotPhase = new Phase("hot", TimeValue.ZERO, actions);
+        phases = Collections.singletonMap("hot", hotPhase);
+        newPolicy = new LifecyclePolicy("my-policy", phases);
+        policyMetadata = new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap(), 2L, 2L);
+
+        logger.info("--> update with changed policy, but not configured in settings");
+        updatedState = TransportPutLifecycleAction.updateIndicesForPolicy(existingState, REGISTRY, client, oldPolicy, policyMetadata);
+
+        // No change, because the index doesn't have a lifecycle.name setting for this policy
+        assertThat(updatedState, equalTo(existingState));
+
+        meta = IndexMetaData.builder(index)
+            .settings(Settings.builder()
+                .put(LifecycleSettings.LIFECYCLE_NAME, "my-policy")
+                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10))
+                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5))
+                .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+                .put(IndexMetaData.SETTING_INDEX_UUID, randomAlphaOfLength(5)))
+            .putCustom(ILM_CUSTOM_METADATA_KEY, exState.asMap())
+            .build();
+        existingState = ClusterState.builder(ClusterState.EMPTY_STATE)
+            .metaData(MetaData.builder(MetaData.EMPTY_META_DATA)
+                .put(meta, false)
+                .build())
+            .build();
+
+        logger.info("--> update with changed policy and this index has the policy");
+        updatedState = TransportPutLifecycleAction.updateIndicesForPolicy(existingState, REGISTRY, client, oldPolicy, policyMetadata);
+
+        IndexMetaData newIdxMeta = updatedState.metaData().index(index);
+        LifecycleExecutionState afterExState = LifecycleExecutionState.fromIndexMetadata(newIdxMeta);
+        Map<String, String> beforeState = new HashMap<>(exState.asMap());
+        beforeState.remove("phase_definition");
+        Map<String, String> afterState = new HashMap<>(afterExState.asMap());
+        afterState.remove("phase_definition");
+        // Check that no other execution state changes have been made
+        assertThat(beforeState, equalTo(afterState));
+
+        // Check that the phase definition has been refreshed
+        assertThat(afterExState.getPhaseDefinition(),
+            equalTo("{\"policy\":\"my-policy\",\"phase_definition\":{\"min_age\":\"0ms\",\"actions\":{\"rollover\":{\"max_docs\":2}," +
+                "\"set_priority\":{\"priority\":150}}},\"version\":2,\"modified_date_in_millis\":2}"));
+    }
+
+    private static IndexMetaData.Builder mkMeta() {
+        return IndexMetaData.builder(index)
+            .settings(Settings.builder()
+                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10))
+                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5))
+                .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+                .put(IndexMetaData.SETTING_INDEX_UUID, randomAlphaOfLength(5)));
+    }
+}