Browse Source

[ML] Abort starting process if kill request is received (#74415)

While the job is opening it is possible that the kill process action is called.
If the kill process action is received before the job process has started,
we currently start the process anyway. The process will eventually timeout
to connect to anything and will exit. However, it may cause an unexpected
failure if the job is opened again as it won't be able to launch a process as
one would already exist.

This commit ensures the JobTask.isClosing() reports true when
the kill process action has been called in order to abort opening the
process.

Closes #74141
Dimitris Athanasiou 4 years ago
parent
commit
9326f7b515

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportKillProcessAction.java

@@ -74,7 +74,7 @@ public class TransportKillProcessAction extends TransportTasksAction<JobTask,
         logger.info("[{}] Killing job", jobTask.getJobId());
         auditor.info(jobTask.getJobId(), Messages.JOB_AUDIT_KILLING);
         try {
-            processManager.killProcess(jobTask, true, null);
+            jobTask.killJob("kill process (api)");
             listener.onResponse(new KillProcessAction.Response(true));
         } catch (Exception e) {
             listener.onFailure(e);

+ 4 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -187,13 +187,14 @@ public class AutodetectProcessManager implements ClusterStateListener {
                 .kill();
         } else {
             // If the process is missing but the task exists this is most likely
-            // due to 2 reasons. The first is because the job went into the failed
+            // due to 3 reasons. The first is because the job went into the failed
             // state then the node restarted causing the task to be recreated
             // but the failed process wasn't. The second is that the job went into
             // the failed state and the user tries to remove it force-deleting it.
             // Force-delete issues a kill but the process will not be present
-            // as it is cleaned up already. In both cases, we still need to remove
-            // the task from the TaskManager (which is what the kill would do)
+            // as it is cleaned up already. The third is that the kill has been
+            // received before the process has even started. In all cases, we still
+            // need to remove the task from the TaskManager (which is what the kill would do)
             logger.trace(() -> new ParameterizedMessage("[{}] Marking job task as completed", jobTask.getJobId()));
             jobTask.markAsCompleted();
         }

+ 6 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/JobTask.java

@@ -38,10 +38,7 @@ public class JobTask extends AllocatedPersistentTask implements OpenJobAction.Jo
     protected void onCancelled() {
         String reason = getReasonCancelled();
         LOGGER.trace(() -> new ParameterizedMessage("[{}] Cancelling job task because: {}", jobId, reason));
-        killJob(reason);
-    }
-
-    void killJob(String reason) {
+        isClosing = true;
         autodetectProcessManager.killProcess(this, false, reason);
     }
 
@@ -54,6 +51,11 @@ public class JobTask extends AllocatedPersistentTask implements OpenJobAction.Jo
         autodetectProcessManager.closeJob(this, reason);
     }
 
+    public void killJob(String reason) {
+        isClosing = true;
+        autodetectProcessManager.killProcess(this, true, reason);
+    }
+
     void setAutodetectProcessManager(AutodetectProcessManager autodetectProcessManager) {
         this.autodetectProcessManager = autodetectProcessManager;
     }

+ 12 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/JobTaskTests.java

@@ -10,9 +10,11 @@ package org.elasticsearch.xpack.ml.job.task;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
+import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 
 import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class JobTaskTests extends ESTestCase {
 
@@ -32,4 +34,14 @@ public class JobTaskTests extends ESTestCase {
         assertThat(OpenJobAction.JobTaskMatcher.match(jobTask2, "ml-2"), is(true));
     }
 
+    public void testKillJob() {
+        JobTask jobTask = new JobTask("job-to-kill", 0, "persistent", "", null, null);
+        AutodetectProcessManager processManager = mock(AutodetectProcessManager.class);
+        jobTask.setAutodetectProcessManager(processManager);
+
+        jobTask.killJob("test");
+
+        assertThat(jobTask.isClosing(), is(true));
+        verify(processManager).killProcess(jobTask, true, "test");
+    }
 }

+ 0 - 3
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/set_upgrade_mode.yml

@@ -158,9 +158,6 @@ teardown:
 
 ---
 "Setting upgrade mode to disabled from enabled":
-  - skip:
-      version: "all"
-      reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/74141"
   - do:
       ml.start_datafeed:
         datafeed_id: set-upgrade-mode-job-datafeed