Răsfoiți Sursa

[ML Data Frame] Set DF task state when stopping (#42516)

Set the state to stopped prior to persisting
David Kyle 6 ani în urmă
părinte
comite
0db0e1330c

+ 1 - 12
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -30,7 +30,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.dataframe.DataFrame;
@@ -223,18 +222,8 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
                            DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder,
                            Long previousCheckpoint,
                            ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
-        // If we are stopped, and it is an initial run, this means we have never been started,
-        // attempt to start the task
-
         buildTask.initializeIndexer(indexerBuilder);
-        // TODO isInitialRun is false after relocation??
-        if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) {
-            logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
-            buildTask.start(previousCheckpoint, listener);
-        } else {
-            logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState());
-            listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
-        }
+        buildTask.start(previousCheckpoint, listener);
     }
 
     @Override

+ 7 - 12
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -174,13 +174,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         }
     }
 
-    public boolean isStopped() {
-        IndexerState currentState = getIndexer() == null ? initialIndexerState : getIndexer().getState();
-        return currentState.equals(IndexerState.STOPPED);
-    }
-
-    boolean isInitialRun() {
-        return getIndexer() != null && getIndexer().initialRun();
+    public void setTaskStateStopped() {
+        taskState.set(DataFrameTransformTaskState.STOPPED);
     }
 
     /**
@@ -235,11 +230,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
     public synchronized void stop() {
         if (getIndexer() == null) {
-            return;
-        }
-        // taskState is initialized as STOPPED and is updated in tandem with the indexerState
-        // Consequently, if it is STOPPED, we consider the whole task STOPPED.
-        if (taskState.get() == DataFrameTransformTaskState.STOPPED) {
+            // If there is no indexer the task has not been triggered
+            // but it still needs to be stopped and removed
+            shutdown();
             return;
         }
 
@@ -609,6 +602,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         protected void onStop() {
             auditor.info(transformConfig.getId(), "Indexer has stopped");
             logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
+
+            transformTask.setTaskStateStopped();
             transformsConfigManager.putOrUpdateTransformStats(
                     new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(),
                             DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null

+ 2 - 2
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

@@ -114,8 +114,8 @@ teardown:
         transform_id: "airline-transform-start-stop"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-start-stop" }
-#  - match: { transforms.0.state.indexer_state: "stopped" }
-#  - match: { transforms.0.state.task_state: "stopped" }
+  - match: { transforms.0.state.indexer_state: "stopped" }
+  - match: { transforms.0.state.task_state: "stopped" }
 
   - do:
       data_frame.start_data_frame_transform: