Selaa lähdekoodia

[ML] do not start stopping tasks on reassignment (#55315)

When a anomaly jobs, datafeeds, and analytics tasks are stopped, they enter an ephemeral state called `STOPPING`. 

If the node executing the task fails while this is occurring, they could be stuck in the limbo state of `STOPPING`. It is best to mark the tasks as completed if they get reassigned to a node.
Benjamin Trent 5 vuotta sitten
vanhempi
commit
d5340ccaa9

+ 9 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

@@ -439,10 +439,18 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
             JobTask jobTask = (JobTask) task;
             jobTask.autodetectProcessManager = autodetectProcessManager;
             JobTaskState jobTaskState = (JobTaskState) state;
+            JobState jobState = jobTaskState == null ? null : jobTaskState.getState();
+            // If the job is closing, simply stop and return
+            if (JobState.CLOSING.equals(jobState)) {
+                // Mark as completed instead of using `stop` as stop assumes native processes have started
+                logger.info("[{}] job got reassigned while stopping. Marking as completed", params.getJobId());
+                jobTask.markAsCompleted();
+                return;
+            }
             // If the job is failed then the Persistent Task Service will
             // try to restart it on a node restart. Exiting here leaves the
             // job in the failed state and it must be force closed.
-            if (jobTaskState != null && jobTaskState.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) {
+            if (JobState.FAILED.equals(jobState)) {
                 return;
             }
 

+ 8 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -645,11 +645,16 @@ public class TransportStartDataFrameAnalyticsAction
                                      PersistentTaskState state) {
             logger.info("[{}] Starting data frame analytics", params.getId());
             DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state;
+            DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState();
 
-            // If we are "stopping" there is nothing to do
+            // If we are "stopping" there is nothing to do and we should stop
+            if (DataFrameAnalyticsState.STOPPING.equals(analyticsState)) {
+                logger.info("[{}] data frame analytics got reassigned while stopping. Marking as completed", params.getId());
+                task.markAsCompleted();
+                return;
+            }
             // If we are "failed" then we should leave the task as is; for recovery it must be force stopped.
-            if (analyticsTaskState != null && analyticsTaskState.getState().isAnyOf(
-                    DataFrameAnalyticsState.STOPPING, DataFrameAnalyticsState.FAILED)) {
+            if (DataFrameAnalyticsState.FAILED.equals(analyticsState)) {
                 return;
             }
 

+ 8 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

@@ -398,6 +398,14 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
                                      final StartDatafeedAction.DatafeedParams params,
                                      final PersistentTaskState state) {
             DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
+            DatafeedState datafeedState = (DatafeedState) state;
+
+            // If we are "stopping" there is nothing to do
+            if (DatafeedState.STOPPING.equals(datafeedState)) {
+                logger.info("[{}] datafeed got reassigned while stopping. Marking as completed", params.getDatafeedId());
+                datafeedTask.markAsCompleted();
+                return;
+            }
             datafeedTask.datafeedManager = datafeedManager;
             datafeedManager.run(datafeedTask,
                     (error) -> {