浏览代码

[ML][Transforms] allow executor to call start on started task (#46347)

Benjamin Trent 6 年之前
父节点
当前提交
8cdce05c6b

+ 2 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -305,7 +305,8 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
                            ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
         buildTask.initializeIndexer(indexerBuilder);
         // DataFrameTransformTask#start will fail if the task state is FAILED
-        buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
+        // Will continue to attempt to start the indexer, even if the state is STARTED
+        buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, false, listener);
     }
 
     private void setNumFailureRetries(int numFailureRetries) {

+ 15 - 10
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -219,13 +219,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 ));
     }
 
-    /**
-     * Start the background indexer and set the task's state to started
-     * @param startingCheckpoint Set the current checkpoint to this value. If null the
-     *                           current checkpoint is not set
-     * @param listener Started listener
-     */
-    public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
+    // Here `failOnConflict` is usually true, except when the initial start is called when the task is assigned to the node
+    synchronized void start(Long startingCheckpoint, boolean force, boolean failOnConflict, ActionListener<Response> listener) {
         logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState());
         if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) {
             listener.onFailure(new ElasticsearchStatusException(
@@ -249,7 +244,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             return;
         }
         // If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again.
-        if (taskState.get() == DataFrameTransformTaskState.STARTED) {
+        if (taskState.get() == DataFrameTransformTaskState.STARTED && failOnConflict) {
             listener.onFailure(new ElasticsearchStatusException(
                 "Cannot start transform [{}] as it is already STARTED.",
                 RestStatus.CONFLICT,
@@ -260,7 +255,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         final IndexerState newState = getIndexer().start();
         if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
             listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
-                    transform.getId(), newState));
+                transform.getId(), newState));
             return;
         }
         stateReason.set(null);
@@ -298,10 +293,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
                 getIndexer().stop();
                 listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
-                                    + transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
+                    + transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
             }
         ));
     }
+    /**
+     * Start the background indexer and set the task's state to started
+     * @param startingCheckpoint Set the current checkpoint to this value. If null the
+     *                           current checkpoint is not set
+     * @param force Whether to force start a failed task or not
+     * @param listener Started listener
+     */
+    public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
+        start(startingCheckpoint, force, true, listener);
+    }
 
     public synchronized void stop(boolean force) {
         logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());

+ 0 - 2
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java

@@ -7,7 +7,6 @@ package org.elasticsearch.upgrades;
 
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
-import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.Version;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
@@ -52,7 +51,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.oneOf;
 
-@LuceneTestCase.AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/46341")
 public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
 
     private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));