Преглед изворни кода

[ML][Transforms] remove `force` flag from _start (#46414)

* [ML][Transforms] remove `force` flag from _start

* fixing expected error message
Benjamin Trent пре 6 година
родитељ
комит
479ebd18ff
16 измењених фајлова са 58 додато и 397 уклоњено
  1. 0 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java
  2. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java
  3. 8 9
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformAction.java
  4. 0 152
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformTaskAction.java
  5. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StartTransformActionRequestTests.java
  6. 0 23
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StartTransformTaskActionRequestTests.java
  7. 0 23
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StartTransformTaskActionResponseTests.java
  8. 1 1
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml
  9. 5 6
      x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java
  10. 5 18
      x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java
  11. 0 3
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/DataFrame.java
  12. 13 30
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartDataFrameTransformAction.java
  13. 0 93
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartDataFrameTransformTaskAction.java
  14. 1 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStartDataFrameTransformAction.java
  15. 10 7
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java
  16. 13 26
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformTask.java

+ 0 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

@@ -198,7 +198,6 @@ import org.elasticsearch.xpack.core.transform.action.GetTransformsStatsAction;
 import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
 import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
 import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
-import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
 import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
 import org.elasticsearch.xpack.core.transform.transforms.Transform;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
@@ -389,7 +388,6 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
                 // Data Frame
                 PutTransformAction.INSTANCE,
                 StartTransformAction.INSTANCE,
-                StartTransformTaskAction.INSTANCE,
                 StopTransformAction.INSTANCE,
                 DeleteTransformAction.INSTANCE,
                 GetTransformsAction.INSTANCE,

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

@@ -36,7 +36,7 @@ public class TransformMessages {
             " Use force stop to stop the data frame transform.";
     public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM =
         "Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " +
-            "Use force start to restart data frame transform once error is resolved.";
+            "Use force stop and then restart the data frame transform once error is resolved.";
 
     public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
     public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION =

+ 8 - 9
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformAction.java

@@ -6,6 +6,7 @@
 
 package org.elasticsearch.xpack.core.transform.action;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.master.AcknowledgedRequest;
@@ -33,32 +34,30 @@ public class StartTransformAction extends ActionType<StartTransformAction.Respon
     public static class Request extends AcknowledgedRequest<Request> {
 
         private final String id;
-        private final boolean force;
 
-        public Request(String id, boolean force) {
+        public Request(String id) {
             this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
-            this.force = force;
         }
 
         public Request(StreamInput in) throws IOException {
             super(in);
             id = in.readString();
-            force = in.readBoolean();
+            if(in.getVersion().before(Version.V_8_0_0)) {
+                in.readBoolean();
+            }
         }
 
         public String getId() {
             return id;
         }
 
-        public boolean isForce() {
-            return force;
-        }
-
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeString(id);
-            out.writeBoolean(force);
+            if(out.getVersion().before(Version.V_8_0_0)) {
+                out.writeBoolean(false);
+            }
         }
 
         @Override

+ 0 - 152
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformTaskAction.java

@@ -1,152 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-
-package org.elasticsearch.xpack.core.transform.action;
-
-import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.ActionType;
-import org.elasticsearch.action.support.tasks.BaseTasksRequest;
-import org.elasticsearch.action.support.tasks.BaseTasksResponse;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.xcontent.ToXContentObject;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.tasks.Task;
-import org.elasticsearch.xpack.core.transform.TransformField;
-import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Objects;
-
-public class StartTransformTaskAction extends ActionType<StartTransformTaskAction.Response> {
-
-    public static final StartTransformTaskAction INSTANCE = new StartTransformTaskAction();
-    public static final String NAME = "cluster:admin/data_frame/start_task";
-
-    private StartTransformTaskAction() {
-        super(NAME, StartTransformTaskAction.Response::new);
-    }
-
-    public static class Request extends BaseTasksRequest<Request> {
-
-        private final String id;
-        private final boolean force;
-
-        public Request(String id, boolean force) {
-            this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
-            this.force = force;
-        }
-
-        public Request(StreamInput in) throws IOException {
-            super(in);
-            id = in.readString();
-            if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
-                force = in.readBoolean();
-            } else {
-                // The behavior before V_7_4_0 was that this flag did not exist,
-                // assuming previous checks allowed this task to be started.
-                force = true;
-            }
-        }
-
-        public String getId() {
-            return id;
-        }
-
-        public boolean isForce() {
-            return force;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeString(id);
-            if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
-                out.writeBoolean(force);
-            }
-        }
-
-        @Override
-        public boolean match(Task task) {
-            return task.getDescription().equals(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
-        }
-
-        @Override
-        public ActionRequestValidationException validate() {
-            return null;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(id, force);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj == null) {
-                return false;
-            }
-            if (getClass() != obj.getClass()) {
-                return false;
-            }
-            Request other = (Request) obj;
-            return Objects.equals(id, other.id) && force == other.force;
-        }
-    }
-
-    public static class Response extends BaseTasksResponse implements ToXContentObject {
-        private final boolean started;
-
-        public Response(StreamInput in) throws IOException {
-            super(in);
-            started = in.readBoolean();
-        }
-
-        public Response(boolean started) {
-            super(Collections.emptyList(), Collections.emptyList());
-            this.started = started;
-        }
-
-        public boolean isStarted() {
-            return started;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeBoolean(started);
-        }
-
-        @Override
-        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            builder.startObject();
-            toXContentCommon(builder, params);
-            builder.field("started", started);
-            builder.endObject();
-            return builder;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj) {
-                return true;
-            }
-
-            if (obj == null || getClass() != obj.getClass()) {
-                return false;
-            }
-            Response response = (Response) obj;
-            return started == response.started;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(started);
-        }
-    }
-}

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StartTransformActionRequestTests.java

@@ -13,7 +13,7 @@ import org.elasticsearch.xpack.core.transform.action.StartTransformAction.Reques
 public class StartTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
     @Override
     protected Request createTestInstance() {
-        return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean());
+        return new Request(randomAlphaOfLengthBetween(1, 20));
     }
 
     @Override

+ 0 - 23
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StartTransformTaskActionRequestTests.java

@@ -1,23 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-
-package org.elasticsearch.xpack.core.transform.action;
-
-import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
-
-public class StartTransformTaskActionRequestTests extends
-        AbstractWireSerializingTestCase<StartTransformTaskAction.Request> {
-    @Override
-    protected StartTransformTaskAction.Request createTestInstance() {
-        return new StartTransformTaskAction.Request(randomAlphaOfLength(4), randomBoolean());
-    }
-
-    @Override
-    protected Writeable.Reader<StartTransformTaskAction.Request> instanceReader() {
-        return StartTransformTaskAction.Request::new;
-    }
-}

+ 0 - 23
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StartTransformTaskActionResponseTests.java

@@ -1,23 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-
-package org.elasticsearch.xpack.core.transform.action;
-
-import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
-
-public class StartTransformTaskActionResponseTests extends
-        AbstractWireSerializingTestCase<StartTransformTaskAction.Response> {
-    @Override
-    protected StartTransformTaskAction.Response createTestInstance() {
-        return new StartTransformTaskAction.Response(randomBoolean());
-    }
-
-    @Override
-    protected Writeable.Reader<StartTransformTaskAction.Response> instanceReader() {
-        return StartTransformTaskAction.Response::new;
-    }
-}

+ 1 - 1
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

@@ -62,7 +62,7 @@ teardown:
   - match: { acknowledged: true }
 
   - do:
-      catch: /Unable to start data frame transform \[airline-transform-start-stop\] as it is in state \[STARTED\]/
+      catch: /Cannot start transform \[airline-transform-start-stop\] as it is already started/
       data_frame.start_data_frame_transform:
         transform_id: "airline-transform-start-stop"
 

+ 5 - 6
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

@@ -224,14 +224,13 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
     }
 
-    protected void startDataframeTransform(String transformId, boolean force) throws IOException {
-        startDataframeTransform(transformId, force, null);
+    protected void startDataframeTransform(String transformId) throws IOException {
+        startDataframeTransform(transformId, null);
     }
 
-    protected void startDataframeTransform(String transformId, boolean force, String authHeader, String... warnings) throws IOException {
+    protected void startDataframeTransform(String transformId, String authHeader, String... warnings) throws IOException {
         // start the transform
         final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader);
-        startTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force));
         if (warnings.length > 0) {
             startTransformRequest.setOptions(expectWarnings(warnings));
         }
@@ -259,7 +258,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
     protected void startAndWaitForTransform(String transformId, String dataFrameIndex,
                                             String authHeader, String... warnings) throws Exception {
         // start the transform
-        startDataframeTransform(transformId, false, authHeader, warnings);
+        startDataframeTransform(transformId, authHeader, warnings);
         assertTrue(indexExists(dataFrameIndex));
         // wait until the dataframe has been created and all data is available
         waitForDataFrameCheckpoint(transformId);
@@ -279,7 +278,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
                                                       String authHeader,
                                                       long checkpoint) throws Exception {
         // start the transform
-        startDataframeTransform(transformId, false, authHeader, new String[0]);
+        startDataframeTransform(transformId, authHeader, new String[0]);
         assertTrue(indexExists(dataFrameIndex));
         // wait until the dataframe has been created and all data is available
         waitForDataFrameCheckpoint(transformId, checkpoint);

+ 5 - 18
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java

@@ -28,8 +28,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.oneOf;
 
 public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
 
@@ -65,7 +63,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
         createDestinationIndexWithBadMapping(dataFrameIndex);
         createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
         failureTransforms.add(transformId);
-        startDataframeTransform(transformId, false);
+        startDataframeTransform(transformId);
         awaitState(transformId, TransformStats.State.FAILED);
         Map<?, ?> fullState = getDataFrameState(transformId);
         final String failureReason = "task encountered more than 0 failures; latest failure: " +
@@ -89,14 +87,14 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
         assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
     }
 
-    public void testForceStartFailedTransform() throws Exception {
+    public void testStartFailedTransform() throws Exception {
         String transformId = "test-force-start-failed-transform";
         createReviewsIndex(REVIEWS_INDEX_NAME, 10);
         String dataFrameIndex = "failure_pivot_reviews";
         createDestinationIndexWithBadMapping(dataFrameIndex);
         createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
         failureTransforms.add(transformId);
-        startDataframeTransform(transformId, false);
+        startDataframeTransform(transformId);
         awaitState(transformId, TransformStats.State.FAILED);
         Map<?, ?> fullState = getDataFrameState(transformId);
         final String failureReason = "task encountered more than 0 failures; latest failure: " +
@@ -106,26 +104,15 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
 
         final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " +
             "as it is in a failed state with failure: [" + failureReason +
-            "]. Use force start to restart data frame transform once error is resolved.";
+            "]. Use force stop and then restart the data frame transform once error is resolved.";
         // Verify that we cannot start the transform when the task is in a failed state
         assertBusy(() -> {
-            ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
+            ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId));
             assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
             assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
                 equalTo(expectedFailure));
         }, 60, TimeUnit.SECONDS);
 
-        // Correct the failure by deleting the destination index
-        deleteIndex(dataFrameIndex);
-        // Force start the data frame to indicate failure correction
-        startDataframeTransform(transformId, true);
-
-        // Verify that we have started and that our reason is cleared
-        fullState = getDataFrameState(transformId);
-        assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
-        assertThat(XContentMapValues.extractValue("state", fullState), oneOf("started", "indexing"));
-        assertThat((Integer)XContentMapValues.extractValue("stats.index_failures", fullState), greaterThanOrEqualTo(1));
-
         stopDataFrameTransform(transformId, true);
     }
 

+ 0 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/DataFrame.java

@@ -51,7 +51,6 @@ import org.elasticsearch.xpack.core.transform.action.GetTransformsStatsAction;
 import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
 import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
 import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
-import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
 import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
 import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
 import org.elasticsearch.xpack.transform.action.TransportDeleteDataFrameTransformAction;
@@ -60,7 +59,6 @@ import org.elasticsearch.xpack.transform.action.TransportGetDataFrameTransformsS
 import org.elasticsearch.xpack.transform.action.TransportPreviewDataFrameTransformAction;
 import org.elasticsearch.xpack.transform.action.TransportPutDataFrameTransformAction;
 import org.elasticsearch.xpack.transform.action.TransportStartDataFrameTransformAction;
-import org.elasticsearch.xpack.transform.action.TransportStartDataFrameTransformTaskAction;
 import org.elasticsearch.xpack.transform.action.TransportStopDataFrameTransformAction;
 import org.elasticsearch.xpack.transform.action.TransportUpdateDataFrameTransformAction;
 import org.elasticsearch.xpack.transform.checkpoint.DataFrameTransformsCheckpointService;
@@ -143,7 +141,6 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
         return Arrays.asList(
                 new ActionHandler<>(PutTransformAction.INSTANCE, TransportPutDataFrameTransformAction.class),
                 new ActionHandler<>(StartTransformAction.INSTANCE, TransportStartDataFrameTransformAction.class),
-                new ActionHandler<>(StartTransformTaskAction.INSTANCE, TransportStartDataFrameTransformTaskAction.class),
                 new ActionHandler<>(StopTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class),
                 new ActionHandler<>(DeleteTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class),
                 new ActionHandler<>(GetTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class),

+ 13 - 30
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartDataFrameTransformAction.java

@@ -37,7 +37,6 @@ import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
 import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
-import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
 import org.elasticsearch.xpack.core.transform.transforms.Transform;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
@@ -56,6 +55,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
+import static org.elasticsearch.xpack.core.transform.TransformMessages.DATA_FRAME_CANNOT_START_FAILED_TRANSFORM;
+
 public class TransportStartDataFrameTransformAction extends
     TransportMasterNodeAction<StartTransformAction.Request, StartTransformAction.Response> {
 
@@ -133,38 +134,20 @@ public class TransportStartDataFrameTransformAction extends
                         newPersistentTaskActionListener);
                 } else {
                     TransformState transformState = (TransformState)existingTask.getState();
-                    if(transformState.getTaskState() == TransformTaskState.FAILED && request.isForce() == false) {
+                    if(transformState.getTaskState() == TransformTaskState.FAILED) {
                         listener.onFailure(new ElasticsearchStatusException(
-                            "Unable to start data frame transform [" + request.getId() +
-                                "] as it is in a failed state with failure: [" + transformState.getReason() +
-                            "]. Use force start to restart data frame transform once error is resolved.",
+                            TransformMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM,
+                                request.getId(),
+                                transformState.getReason()),
                             RestStatus.CONFLICT));
-                    } else if (transformState.getTaskState() != TransformTaskState.STOPPED &&
-                               transformState.getTaskState() != TransformTaskState.FAILED) {
-                        listener.onFailure(new ElasticsearchStatusException(
-                            "Unable to start data frame transform [" + request.getId() +
-                                "] as it is in state [" + transformState.getTaskState()  + "]", RestStatus.CONFLICT));
                     } else {
-                        // If the task already exists but is not assigned to a node, something is weird
-                        // return a failure that includes the current assignment explanation (if one exists)
-                        if (existingTask.isAssigned() == false) {
-                            String assignmentExplanation = "unknown reason";
-                            if (existingTask.getAssignment() != null) {
-                                assignmentExplanation = existingTask.getAssignment().getExplanation();
-                            }
-                            listener.onFailure(new ElasticsearchStatusException("Unable to start data frame transform [" +
-                                request.getId() + "] as it is not assigned to a node, explanation: " + assignmentExplanation,
-                                RestStatus.CONFLICT));
-                            return;
-                        }
-                        // If the task already exists and is assigned to a node, simply attempt to set it to start
-                        ClientHelper.executeAsyncWithOrigin(client,
-                            ClientHelper.DATA_FRAME_ORIGIN,
-                            StartTransformTaskAction.INSTANCE,
-                            new StartTransformTaskAction.Request(request.getId(), request.isForce()),
-                            ActionListener.wrap(
-                                r -> listener.onResponse(new StartTransformAction.Response(true)),
-                                listener::onFailure));
+                        // If the task already exists that means that it is either running or failed
+                        // Since it is not failed, that means it is running, we return a conflict.
+                        listener.onFailure(new ElasticsearchStatusException(
+                            "Cannot start transform [{}] as it is already started.",
+                            RestStatus.CONFLICT,
+                            request.getId()
+                        ));
                     }
                 }
             },

+ 0 - 93
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartDataFrameTransformTaskAction.java

@@ -1,93 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-
-package org.elasticsearch.xpack.transform.action;
-
-import org.elasticsearch.ResourceNotFoundException;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.FailedNodeException;
-import org.elasticsearch.action.TaskOperationFailure;
-import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.tasks.TransportTasksAction;
-import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.license.LicenseUtils;
-import org.elasticsearch.license.XPackLicenseState;
-import org.elasticsearch.tasks.Task;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportService;
-import org.elasticsearch.xpack.core.XPackField;
-import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
-import org.elasticsearch.xpack.transform.transforms.DataFrameTransformTask;
-
-import java.util.List;
-
-/**
- * Internal only transport class to change an allocated persistent task's state to started
- */
-public class TransportStartDataFrameTransformTaskAction extends
-    TransportTasksAction<DataFrameTransformTask, StartTransformTaskAction.Request,
-        StartTransformTaskAction.Response, StartTransformTaskAction.Response> {
-
-    private final XPackLicenseState licenseState;
-
-    @Inject
-    public TransportStartDataFrameTransformTaskAction(TransportService transportService, ActionFilters actionFilters,
-                                                  ClusterService clusterService, XPackLicenseState licenseState) {
-        super(StartTransformTaskAction.NAME, clusterService, transportService, actionFilters,
-            StartTransformTaskAction.Request::new, StartTransformTaskAction.Response::new,
-            StartTransformTaskAction.Response::new, ThreadPool.Names.SAME);
-        this.licenseState = licenseState;
-    }
-
-    @Override
-    protected void doExecute(Task task, StartTransformTaskAction.Request request,
-                             ActionListener<StartTransformTaskAction.Response> listener) {
-
-        if (!licenseState.isDataFrameAllowed()) {
-            listener.onFailure(LicenseUtils.newComplianceException(XPackField.Transform));
-            return;
-        }
-
-        super.doExecute(task, request, listener);
-    }
-
-    @Override
-    protected void taskOperation(StartTransformTaskAction.Request request, DataFrameTransformTask transformTask,
-                                 ActionListener<StartTransformTaskAction.Response> listener) {
-        if (transformTask.getTransformId().equals(request.getId())) {
-            transformTask.start(null, request.isForce(), listener);
-        } else {
-            listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
-                + "] does not match request's ID [" + request.getId() + "]"));
-        }
-    }
-
-    @Override
-    protected StartTransformTaskAction.Response newResponse(StartTransformTaskAction.Request request,
-                                                                     List<StartTransformTaskAction.Response> tasks,
-                                                                     List<TaskOperationFailure> taskOperationFailures,
-                                                                     List<FailedNodeException> failedNodeExceptions) {
-
-        if (taskOperationFailures.isEmpty() == false) {
-            throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
-        } else if (failedNodeExceptions.isEmpty() == false) {
-            throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
-        }
-
-        // Either the transform doesn't exist (the user didn't create it yet) or was deleted
-        // after the StartAPI executed.
-        // In either case, let the user know
-        if (tasks.size() == 0) {
-            throw new ResourceNotFoundException("Task for data frame transform [" + request.getId() + "] not found");
-        }
-
-        assert tasks.size() == 1;
-
-        boolean allStarted = tasks.stream().allMatch(StartTransformTaskAction.Response::isStarted);
-        return new StartTransformTaskAction.Response(allStarted);
-    }
-}

+ 1 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStartDataFrameTransformAction.java

@@ -24,8 +24,7 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler {
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
         String id = restRequest.param(TransformField.ID.getPreferredName());
-        boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false);
-        StartTransformAction.Request request = new StartTransformAction.Request(id, force);
+        StartTransformAction.Request request = new StartTransformAction.Request(id);
         request.timeout(restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
         return channel -> client.execute(StartTransformAction.INSTANCE, request,
                 new RestToXContentListener<>(channel));

+ 10 - 7
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -32,7 +32,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
-import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
+import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
 import org.elasticsearch.xpack.core.transform.transforms.Transform;
 import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
@@ -141,9 +141,13 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
 
         final SetOnce<TransformState> stateHolder = new SetOnce<>();
 
-        ActionListener<StartTransformTaskAction.Response> startTaskListener = ActionListener.wrap(
-            response -> logger.info("Successfully completed and scheduled task in node operation"),
-            failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure)
+        ActionListener<StartTransformAction.Response> startTaskListener = ActionListener.wrap(
+            response -> logger.info("[{}] successfully completed and scheduled task in node operation", transformId),
+            failure -> {
+                auditor.error(transformId, "Failed to start data frame transform. " +
+                    "Please stop and attempt to start again. Failure: " + failure.getMessage());
+                logger.error("Failed to start task ["+ transformId +"] in node operation", failure);
+            }
         );
 
         // <7> load next checkpoint
@@ -315,11 +319,10 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
     private void startTask(DataFrameTransformTask buildTask,
                            ClientDataFrameIndexerBuilder indexerBuilder,
                            Long previousCheckpoint,
-                           ActionListener<StartTransformTaskAction.Response> listener) {
+                           ActionListener<StartTransformAction.Response> listener) {
         buildTask.initializeIndexer(indexerBuilder);
         // DataFrameTransformTask#start will fail if the task state is FAILED
-        // Will continue to attempt to start the indexer, even if the state is STARTED
-        buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, false, listener);
+        buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
     }
 
     private void setNumFailureRetries(int numFailureRetries) {

+ 13 - 26
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformTask.java

@@ -26,8 +26,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
-import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
-import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction.Response;
+import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.Transform;
@@ -199,10 +198,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 ));
     }
 
-    // Here `failOnConflict` is usually true, except when the initial start is called when the task is assigned to the node
-    synchronized void start(Long startingCheckpoint, boolean force, boolean failOnConflict, ActionListener<Response> listener) {
-        logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState());
-        if (taskState.get() == TransformTaskState.FAILED && force == false) {
+    /**
+     * Starts the transform and schedules it to be triggered in the future.
+     *
+     * NOTE: This should ONLY be called via {@link DataFrameTransformPersistentTasksExecutor}
+     *
+     * @param startingCheckpoint The starting checkpoint, could null. Null indicates that there is no starting checkpoint
+     * @param listener The listener to alert once started
+     */
+    synchronized void start(Long startingCheckpoint, ActionListener<StartTransformAction.Response> listener) {
+        logger.debug("[{}] start called with state [{}].", getTransformId(), getState());
+        if (taskState.get() == TransformTaskState.FAILED) {
             listener.onFailure(new ElasticsearchStatusException(
                 TransformMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM,
                     getTransformId(),
@@ -223,15 +229,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 msg));
             return;
         }
-        // If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again.
-        if (taskState.get() == TransformTaskState.STARTED && failOnConflict) {
-            listener.onFailure(new ElasticsearchStatusException(
-                "Cannot start transform [{}] as it is already STARTED.",
-                RestStatus.CONFLICT,
-                getTransformId()
-            ));
-            return;
-        }
         final IndexerState newState = getIndexer().start();
         if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
             listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
@@ -265,7 +262,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 // kick off the indexer
                 triggered(new Event(schedulerJobName(), now, now));
                 registerWithSchedulerJob();
-                listener.onResponse(new StartTransformTaskAction.Response(true));
+                listener.onResponse(new StartTransformAction.Response(true));
             },
             exc -> {
                 auditor.warning(transform.getId(),
@@ -277,16 +274,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             }
         ));
     }
-    /**
-     * Start the background indexer and set the task's state to started
-     * @param startingCheckpoint Set the current checkpoint to this value. If null the
-     *                           current checkpoint is not set
-     * @param force Whether to force start a failed task or not
-     * @param listener Started listener
-     */
-    public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
-        start(startingCheckpoint, force, true, listener);
-    }
 
     public synchronized void stop(boolean force) {
         logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());