Browse Source

Allow shrink in the hot phase for ILM policies (#64008)

Joe Gallo 5 years ago
parent
commit
9986cb80ab

+ 5 - 2
docs/reference/ilm/actions/ilm-shrink.asciidoc

@@ -2,7 +2,7 @@
 [[ilm-shrink]]
 === Shrink
 
-Phases allowed: warm
+Phases allowed: hot, warm.
 
 Sets an index to <<dynamic-index-settings, read-only>> 
 and shrinks it into a new index with fewer primary shards. 
@@ -11,9 +11,12 @@ For example, if the name of the source index is _logs_,
 the name of the shrunken index is _shrink-logs_.
 
 The shrink action allocates all primary shards of the index to one node so it 
-can call  the <<indices-shrink-index,Shrink API>> to shrink the index.
+can call the <<indices-shrink-index,Shrink API>> to shrink the index.
 After shrinking, it swaps aliases that point to the original index to the new shrunken index. 
 
+To use the `shrink` action in the `hot` phase, the `rollover` action *must* be present.
+If no rollover action is configured, {ilm-init} will reject the policy.
+
 [IMPORTANT]
 If the shrink action is used on a <<ccr-put-follow,follower index>>, 
 policy execution waits until the leader index rolls over (or is

+ 4 - 2
docs/reference/ilm/ilm-index-lifecycle.asciidoc

@@ -77,22 +77,24 @@ the rollover criteria, it could be 20 minutes before the rollover is complete.
 * Hot
   - <<ilm-set-priority,Set Priority>>
   - <<ilm-unfollow,Unfollow>>
-  - <<ilm-forcemerge,Force Merge>>
   - <<ilm-rollover,Rollover>>
+  - <<ilm-shrink,Shrink>>
+  - <<ilm-forcemerge,Force Merge>>
 * Warm
   - <<ilm-set-priority,Set Priority>>
   - <<ilm-unfollow,Unfollow>>
   - <<ilm-readonly,Read-Only>>
   - <<ilm-allocate,Allocate>>
+  - <<ilm-migrate,Migrate>>
   - <<ilm-shrink,Shrink>>
   - <<ilm-forcemerge,Force Merge>>
 * Cold
   - <<ilm-set-priority-action,Set Priority>>
   - <<ilm-unfollow-action,Unfollow>>
   - <<ilm-allocate,Allocate>>
+  - <<ilm-migrate,Migrate>>
   - <<ilm-freeze,Freeze>>
   - <<ilm-searchable-snapshot, Searchable Snapshot>>
 * Delete
   - <<ilm-wait-for-snapshot-action,Wait For Snapshot>>
   - <<ilm-delete,Delete>>
-  

+ 35 - 35
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java

@@ -40,7 +40,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
     static final String DELETE_PHASE = "delete";
     static final List<String> VALID_PHASES = Arrays.asList(HOT_PHASE, WARM_PHASE, COLD_PHASE, DELETE_PHASE);
     static final List<String> ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME,
-        ForceMergeAction.NAME);
+        ShrinkAction.NAME, ForceMergeAction.NAME);
     static final List<String> ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME,
         AllocateAction.NAME, MigrateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
     static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
@@ -50,14 +50,13 @@ public class TimeseriesLifecycleType implements LifecycleType {
     static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
     static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
     static final Set<String> VALID_DELETE_ACTIONS = Sets.newHashSet(ORDERED_VALID_DELETE_ACTIONS);
-    private static final Map<String, Set<String>> ALLOWED_ACTIONS = new HashMap<>();
+    private static final Map<String, Set<String>> ALLOWED_ACTIONS = Map.of(
+        HOT_PHASE, VALID_HOT_ACTIONS,
+        WARM_PHASE, VALID_WARM_ACTIONS,
+        COLD_PHASE, VALID_COLD_ACTIONS,
+        DELETE_PHASE, VALID_DELETE_ACTIONS);
 
-    static {
-        ALLOWED_ACTIONS.put(HOT_PHASE, VALID_HOT_ACTIONS);
-        ALLOWED_ACTIONS.put(WARM_PHASE, VALID_WARM_ACTIONS);
-        ALLOWED_ACTIONS.put(COLD_PHASE, VALID_COLD_ACTIONS);
-        ALLOWED_ACTIONS.put(DELETE_PHASE, VALID_DELETE_ACTIONS);
-    }
+    static final Set<String> HOT_ACTIONS_THAT_REQUIRE_ROLLOVER = Sets.newHashSet(ShrinkAction.NAME, ForceMergeAction.NAME);
 
     private TimeseriesLifecycleType() {
     }
@@ -157,16 +156,16 @@ public class TimeseriesLifecycleType implements LifecycleType {
         Map<String, LifecycleAction> actions = phase.getActions();
         switch (phase.getName()) {
             case HOT_PHASE:
-                return ORDERED_VALID_HOT_ACTIONS.stream().map(a -> actions.getOrDefault(a, null))
+                return ORDERED_VALID_HOT_ACTIONS.stream().map(actions::get)
                     .filter(Objects::nonNull).collect(toList());
             case WARM_PHASE:
-                return ORDERED_VALID_WARM_ACTIONS.stream().map(a -> actions.getOrDefault(a, null))
+                return ORDERED_VALID_WARM_ACTIONS.stream().map(actions::get)
                     .filter(Objects::nonNull).collect(toList());
             case COLD_PHASE:
-                return ORDERED_VALID_COLD_ACTIONS.stream().map(a -> actions.getOrDefault(a, null))
+                return ORDERED_VALID_COLD_ACTIONS.stream().map(actions::get)
                     .filter(Objects::nonNull).collect(toList());
             case DELETE_PHASE:
-                return ORDERED_VALID_DELETE_ACTIONS.stream().map(a -> actions.getOrDefault(a, null))
+                return ORDERED_VALID_DELETE_ACTIONS.stream().map(actions::get)
                     .filter(Objects::nonNull).collect(toList());
             default:
                 throw new IllegalArgumentException("lifecycle type[" + TYPE + "] does not support phase[" + phase.getName() + "]");
@@ -177,20 +176,20 @@ public class TimeseriesLifecycleType implements LifecycleType {
     public String getNextActionName(String currentActionName, Phase phase) {
         List<String> orderedActionNames;
         switch (phase.getName()) {
-        case HOT_PHASE:
-            orderedActionNames = ORDERED_VALID_HOT_ACTIONS;
-            break;
-        case WARM_PHASE:
-            orderedActionNames = ORDERED_VALID_WARM_ACTIONS;
-            break;
-        case COLD_PHASE:
-            orderedActionNames = ORDERED_VALID_COLD_ACTIONS;
-            break;
-        case DELETE_PHASE:
-            orderedActionNames = ORDERED_VALID_DELETE_ACTIONS;
-            break;
-        default:
-            throw new IllegalArgumentException("lifecycle type [" + TYPE + "] does not support phase [" + phase.getName() + "]");
+            case HOT_PHASE:
+                orderedActionNames = ORDERED_VALID_HOT_ACTIONS;
+                break;
+            case WARM_PHASE:
+                orderedActionNames = ORDERED_VALID_WARM_ACTIONS;
+                break;
+            case COLD_PHASE:
+                orderedActionNames = ORDERED_VALID_COLD_ACTIONS;
+                break;
+            case DELETE_PHASE:
+                orderedActionNames = ORDERED_VALID_DELETE_ACTIONS;
+                break;
+            default:
+                throw new IllegalArgumentException("lifecycle type [" + TYPE + "] does not support phase [" + phase.getName() + "]");
         }
 
         int index = orderedActionNames.indexOf(currentActionName);
@@ -226,17 +225,18 @@ public class TimeseriesLifecycleType implements LifecycleType {
             });
         });
 
-        // Check for forcemerge in 'hot' without a rollover action
-        if (phases.stream()
+        // Check for actions in the hot phase that require a rollover
+        String invalidHotPhaseActions = phases.stream()
             // Is there a hot phase
             .filter(phase -> HOT_PHASE.equals(phase.getName()))
-            // That contains the 'forcemerge' action
-            .filter(phase -> phase.getActions().containsKey(ForceMergeAction.NAME))
-            // But does *not* contain the 'rollover' action?
-            .anyMatch(phase -> phase.getActions().containsKey(RolloverAction.NAME) == false)) {
-            // If there is, throw an exception
-            throw new IllegalArgumentException("the [" + ForceMergeAction.NAME +
-                "] action may not be used in the [" + HOT_PHASE +
+            // that does *not* contain the 'rollover' action
+            .filter(phase -> phase.getActions().containsKey(RolloverAction.NAME) == false)
+            // but that does have actions that require a rollover action?
+            .flatMap(phase -> Sets.intersection(phase.getActions().keySet(), HOT_ACTIONS_THAT_REQUIRE_ROLLOVER).stream())
+            .collect(Collectors.joining(", "));
+        if (Strings.hasText(invalidHotPhaseActions)) {
+            throw new IllegalArgumentException("the [" + invalidHotPhaseActions +
+                "] action(s) may not be used in the [" + HOT_PHASE +
                 "] phase without an accompanying [" + RolloverAction.NAME + "] action");
         }
 

+ 30 - 29
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java

@@ -99,7 +99,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
     public static LifecyclePolicy randomTimeseriesLifecyclePolicyWithAllPhases(@Nullable String lifecycleName) {
         List<String> phaseNames = TimeseriesLifecycleType.VALID_PHASES;
         Map<String, Phase> phases = new HashMap<>(phaseNames.size());
-        Function<String, Set<String>> validActions = (phase) ->  {
+        Function<String, Set<String>> validActions = (phase) -> {
             switch (phase) {
                 case "hot":
                     return TimeseriesLifecycleType.VALID_HOT_ACTIONS;
@@ -112,14 +112,14 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
                 default:
                     throw new IllegalArgumentException("invalid phase [" + phase + "]");
             }};
-        Function<String, LifecycleAction> randomAction = (action) ->  {
+        Function<String, LifecycleAction> randomAction = (action) -> {
             switch (action) {
                 case AllocateAction.NAME:
                     return AllocateActionTests.randomInstance();
                 case DeleteAction.NAME:
                     return new DeleteAction();
                 case WaitForSnapshotAction.NAME:
-                    return  WaitForSnapshotActionTests.randomInstance();
+                    return WaitForSnapshotActionTests.randomInstance();
                 case ForceMergeAction.NAME:
                     return ForceMergeActionTests.randomInstance();
                 case ReadOnlyAction.NAME:
@@ -157,7 +157,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
         List<String> phaseNames = randomSubsetOf(
             between(0, TimeseriesLifecycleType.VALID_PHASES.size() - 1), TimeseriesLifecycleType.VALID_PHASES);
         Map<String, Phase> phases = new HashMap<>(phaseNames.size());
-        Function<String, Set<String>> validActions = (phase) ->  {
+        Function<String, Set<String>> validActions = (phase) -> {
             switch (phase) {
                 case "hot":
                     return TimeseriesLifecycleType.VALID_HOT_ACTIONS;
@@ -170,7 +170,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
                 default:
                     throw new IllegalArgumentException("invalid phase [" + phase + "]");
             }};
-        Function<String, LifecycleAction> randomAction = (action) ->  {
+        Function<String, LifecycleAction> randomAction = (action) -> {
             switch (action) {
                 case AllocateAction.NAME:
                     return AllocateActionTests.randomInstance();
@@ -204,8 +204,9 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
             Map<String, LifecycleAction> actions = new HashMap<>();
             List<String> actionNames = randomSubsetOf(validActions.apply(phase));
 
-            // If the hot phase contains a forcemerge, also make sure to add a rollover, or else the policy will not validate
-            if (phase.equals(TimeseriesLifecycleType.HOT_PHASE) && actionNames.contains(ForceMergeAction.NAME)) {
+            // If the hot phase has any actions that require a rollover, then ensure there is one so that the policy will validate
+            if (phase.equals(TimeseriesLifecycleType.HOT_PHASE)
+                && actionNames.stream().anyMatch(TimeseriesLifecycleType.HOT_ACTIONS_THAT_REQUIRE_ROLLOVER::contains)) {
                 actionNames.add(RolloverAction.NAME);
             }
 
@@ -238,16 +239,16 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
         String name = instance.getName();
         Map<String, Phase> phases = instance.getPhases();
         switch (between(0, 1)) {
-        case 0:
-            name = name + randomAlphaOfLengthBetween(1, 5);
-            break;
-        case 1:
-            String phaseName = randomValueOtherThanMany(phases::containsKey, () -> randomFrom(TimeseriesLifecycleType.VALID_PHASES));
-            phases = new LinkedHashMap<>(phases);
-            phases.put(phaseName, new Phase(phaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)), Collections.emptyMap()));
-            break;
-        default:
-            throw new AssertionError("Illegal randomisation branch");
+            case 0:
+                name = name + randomAlphaOfLengthBetween(1, 5);
+                break;
+            case 1:
+                String phaseName = randomValueOtherThanMany(phases::containsKey, () -> randomFrom(TimeseriesLifecycleType.VALID_PHASES));
+                phases = new LinkedHashMap<>(phases);
+                phases.put(phaseName, new Phase(phaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)), Collections.emptyMap()));
+                break;
+            default:
+                throw new AssertionError("Illegal randomisation branch");
         }
         return new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, name, phases);
     }
@@ -300,7 +301,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
         MockStep secondActionStep = new MockStep(new StepKey("second_phase", "test2", "test"),
             PhaseCompleteStep.finalStep("second_phase").getKey());
         MockStep secondAfter = new MockStep(new StepKey("first_phase", PhaseCompleteStep.NAME, PhaseCompleteStep.NAME),
-                secondActionStep.getKey());
+            secondActionStep.getKey());
         MockStep firstActionAnotherStep = new MockStep(new StepKey("first_phase", "test", "bar"), secondAfter.getKey());
         MockStep firstActionStep = new MockStep(new StepKey("first_phase", "test", "foo"), firstActionAnotherStep.getKey());
         MockStep firstAfter = new MockStep(new StepKey("new", PhaseCompleteStep.NAME, PhaseCompleteStep.NAME), firstActionStep.getKey());
@@ -352,30 +353,30 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
         assertFalse(policy.isActionSafe(new StepKey("second_phase", MockAction.NAME, randomAlphaOfLength(10))));
 
         IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
-                () -> policy.isActionSafe(new StepKey("non_existant_phase", MockAction.NAME, randomAlphaOfLength(10))));
+            () -> policy.isActionSafe(new StepKey("non_existant_phase", MockAction.NAME, randomAlphaOfLength(10))));
         assertEquals("Phase [non_existant_phase]  does not exist in policy [" + policy.getName() + "]", exception.getMessage());
 
         exception = expectThrows(IllegalArgumentException.class,
-                () -> policy.isActionSafe(new StepKey("first_phase", "non_existant_action", randomAlphaOfLength(10))));
+            () -> policy.isActionSafe(new StepKey("first_phase", "non_existant_action", randomAlphaOfLength(10))));
         assertEquals("Action [non_existant_action] in phase [first_phase]  does not exist in policy [" + policy.getName() + "]",
-                exception.getMessage());
+            exception.getMessage());
 
         assertTrue(policy.isActionSafe(new StepKey("new", randomAlphaOfLength(10), randomAlphaOfLength(10))));
     }
 
     public void testValidatePolicyName() {
-        expectThrows(IllegalArgumentException.class, () -> LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(0,10) +
-            "," + randomAlphaOfLengthBetween(0,10)));
-        expectThrows(IllegalArgumentException.class, () -> LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(0,10) +
-            " " + randomAlphaOfLengthBetween(0,10)));
+        expectThrows(IllegalArgumentException.class, () -> LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(0, 10) +
+            "," + randomAlphaOfLengthBetween(0, 10)));
+        expectThrows(IllegalArgumentException.class, () -> LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(0, 10) +
+            " " + randomAlphaOfLengthBetween(0, 10)));
         expectThrows(IllegalArgumentException.class, () -> LifecyclePolicy.validatePolicyName("_" + randomAlphaOfLengthBetween(1, 20)));
         expectThrows(IllegalArgumentException.class, () -> LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(256, 1000)));
 
-        LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(1,10) + "_" + randomAlphaOfLengthBetween(0,10));
+        LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(1, 10) + "_" + randomAlphaOfLengthBetween(0, 10));
 
-        LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(0,10) + "-" + randomAlphaOfLengthBetween(0,10));
-        LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(0,10) + "+" + randomAlphaOfLengthBetween(0,10));
+        LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(0, 10) + "-" + randomAlphaOfLengthBetween(0, 10));
+        LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(0, 10) + "+" + randomAlphaOfLengthBetween(0, 10));
 
-        LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(1,255));
+        LifecyclePolicy.validatePolicyName(randomAlphaOfLengthBetween(1, 255));
     }
 }

+ 1 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java

@@ -104,7 +104,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
             IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
                 () -> validateHotActions.accept(Arrays.asList(ForceMergeAction.NAME)));
             assertThat(e.getMessage(),
-                containsString("the [forcemerge] action may not be used in the [hot] phase without an accompanying [rollover] action"));
+                containsString("the [forcemerge] action(s) may not be used in the [hot] phase without an accompanying [rollover] action"));
         }
     }
 
@@ -407,7 +407,6 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
         assertInvalidAction("hot", AllocateAction.NAME, new String[] { RolloverAction.NAME });
         assertInvalidAction("hot", DeleteAction.NAME, new String[] { RolloverAction.NAME });
         assertInvalidAction("hot", ReadOnlyAction.NAME, new String[] { RolloverAction.NAME });
-        assertInvalidAction("hot", ShrinkAction.NAME, new String[] { RolloverAction.NAME });
 
         // Warm Phase
         assertNextActionName("warm", SetPriorityAction.NAME, UnfollowAction.NAME,

+ 42 - 0
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java

@@ -559,6 +559,48 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
         assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot")));
     }
 
+    public void testShrinkActionInTheHotPhase() throws Exception {
+        int numShards = 2;
+        int expectedFinalShards = 1;
+        String originalIndex = index + "-000001";
+        String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + originalIndex;
+
+        // add a policy
+        Map<String, LifecycleAction> hotActions = Map.of(
+            RolloverAction.NAME, new RolloverAction(null, null, 1L),
+            ShrinkAction.NAME, new ShrinkAction(expectedFinalShards));
+        Map<String, Phase> phases = Map.of(
+            "hot", new Phase("hot", TimeValue.ZERO, hotActions));
+        LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases);
+        Request createPolicyRequest = new Request("PUT", "_ilm/policy/" + policy);
+        createPolicyRequest.setJsonEntity("{ \"policy\":" + Strings.toString(lifecyclePolicy) + "}");
+        client().performRequest(createPolicyRequest);
+
+        // and a template
+        Request createTemplateRequest = new Request("PUT", "_template/" + index);
+        createTemplateRequest.setJsonEntity("{" +
+            "\"index_patterns\": [\"" + index + "-*\"], \n" +
+            "  \"settings\": {\n" +
+            "    \"number_of_shards\": " + numShards + ",\n" +
+            "    \"number_of_replicas\": 0,\n" +
+            "    \"index.lifecycle.name\": \"" + policy + "\", \n" +
+            "    \"index.lifecycle.rollover_alias\": \"" + alias + "\"\n" +
+            "  }\n" +
+            "}");
+        client().performRequest(createTemplateRequest);
+
+        // then create the index and index a document to trigger rollover
+        createIndexWithSettings(client(), originalIndex, alias, Settings.builder(), true);
+        index(client(), originalIndex, "_id", "foo", "bar");
+
+        assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS);
+        assertBusy(() -> assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
+        assertBusy(() -> {
+            Map<String, Object> settings = getOnlyIndexSettings(client(), shrunkenIndex);
+            assertThat(settings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards)));
+        });
+    }
+
     public void testSetSingleNodeAllocationRetriesUntilItSucceeds() throws Exception {
         int numShards = 2;
         int expectedFinalShards = 1;

+ 21 - 1
x-pack/plugin/ilm/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/ilm/10_basic.yml

@@ -107,7 +107,7 @@ setup:
         body:
           settings:
             index.lifecycle.name: "my_moveable_timeseries_lifecycle"
-  
+
   - do:
       ilm.put_lifecycle:
         policy: "my_timeseries_lifecycle"
@@ -238,3 +238,23 @@ setup:
                }
              }
            }
+
+  - do:
+      catch: bad_request
+      ilm.put_lifecycle:
+        policy: "my_invalid_lifecycle"
+        body: |
+           {
+             "policy": {
+               "phases": {
+                 "hot": {
+                   "min_age": "0s",
+                   "actions": {
+                     "shrink": {
+                       "number_of_shards": 1
+                     }
+                   }
+                 }
+               }
+             }
+           }