|
@@ -401,16 +401,12 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|
|
logger.debug("Aborted opening job [{}] as it has been closed", jobId);
|
|
|
return;
|
|
|
}
|
|
|
- if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) {
|
|
|
- logger.debug("Cannot open job [{}] when its state is [{}]",
|
|
|
- jobId, processContext.getState().getClass().getName());
|
|
|
- return;
|
|
|
- }
|
|
|
|
|
|
try {
|
|
|
- createProcessAndSetRunning(processContext, job, params, closeHandler);
|
|
|
- processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
|
|
|
- setJobState(jobTask, JobState.OPENED);
|
|
|
+ if (createProcessAndSetRunning(processContext, job, params, closeHandler)) {
|
|
|
+ processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
|
|
|
+ setJobState(jobTask, JobState.OPENED);
|
|
|
+ }
|
|
|
} catch (Exception e1) {
|
|
|
// No need to log here as the persistent task framework will log it
|
|
|
try {
|
|
@@ -447,19 +443,25 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|
|
ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler);
|
|
|
}
|
|
|
|
|
|
- private void createProcessAndSetRunning(ProcessContext processContext,
|
|
|
- Job job,
|
|
|
- AutodetectParams params,
|
|
|
- BiConsumer<Exception, Boolean> handler) throws IOException {
|
|
|
+ private boolean createProcessAndSetRunning(ProcessContext processContext,
|
|
|
+ Job job,
|
|
|
+ AutodetectParams params,
|
|
|
+ BiConsumer<Exception, Boolean> handler) throws IOException {
|
|
|
// At this point we lock the process context until the process has been started.
|
|
|
// The reason behind this is to ensure closing the job does not happen before
|
|
|
// the process is started as that can result to the job getting seemingly closed
|
|
|
// but the actual process is hanging alive.
|
|
|
processContext.tryLock();
|
|
|
try {
|
|
|
+ if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) {
|
|
|
+ logger.debug("Cannot open job [{}] when its state is [{}]",
|
|
|
+ job.getId(), processContext.getState().getClass().getName());
|
|
|
+ return false;
|
|
|
+ }
|
|
|
AutodetectCommunicator communicator = create(processContext.getJobTask(), job, params, handler);
|
|
|
communicator.writeHeader();
|
|
|
processContext.setRunning(communicator);
|
|
|
+ return true;
|
|
|
} finally {
|
|
|
// Now that the process is running and we have updated its state we can unlock.
|
|
|
// It is important to unlock before we initialize the communicator (ie. load the model state)
|
|
@@ -592,6 +594,8 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|
|
try {
|
|
|
if (processContext.setDying() == false) {
|
|
|
logger.debug("Cannot close job [{}] as it has been marked as dying", jobId);
|
|
|
+ // The only way we can get here is if 2 close requests are made very close together.
|
|
|
+ // The other close has done the work so it's safe to return here without doing anything.
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -605,10 +609,10 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|
|
if (communicator == null) {
|
|
|
logger.debug("Job [{}] is being closed before its process is started", jobId);
|
|
|
jobTask.markAsCompleted();
|
|
|
- return;
|
|
|
+ } else {
|
|
|
+ communicator.close(restart, reason);
|
|
|
}
|
|
|
|
|
|
- communicator.close(restart, reason);
|
|
|
processByAllocation.remove(allocationId);
|
|
|
} catch (Exception e) {
|
|
|
// If the close failed because the process has explicitly been killed by us then just pass on that exception
|
|
@@ -628,7 +632,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
|
|
|
try {
|
|
|
nativeStorageProvider.cleanupLocalTmpStorage(jobTask.getDescription());
|
|
|
} catch (IOException e) {
|
|
|
- logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e);
|
|
|
+ logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobId), e);
|
|
|
}
|
|
|
}
|
|
|
|