|
@@ -93,131 +93,121 @@ public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask
|
|
|
return currentState;
|
|
|
}
|
|
|
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetadata);
|
|
|
- if (currentStep.equals(registeredCurrentStep)) {
|
|
|
- ClusterState state = currentState;
|
|
|
- // We can do cluster state steps all together until we
|
|
|
- // either get to a step that isn't a cluster state step or a
|
|
|
- // cluster state wait step returns not completed
|
|
|
- while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
|
|
|
+ if (currentStep.equals(registeredCurrentStep) == false) {
|
|
|
+ // either we are no longer the master or the step is now
|
|
|
+ // not the same as when we submitted the update task. In
|
|
|
+ // either case we don't want to do anything now
|
|
|
+ return currentState;
|
|
|
+ }
|
|
|
+ ClusterState state = currentState;
|
|
|
+ // We can do cluster state steps all together until we
|
|
|
+ // either get to a step that isn't a cluster state step or a
|
|
|
+ // cluster state wait step returns not completed
|
|
|
+ while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
|
|
|
+ try {
|
|
|
if (currentStep instanceof ClusterStateActionStep) {
|
|
|
- // cluster state action step so do the action and
|
|
|
- // move the cluster state to the next step
|
|
|
- logger.trace(
|
|
|
- "[{}] performing cluster state action ({}) [{}]",
|
|
|
- index.getName(),
|
|
|
- currentStep.getClass().getSimpleName(),
|
|
|
- currentStep.getKey()
|
|
|
- );
|
|
|
- try {
|
|
|
- ClusterStateActionStep actionStep = (ClusterStateActionStep) currentStep;
|
|
|
- state = actionStep.performAction(index, state);
|
|
|
- // If this step (usually a CopyExecutionStateStep step) has brought the
|
|
|
- // index to where it needs to have async actions invoked, then add that
|
|
|
- // index to the list so that when the new cluster state has been
|
|
|
- // processed, the new indices will have their async actions invoked.
|
|
|
- Optional.ofNullable(actionStep.indexForAsyncInvocation())
|
|
|
- .ifPresent(tuple -> indexToStepKeysForAsyncActions.put(tuple.v1(), tuple.v2()));
|
|
|
- } catch (Exception exception) {
|
|
|
- return moveToErrorStep(state, currentStep.getKey(), exception);
|
|
|
- }
|
|
|
- // set here to make sure that the clusterProcessed knows to execute the
|
|
|
- // correct step if it an async action
|
|
|
- nextStepKey = currentStep.getNextStepKey();
|
|
|
- if (nextStepKey == null) {
|
|
|
- return state;
|
|
|
- } else {
|
|
|
- logger.trace("[{}] moving cluster state to next step [{}]", index.getName(), nextStepKey);
|
|
|
- state = IndexLifecycleTransition.moveClusterStateToStep(
|
|
|
- index,
|
|
|
- state,
|
|
|
- nextStepKey,
|
|
|
- nowSupplier,
|
|
|
- policyStepsRegistry,
|
|
|
- false
|
|
|
- );
|
|
|
- }
|
|
|
+ state = executeActionStep(state, currentStep);
|
|
|
} else {
|
|
|
- // cluster state wait step so evaluate the
|
|
|
- // condition, if the condition is met move to the
|
|
|
- // next step, if its not met return the current
|
|
|
- // cluster state so it can be applied and we will
|
|
|
- // wait for the next trigger to evaluate the
|
|
|
- // condition again
|
|
|
- logger.trace(
|
|
|
- "[{}] waiting for cluster state step condition ({}) [{}]",
|
|
|
- index.getName(),
|
|
|
- currentStep.getClass().getSimpleName(),
|
|
|
- currentStep.getKey()
|
|
|
- );
|
|
|
- ClusterStateWaitStep.Result result;
|
|
|
- try {
|
|
|
- result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
|
|
|
- } catch (Exception exception) {
|
|
|
- return moveToErrorStep(state, currentStep.getKey(), exception);
|
|
|
- }
|
|
|
- // some steps can decide to change the next step to execute after waiting for some time for the condition
|
|
|
- // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
|
|
|
- // re-evaluate what the next step is after we evaluate the condition
|
|
|
- nextStepKey = currentStep.getNextStepKey();
|
|
|
- if (result.complete()) {
|
|
|
- logger.trace(
|
|
|
- "[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}",
|
|
|
- index.getName(),
|
|
|
- currentStep.getClass().getSimpleName(),
|
|
|
- currentStep.getKey(),
|
|
|
- nextStepKey
|
|
|
- );
|
|
|
- if (nextStepKey == null) {
|
|
|
- return state;
|
|
|
- } else {
|
|
|
- state = IndexLifecycleTransition.moveClusterStateToStep(
|
|
|
- index,
|
|
|
- state,
|
|
|
- nextStepKey,
|
|
|
- nowSupplier,
|
|
|
- policyStepsRegistry,
|
|
|
- false
|
|
|
- );
|
|
|
- }
|
|
|
- } else {
|
|
|
- final ToXContentObject stepInfo = result.informationContext();
|
|
|
- if (logger.isTraceEnabled()) {
|
|
|
- logger.trace(
|
|
|
- "[{}] condition not met ({}) [{}], returning existing state (info: {})",
|
|
|
- index.getName(),
|
|
|
- currentStep.getClass().getSimpleName(),
|
|
|
- currentStep.getKey(),
|
|
|
- stepInfo == null ? "null" : Strings.toString(stepInfo)
|
|
|
- );
|
|
|
- }
|
|
|
- // We may have executed a step and set "nextStepKey" to
|
|
|
- // a value, but in this case, since the condition was
|
|
|
- // not met, we can't advance any way, so don't attempt
|
|
|
- // to run the current step
|
|
|
- nextStepKey = null;
|
|
|
- if (stepInfo == null) {
|
|
|
- return state;
|
|
|
- } else {
|
|
|
- return IndexLifecycleTransition.addStepInfoToClusterState(index, state, stepInfo);
|
|
|
- }
|
|
|
- }
|
|
|
+ state = executeWaitStep(state, currentStep);
|
|
|
}
|
|
|
- // There are actions we need to take in the event a phase
|
|
|
- // transition happens, so even if we would continue in the while
|
|
|
- // loop, if we are about to go into a new phase, return so that
|
|
|
- // other processing can occur
|
|
|
- if (currentStep.getKey().phase().equals(currentStep.getNextStepKey().phase()) == false) {
|
|
|
- return state;
|
|
|
- }
|
|
|
- currentStep = policyStepsRegistry.getStep(indexMetadata, currentStep.getNextStepKey());
|
|
|
+ } catch (Exception exception) {
|
|
|
+ return moveToErrorStep(state, currentStep.getKey(), exception);
|
|
|
}
|
|
|
+ if (nextStepKey == null) {
|
|
|
+ return state;
|
|
|
+ } else {
|
|
|
+ state = moveToNextStep(state);
|
|
|
+ }
|
|
|
+ // There are actions we need to take in the event a phase
|
|
|
+ // transition happens, so even if we would continue in the while
|
|
|
+ // loop, if we are about to go into a new phase, return so that
|
|
|
+ // other processing can occur
|
|
|
+ if (currentStep.getKey().phase().equals(currentStep.getNextStepKey().phase()) == false) {
|
|
|
+ return state;
|
|
|
+ }
|
|
|
+ currentStep = policyStepsRegistry.getStep(indexMetadata, currentStep.getNextStepKey());
|
|
|
+ }
|
|
|
+ return state;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ClusterState executeActionStep(ClusterState state, Step currentStep) {
|
|
|
+ // cluster state action step so do the action and
|
|
|
+ // move the cluster state to the next step
|
|
|
+ logger.trace(
|
|
|
+ "[{}] performing cluster state action ({}) [{}]",
|
|
|
+ index.getName(),
|
|
|
+ currentStep.getClass().getSimpleName(),
|
|
|
+ currentStep.getKey()
|
|
|
+ );
|
|
|
+ ClusterStateActionStep actionStep = (ClusterStateActionStep) currentStep;
|
|
|
+ state = actionStep.performAction(index, state);
|
|
|
+ // If this step (usually a CopyExecutionStateStep step) has brought the
|
|
|
+ // index to where it needs to have async actions invoked, then add that
|
|
|
+ // index to the list so that when the new cluster state has been
|
|
|
+ // processed, the new indices will have their async actions invoked.
|
|
|
+ Optional.ofNullable(actionStep.indexForAsyncInvocation())
|
|
|
+ .ifPresent(tuple -> indexToStepKeysForAsyncActions.put(tuple.v1(), tuple.v2()));
|
|
|
+ // set here to make sure that the clusterProcessed knows to execute the
|
|
|
+ // correct step if it an async action
|
|
|
+ nextStepKey = currentStep.getNextStepKey();
|
|
|
+ return state;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ClusterState executeWaitStep(ClusterState state, Step currentStep) {
|
|
|
+ // cluster state wait step so evaluate the
|
|
|
+ // condition, if the condition is met move to the
|
|
|
+ // next step, if its not met return the current
|
|
|
+ // cluster state so it can be applied and we will
|
|
|
+ // wait for the next trigger to evaluate the
|
|
|
+ // condition again
|
|
|
+ logger.trace(
|
|
|
+ "[{}] waiting for cluster state step condition ({}) [{}]",
|
|
|
+ index.getName(),
|
|
|
+ currentStep.getClass().getSimpleName(),
|
|
|
+ currentStep.getKey()
|
|
|
+ );
|
|
|
+ ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
|
|
|
+ // some steps can decide to change the next step to execute after waiting for some time for the condition
|
|
|
+ // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
|
|
|
+ // re-evaluate what the next step is after we evaluate the condition
|
|
|
+ nextStepKey = currentStep.getNextStepKey();
|
|
|
+ if (result.complete()) {
|
|
|
+ logger.trace(
|
|
|
+ "[{}] cluster state step condition met successfully ({}) [{}]",
|
|
|
+ index.getName(),
|
|
|
+ currentStep.getClass().getSimpleName(),
|
|
|
+ currentStep.getKey()
|
|
|
+ );
|
|
|
return state;
|
|
|
} else {
|
|
|
- // either we are no longer the master or the step is now
|
|
|
- // not the same as when we submitted the update task. In
|
|
|
- // either case we don't want to do anything now
|
|
|
- return currentState;
|
|
|
+ final ToXContentObject stepInfo = result.informationContext();
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace(
|
|
|
+ "[{}] condition not met ({}) [{}], returning existing state (info: {})",
|
|
|
+ index.getName(),
|
|
|
+ currentStep.getClass().getSimpleName(),
|
|
|
+ currentStep.getKey(),
|
|
|
+ stepInfo == null ? "null" : Strings.toString(stepInfo)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ // We may have executed a step and set "nextStepKey" to
|
|
|
+ // a value, but in this case, since the condition was
|
|
|
+ // not met, we can't advance any way, so don't attempt
|
|
|
+ // to run the current step
|
|
|
+ nextStepKey = null;
|
|
|
+ if (stepInfo == null) {
|
|
|
+ return state;
|
|
|
+ }
|
|
|
+ return IndexLifecycleTransition.addStepInfoToClusterState(index, state, stepInfo);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private ClusterState moveToNextStep(ClusterState state) {
|
|
|
+ if (nextStepKey == null) {
|
|
|
+ return state;
|
|
|
}
|
|
|
+ logger.trace("[{}] moving cluster state to next step [{}]", index.getName(), nextStepKey);
|
|
|
+ return IndexLifecycleTransition.moveClusterStateToStep(index, state, nextStepKey, nowSupplier, policyStepsRegistry, false);
|
|
|
}
|
|
|
|
|
|
@Override
|