|
@@ -33,7 +33,9 @@ import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.tasks.TaskId;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
+import org.elasticsearch.xpack.core.ClientHelper;
|
|
|
import org.elasticsearch.xpack.core.ml.MlTasks;
|
|
|
+import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
|
|
|
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
|
|
|
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
|
|
|
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
|
|
@@ -162,11 +164,20 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
|
|
|
},
|
|
|
finalListener::onFailure);
|
|
|
|
|
|
- ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
|
|
|
+ ActionListener<AcknowledgedResponse> datafeedDeleteListener = ActionListener.wrap(
|
|
|
response -> {
|
|
|
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId));
|
|
|
- markJobAsDeletingIfNotUsed(request.getJobId(), taskId, markAsDeletingListener);
|
|
|
+ cancelResetTaskIfExists(request.getJobId(), ActionListener.wrap(
|
|
|
+ r -> jobConfigProvider.updateJobBlockReason(request.getJobId(), new Blocked(Blocked.Reason.DELETE, taskId),
|
|
|
+ markAsDeletingListener),
|
|
|
+ finalListener::onFailure
|
|
|
+ ));
|
|
|
},
|
|
|
+ finalListener::onFailure
|
|
|
+ );
|
|
|
+
|
|
|
+ ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
|
|
|
+ response -> deleteDatafeedIfNecessary(request, datafeedDeleteListener),
|
|
|
e -> {
|
|
|
if (request.isForce()
|
|
|
&& MlTasks.getJobTask(request.getJobId(), state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE)) != null) {
|
|
@@ -223,15 +234,9 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
|
|
|
logger.debug(() -> new ParameterizedMessage("[{}] force deleting job", jobId));
|
|
|
|
|
|
// 3. Delete the job
|
|
|
- ActionListener<Boolean> removeTaskListener = new ActionListener<Boolean>() {
|
|
|
- @Override
|
|
|
- public void onResponse(Boolean response) {
|
|
|
- // use clusterService.state() here so that the updated state without the task is available
|
|
|
- normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
+ ActionListener<Boolean> removeTaskListener = ActionListener.wrap(
|
|
|
+ response -> normalDeleteJob(parentTaskClient, request, clusterService.state(), listener),
|
|
|
+ e -> {
|
|
|
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
|
|
|
// use clusterService.state() here so that the updated state without the task is available
|
|
|
normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
|
|
@@ -239,7 +244,7 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
|
|
|
listener.onFailure(e);
|
|
|
}
|
|
|
}
|
|
|
- };
|
|
|
+ );
|
|
|
|
|
|
// 2. Cancel the persistent task. This closes the process gracefully so
|
|
|
// the process should be killed first.
|
|
@@ -288,21 +293,42 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void markJobAsDeletingIfNotUsed(String jobId, TaskId taskId, ActionListener<PutJobAction.Response> listener) {
|
|
|
+ private void deleteDatafeedIfNecessary(DeleteJobAction.Request deleteJobRequest, ActionListener<AcknowledgedResponse> listener) {
|
|
|
|
|
|
- datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
|
|
|
- datafeedIds -> {
|
|
|
- if (datafeedIds.isEmpty() == false) {
|
|
|
- listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed ["
|
|
|
- + datafeedIds.iterator().next() + "] refers to it"));
|
|
|
- return;
|
|
|
- }
|
|
|
- cancelResetTaskIfExists(jobId, ActionListener.wrap(
|
|
|
- response -> jobConfigProvider.updateJobBlockReason(jobId, new Blocked(Blocked.Reason.DELETE, taskId), listener),
|
|
|
- listener::onFailure
|
|
|
- ));
|
|
|
- },
|
|
|
- listener::onFailure
|
|
|
+ datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singletonList(deleteJobRequest.getJobId()), ActionListener.wrap(
|
|
|
+ datafeedIds -> {
|
|
|
+ // Since it's only possible to delete a single job at a time there should not be more than one datafeed
|
|
|
+ assert datafeedIds.size() <= 1 : "Expected at most 1 datafeed for a single job, got " + datafeedIds;
|
|
|
+ if (datafeedIds.isEmpty()) {
|
|
|
+ listener.onResponse(AcknowledgedResponse.TRUE);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ DeleteDatafeedAction.Request deleteDatafeedRequest = new DeleteDatafeedAction.Request(datafeedIds.iterator().next());
|
|
|
+ deleteDatafeedRequest.setForce(deleteJobRequest.isForce());
|
|
|
+ deleteDatafeedRequest.timeout(deleteJobRequest.timeout());
|
|
|
+ ClientHelper.executeAsyncWithOrigin(
|
|
|
+ client,
|
|
|
+ ClientHelper.ML_ORIGIN,
|
|
|
+ DeleteDatafeedAction.INSTANCE,
|
|
|
+ deleteDatafeedRequest,
|
|
|
+ ActionListener.wrap(
|
|
|
+ listener::onResponse,
|
|
|
+ e -> {
|
|
|
+ // It's possible that a simultaneous call to delete the datafeed has deleted it in between
|
|
|
+ // us finding the datafeed ID and trying to delete it in this method - this is OK
|
|
|
+ if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
|
|
|
+ listener.onResponse(AcknowledgedResponse.TRUE);
|
|
|
+ } else {
|
|
|
+ listener.onFailure(ExceptionsHelper.conflictStatusException(
|
|
|
+ "failed to delete job [{}] as its datafeed [{}] could not be deleted", e,
|
|
|
+ deleteJobRequest.getJobId(), deleteDatafeedRequest.getDatafeedId())
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+ );
|
|
|
+ },
|
|
|
+ listener::onFailure
|
|
|
));
|
|
|
}
|
|
|
|