Browse Source

[ML][Data Frame] forcing that no ptask => STOPPED state (#42800)

* [ML][Data Frame] forcing that no ptask => STOPPED state

* Addressing side-effect, early exit for stop when stopped
Benjamin Trent 6 years ago
parent
commit
70c099aa21

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

@@ -284,6 +284,8 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
         AtomicBoolean callOnStop = new AtomicBoolean(false);
         AtomicBoolean callOnAbort = new AtomicBoolean(false);
         IndexerState updatedState = state.updateAndGet(prev -> {
+            callOnAbort.set(false);
+            callOnStop.set(false);
             switch (prev) {
             case INDEXING:
                 // ready for another job

+ 18 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java

@@ -26,7 +26,10 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStats
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
+import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
@@ -136,7 +139,21 @@ public class TransportGetDataFrameTransformsStatsAction extends
         ActionListener<List<DataFrameTransformStateAndStats>> searchStatsListener = ActionListener.wrap(
             stats -> {
                 List<DataFrameTransformStateAndStats> allStateAndStats = response.getTransformsStateAndStats();
-                allStateAndStats.addAll(stats);
+                // If the persistent task does NOT exist, it is STOPPED
+                // There is a potential race condition where the saved document does not actually have a STOPPED state
+                //    as the task is cancelled before we persist state.
+                stats.forEach(stat ->
+                    allStateAndStats.add(new DataFrameTransformStateAndStats(
+                        stat.getId(),
+                        new DataFrameTransformState(DataFrameTransformTaskState.STOPPED,
+                            IndexerState.STOPPED,
+                            stat.getTransformState().getPosition(),
+                            stat.getTransformState().getCheckpoint(),
+                            stat.getTransformState().getReason(),
+                            stat.getTransformState().getProgress()),
+                        stat.getTransformStats(),
+                        stat.getCheckpointingInfo()))
+                );
                 transformsWithoutTasks.removeAll(
                         stats.stream().map(DataFrameTransformStateAndStats::getId).collect(Collectors.toSet()));
 

+ 4 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -237,6 +237,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             return;
         }
 
+        if (getIndexer().getState() == IndexerState.STOPPED) {
+            return;
+        }
+
         IndexerState state = getIndexer().stop();
         if (state == IndexerState.STOPPED) {
             getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop());

+ 0 - 3
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

@@ -90,9 +90,6 @@ teardown:
   - match: { airline-data-by-airline-start-stop.mappings: {} }
 ---
 "Test start/stop/start transform":
-  - skip:
-      reason: "https://github.com/elastic/elasticsearch/issues/42650"
-      version: "all"
   - do:
       data_frame.start_data_frame_transform:
         transform_id: "airline-transform-start-stop"