|
@@ -5,6 +5,8 @@
|
|
|
*/
|
|
|
package org.elasticsearch.xpack.ml.action;
|
|
|
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.ElasticsearchStatusException;
|
|
|
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
@@ -50,6 +52,7 @@ import static org.elasticsearch.xpack.core.ml.MlTasks.JOB_TASK_NAME;
|
|
|
|
|
|
public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<SetUpgradeModeAction.Request, AcknowledgedResponse> {
|
|
|
|
|
|
+ private static final Logger logger = LogManager.getLogger(TransportSetUpgradeModeAction.class);
|
|
|
private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
|
|
private final PersistentTasksClusterService persistentTasksClusterService;
|
|
|
private final PersistentTasksService persistentTasksService;
|
|
@@ -88,6 +91,7 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
|
|
|
String msg = "Attempted to set [upgrade_mode] to [" +
|
|
|
request.isEnabled() + "] from [" + MlMetadata.getMlMetadata(state).isUpgradeMode() +
|
|
|
"] while previous request was processing.";
|
|
|
+ logger.info(msg);
|
|
|
Exception detail = new IllegalStateException(msg);
|
|
|
listener.onFailure(new ElasticsearchStatusException(
|
|
|
"Cannot change [upgrade_mode]. Previous request is still being processed.",
|
|
@@ -98,17 +102,23 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
|
|
|
|
|
|
// Noop, nothing for us to do, simply return fast to the caller
|
|
|
if (request.isEnabled() == MlMetadata.getMlMetadata(state).isUpgradeMode()) {
|
|
|
+ logger.info("Upgrade mode noop");
|
|
|
isRunning.set(false);
|
|
|
listener.onResponse(new AcknowledgedResponse(true));
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ logger.info("Starting to set [upgrade_mode] to [" + request.isEnabled() +
|
|
|
+ "] from [" + MlMetadata.getMlMetadata(state).isUpgradeMode() + "]");
|
|
|
+
|
|
|
ActionListener<AcknowledgedResponse> wrappedListener = ActionListener.wrap(
|
|
|
r -> {
|
|
|
+ logger.info("Completed upgrade mode request");
|
|
|
isRunning.set(false);
|
|
|
listener.onResponse(r);
|
|
|
},
|
|
|
e -> {
|
|
|
+ logger.info("Completed upgrade mode request but with failure", e);
|
|
|
isRunning.set(false);
|
|
|
listener.onFailure(e);
|
|
|
}
|
|
@@ -131,9 +141,14 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
|
|
|
try {
|
|
|
// Handle potential node timeouts,
|
|
|
// these should be considered failures as tasks as still potentially executing
|
|
|
+ logger.info("Waited for tasks to be unassigned");
|
|
|
+ if (r.getNodeFailures().isEmpty() == false) {
|
|
|
+ logger.info("There were node failures waiting for tasks", r.getNodeFailures().get(0));
|
|
|
+ }
|
|
|
rethrowAndSuppress(r.getNodeFailures());
|
|
|
wrappedListener.onResponse(new AcknowledgedResponse(true));
|
|
|
} catch (ElasticsearchException ex) {
|
|
|
+ logger.info("Caught node failures waiting for tasks to be unassigned", ex);
|
|
|
wrappedListener.onFailure(ex);
|
|
|
}
|
|
|
},
|
|
@@ -144,7 +159,10 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
|
|
|
|
|
|
// <3> After isolating the datafeeds, unassign the tasks
|
|
|
ActionListener<List<IsolateDatafeedAction.Response>> isolateDatafeedListener = ActionListener.wrap(
|
|
|
- isolatedDatafeeds -> unassignPersistentTasks(tasksCustomMetaData, unassignPersistentTasksListener),
|
|
|
+ isolatedDatafeeds -> {
|
|
|
+ logger.info("Isolated the datafeeds");
|
|
|
+ unassignPersistentTasks(tasksCustomMetaData, unassignPersistentTasksListener);
|
|
|
+ },
|
|
|
wrappedListener::onFailure
|
|
|
);
|
|
|
|
|
@@ -176,20 +194,24 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
|
|
|
// State change was not acknowledged, we either timed out or ran into some exception
|
|
|
// We should not continue and alert failure to the end user
|
|
|
if (acknowledgedResponse.isAcknowledged() == false) {
|
|
|
+ logger.info("Cluster state update is NOT acknowledged");
|
|
|
wrappedListener.onFailure(new ElasticsearchTimeoutException("Unknown error occurred while updating cluster state"));
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// There are no tasks to worry about starting/stopping
|
|
|
if (tasksCustomMetaData == null || tasksCustomMetaData.tasks().isEmpty()) {
|
|
|
+ logger.info("No tasks to worry about after state update");
|
|
|
wrappedListener.onResponse(new AcknowledgedResponse(true));
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// Did we change from disabled -> enabled?
|
|
|
if (request.isEnabled()) {
|
|
|
+ logger.info("Enabling upgrade mode, must isolate datafeeds");
|
|
|
isolateDatafeeds(tasksCustomMetaData, isolateDatafeedListener);
|
|
|
} else {
|
|
|
+ logger.info("Disabling upgrade mode, must wait for tasks to not have AWAITING_UPGRADE assignment");
|
|
|
persistentTasksService.waitForPersistentTasksCondition(
|
|
|
(persistentTasksCustomMetaData) ->
|
|
|
// Wait for jobs to not be "Awaiting upgrade"
|
|
@@ -202,7 +224,10 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
|
|
|
(t) -> t.getAssignment().equals(AWAITING_UPGRADE))
|
|
|
.isEmpty(),
|
|
|
request.timeout(),
|
|
|
- ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure)
|
|
|
+ ActionListener.wrap(r -> {
|
|
|
+ logger.info("Done waiting for tasks to be out of AWAITING_UPGRADE");
|
|
|
+ wrappedListener.onResponse(new AcknowledgedResponse(true));
|
|
|
+ }, wrappedListener::onFailure)
|
|
|
);
|
|
|
}
|
|
|
},
|
|
@@ -215,11 +240,13 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
|
|
|
|
|
|
@Override
|
|
|
protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
|
|
+ logger.info("Cluster update response built: " + acknowledged);
|
|
|
return new AcknowledgedResponse(acknowledged);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) throws Exception {
|
|
|
+ logger.info("Executing cluster state update");
|
|
|
MlMetadata.Builder builder = new MlMetadata.Builder(currentState.metaData().custom(MlMetadata.TYPE));
|
|
|
builder.isUpgradeMode(request.isEnabled());
|
|
|
ClusterState.Builder newState = ClusterState.builder(currentState);
|