Kaynağa Gözat

[ML][Data Frame] fixing _start?force=true bug (#45660)

* [ML][Data Frame] fixing _start?force=true bug

* removing unused import

* removing old TODO
Benjamin Trent 6 yıl önce
ebeveyn
işleme
ce6106f011

+ 0 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java

@@ -59,7 +59,6 @@ public class TransportStartDataFrameTransformTaskAction extends
     protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
                                  ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
         if (transformTask.getTransformId().equals(request.getId())) {
-            //TODO fix bug as .start where it was failed could result in a null current checkpoint?
             transformTask.start(null, request.isForce(), listener);
         } else {
             listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()

+ 9 - 8
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -35,7 +35,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStoredDoc;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.dataframe.DataFrame;
@@ -120,13 +119,14 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
     protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
         final String transformId = params.getId();
         final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
-        final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state;
-        // If the transform is failed then the Persistent Task Service will
-        // try to restart it on a node restart. Exiting here leaves the
-        // transform in the failed state and it must be force closed.
-        if (transformPTaskState != null && transformPTaskState.getTaskState() == DataFrameTransformTaskState.FAILED) {
-            return;
-        }
+        // NOTE: DataFrameTransformPersistentTasksExecutor#createTask pulls in the stored task state from the ClusterState when the object
+        // is created. DataFrameTransformTask#ctor takes into account setting the task as failed if that is passed in with the
+        // persisted state.
+        // DataFrameTransformPersistentTasksExecutor#startTask will fail as DataFrameTransformTask#start, when force == false, will return
+        // a failure indicating that a failed task cannot be started.
+        //
+        // We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again
+        // later if they want.
 
         final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
             new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
@@ -298,6 +298,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
                            Long previousCheckpoint,
                            ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
         buildTask.initializeIndexer(indexerBuilder);
+        // DataFrameTransformTask#start will fail if the task state is FAILED
         buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
     }
 

+ 19 - 17
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -235,7 +235,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         }
     }
 
-    public void setTaskStateStopped() {
+    public synchronized void setTaskStateStopped() {
         taskState.set(DataFrameTransformTaskState.STOPPED);
     }
 
@@ -256,8 +256,16 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             return;
         }
         if (getIndexer() == null) {
-            listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
-                getTransformId()));
+            // If our state is failed AND the indexer is null, the user needs to _stop?force=true so that the indexer gets
+            // fully initialized.
+            // If we are NOT failed, then we can assume that `start` was just called early in the process.
+            String msg = taskState.get() == DataFrameTransformTaskState.FAILED ?
+                "It failed during the initialization process; force stop to allow reinitialization." :
+                "Try again later.";
+            listener.onFailure(new ElasticsearchStatusException("Task for transform [{}] not fully initialized. {}",
+                RestStatus.CONFLICT,
+                getTransformId(),
+                msg));
             return;
         }
         final IndexerState newState = getIndexer().start();
@@ -409,6 +417,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     }
 
     synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
+        // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
+        // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
+        if (taskState.get() == DataFrameTransformTaskState.FAILED) {
+            logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
+            listener.onResponse(null);
+            return;
+        }
         // If the indexer is `STOPPING` this means that `DataFrameTransformTask#stop` was called previously, but something caused
         // the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops,
         // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
@@ -425,26 +440,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             listener.onResponse(null);
             return;
         }
-        // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
-        // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
-        if (taskState.get() == DataFrameTransformTaskState.FAILED) {
-            logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
-            listener.onResponse(null);
-            return;
-        }
         auditor.error(transform.getId(), reason);
         // We should not keep retrying. Either the task will be stopped, or started
         // If it is started again, it is registered again.
         deregisterSchedulerJob();
-        DataFrameTransformState newState = new DataFrameTransformState(
-            DataFrameTransformTaskState.FAILED,
-            getIndexer() == null ? initialIndexerState : getIndexer().getState(),
-            getIndexer() == null ? initialPosition : getIndexer().getPosition(),
-            currentCheckpoint.get(),
-            reason,
-            getIndexer() == null ? null : getIndexer().getProgress());
         taskState.set(DataFrameTransformTaskState.FAILED);
         stateReason.set(reason);
+        DataFrameTransformState newState = getState();
         // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
         // This keeps track of STARTED, FAILED, STOPPED
         // This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that