|
@@ -5,7 +5,11 @@
|
|
|
*/
|
|
|
package org.elasticsearch.xpack.core.ilm;
|
|
|
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
+import org.elasticsearch.cluster.ClusterState;
|
|
|
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.common.ParseField;
|
|
|
import org.elasticsearch.common.Strings;
|
|
@@ -15,12 +19,15 @@ import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.common.xcontent.XContentParser;
|
|
|
+import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
+import java.util.Locale;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.function.BiPredicate;
|
|
|
|
|
|
/**
|
|
|
* A {@link LifecycleAction} which shrinks the index.
|
|
@@ -29,6 +36,10 @@ public class ShrinkAction implements LifecycleAction {
|
|
|
public static final String NAME = "shrink";
|
|
|
public static final String SHRUNKEN_INDEX_PREFIX = "shrink-";
|
|
|
public static final ParseField NUMBER_OF_SHARDS_FIELD = new ParseField("number_of_shards");
|
|
|
+ public static final String CONDITIONAL_SKIP_SHRINK_STEP = BranchingStep.NAME + "-check-prerequisites";
|
|
|
+ public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
|
|
|
+
|
|
|
+ private static final Logger logger = LogManager.getLogger(ShrinkAction.class);
|
|
|
|
|
|
private static final ConstructingObjectParser<ShrinkAction, Void> PARSER =
|
|
|
new ConstructingObjectParser<>(NAME, a -> new ShrinkAction((Integer) a[0]));
|
|
@@ -85,7 +96,7 @@ public class ShrinkAction implements LifecycleAction {
|
|
|
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
|
|
|
Settings readOnlySettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build();
|
|
|
|
|
|
- StepKey branchingKey = new StepKey(phase, NAME, BranchingStep.NAME);
|
|
|
+ StepKey preShrinkBranchingKey = new StepKey(phase, NAME, CONDITIONAL_SKIP_SHRINK_STEP);
|
|
|
StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
|
|
|
StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyAction.NAME);
|
|
|
StepKey setSingleNodeKey = new StepKey(phase, NAME, SetSingleNodeAllocateStep.NAME);
|
|
@@ -93,23 +104,77 @@ public class ShrinkAction implements LifecycleAction {
|
|
|
StepKey shrinkKey = new StepKey(phase, NAME, ShrinkStep.NAME);
|
|
|
StepKey enoughShardsKey = new StepKey(phase, NAME, ShrunkShardsAllocatedStep.NAME);
|
|
|
StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
|
|
|
+ StepKey dataStreamCheckBranchingKey = new StepKey(phase, NAME, CONDITIONAL_DATASTREAM_CHECK_KEY);
|
|
|
StepKey aliasKey = new StepKey(phase, NAME, ShrinkSetAliasStep.NAME);
|
|
|
StepKey isShrunkIndexKey = new StepKey(phase, NAME, ShrunkenIndexCheckStep.NAME);
|
|
|
+ StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
|
|
|
+ StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME);
|
|
|
|
|
|
- BranchingStep conditionalSkipShrinkStep = new BranchingStep(branchingKey, waitForNoFollowerStepKey, nextStepKey,
|
|
|
- (index, clusterState) -> clusterState.getMetadata().index(index).getNumberOfShards() == numberOfShards);
|
|
|
+ BranchingStep conditionalSkipShrinkStep = new BranchingStep(preShrinkBranchingKey, waitForNoFollowerStepKey, nextStepKey,
|
|
|
+ getSkipShrinkStepPredicate(numberOfShards));
|
|
|
WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, readOnlyKey, client);
|
|
|
UpdateSettingsStep readOnlyStep = new UpdateSettingsStep(readOnlyKey, setSingleNodeKey, client, readOnlySettings);
|
|
|
SetSingleNodeAllocateStep setSingleNodeStep = new SetSingleNodeAllocateStep(setSingleNodeKey, allocationRoutedKey, client);
|
|
|
CheckShrinkReadyStep checkShrinkReadyStep = new CheckShrinkReadyStep(allocationRoutedKey, shrinkKey);
|
|
|
ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, SHRUNKEN_INDEX_PREFIX);
|
|
|
ShrunkShardsAllocatedStep allocated = new ShrunkShardsAllocatedStep(enoughShardsKey, copyMetadataKey, SHRUNKEN_INDEX_PREFIX);
|
|
|
- CopyExecutionStateStep copyMetadata = new CopyExecutionStateStep(copyMetadataKey, aliasKey, SHRUNKEN_INDEX_PREFIX,
|
|
|
- ShrunkenIndexCheckStep.NAME);
|
|
|
+ CopyExecutionStateStep copyMetadata = new CopyExecutionStateStep(copyMetadataKey, dataStreamCheckBranchingKey,
|
|
|
+ SHRUNKEN_INDEX_PREFIX, ShrunkenIndexCheckStep.NAME);
|
|
|
+ // by the time we get to this step we have 2 indices, the source and the shrunken one. we now need to choose an index
|
|
|
+ // swapping strategy such that the shrunken index takes the place of the source index (which is also deleted).
|
|
|
+ // if the source index is part of a data stream it's a matter of replacing it with the shrunken index one in the data stream and
|
|
|
+ // then deleting the source index; otherwise we'll use the alias management api to atomically transfer the aliases from source to
|
|
|
+ // the shrunken index and delete the source
|
|
|
+ BranchingStep isDataStreamBranchingStep = new BranchingStep(dataStreamCheckBranchingKey, aliasKey, replaceDataStreamIndexKey,
|
|
|
+ (index, clusterState) -> {
|
|
|
+ IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName());
|
|
|
+ assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
|
|
|
+ return indexAbstraction.getParentDataStream() != null;
|
|
|
+ });
|
|
|
ShrinkSetAliasStep aliasSwapAndDelete = new ShrinkSetAliasStep(aliasKey, isShrunkIndexKey, client, SHRUNKEN_INDEX_PREFIX);
|
|
|
+ ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep(replaceDataStreamIndexKey,
|
|
|
+ deleteIndexKey, SHRUNKEN_INDEX_PREFIX);
|
|
|
+ DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, isShrunkIndexKey, client);
|
|
|
ShrunkenIndexCheckStep waitOnShrinkTakeover = new ShrunkenIndexCheckStep(isShrunkIndexKey, nextStepKey, SHRUNKEN_INDEX_PREFIX);
|
|
|
return Arrays.asList(conditionalSkipShrinkStep, waitForNoFollowersStep, readOnlyStep, setSingleNodeStep, checkShrinkReadyStep,
|
|
|
- shrink, allocated, copyMetadata, aliasSwapAndDelete, waitOnShrinkTakeover);
|
|
|
+ shrink, allocated, copyMetadata, isDataStreamBranchingStep, aliasSwapAndDelete, waitOnShrinkTakeover,
|
|
|
+ replaceDataStreamBackingIndex, deleteSourceIndexStep);
|
|
|
+ }
|
|
|
+
|
|
|
+ static BiPredicate<Index, ClusterState> getSkipShrinkStepPredicate(int targetNumberOfShards) {
|
|
|
+ return (index, clusterState) -> {
|
|
|
+ IndexMetadata indexMetadata = clusterState.getMetadata().index(index);
|
|
|
+ if (indexMetadata == null) {
|
|
|
+ // Index must have been since deleted, skip the shrink action
|
|
|
+ logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", NAME, index.getName());
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (indexMetadata.getNumberOfShards() == targetNumberOfShards) {
|
|
|
+ logger.debug("skipping [{}] lifecycle action for index [{}] because the index already has the target number of shards",
|
|
|
+ NAME, index.getName());
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName());
|
|
|
+ assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
|
|
|
+
|
|
|
+ if (indexAbstraction.getParentDataStream() != null) {
|
|
|
+ IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
|
|
|
+ assert dataStream.getWriteIndex() != null : dataStream.getName() + " has no write index";
|
|
|
+ if (dataStream.getWriteIndex().getIndex().getName().equals(index.getName())) {
|
|
|
+ String policyName = indexMetadata.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
|
|
|
+ String errorMessage = String.format(Locale.ROOT,
|
|
|
+ "index [%s] is the write index for data stream [%s]. stopping execution of lifecycle [%s] as a data stream's " +
|
|
|
+ "write index cannot be shrunk. manually rolling over the index will resume the execution of the policy " +
|
|
|
+ "as the index will not be the data stream's write index anymore",
|
|
|
+ index.getName(), dataStream.getName(), policyName);
|
|
|
+ logger.debug(errorMessage);
|
|
|
+ throw new IllegalStateException(errorMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ };
|
|
|
}
|
|
|
|
|
|
@Override
|