Browse Source

[ML] Add data frame task state object and field (#40169)

* [ML] Add data frame task state object and field

* A new state item is added so that the overall task state can be
accoutned for
* A new FAILED state and reason have been added as well so that failures
can be shown to the user for optional correction

* Addressing PR comments

* adjusting after master merge

* addressing pr comment

* Adjusting auditor usage with failure state

* Refactor, renamed state items to task_state and indexer_state

* Adding todo and removing redundant auditor call

* Address HLRC changes and PR comment

* adjusting hlrc IT test
Benjamin Trent 6 years ago
parent
commit
e79da922a3
27 changed files with 645 additions and 132 deletions
  1. 29 10
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java
  2. 34 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java
  3. 8 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java
  4. 6 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java
  5. 6 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java
  6. 3 2
      docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc
  7. 2 1
      docs/reference/data-frames/apis/get-transform-stats.asciidoc
  8. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java
  9. 7 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java
  10. 15 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java
  11. 63 16
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java
  12. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java
  13. 36 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java
  14. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java
  15. 4 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java
  16. 16 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java
  17. 81 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskStateTests.java
  18. 40 9
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java
  19. 94 0
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java
  20. 18 6
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java
  21. 11 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java
  22. 2 1
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java
  23. 2 1
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java
  24. 6 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java
  25. 144 60
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java
  26. 8 4
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml
  27. 7 6
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

+ 29 - 10
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java

@@ -39,17 +39,25 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
 
 public class DataFrameTransformState {
 
-    private static final ParseField STATE = new ParseField("transform_state");
+    private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
+    private static final ParseField TASK_STATE = new ParseField("task_state");
     private static final ParseField CURRENT_POSITION = new ParseField("current_position");
     private static final ParseField GENERATION = new ParseField("generation");
 
     @SuppressWarnings("unchecked")
     public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
             new ConstructingObjectParser<>("data_frame_transform_state",
-                    args -> new DataFrameTransformState((IndexerState) args[0], (HashMap<String, Object>) args[1], (long) args[2]));
+                    args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
+                        (IndexerState) args[1],
+                        (HashMap<String, Object>) args[2],
+                        (long) args[3]));
 
     static {
-        PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), STATE, ObjectParser.ValueType.STRING);
+        PARSER.declareField(constructorArg(),
+            p -> DataFrameTransformTaskState.fromString(p.text()),
+            TASK_STATE,
+            ObjectParser.ValueType.STRING);
+        PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ObjectParser.ValueType.STRING);
         PARSER.declareField(optionalConstructorArg(), p -> {
             if (p.currentToken() == XContentParser.Token.START_OBJECT) {
                 return p.map();
@@ -66,18 +74,27 @@ public class DataFrameTransformState {
         return PARSER.parse(parser, null);
     }
 
-    private final IndexerState state;
+    private final DataFrameTransformTaskState taskState;
+    private final IndexerState indexerState;
     private final long generation;
     private final SortedMap<String, Object> currentPosition;
 
-    public DataFrameTransformState(IndexerState state, @Nullable Map<String, Object> position, long generation) {
-        this.state = state;
+    public DataFrameTransformState(DataFrameTransformTaskState taskState,
+                                   IndexerState indexerState,
+                                   @Nullable Map<String, Object> position,
+                                   long generation) {
+        this.taskState = taskState;
+        this.indexerState = indexerState;
         this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
         this.generation = generation;
     }
 
     public IndexerState getIndexerState() {
-        return state;
+        return indexerState;
+    }
+
+    public DataFrameTransformTaskState getTaskState() {
+        return taskState;
     }
 
     @Nullable
@@ -101,12 +118,14 @@ public class DataFrameTransformState {
 
         DataFrameTransformState that = (DataFrameTransformState) other;
 
-        return Objects.equals(this.state, that.state) && Objects.equals(this.currentPosition, that.currentPosition)
-                && this.generation == that.generation;
+        return Objects.equals(this.taskState, that.taskState) &&
+            Objects.equals(this.indexerState, that.indexerState) &&
+            Objects.equals(this.currentPosition, that.currentPosition) &&
+            this.generation == that.generation;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(state, currentPosition, generation);
+        return Objects.hash(taskState, indexerState, currentPosition, generation);
     }
 }

+ 34 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java

@@ -0,0 +1,34 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import java.util.Locale;
+
+public enum DataFrameTransformTaskState {
+    STOPPED, STARTED, FAILED;
+
+    public static DataFrameTransformTaskState fromString(String name) {
+        return valueOf(name.trim().toUpperCase(Locale.ROOT));
+    }
+
+    public String value() {
+        return name().toLowerCase(Locale.ROOT);
+    }
+}

+ 8 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

@@ -39,6 +39,7 @@ import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
+import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
 import org.elasticsearch.client.dataframe.transforms.DestConfig;
 import org.elasticsearch.client.dataframe.transforms.QueryConfig;
 import org.elasticsearch.client.dataframe.transforms.SourceConfig;
@@ -67,6 +68,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 
 public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
 
@@ -277,18 +279,23 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
 
         assertEquals(1, statsResponse.getTransformsStateAndStats().size());
         DataFrameTransformStateAndStats stats = statsResponse.getTransformsStateAndStats().get(0);
+        assertEquals(DataFrameTransformTaskState.STOPPED, stats.getTransformState().getTaskState());
         assertEquals(IndexerState.STOPPED, stats.getTransformState().getIndexerState());
 
         DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
         assertEquals(zeroIndexerStats, stats.getTransformStats());
 
         // start the transform
-        execute(new StartDataFrameTransformRequest(id), client::startDataFrameTransform, client::startDataFrameTransformAsync);
+        StartDataFrameTransformResponse startTransformResponse = execute(new StartDataFrameTransformRequest(id),
+            client::startDataFrameTransform,
+            client::startDataFrameTransformAsync);
+        assertThat(startTransformResponse.isStarted(), is(true));
         assertBusy(() -> {
             GetDataFrameTransformStatsResponse response = execute(new GetDataFrameTransformStatsRequest(id),
                     client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
             DataFrameTransformStateAndStats stateAndStats = response.getTransformsStateAndStats().get(0);
             assertEquals(IndexerState.STARTED, stateAndStats.getTransformState().getIndexerState());
+            assertEquals(DataFrameTransformTaskState.STARTED, stateAndStats.getTransformState().getTaskState());
             assertNotEquals(zeroIndexerStats, stateAndStats.getTransformStats());
         });
     }

+ 6 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java

@@ -41,12 +41,16 @@ public class DataFrameTransformStateTests extends ESTestCase {
     }
 
     public static DataFrameTransformState randomDataFrameTransformState() {
-        return new DataFrameTransformState(randomFrom(IndexerState.values()), randomPositionMap(), randomLongBetween(0,10));
+        return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
+            randomFrom(IndexerState.values()),
+            randomPositionMap(),
+            randomLongBetween(0,10));
     }
 
     public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException {
         builder.startObject();
-        builder.field("transform_state", state.getIndexerState().value());
+        builder.field("task_state", state.getTaskState().value());
+        builder.field("indexer_state", state.getIndexerState().value());
         if (state.getPosition() != null) {
             builder.field("current_position", state.getPosition());
         }

+ 6 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

@@ -39,6 +39,7 @@ import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
+import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
 import org.elasticsearch.client.dataframe.transforms.DestConfig;
 import org.elasticsearch.client.dataframe.transforms.QueryConfig;
 import org.elasticsearch.client.dataframe.transforms.SourceConfig;
@@ -466,13 +467,16 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
             // tag::get-data-frame-transform-stats-response
             DataFrameTransformStateAndStats stateAndStats =
                     response.getTransformsStateAndStats().get(0);   // <1>
+            DataFrameTransformTaskState taskState =
+                stateAndStats.getTransformState().getTaskState(); // <2>
             IndexerState indexerState =
-                    stateAndStats.getTransformState().getIndexerState();  // <2>
+                    stateAndStats.getTransformState().getIndexerState();  // <3>
             DataFrameIndexerTransformStats transformStats =
-                    stateAndStats.getTransformStats();              // <3>
+                    stateAndStats.getTransformStats();              // <4>
             // end::get-data-frame-transform-stats-response
 
             assertEquals(IndexerState.STOPPED, indexerState);
+            assertEquals(DataFrameTransformTaskState.STOPPED, taskState);
             assertNotNull(transformStats);
         }
         {

+ 3 - 2
docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc

@@ -35,5 +35,6 @@ The returned +{response}+ contains the requested {dataframe-transform} statistic
 include-tagged::{doc-tests-file}[{api}-response]
 --------------------------------------------------
 <1> The response contains a list of `DataFrameTransformStateAndStats` objects
-<2> The running state of the transform e.g `started`
-<3> The transform progress statistics recording the number of documents indexed etc
+<2> The running state of the transform task e.g `started`
+<3> The running state of the transform indexer e.g `started`, `indexing`, etc.
+<4> The transform progress statistics recording the number of documents indexed etc

+ 2 - 1
docs/reference/data-frames/apis/get-transform-stats.asciidoc

@@ -63,7 +63,8 @@ The API returns the following results:
     {
       "id" : "ecommerce_transform",
       "state" : {
-        "transform_state" : "started",
+        "indexer_state" : "started",
+        "task_state": "started",
         "current_position" : {
           "customer_id" : "9"
         },

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java

@@ -26,6 +26,7 @@ public final class DataFrameField {
     public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
     public static final ParseField SOURCE = new ParseField("source");
     public static final ParseField DESTINATION = new ParseField("dest");
+    public static final ParseField FORCE = new ParseField("force");
 
     // common strings
     public static final String TASK_NAME = "data_frame/transforms";

+ 7 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java

@@ -42,9 +42,11 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
     public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
 
         private String id;
+        private boolean force;
 
-        public Request(String id) {
+        public Request(String id, boolean force) {
             this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
+            this.force = force;
         }
 
         public Request() {
@@ -59,6 +61,10 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
             return id;
         }
 
+        public boolean isForce() {
+            return force;
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);

+ 15 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java

@@ -47,23 +47,26 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
     public static class Request extends BaseTasksRequest<Request> implements ToXContent {
         private String id;
         private final boolean waitForCompletion;
+        private final boolean force;
 
-        public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout) {
+        public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout) {
             this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
             this.waitForCompletion = waitForCompletion;
+            this.force = force;
 
             // use the timeout value already present in BaseTasksRequest
             this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
         }
 
         private Request() {
-            this(null, false, null);
+            this(null, false, false, null);
         }
 
         public Request(StreamInput in) throws IOException {
             super(in);
             id = in.readString();
             waitForCompletion = in.readBoolean();
+            force = in.readBoolean();
         }
 
         public String getId() {
@@ -78,11 +81,16 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
             return waitForCompletion;
         }
 
+        public boolean isForce() {
+            return force;
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeString(id);
             out.writeBoolean(waitForCompletion);
+            out.writeBoolean(force);
         }
 
         @Override
@@ -94,6 +102,7 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.field(DataFrameField.ID.getPreferredName(), id);
             builder.field(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), waitForCompletion);
+            builder.field(DataFrameField.FORCE.getPreferredName(), force);
             if (this.getTimeout() != null) {
                 builder.field(DataFrameField.TIMEOUT.getPreferredName(), this.getTimeout());
             }
@@ -103,7 +112,7 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
         @Override
         public int hashCode() {
             // the base class does not implement hashCode, therefore we need to hash timeout ourselves
-            return Objects.hash(id, waitForCompletion, this.getTimeout());
+            return Objects.hash(id, waitForCompletion, force, this.getTimeout());
         }
 
         @Override
@@ -122,7 +131,9 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
                 return false;
             }
 
-            return Objects.equals(id, other.id) && Objects.equals(waitForCompletion, other.waitForCompletion);
+            return Objects.equals(id, other.id) &&
+                Objects.equals(waitForCompletion, other.waitForCompletion) &&
+                Objects.equals(force, other.force);
         }
 
         @Override

+ 63 - 16
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.dataframe.transforms;
 
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@@ -21,7 +22,6 @@ import org.elasticsearch.xpack.core.indexing.IndexerState;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.SortedMap;
@@ -33,28 +33,44 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
 public class DataFrameTransformState implements Task.Status, PersistentTaskState {
     public static final String NAME = DataFrameField.TASK_NAME;
 
-    private final IndexerState state;
+    private final DataFrameTransformTaskState taskState;
+    private final IndexerState indexerState;
     private final long generation;
 
     @Nullable
     private final SortedMap<String, Object> currentPosition;
+    @Nullable
+    private final String reason;
 
-    private static final ParseField STATE = new ParseField("transform_state");
+    private static final ParseField TASK_STATE = new ParseField("task_state");
+    private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
     private static final ParseField CURRENT_POSITION = new ParseField("current_position");
     private static final ParseField GENERATION = new ParseField("generation");
+    private static final ParseField REASON = new ParseField("reason");
 
     @SuppressWarnings("unchecked")
     public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
-            args -> new DataFrameTransformState((IndexerState) args[0], (HashMap<String, Object>) args[1], (long) args[2]));
+            true,
+            args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
+                (IndexerState) args[1],
+                (Map<String, Object>) args[2],
+                (long) args[3],
+                (String) args[4]));
 
     static {
+        PARSER.declareField(constructorArg(), p -> {
+            if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
+                return DataFrameTransformTaskState.fromString(p.text());
+            }
+            throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
+        }, TASK_STATE, ObjectParser.ValueType.STRING);
         PARSER.declareField(constructorArg(), p -> {
             if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
                 return IndexerState.fromString(p.text());
             }
             throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
 
-        }, STATE, ObjectParser.ValueType.STRING);
+        }, INDEXER_STATE, ObjectParser.ValueType.STRING);
         PARSER.declareField(optionalConstructorArg(), p -> {
             if (p.currentToken() == XContentParser.Token.START_OBJECT) {
                 return p.map();
@@ -64,23 +80,36 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
             }
             throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
         }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
-        PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), GENERATION);
+        PARSER.declareLong(constructorArg(), GENERATION);
+        PARSER.declareString(optionalConstructorArg(), REASON);
     }
 
-    public DataFrameTransformState(IndexerState state, @Nullable Map<String, Object> position, long generation) {
-        this.state = state;
+    public DataFrameTransformState(DataFrameTransformTaskState taskState,
+                                   IndexerState indexerState,
+                                   @Nullable Map<String, Object> position,
+                                   long generation,
+                                   @Nullable String reason) {
+        this.taskState = taskState;
+        this.indexerState = indexerState;
         this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
         this.generation = generation;
+        this.reason = reason;
     }
 
     public DataFrameTransformState(StreamInput in) throws IOException {
-        state = IndexerState.fromStream(in);
+        taskState = DataFrameTransformTaskState.fromStream(in);
+        indexerState = IndexerState.fromStream(in);
         currentPosition = in.readBoolean() ? Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap())) : null;
         generation = in.readLong();
+        reason = in.readOptionalString();
+    }
+
+    public DataFrameTransformTaskState getTaskState() {
+        return taskState;
     }
 
     public IndexerState getIndexerState() {
-        return state;
+        return indexerState;
     }
 
     public Map<String, Object> getPosition() {
@@ -91,6 +120,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
         return generation;
     }
 
+    public String getReason() {
+        return reason;
+    }
+
     public static DataFrameTransformState fromXContent(XContentParser parser) {
         try {
             return PARSER.parse(parser, null);
@@ -102,11 +135,15 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
-        builder.field(STATE.getPreferredName(), state.value());
+        builder.field(TASK_STATE.getPreferredName(), taskState.value());
+        builder.field(INDEXER_STATE.getPreferredName(), indexerState.value());
         if (currentPosition != null) {
             builder.field(CURRENT_POSITION.getPreferredName(), currentPosition);
         }
         builder.field(GENERATION.getPreferredName(), generation);
+        if (reason != null) {
+            builder.field(REASON.getPreferredName(), reason);
+        }
         builder.endObject();
         return builder;
     }
@@ -118,12 +155,14 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        state.writeTo(out);
+        taskState.writeTo(out);
+        indexerState.writeTo(out);
         out.writeBoolean(currentPosition != null);
         if (currentPosition != null) {
             out.writeMap(currentPosition);
         }
         out.writeLong(generation);
+        out.writeOptionalString(reason);
     }
 
     @Override
@@ -138,12 +177,20 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
 
         DataFrameTransformState that = (DataFrameTransformState) other;
 
-        return Objects.equals(this.state, that.state) && Objects.equals(this.currentPosition, that.currentPosition)
-                && this.generation == that.generation;
+        return Objects.equals(this.taskState, that.taskState) &&
+            Objects.equals(this.indexerState, that.indexerState) &&
+            Objects.equals(this.currentPosition, that.currentPosition) &&
+            this.generation == that.generation &&
+            Objects.equals(this.reason, that.reason);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(state, currentPosition, generation);
+        return Objects.hash(taskState, indexerState, currentPosition, generation, reason);
+    }
+
+    @Override
+    public String toString() {
+        return Strings.toString(this);
     }
-}
+}

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java

@@ -41,7 +41,7 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
 
     public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
         return new DataFrameTransformStateAndStats(id,
-            new DataFrameTransformState(IndexerState.STOPPED, null, 0),
+            new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null),
             new DataFrameIndexerTransformStats());
     }
 

+ 36 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java

@@ -0,0 +1,36 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+
+import java.io.IOException;
+import java.util.Locale;
+
+public enum DataFrameTransformTaskState implements Writeable {
+    STOPPED, STARTED, FAILED;
+
+    public static DataFrameTransformTaskState fromString(String name) {
+        return valueOf(name.trim().toUpperCase(Locale.ROOT));
+    }
+
+    public static DataFrameTransformTaskState fromStream(StreamInput in) throws IOException {
+        return in.readEnum(DataFrameTransformTaskState.class);
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        DataFrameTransformTaskState state = this;
+        out.writeEnum(state);
+    }
+
+    public String value() {
+        return name().toLowerCase(Locale.ROOT);
+    }
+}

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java

@@ -13,7 +13,7 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformActi
 public class StartDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
     @Override
     protected Request createTestInstance() {
-        return new Request(randomAlphaOfLengthBetween(1, 20));
+        return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean());
     }
 
     @Override

+ 4 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java

@@ -16,7 +16,7 @@ public class StopDataFrameTransformActionRequestTests extends AbstractWireSerial
     @Override
     protected Request createTestInstance() {
         TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null;
-        return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), timeout);
+        return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout);
     }
 
     @Override
@@ -27,9 +27,10 @@ public class StopDataFrameTransformActionRequestTests extends AbstractWireSerial
     public void testSameButDifferentTimeout() {
         String id = randomAlphaOfLengthBetween(1, 10);
         boolean waitForCompletion = randomBoolean();
+        boolean force = randomBoolean();
 
-        Request r1 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(10));
-        Request r2 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(20));
+        Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10));
+        Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20));
 
         assertNotEquals(r1,r2);
         assertNotEquals(r1.hashCode(),r2.hashCode());

+ 16 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java

@@ -14,11 +14,16 @@ import org.elasticsearch.xpack.core.indexing.IndexerState;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Predicate;
 
 public class DataFrameTransformStateTests extends AbstractSerializingTestCase<DataFrameTransformState> {
 
     public static DataFrameTransformState randomDataFrameTransformState() {
-        return new DataFrameTransformState(randomFrom(IndexerState.values()), randomPosition(), randomLongBetween(0,10));
+        return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
+            randomFrom(IndexerState.values()),
+            randomPosition(),
+            randomLongBetween(0,10),
+            randomBoolean() ? null : randomAlphaOfLength(10));
     }
 
     @Override
@@ -53,4 +58,14 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase<Da
         }
         return position;
     }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+    @Override
+    protected Predicate<String> getRandomFieldsExcludeFilter() {
+        return field -> !field.isEmpty();
+    }
 }

+ 81 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskStateTests.java

@@ -0,0 +1,81 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class DataFrameTransformTaskStateTests extends ESTestCase {
+
+    public void testValidOrdinals() {
+        assertThat(DataFrameTransformTaskState.STOPPED.ordinal(), equalTo(0));
+        assertThat(DataFrameTransformTaskState.STARTED.ordinal(), equalTo(1));
+        assertThat(DataFrameTransformTaskState.FAILED.ordinal(), equalTo(2));
+    }
+
+    public void testwriteTo() throws Exception {
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            DataFrameTransformTaskState.STOPPED.writeTo(out);
+            try (StreamInput in = out.bytes().streamInput()) {
+                assertThat(in.readVInt(), equalTo(0));
+            }
+        }
+
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            DataFrameTransformTaskState.STARTED.writeTo(out);
+            try (StreamInput in = out.bytes().streamInput()) {
+                assertThat(in.readVInt(), equalTo(1));
+            }
+        }
+
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            DataFrameTransformTaskState.FAILED.writeTo(out);
+            try (StreamInput in = out.bytes().streamInput()) {
+                assertThat(in.readVInt(), equalTo(2));
+            }
+        }
+    }
+
+    public void testReadFrom() throws Exception {
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            out.writeVInt(0);
+            try (StreamInput in = out.bytes().streamInput()) {
+                assertThat(DataFrameTransformTaskState.fromStream(in), equalTo(DataFrameTransformTaskState.STOPPED));
+            }
+        }
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            out.writeVInt(1);
+            try (StreamInput in = out.bytes().streamInput()) {
+                assertThat(DataFrameTransformTaskState.fromStream(in), equalTo(DataFrameTransformTaskState.STARTED));
+            }
+        }
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            out.writeVInt(2);
+            try (StreamInput in = out.bytes().streamInput()) {
+                assertThat(DataFrameTransformTaskState.fromStream(in), equalTo(DataFrameTransformTaskState.FAILED));
+            }
+        }
+    }
+
+    public void testInvalidReadFrom() throws Exception {
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            out.writeVInt(randomIntBetween(3, Integer.MAX_VALUE));
+            try (StreamInput in = out.bytes().streamInput()) {
+                DataFrameTransformTaskState.fromStream(in);
+                fail("Expected IOException");
+            } catch(IOException e) {
+                assertThat(e.getMessage(), containsString("Unknown DataFrameTransformTaskState ordinal ["));
+            }
+
+        }
+    }
+}

+ 40 - 9
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

@@ -167,17 +167,34 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         assertTrue(indexExists(dataFrameIndex));
     }
 
-    protected void startAndWaitForTransform(String transformId, String dataFrameIndex) throws Exception {
-        startAndWaitForTransform(transformId, dataFrameIndex, null);
+    protected void startDataframeTransform(String transformId, boolean force) throws IOException {
+        startDataframeTransform(transformId, force, null);
     }
 
-    protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
+    protected void startDataframeTransform(String transformId, boolean force, String authHeader) throws IOException {
         // start the transform
         final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader);
-
+        startTransformRequest.addParameter(DataFrameField.FORCE.getPreferredName(), Boolean.toString(force));
         Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
         assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));
+    }
 
+    protected void stopDataFrameTransform(String transformId, boolean force) throws Exception {
+        // start the transform
+        final Request stopTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_stop", null);
+        stopTransformRequest.addParameter(DataFrameField.FORCE.getPreferredName(), Boolean.toString(force));
+        stopTransformRequest.addParameter(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true));
+        Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
+        assertThat(stopTransformResponse.get("stopped"), equalTo(Boolean.TRUE));
+    }
+
+    protected void startAndWaitForTransform(String transformId, String dataFrameIndex) throws Exception {
+        startAndWaitForTransform(transformId, dataFrameIndex, null);
+    }
+
+    protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
+        // start the transform
+        startDataframeTransform(transformId, false, authHeader);
         // wait until the dataframe has been created and all data is available
         waitForDataFrameGeneration(transformId);
         refreshIndex(dataFrameIndex);
@@ -216,13 +233,29 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
     }
 
     protected static String getDataFrameIndexerState(String transformId) throws IOException {
+        Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
+        return transformStatsAsMap == null ? null :
+            (String) XContentMapValues.extractValue("state.indexer_state", transformStatsAsMap);
+    }
+
+    protected static String getDataFrameTaskState(String transformId) throws IOException {
+        Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
+        return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state.task_state", transformStatsAsMap);
+    }
+
+    protected static Map<?, ?> getDataFrameState(String transformId) throws IOException {
         Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));
         List<?> transforms = ((List<?>) entityAsMap(statsResponse).get("transforms"));
         if (transforms.isEmpty()) {
             return null;
         }
-        Map<?, ?> transformStatsAsMap = (Map<?, ?>) transforms.get(0);
-        return (String) XContentMapValues.extractValue("state.transform_state", transformStatsAsMap);
+        return (Map<?, ?>) transforms.get(0);
+    }
+
+    protected static void deleteDataFrameTransform(String transformId) throws IOException {
+        Request request = new Request("DELETE", DATAFRAME_ENDPOINT + transformId);
+        request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this
+        adminClient().performRequest(request);
     }
 
     @AfterClass
@@ -251,9 +284,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
 
         for (Map<String, Object> transformConfig : transformConfigs) {
             String transformId = (String) transformConfig.get("id");
-            Request request = new Request("DELETE", DATAFRAME_ENDPOINT + transformId);
-            request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this
-            adminClient().performRequest(request);
+            deleteDataFrameTransform(transformId);
         }
 
         // transforms should be all gone

+ 94 - 0
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java

@@ -0,0 +1,94 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.integration;
+
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.equalTo;
+
+public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
+
+    public void testFailureStateInteraction() throws Exception {
+        createReviewsIndex();
+        String transformId = "failure_pivot_1";
+        String dataFrameIndex = "failure_pivot_reviews";
+        createPivotReviewsTransform(transformId, dataFrameIndex, null);
+        deleteIndex(REVIEWS_INDEX_NAME); // trigger start failure due to index missing
+        startDataframeTransform(transformId, false);
+        awaitState(transformId, DataFrameTransformTaskState.FAILED);
+        Map<?, ?> fullState = getDataFrameState(transformId);
+
+        // Verify we have failed for the expected reason
+        assertThat(XContentMapValues.extractValue("state.reason", fullState),
+            equalTo("task encountered irrecoverable failure: no such index [reviews]"));
+        assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started"));
+
+        // Verify that we cannot stop or start the transform when the task is in a failed state
+        ResponseException ex = expectThrows(ResponseException.class, () -> stopDataFrameTransform(transformId, false));
+        assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
+        assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
+            equalTo("Unable to stop data frame transform [failure_pivot_1] as it is in a failed state with reason: [" +
+                "task encountered irrecoverable failure: no such index [reviews]]. Use force stop to stop the data frame transform."));
+
+        ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
+        assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
+        assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
+            equalTo("Unable to start data frame transform [failure_pivot_1] as it is in a failed state with failure: [" +
+                "task encountered irrecoverable failure: no such index [reviews]]. " +
+                "Use force start to restart data frame transform once error is resolved."));
+
+        // Correct the failure by creating the reviews index again
+        createReviewsIndex();
+        // Force start the data frame to indicate failure correction
+        startDataframeTransform(transformId, true);
+        // Wait for data to be indexed appropriately and refresh for search
+        waitForDataFrameGeneration(transformId);
+        refreshIndex(dataFrameIndex);
+
+        // Verify that we have started and that our reason is cleared
+        fullState = getDataFrameState(transformId);
+        assertThat(XContentMapValues.extractValue("state.reason", fullState), is(nullValue()));
+        assertThat(XContentMapValues.extractValue("state.task_state", fullState), equalTo("started"));
+        assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started"));
+        assertThat(XContentMapValues.extractValue("stats.search_failures", fullState), equalTo(1));
+
+        // get and check some users to verify we restarted
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
+
+
+        stopDataFrameTransform(transformId, true);
+        deleteDataFrameTransform(transformId);
+    }
+
+    private void awaitState(String transformId, DataFrameTransformTaskState state) throws Exception {
+        assertBusy(() -> {
+            String currentState = getDataFrameTaskState(transformId);
+            assertThat(state.value(), equalTo(currentState));
+        });
+    }
+
+    private void assertOnePivotValue(String query, double expected) throws IOException {
+        Map<String, Object> searchResult = getAsMap(query);
+
+        assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
+        double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
+        assertEquals(expected, actual, 0.000001);
+    }
+}

+ 18 - 6
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java

@@ -33,6 +33,8 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformActi
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 
 import java.util.Collection;
@@ -94,11 +96,7 @@ public class TransportStartDataFrameTransformAction extends
                             new StartDataFrameTransformTaskAction.Request(request.getId()),
                             ActionListener.wrap(
                                 r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
-                                startingFailure -> cancelDataFrameTask(task.getId(),
-                                    transformTask.getId(),
-                                    startingFailure,
-                                    listener::onFailure)
-                                )),
+                                listener::onFailure)),
                         listener::onFailure));
             },
             listener::onFailure
@@ -122,7 +120,21 @@ public class TransportStartDataFrameTransformAction extends
                         transformTask,
                         persistentTaskActionListener);
                 } else {
-                    persistentTaskActionListener.onResponse(existingTask);
+                    DataFrameTransformState transformState = (DataFrameTransformState)existingTask.getState();
+                    if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
+                        listener.onFailure(new ElasticsearchStatusException(
+                            "Unable to start data frame transform [" + config.getId() +
+                                "] as it is in a failed state with failure: [" + transformState.getReason() +
+                            "]. Use force start to restart data frame transform once error is resolved.",
+                            RestStatus.CONFLICT));
+                    } else if (transformState.getTaskState() != DataFrameTransformTaskState.STOPPED &&
+                               transformState.getTaskState() != DataFrameTransformTaskState.FAILED) {
+                        listener.onFailure(new ElasticsearchStatusException(
+                            "Unable to start data frame transform [" + config.getId() +
+                                "] as it is in state [" + transformState.getTaskState()  + "]", RestStatus.CONFLICT));
+                    } else {
+                        persistentTaskActionListener.onResponse(existingTask);
+                    }
                 }
             },
             listener::onFailure

+ 11 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.dataframe.action;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.FailedNodeException;
@@ -15,11 +16,13 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
 
@@ -60,6 +63,14 @@ public class TransportStopDataFrameTransformAction extends
     protected void taskOperation(StopDataFrameTransformAction.Request request, DataFrameTransformTask transformTask,
             ActionListener<StopDataFrameTransformAction.Response> listener) {
         if (transformTask.getTransformId().equals(request.getId())) {
+            if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
+                listener.onFailure(
+                    new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId()
+                        + "] as it is in a failed state with reason: [" + transformTask.getState().getReason() +
+                        "]. Use force stop to stop the data frame transform.",
+                        RestStatus.CONFLICT));
+                return;
+            }
             if (request.waitForCompletion() == false) {
                 transformTask.stop(listener);
             } else {

+ 2 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java

@@ -28,7 +28,8 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler {
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
         String id = restRequest.param(DataFrameField.ID.getPreferredName());
-        StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id);
+        boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false);
+        StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id, force);
         request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
         return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }

+ 2 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java

@@ -30,8 +30,9 @@ public class RestStopDataFrameTransformAction extends BaseRestHandler {
         TimeValue timeout = restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(),
                 StopDataFrameTransformAction.DEFAULT_TIMEOUT);
         boolean waitForCompletion = restRequest.paramAsBoolean(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), false);
+        boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false);
 
-        StopDataFrameTransformAction.Request request = new StopDataFrameTransformAction.Request(id, waitForCompletion, timeout);
+        StopDataFrameTransformAction.Request request = new StopDataFrameTransformAction.Request(id, waitForCompletion, force, timeout);
 
         return channel -> client.execute(StopDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }

+ 6 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.dataframe.DataFrame;
 import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
@@ -60,6 +61,11 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
         SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(
                 DataFrameTransformTask.SCHEDULE_NAME + "_" + params.getId(), next());
 
+        DataFrameTransformState transformState = (DataFrameTransformState) state;
+        if (transformState != null && transformState.getTaskState() == DataFrameTransformTaskState.FAILED) {
+            logger.warn("Tried to start failed transform [" + params.getId() + "] failure reason: " + transformState.getReason());
+            return;
+        }
         // Note that while the task is added to the scheduler here, the internal state will prevent
         // it from doing any work until the task is "started" via the StartTransform api
         schedulerEngine.register(buildTask);

+ 144 - 60
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -18,7 +18,9 @@ import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
+import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
@@ -33,6 +35,7 @@ import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformActio
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
@@ -40,14 +43,20 @@ import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpoin
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+
 public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener {
 
     private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class);
+    // TODO consider moving to dynamic cluster setting
+    private static final int MAX_CONTINUOUS_FAILURES = 10;
+    private static final IndexerState[] RUNNING_STATES = new IndexerState[]{IndexerState.STARTED, IndexerState.INDEXING};
     public static final String SCHEDULE_NAME = DataFrameField.TASK_NAME + "/schedule";
 
     private final DataFrameTransform transform;
@@ -56,10 +65,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     private final DataFrameIndexer indexer;
     private final Auditor<DataFrameAuditMessage> auditor;
 
+    private final AtomicReference<DataFrameTransformTaskState> taskState;
+    private final AtomicReference<String> stateReason;
     // the generation of this data frame, for v1 there will be only
     // 0: data frame not created or still indexing
     // 1: data frame complete, all data has been indexed
     private final AtomicReference<Long> generation;
+    private final AtomicInteger failureCount;
 
     public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform,
                                   DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager,
@@ -72,10 +84,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         this.threadPool = threadPool;
         this.auditor = auditor;
         IndexerState initialState = IndexerState.STOPPED;
+        DataFrameTransformTaskState initialTaskState = DataFrameTransformTaskState.STOPPED;
+        String initialReason = null;
         long initialGeneration = 0;
         Map<String, Object> initialPosition = null;
         logger.info("[{}] init, got state: [{}]", transform.getId(), state != null);
         if (state != null) {
+            initialTaskState = state.getTaskState();
+            initialReason = state.getReason();
             final IndexerState existingState = state.getIndexerState();
             logger.info("[{}] Loading existing state: [{}], position [{}]", transform.getId(), existingState, state.getPosition());
             if (existingState.equals(IndexerState.INDEXING)) {
@@ -93,7 +109,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
         this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService,
             new AtomicReference<>(initialState), initialPosition, client, auditor);
-        this.generation = new AtomicReference<Long>(initialGeneration);
+        this.generation = new AtomicReference<>(initialGeneration);
+        this.taskState = new AtomicReference<>(initialTaskState);
+        this.stateReason = new AtomicReference<>(initialReason);
+        this.failureCount = new AtomicInteger(0);
     }
 
     public String getTransformId() {
@@ -109,7 +128,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     }
 
     public DataFrameTransformState getState() {
-        return new DataFrameTransformState(indexer.getState(), indexer.getPosition(), generation.get());
+        return new DataFrameTransformState(taskState.get(), indexer.getState(), indexer.getPosition(), generation.get(), stateReason.get());
     }
 
     public DataFrameIndexerTransformStats getStats() {
@@ -125,66 +144,71 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     }
 
     public synchronized void start(ActionListener<Response> listener) {
-        final IndexerState prevState = indexer.getState();
-        if (prevState != IndexerState.STOPPED) {
-            // fails if the task is not STOPPED
-            listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
-                    transform.getId(), prevState));
-            return;
-        }
-
         final IndexerState newState = indexer.start();
-        if (newState != IndexerState.STARTED) {
+        if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
             listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
                     transform.getId(), newState));
             return;
         }
-
-        final DataFrameTransformState state = new DataFrameTransformState(IndexerState.STOPPED, indexer.getPosition(), generation.get());
-
-        logger.debug("Updating state for data frame transform [{}] to [{}][{}]", transform.getId(), state.getIndexerState(),
-                state.getPosition());
-        updatePersistentTaskState(state,
-                ActionListener.wrap(
-                        (task) -> {
-                            auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]");
-                            logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to ["
-                                    + state.getIndexerState() + "][" + state.getPosition() + "]");
-                            listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
-                        }, (exc) -> {
-                            // We were unable to update the persistent status, so we need to shutdown the indexer too.
-                            indexer.stop();
-                            listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
+        stateReason.set(null);
+        taskState.set(DataFrameTransformTaskState.STARTED);
+        failureCount.set(0);
+
+        final DataFrameTransformState state = new DataFrameTransformState(
+            DataFrameTransformTaskState.STARTED,
+            IndexerState.STOPPED,
+            indexer.getPosition(),
+            generation.get(),
+            null);
+
+        logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
+        persistStateToClusterState(state, ActionListener.wrap(
+            task -> {
+                auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
+                listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
+            },
+            exc -> {
+                indexer.stop();
+                listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
                                     + transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
-                        })
-        );
+            }
+        ));
     }
 
     public synchronized void stop(ActionListener<StopDataFrameTransformAction.Response> listener) {
+        // taskState is initialized as STOPPED and is updated in tandem with the indexerState
+        // Consequently, if it is STOPPED, we consider the whole task STOPPED.
+        if (taskState.get() == DataFrameTransformTaskState.STOPPED) {
+            listener.onResponse(new StopDataFrameTransformAction.Response(true));
+            return;
+        }
         final IndexerState newState = indexer.stop();
         switch (newState) {
         case STOPPED:
-            listener.onResponse(new StopDataFrameTransformAction.Response(true));
-            break;
-
+            // Fall through to `STOPPING` as the behavior is the same for both, we should persist for both
         case STOPPING:
             // update the persistent state to STOPPED. There are two scenarios and both are safe:
             // 1. we persist STOPPED now, indexer continues a bit then sees the flag and checkpoints another STOPPED with the more recent
             // position.
             // 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up at last checkpoint,
             // overwrite some docs and eventually checkpoint.
-            DataFrameTransformState state = new DataFrameTransformState(IndexerState.STOPPED, indexer.getPosition(), generation.get());
-            updatePersistentTaskState(state, ActionListener.wrap((task) -> {
-                auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]");
-                logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(),
-                        state.getIndexerState());
-                listener.onResponse(new StopDataFrameTransformAction.Response(true));
-            }, (exc) -> {
-                listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [{}] to [{}]", exc,
-                        transform.getId(), state.getIndexerState()));
-            }));
+            taskState.set(DataFrameTransformTaskState.STOPPED);
+            DataFrameTransformState state = new DataFrameTransformState(
+                DataFrameTransformTaskState.STOPPED,
+                IndexerState.STOPPED,
+                indexer.getPosition(),
+                generation.get(),
+                stateReason.get());
+            persistStateToClusterState(state, ActionListener.wrap(
+                task -> {
+                    auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
+                    listener.onResponse(new StopDataFrameTransformAction.Response(true));
+                },
+                exc -> listener.onFailure(new ElasticsearchException(
+                    "Error while updating state for data frame transform [{}] to [{}]", exc,
+                    transform.getId(),
+                    state.getIndexerState()))));
             break;
-
         default:
             listener.onFailure(new ElasticsearchException("Cannot stop task for data frame transform [{}], because state was [{}]",
                     transform.getId(), newState));
@@ -217,6 +241,40 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         markAsCompleted();
     }
 
+    void persistStateToClusterState(DataFrameTransformState state,
+                                    ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
+        updatePersistentTaskState(state, ActionListener.wrap(
+            success -> {
+                logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
+                listener.onResponse(success);
+            },
+            failure -> {
+                auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage());
+                logger.error("Failed to update state for data frame transform [" + transform.getId() + "]", failure);
+                listener.onFailure(failure);
+            }
+        ));
+    }
+
+    private boolean isIrrecoverableFailure(Exception e) {
+        return e instanceof IndexNotFoundException || e instanceof DataFrameConfigurationException;
+    }
+
+    synchronized void handleFailure(Exception e) {
+        if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > MAX_CONTINUOUS_FAILURES) {
+            String failureMessage = isIrrecoverableFailure(e) ?
+                "task encountered irrecoverable failure: " + e.getMessage() :
+                "task encountered more than " + MAX_CONTINUOUS_FAILURES + " failures; latest failure: " + e.getMessage();
+            auditor.error(transform.getId(), failureMessage);
+            stateReason.set(failureMessage);
+            taskState.set(DataFrameTransformTaskState.FAILED);
+            persistStateToClusterState(getState(), ActionListener.wrap(
+                r -> failureCount.set(0), // Successfully marked as failed, reset counter so that task can be restarted
+                exception -> {} // Noop, internal method logs the failure to update the state
+            ));
+        }
+    }
+
     /**
      * This is called when the persistent task signals that the allocated task should be terminated.
      * Termination in the task framework is essentially voluntary, as the allocated task can only be
@@ -239,6 +297,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         private final DataFrameTransformsCheckpointService transformsCheckpointService;
         private final String transformId;
         private final Auditor<DataFrameAuditMessage> auditor;
+        // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
+        private volatile String lastAuditedExceptionMessage = null;
         private Map<String, String> fieldMappings = null;
 
         private DataFrameTransformConfig transformConfig = null;
@@ -272,12 +332,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
         @Override
         public synchronized boolean maybeTriggerAsyncJob(long now) {
+            if (taskState.get() == DataFrameTransformTaskState.FAILED) {
+                logger.debug("Schedule was triggered for transform [" + getJobId() + "] but task is failed.  Ignoring trigger.");
+                return false;
+            }
+
             if (transformConfig == null) {
                 CountDownLatch latch = new CountDownLatch(1);
 
-                transformsConfigManager.getTransformConfiguration(transformId, new LatchedActionListener<>(ActionListener.wrap(config -> {
-                    transformConfig = config;
-                }, e -> {
+                transformsConfigManager.getTransformConfiguration(transformId, new LatchedActionListener<>(ActionListener.wrap(
+                    config -> transformConfig = config,
+                    e -> {
                     throw new RuntimeException(
                             DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CONFIGURATION, transformId), e);
                 }), latch));
@@ -290,11 +355,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 }
             }
 
-            // todo: set job into failed state
             if (transformConfig.isValid() == false) {
-                auditor.error(transformId, "Cannot execute data frame transform as configuration is invalid");
-                throw new RuntimeException(
-                        DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
+                DataFrameConfigurationException exception = new DataFrameConfigurationException(transformId);
+                handleFailure(exception);
+                throw exception;
             }
 
             if (fieldMappings == null) {
@@ -341,24 +405,36 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 return;
             }
 
-            if(indexerState.equals(IndexerState.STARTED)) {
-                // if the indexer resets the state to started, it means it is done, so increment the generation
+            if(indexerState.equals(IndexerState.STARTED) && getStats().getNumDocuments() > 0) {
+                // if the indexer resets the state to started, it means it is done with a run through the data.
+                // But, if there were no documents, we should allow it to attempt to gather more again, as there is no risk of overwriting
+                // Some reasons for no documents are (but is not limited to):
+                // * Could have failed early on search or index
+                // * Have an empty index
+                // * Have a query that returns no documents
                 generation.compareAndSet(0L, 1L);
             }
 
-            final DataFrameTransformState state = new DataFrameTransformState(indexerState, getPosition(), generation.get());
+            final DataFrameTransformState state = new DataFrameTransformState(
+                taskState.get(),
+                indexerState,
+                getPosition(),
+                generation.get(),
+                stateReason.get());
             logger.info("Updating persistent state of transform [" + transform.getId() + "] to [" + state.toString() + "]");
-
-            updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> {
-                logger.error("Updating persistent state of transform [" + transform.getId() + "] failed", exc);
-                next.run();
-            }));
+            persistStateToClusterState(state, ActionListener.wrap(t -> next.run(), e -> next.run()));
         }
 
         @Override
         protected void onFailure(Exception exc) {
-            auditor.error(transform.getId(), "Data frame transform failed with an exception: " + exc.getMessage());
-            logger.warn("Data frame transform [" + transform.getId() + "] failed with an exception: ", exc);
+            // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
+            // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
+            if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) {
+                auditor.warning(transform.getId(), "Data frame transform encountered an exception: " + exc.getMessage());
+                lastAuditedExceptionMessage = exc.getMessage();
+            }
+            logger.warn("Data frame transform [" + transform.getId() + "] encountered an exception: ", exc);
+            handleFailure(exc);
         }
 
         @Override
@@ -374,4 +450,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             shutdown();
         }
     }
+
+    class DataFrameConfigurationException extends RuntimeException {
+
+        DataFrameConfigurationException(String transformId) {
+            super(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
+        }
+
+    }
 }

+ 8 - 4
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

@@ -59,7 +59,7 @@ teardown:
   - match: { started: true }
 
   - do:
-      catch: /Cannot start task for data frame transform \[airline-transform-start-stop\], because state was \[STARTED\]/
+      catch: /Unable to start data frame transform \[airline-transform-start-stop\] as it is in state \[STARTED\]/
       data_frame.start_data_frame_transform:
         transform_id: "airline-transform-start-stop"
 
@@ -75,7 +75,8 @@ teardown:
         transform_id: "airline-transform-start-stop"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-start-stop" }
-  - match: { transforms.0.state.transform_state: "started" }
+  - match: { transforms.0.state.indexer_state: "started" }
+  - match: { transforms.0.state.task_state: "started" }
 
   - do:
       data_frame.stop_data_frame_transform:
@@ -87,7 +88,8 @@ teardown:
         transform_id: "airline-transform-start-stop"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-start-stop" }
-  - match: { transforms.0.state.transform_state: "stopped" }
+  - match: { transforms.0.state.indexer_state: "stopped" }
+  - match: { transforms.0.state.task_state: "stopped" }
 
   - do:
       data_frame.start_data_frame_transform:
@@ -99,7 +101,8 @@ teardown:
         transform_id: "airline-transform-start-stop"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-start-stop" }
-  - match: { transforms.0.state.transform_state: "started" }
+  - match: { transforms.0.state.indexer_state: "started" }
+  - match: { transforms.0.state.task_state: "started" }
 
 ---
 "Test stop missing transform":
@@ -114,3 +117,4 @@ teardown:
       data_frame.stop_data_frame_transform:
         transform_id: "airline-transform-start-stop"
   - match: { stopped: true }
+

+ 7 - 6
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

@@ -46,7 +46,8 @@ teardown:
         transform_id: "airline-transform-stats"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-stats" }
-  - match: { transforms.0.state.transform_state: "started" }
+  - match: { transforms.0.state.indexer_state: "started" }
+  - match: { transforms.0.state.task_state: "started" }
   - match: { transforms.0.state.generation: 0 }
   - match: { transforms.0.stats.pages_processed: 0 }
   - match: { transforms.0.stats.documents_processed: 0 }
@@ -124,18 +125,18 @@ teardown:
         transform_id: "*"
   - match: { count: 2 }
   - match: { transforms.0.id: "airline-transform-stats" }
-  - match: { transforms.0.state.transform_state: "started" }
+  - match: { transforms.0.state.indexer_state: "started" }
   - match: { transforms.1.id: "airline-transform-stats-dos" }
-  - match: { transforms.1.state.transform_state: "stopped" }
+  - match: { transforms.1.state.indexer_state: "stopped" }
 
   - do:
       data_frame.get_data_frame_transform_stats:
         transform_id: "_all"
   - match: { count: 2 }
   - match: { transforms.0.id: "airline-transform-stats" }
-  - match: { transforms.0.state.transform_state: "started" }
+  - match: { transforms.0.state.indexer_state: "started" }
   - match: { transforms.1.id: "airline-transform-stats-dos" }
-  - match: { transforms.1.state.transform_state: "stopped" }
+  - match: { transforms.1.state.indexer_state: "stopped" }
 
 ---
 "Test get single transform stats when it does not have a task":
@@ -157,7 +158,7 @@ teardown:
         transform_id: "airline-transform-stats-dos"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-stats-dos" }
-  - match: { transforms.0.state.transform_state: "stopped" }
+  - match: { transforms.0.state.indexer_state: "stopped" }
   - match: { transforms.0.state.generation: 0 }
   - match: { transforms.0.stats.pages_processed: 0 }
   - match: { transforms.0.stats.documents_processed: 0 }