Bläddra i källkod

[ML][Transforms] add wait_for_checkpoint flag to stop (#47935)

Adds `wait_for_checkpoint` for `_stop` API.
Benjamin Trent 6 år sedan
förälder
incheckning
451a5c0621
33 ändrade filer med 698 tillägg och 121 borttagningar
  1. 4 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/TransformRequestConverters.java
  2. 16 5
      client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopTransformRequest.java
  3. 2 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/TransformIT.java
  4. 12 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/TransformRequestConvertersTests.java
  5. 1 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TransformDocumentationIT.java
  6. 4 0
      docs/reference/transform/apis/stop-transform.asciidoc
  7. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java
  8. 32 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java
  9. 70 31
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java
  10. 2 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java
  11. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java
  12. 15 9
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java
  13. 25 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java
  14. 23 13
      x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json
  15. 21 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml
  16. 2 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml
  17. 43 0
      x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIT.java
  18. 10 2
      x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java
  19. 56 0
      x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java
  20. 5 1
      x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java
  21. 1 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java
  22. 23 13
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java
  23. 18 7
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java
  24. 6 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java
  25. 3 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopTransformAction.java
  26. 3 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/compat/RestStopTransformActionDeprecated.java
  27. 89 6
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java
  28. 9 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java
  29. 4 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java
  30. 58 13
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java
  31. 134 0
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java
  32. 2 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java
  33. 3 3
      x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml

+ 4 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/TransformRequestConverters.java

@@ -41,6 +41,7 @@ import static org.elasticsearch.client.RequestConverters.createEntity;
 import static org.elasticsearch.client.transform.DeleteTransformRequest.FORCE;
 import static org.elasticsearch.client.transform.GetTransformRequest.ALLOW_NO_MATCH;
 import static org.elasticsearch.client.transform.PutTransformRequest.DEFER_VALIDATION;
+import static org.elasticsearch.client.transform.StopTransformRequest.WAIT_FOR_CHECKPOINT;
 
 final class TransformRequestConverters {
 
@@ -135,6 +136,9 @@ final class TransformRequestConverters {
         if (stopRequest.getAllowNoMatch() != null) {
             request.addParameter(ALLOW_NO_MATCH, stopRequest.getAllowNoMatch().toString());
         }
+        if (stopRequest.getWaitForCheckpoint() != null) {
+            request.addParameter(WAIT_FOR_CHECKPOINT, stopRequest.getWaitForCheckpoint().toString());
+        }
         request.addParameters(params.asMap());
         return request;
     }

+ 16 - 5
client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopTransformRequest.java

@@ -28,21 +28,23 @@ import java.util.Optional;
 
 public class StopTransformRequest implements Validatable {
 
+    public static final String WAIT_FOR_CHECKPOINT = "wait_for_checkpoint";
+
     private final String id;
     private Boolean waitForCompletion;
+    private Boolean waitForCheckpoint;
     private TimeValue timeout;
     private Boolean allowNoMatch;
 
     public StopTransformRequest(String id) {
-        this.id = id;
-        waitForCompletion = null;
-        timeout = null;
+        this(id, null, null, null);
     }
 
-    public StopTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) {
+    public StopTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout, Boolean waitForCheckpoint) {
         this.id = id;
         this.waitForCompletion = waitForCompletion;
         this.timeout = timeout;
+        this.waitForCheckpoint = waitForCheckpoint;
     }
 
     public String getId() {
@@ -73,6 +75,14 @@ public class StopTransformRequest implements Validatable {
         this.allowNoMatch = allowNoMatch;
     }
 
+    public Boolean getWaitForCheckpoint() {
+        return waitForCheckpoint;
+    }
+
+    public void setWaitForCheckpoint(Boolean waitForCheckpoint) {
+        this.waitForCheckpoint = waitForCheckpoint;
+    }
+
     @Override
     public Optional<ValidationException> validate() {
         if (id == null) {
@@ -86,7 +96,7 @@ public class StopTransformRequest implements Validatable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, waitForCompletion, timeout, allowNoMatch);
+        return Objects.hash(id, waitForCompletion, timeout, allowNoMatch, waitForCheckpoint);
     }
 
     @Override
@@ -102,6 +112,7 @@ public class StopTransformRequest implements Validatable {
         return Objects.equals(this.id, other.id)
                 && Objects.equals(this.waitForCompletion, other.waitForCompletion)
                 && Objects.equals(this.timeout, other.timeout)
+                && Objects.equals(this.waitForCheckpoint, other.waitForCheckpoint)
                 && Objects.equals(this.allowNoMatch, other.allowNoMatch);
     }
 

+ 2 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/TransformIT.java

@@ -147,7 +147,7 @@ public class TransformIT extends ESRestHighLevelClientTestCase {
     public void cleanUpTransforms() throws Exception {
         for (String transformId : transformsToClean) {
             highLevelClient().transform().stopTransform(
-                    new StopTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
+                    new StopTransformRequest(transformId, Boolean.TRUE, null, false), RequestOptions.DEFAULT);
         }
 
         for (String transformId : transformsToClean) {
@@ -310,7 +310,7 @@ public class TransformIT extends ESRestHighLevelClientTestCase {
         assertThat(taskState, oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING,
             TransformStats.State.STOPPING, TransformStats.State.STOPPED));
 
-        StopTransformRequest stopRequest = new StopTransformRequest(id, Boolean.TRUE, null);
+        StopTransformRequest stopRequest = new StopTransformRequest(id, Boolean.TRUE, null, false);
         StopTransformResponse stopResponse =
                 execute(stopRequest, client::stopTransform, client::stopTransformAsync);
         assertTrue(stopResponse.isAcknowledged());

+ 12 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/TransformRequestConvertersTests.java

@@ -149,7 +149,12 @@ public class TransformRequestConvertersTests extends ESTestCase {
         if (randomBoolean()) {
             timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
         }
-        StopTransformRequest stopRequest = new StopTransformRequest(id, waitForCompletion, timeValue);
+        Boolean waitForCheckpoint = null;
+        if (randomBoolean()) {
+            waitForCheckpoint = randomBoolean();
+        }
+
+        StopTransformRequest stopRequest = new StopTransformRequest(id, waitForCompletion, timeValue, waitForCheckpoint);
 
         Request request = TransformRequestConverters.stopTransform(stopRequest);
         assertEquals(HttpPost.METHOD_NAME, request.getMethod());
@@ -168,6 +173,12 @@ public class TransformRequestConvertersTests extends ESTestCase {
         } else {
             assertFalse(request.getParameters().containsKey("timeout"));
         }
+        if (waitForCheckpoint != null) {
+            assertTrue(request.getParameters().containsKey("wait_for_checkpoint"));
+            assertEquals(stopRequest.getWaitForCheckpoint(), Boolean.parseBoolean(request.getParameters().get("wait_for_checkpoint")));
+        } else {
+            assertFalse(request.getParameters().containsKey("wait_for_checkpoint"));
+        }
 
         assertFalse(request.getParameters().containsKey(ALLOW_NO_MATCH));
         stopRequest.setAllowNoMatch(randomBoolean());

+ 1 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TransformDocumentationIT.java

@@ -81,7 +81,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
     public void cleanUpTransforms() throws Exception {
         for (String transformId : transformsToClean) {
             highLevelClient().transform().stopTransform(
-                    new StopTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
+                    new StopTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT);
         }
 
         for (String transformId : transformsToClean) {

+ 4 - 0
docs/reference/transform/apis/stop-transform.asciidoc

@@ -90,6 +90,10 @@ are no matches or only partial matches.
   state completely stops. If set to `false`, the API returns immediately and the
   indexer will be stopped asynchronously in the background. Defaults to `false`.
 
+`wait_for_checkpoint`::
+  (Optional, boolean) If set to `true`, the transform will not completely stop
+  until the current checkpoint is completed. If set to `false`, the transform
+  stops as soon as possible. Defaults to `false`.
 
 [[stop-transform-response-codes]]
 ==== {api-response-codes-title}

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java

@@ -22,6 +22,7 @@ public final class TransformField {
     public static final ParseField GROUP_BY = new ParseField("group_by");
     public static final ParseField TIMEOUT = new ParseField("timeout");
     public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
+    public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint");
     public static final ParseField STATS_FIELD = new ParseField("stats");
     public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
     public static final ParseField SOURCE = new ParseField("source");

+ 32 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.core.transform.action;
 
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
@@ -32,6 +33,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static org.elasticsearch.action.ValidateActions.addValidationError;
+
 public class StopTransformAction extends ActionType<StopTransformAction.Response> {
 
     public static final StopTransformAction INSTANCE = new StopTransformAction();
@@ -48,9 +51,15 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
         private final boolean waitForCompletion;
         private final boolean force;
         private final boolean allowNoMatch;
+        private final boolean waitForCheckpoint;
         private Set<String> expandedIds;
 
-        public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout, boolean allowNoMatch) {
+        public Request(String id,
+                       boolean waitForCompletion,
+                       boolean force,
+                       @Nullable TimeValue timeout,
+                       boolean allowNoMatch,
+                       boolean waitForCheckpoint) {
             this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
             this.waitForCompletion = waitForCompletion;
             this.force = force;
@@ -58,6 +67,7 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
             // use the timeout value already present in BaseTasksRequest
             this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
             this.allowNoMatch = allowNoMatch;
+            this.waitForCheckpoint = waitForCheckpoint;
         }
 
         public Request(StreamInput in) throws IOException {
@@ -73,6 +83,11 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
             } else {
                 this.allowNoMatch = true;
             }
+            if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+                this.waitForCheckpoint = in.readBoolean();
+            } else {
+                this.waitForCheckpoint = false;
+            }
         }
 
         public String getId() {
@@ -99,6 +114,10 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
             return allowNoMatch;
         }
 
+        public boolean isWaitForCheckpoint() {
+            return waitForCheckpoint;
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
@@ -113,17 +132,27 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
             if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
                 out.writeBoolean(allowNoMatch);
             }
+            if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+                out.writeBoolean(waitForCheckpoint);
+            }
         }
 
         @Override
         public ActionRequestValidationException validate() {
+            if (force && waitForCheckpoint) {
+                return addValidationError(new ParameterizedMessage(
+                    "cannot set both [{}] and [{}] to true",
+                        TransformField.FORCE,
+                        TransformField.WAIT_FOR_CHECKPOINT).getFormattedMessage(),
+                    null);
+            }
             return null;
         }
 
         @Override
         public int hashCode() {
             // the base class does not implement hashCode, therefore we need to hash timeout ourselves
-            return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch);
+            return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch, waitForCheckpoint);
         }
 
         @Override
@@ -146,6 +175,7 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
                     Objects.equals(waitForCompletion, other.waitForCompletion) &&
                     Objects.equals(force, other.force) &&
                     Objects.equals(expandedIds, other.expandedIds) &&
+                    Objects.equals(waitForCheckpoint, other.waitForCheckpoint) &&
                     allowNoMatch == other.allowNoMatch;
         }
 

+ 70 - 31
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java

@@ -43,6 +43,8 @@ public class TransformState implements Task.Status, PersistentTaskState {
     @Nullable
     private NodeAttributes node;
 
+    private final boolean shouldStopAtNextCheckpoint;
+
     public static final ParseField TASK_STATE = new ParseField("task_state");
     public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
 
@@ -53,28 +55,38 @@ public class TransformState implements Task.Status, PersistentTaskState {
     public static final ParseField REASON = new ParseField("reason");
     public static final ParseField PROGRESS = new ParseField("progress");
     public static final ParseField NODE = new ParseField("node");
+    public static final ParseField SHOULD_STOP_AT_NEXT_CHECKPOINT = new ParseField("should_stop_at_checkpoint");
+
 
     @SuppressWarnings("unchecked")
     public static final ConstructingObjectParser<TransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
-            true,
-            args -> {
-                TransformTaskState taskState = (TransformTaskState) args[0];
-                IndexerState indexerState = (IndexerState) args[1];
-                Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
-                TransformIndexerPosition transformIndexerPosition = (TransformIndexerPosition) args[3];
-
-                // BWC handling, translate current_position to position iff position isn't set
-                if (bwcCurrentPosition != null && transformIndexerPosition == null) {
-                    transformIndexerPosition = new TransformIndexerPosition(bwcCurrentPosition, null);
-                }
-
-                long checkpoint = (long) args[4];
-                String reason = (String) args[5];
-                TransformProgress progress = (TransformProgress) args[6];
-                NodeAttributes node = (NodeAttributes) args[7];
-
-                return new TransformState(taskState, indexerState, transformIndexerPosition, checkpoint, reason, progress, node);
-            });
+        true,
+        args -> {
+            TransformTaskState taskState = (TransformTaskState) args[0];
+            IndexerState indexerState = (IndexerState) args[1];
+            Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
+            TransformIndexerPosition transformIndexerPosition = (TransformIndexerPosition) args[3];
+
+            // BWC handling, translate current_position to position iff position isn't set
+            if (bwcCurrentPosition != null && transformIndexerPosition == null) {
+                transformIndexerPosition = new TransformIndexerPosition(bwcCurrentPosition, null);
+            }
+
+            long checkpoint = (long) args[4];
+            String reason = (String) args[5];
+            TransformProgress progress = (TransformProgress) args[6];
+            NodeAttributes node = (NodeAttributes) args[7];
+            boolean shouldStopAtNextCheckpoint = args[8] == null ? false : (boolean)args[8];
+
+            return new TransformState(taskState,
+                indexerState,
+                transformIndexerPosition,
+                checkpoint,
+                reason,
+                progress,
+                node,
+                shouldStopAtNextCheckpoint);
+        });
 
     static {
         PARSER.declareField(constructorArg(), p -> TransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
@@ -85,15 +97,17 @@ public class TransformState implements Task.Status, PersistentTaskState {
         PARSER.declareString(optionalConstructorArg(), REASON);
         PARSER.declareField(optionalConstructorArg(), TransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT);
         PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT);
+        PARSER.declareBoolean(optionalConstructorArg(), SHOULD_STOP_AT_NEXT_CHECKPOINT);
     }
 
     public TransformState(TransformTaskState taskState,
-                                   IndexerState indexerState,
-                                   @Nullable TransformIndexerPosition position,
-                                   long checkpoint,
-                                   @Nullable String reason,
-                                   @Nullable TransformProgress progress,
-                                   @Nullable NodeAttributes node) {
+                          IndexerState indexerState,
+                          @Nullable TransformIndexerPosition position,
+                          long checkpoint,
+                          @Nullable String reason,
+                          @Nullable TransformProgress progress,
+                          @Nullable NodeAttributes node,
+                          boolean shouldStopAtNextCheckpoint) {
         this.taskState = taskState;
         this.indexerState = indexerState;
         this.position = position;
@@ -101,14 +115,25 @@ public class TransformState implements Task.Status, PersistentTaskState {
         this.reason = reason;
         this.progress = progress;
         this.node = node;
+        this.shouldStopAtNextCheckpoint = shouldStopAtNextCheckpoint;
+    }
+
+    public TransformState(TransformTaskState taskState,
+                          IndexerState indexerState,
+                          @Nullable TransformIndexerPosition position,
+                          long checkpoint,
+                          @Nullable String reason,
+                          @Nullable TransformProgress progress,
+                          @Nullable NodeAttributes node) {
+        this(taskState, indexerState, position, checkpoint, reason, progress, node, false);
     }
 
     public TransformState(TransformTaskState taskState,
-                                   IndexerState indexerState,
-                                   @Nullable TransformIndexerPosition position,
-                                   long checkpoint,
-                                   @Nullable String reason,
-                                   @Nullable TransformProgress progress) {
+                          IndexerState indexerState,
+                          @Nullable TransformIndexerPosition position,
+                          long checkpoint,
+                          @Nullable String reason,
+                          @Nullable TransformProgress progress) {
         this(taskState, indexerState, position, checkpoint, reason, progress, null);
     }
 
@@ -129,6 +154,11 @@ public class TransformState implements Task.Status, PersistentTaskState {
         } else {
             node = null;
         }
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            shouldStopAtNextCheckpoint = in.readBoolean();
+        } else {
+            shouldStopAtNextCheckpoint = false;
+        }
     }
 
     public TransformTaskState getTaskState() {
@@ -164,6 +194,10 @@ public class TransformState implements Task.Status, PersistentTaskState {
         return this;
     }
 
+    public boolean shouldStopAtNextCheckpoint() {
+        return shouldStopAtNextCheckpoint;
+    }
+
     public static TransformState fromXContent(XContentParser parser) {
         try {
             return PARSER.parse(parser, null);
@@ -190,6 +224,7 @@ public class TransformState implements Task.Status, PersistentTaskState {
         if (node != null) {
             builder.field(NODE.getPreferredName(), node);
         }
+        builder.field(SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName(), shouldStopAtNextCheckpoint);
         builder.endObject();
         return builder;
     }
@@ -214,6 +249,9 @@ public class TransformState implements Task.Status, PersistentTaskState {
         if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
             out.writeOptionalWriteable(node);
         }
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeBoolean(shouldStopAtNextCheckpoint);
+        }
     }
 
     @Override
@@ -234,12 +272,13 @@ public class TransformState implements Task.Status, PersistentTaskState {
             this.checkpoint == that.checkpoint &&
             Objects.equals(this.reason, that.reason) &&
             Objects.equals(this.progress, that.progress) &&
+            Objects.equals(this.shouldStopAtNextCheckpoint, that.shouldStopAtNextCheckpoint) &&
             Objects.equals(this.node, that.node);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node);
+        return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint);
     }
 
     @Override

+ 2 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java

@@ -92,8 +92,8 @@ public class TransformStats implements Writeable, ToXContentObject {
 
 
     public TransformStats(String id, State state, @Nullable String reason,
-                                   @Nullable NodeAttributes node, TransformIndexerStats stats,
-                                   TransformCheckpointingInfo checkpointingInfo) {
+                          @Nullable NodeAttributes node, TransformIndexerStats stats,
+                          TransformCheckpointingInfo checkpointingInfo) {
         this.id = Objects.requireNonNull(id);
         this.state = Objects.requireNonNull(state);
         this.reason = reason;

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java

@@ -24,7 +24,7 @@ public final class TransformInternalIndexConstants {
     // internal index
 
     // version is not a rollover pattern, however padded because sort is string based
-    public static final String INDEX_VERSION = "003";
+    public static final String INDEX_VERSION = "004";
     public static final String INDEX_PATTERN = ".transform-internal-";
     public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
     public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;

+ 15 - 9
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java

@@ -24,7 +24,12 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest
     @Override
     protected Request createTestInstance() {
         TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null;
-        Request request = new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout, randomBoolean());
+        Request request = new Request(randomAlphaOfLengthBetween(1, 10),
+            randomBoolean(),
+            randomBoolean(),
+            timeout,
+            randomBoolean(),
+            randomBoolean());
         if (randomBoolean()) {
             request.setExpandedIds(new HashSet<>(Arrays.asList(generateRandomStringArray(5, 6, false))));
         }
@@ -41,9 +46,10 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest
         boolean waitForCompletion = randomBoolean();
         boolean force = randomBoolean();
         boolean allowNoMatch = randomBoolean();
+        boolean waitForCheckpoint = randomBoolean();
 
-        Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch);
-        Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch);
+        Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch, waitForCheckpoint);
+        Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch, waitForCheckpoint);
 
         assertNotEquals(r1,r2);
         assertNotEquals(r1.hashCode(),r2.hashCode());
@@ -53,20 +59,20 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest
         String dataFrameId = "dataframe-id";
 
         Task dataFrameTask = new Task(1L, "persistent", "action",
-                TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId,
-                TaskId.EMPTY_TASK_ID, Collections.emptyMap());
+            TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId,
+            TaskId.EMPTY_TASK_ID, Collections.emptyMap());
 
-        Request request = new Request("unrelated", false, false, null, false);
+        Request request = new Request("unrelated", false, false, null, false, false);
         request.setExpandedIds(Set.of("foo", "bar"));
         assertFalse(request.match(dataFrameTask));
 
-        Request matchingRequest = new Request(dataFrameId, false, false, null, false);
+        Request matchingRequest = new Request(dataFrameId, false, false, null, false, false);
         matchingRequest.setExpandedIds(Set.of(dataFrameId));
         assertTrue(matchingRequest.match(dataFrameTask));
 
         Task notADataFrameTask = new Task(1L, "persistent", "action",
-                "some other task, say monitoring",
-                TaskId.EMPTY_TASK_ID, Collections.emptyMap());
+            "some other task, say monitoring",
+            TaskId.EMPTY_TASK_ID, Collections.emptyMap());
         assertFalse(matchingRequest.match(notADataFrameTask));
     }
 }

+ 25 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java

@@ -6,6 +6,9 @@
 
 package org.elasticsearch.xpack.core.transform.transforms;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.test.AbstractSerializingTestCase;
@@ -26,7 +29,8 @@ public class TransformStateTests extends AbstractSerializingTestCase<TransformSt
             randomLongBetween(0,10),
             randomBoolean() ? null : randomAlphaOfLength(10),
             randomBoolean() ? null : randomTransformProgress(),
-            randomBoolean() ? null : randomNodeAttributes());
+            randomBoolean() ? null : randomNodeAttributes(),
+            randomBoolean());
     }
 
     @Override
@@ -53,4 +57,24 @@ public class TransformStateTests extends AbstractSerializingTestCase<TransformSt
     protected Predicate<String> getRandomFieldsExcludeFilter() {
         return field -> !field.isEmpty();
     }
+
+    public void testBackwardsSerialization() throws IOException {
+        TransformState state = new TransformState(randomFrom(TransformTaskState.values()),
+            randomFrom(IndexerState.values()),
+            TransformIndexerPositionTests.randomTransformIndexerPosition(),
+            randomLongBetween(0,10),
+            randomBoolean() ? null : randomAlphaOfLength(10),
+            randomBoolean() ? null : randomTransformProgress(),
+            randomBoolean() ? null : randomNodeAttributes(),
+            false); // Will be false after BWC deserialization
+        try (BytesStreamOutput output = new BytesStreamOutput()) {
+            output.setVersion(Version.V_7_5_0);
+            state.writeTo(output);
+            try (StreamInput in = output.bytes().streamInput()) {
+                in.setVersion(Version.V_7_5_0);
+                TransformState streamedState = new TransformState(in);
+                assertEquals(state, streamedState);
+            }
+        }
+    }
 }

+ 23 - 13
x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json

@@ -20,21 +20,31 @@
         }
       ]
     },
-    "params":{
-      "wait_for_completion":{
-        "type":"boolean",
-        "required":false,
-        "description":"Whether to wait for the transform to fully stop before returning or not. Default to false"
+    "params": {
+      "force": {
+        "type": "boolean",
+        "required": false,
+        "description": "Whether to force stop a failed transform or not. Default to false"
       },
-      "timeout":{
-        "type":"time",
-        "required":false,
-        "description":"Controls the time to wait until the transform has stopped. Default to 30 seconds"
+      "wait_for_completion": {
+        "type": "boolean",
+        "required": false,
+        "description": "Whether to wait for the transform to fully stop before returning or not. Default to false"
       },
-      "allow_no_match":{
-        "type":"boolean",
-        "required":false,
-        "description":"Whether to ignore if a wildcard expression matches no transforms. (This includes `_all` string or when no transforms have been specified)"
+      "timeout": {
+        "type": "time",
+        "required": false,
+        "description": "Controls the time to wait until the transform has stopped. Default to 30 seconds"
+      },
+      "allow_no_match": {
+        "type": "boolean",
+        "required": false,
+        "description": "Whether to ignore if a wildcard expression matches no transforms. (This includes `_all` string or when no transforms have been specified)"
+      },
+      "wait_for_checkpoint": {
+        "type": "boolean",
+        "required": false,
+        "description": "Whether to wait for the transform to reach a checkpoint before stopping. Default to false"
       }
     }
   }

+ 21 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml

@@ -51,6 +51,7 @@ setup:
 teardown:
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-start-stop"
         timeout: "10m"
         wait_for_completion: true
@@ -59,6 +60,7 @@ teardown:
         transform_id: "airline-transform-start-stop"
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-start-stop-continuous"
         timeout: "10m"
         wait_for_completion: true
@@ -131,6 +133,7 @@ teardown:
 
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-start-stop"
         wait_for_completion: true
   - match: { acknowledged: true }
@@ -168,6 +171,7 @@ teardown:
 
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-start-stop-continuous"
         wait_for_completion: true
   - match: { acknowledged: true }
@@ -193,6 +197,7 @@ teardown:
 
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-start-stop-continuous"
         wait_for_completion: true
   - match: { acknowledged: true }
@@ -201,25 +206,38 @@ teardown:
   - do:
       catch: missing
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "missing-transform"
 
 ---
 "Test stop missing transform by expression":
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         allow_no_match: true
         transform_id: "missing-transform*"
 
   - do:
       catch: missing
       transform.stop_transform:
+        wait_for_checkpoint: false
         allow_no_match: false
         transform_id: "missing-transform*"
 
+---
+"Test stop transform with force and wait_for_checkpoint true ":
+  - do:
+      catch: /cannot set both \[force\] and \[wait_for_checkpoint\] to true/
+      transform.stop_transform:
+        wait_for_checkpoint: true
+        force: true
+        transform_id: "airline-transform-start-stop-continuous"
+
 ---
 "Test stop already stopped transform":
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-start-stop"
   - match: { acknowledged: true }
 
@@ -269,6 +287,7 @@ teardown:
 
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-start-stop-continuous"
         wait_for_completion: true
   - match: { acknowledged: true }
@@ -283,6 +302,7 @@ teardown:
 
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-start-later"
         wait_for_completion: true
   - match: { acknowledged: true }
@@ -317,6 +337,7 @@ teardown:
 
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "_all"
         wait_for_completion: true
   - match: { acknowledged: true }

+ 2 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml

@@ -33,6 +33,7 @@ setup:
 teardown:
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-stats"
         wait_for_completion: true
 
@@ -252,6 +253,7 @@ teardown:
 
   - do:
       transform.stop_transform:
+        wait_for_checkpoint: false
         transform_id: "airline-transform-stats-continuous"
         wait_for_completion: true
 

+ 43 - 0
x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

@@ -235,6 +235,49 @@ public class TransformIT extends TransformIntegTestCase {
         deleteTransform(config.getId());
     }
 
+    public void testStopWaitForCheckpoint() throws Exception {
+        String indexName = "wait-for-checkpoint-reviews";
+        String transformId = "data-frame-transform-wait-for-checkpoint";
+        createReviewsIndex(indexName, 1000);
+
+        Map<String, SingleGroupSource> groups = new HashMap<>();
+        groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null));
+        groups.put("by-user", TermsGroupSource.builder().setField("user_id").build());
+        groups.put("by-business", TermsGroupSource.builder().setField("business_id").build());
+
+        AggregatorFactories.Builder aggs = AggregatorFactories.builder()
+            .addAggregator(AggregationBuilders.avg("review_score").field("stars"))
+            .addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
+
+        TransformConfig config = createTransformConfigBuilder(transformId,
+            groups,
+            aggs,
+            "reviews-by-user-business-day",
+            QueryBuilders.matchAllQuery(),
+            indexName)
+            .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
+            .build();
+
+        assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged());
+        assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
+
+        // waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop
+        stopTransform(transformId, false, null, true);
+
+        // Wait until the first checkpoint
+        waitUntilCheckpoint(config.getId(), 1L);
+
+        // Even though we are continuous, we should be stopped now as we needed to stop at the first checkpoint
+        assertBusy(() -> {
+            TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0);
+            assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED));
+            assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(1000L));
+        });
+
+        stopTransform(config.getId());
+        deleteTransform(config.getId());
+    }
+
     private void indexMoreDocs(long timestamp, long userId, String index) throws Exception {
         BulkRequest bulk = new BulkRequest(index);
         for (int i = 0; i < 25; i++) {

+ 10 - 2
x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java

@@ -86,8 +86,16 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
     }
 
     protected StopTransformResponse stopTransform(String id) throws IOException {
+        return stopTransform(id, true, null, false);
+    }
+
+    protected StopTransformResponse stopTransform(String id,
+                                                  boolean waitForCompletion,
+                                                  TimeValue timeout,
+                                                  boolean waitForCheckpoint) throws IOException {
         RestHighLevelClient restClient = new TestRestHighLevelClient();
-        return restClient.transform().stopTransform(new StopTransformRequest(id, true, null), RequestOptions.DEFAULT);
+        return restClient.transform()
+            .stopTransform(new StopTransformRequest(id, waitForCompletion, timeout, waitForCheckpoint), RequestOptions.DEFAULT);
     }
 
     protected StartTransformResponse startTransform(String id, RequestOptions options) throws IOException {
@@ -298,7 +306,7 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
                 .append("\"}");
             bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON));
 
-            if (i % 50 == 0) {
+            if (i % 100 == 0) {
                 BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT);
                 assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
                 bulk = new BulkRequest(indexName);

+ 56 - 0
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.transform.integration;
 
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.junit.Before;
 
@@ -863,6 +864,61 @@ public class TransformPivotRestIT extends TransformRestTestCase {
         assertEquals(101, ((List<?>)XContentMapValues.extractValue("transforms.stats.pages_processed", stats)).get(0));
     }
 
+    public void testContinuousStopWaitForCheckpoint() throws Exception {
+        Request updateLoggingLevels = new Request("PUT", "/_cluster/settings");
+        updateLoggingLevels.setJsonEntity(
+            "{\"transient\": {" +
+                "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
+                "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}");
+        client().performRequest(updateLoggingLevels);
+        String indexName = "continuous_reviews_wait_for_checkpoint";
+        createReviewsIndex(indexName);
+        String transformId = "simple_continuous_pivot_wait_for_checkpoint";
+        String dataFrameIndex = "pivot_reviews_continuous_wait_for_checkpoint";
+        setupDataAccessRole(DATA_ACCESS_ROLE, indexName, dataFrameIndex);
+        final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
+            BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
+        String config = "{"
+            + " \"source\": {\"index\":\"" + indexName + "\"},"
+            + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
+            + " \"frequency\": \"1s\","
+            + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
+            + " \"pivot\": {"
+            + "   \"group_by\": {"
+            + "     \"reviewer\": {"
+            + "       \"terms\": {"
+            + "         \"field\": \"user_id\""
+            + " } } },"
+            + "   \"aggregations\": {"
+            + "     \"avg_rating\": {"
+            + "       \"avg\": {"
+            + "         \"field\": \"stars\""
+            + " } } } }"
+            + "}";
+        createDataframeTransformRequest.setJsonEntity(config);
+        Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
+        assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+
+        startAndWaitForContinuousTransform(transformId, dataFrameIndex, null);
+        assertTrue(indexExists(dataFrameIndex));
+        assertBusy(() -> {
+            try {
+                stopTransform(transformId,false, true);
+            } catch (ResponseException e) {
+                // We get a conflict sometimes depending on WHEN we try to write the state, should eventually pass though
+                assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(200));
+            }
+        });
+
+        // get and check some users
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
+        deleteIndex(indexName);
+    }
+
     private void assertOnePivotValue(String query, double expected) throws IOException {
         Map<String, Object> searchResult = getAsMap(query);
 

+ 5 - 1
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -260,10 +260,14 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
     }
 
     protected void stopTransform(String transformId, boolean force) throws Exception {
-        // start the transform
+        stopTransform(transformId, force, false);
+    }
+
+    protected void stopTransform(String transformId, boolean force, boolean waitForCheckpoint) throws Exception {
         final Request stopTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_stop", null);
         stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force));
         stopTransformRequest.addParameter(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true));
+        stopTransformRequest.addParameter(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), Boolean.toString(waitForCheckpoint));
         Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
         assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
     }

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java

@@ -92,7 +92,7 @@ public class TransportDeleteTransformAction extends TransportMasterNodeAction<Re
                 executeAsyncWithOrigin(client,
                     TRANSFORM_ORIGIN,
                     StopTransformAction.INSTANCE,
-                    new StopTransformAction.Request(request.getId(), true, true, null, true),
+                    new StopTransformAction.Request(request.getId(), true, true, null, true, false),
                     ActionListener.wrap(
                         r -> stopTransformActionListener.onResponse(null),
                         stopTransformActionListener::onFailure));

+ 23 - 13
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java

@@ -17,6 +17,7 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.tasks.TransportTasksAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@@ -93,25 +94,14 @@ public class TransportGetTransformStatsAction extends
         ClusterState state = clusterService.state();
         String nodeId = state.nodes().getLocalNode().getId();
         if (task.isCancelled() == false) {
-            TransformState transformState = task.getState();
             task.getCheckpointingInfo(transformCheckpointService, ActionListener.wrap(
                 checkpointingInfo -> listener.onResponse(new Response(
-                    Collections.singletonList(new TransformStats(task.getTransformId(),
-                        TransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()),
-                        transformState.getReason(),
-                        null,
-                        task.getStats(),
-                        checkpointingInfo)),
+                    Collections.singletonList(deriveStats(task, checkpointingInfo)),
                     1L)),
                 e -> {
                     logger.warn("Failed to retrieve checkpointing info for transform [" + task.getTransformId() + "]", e);
                     listener.onResponse(new Response(
-                    Collections.singletonList(new TransformStats(task.getTransformId(),
-                        TransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()),
-                        transformState.getReason(),
-                        null,
-                        task.getStats(),
-                        TransformCheckpointingInfo.EMPTY)),
+                    Collections.singletonList(deriveStats(task, null)),
                     1L,
                     Collections.emptyList(),
                     Collections.singletonList(new FailedNodeException(nodeId, "Failed to retrieve checkpointing info", e))));
@@ -168,6 +158,26 @@ public class TransportGetTransformStatsAction extends
         }
     }
 
+    static TransformStats deriveStats(TransformTask task, @Nullable TransformCheckpointingInfo checkpointingInfo) {
+        TransformState transformState = task.getState();
+        TransformStats.State derivedState = TransformStats.State.fromComponents(transformState.getTaskState(),
+            transformState.getIndexerState());
+        String reason = transformState.getReason();
+        if (transformState.shouldStopAtNextCheckpoint() &&
+            derivedState.equals(TransformStats.State.STOPPED) == false &&
+            derivedState.equals(TransformStats.State.FAILED) == false) {
+            derivedState = TransformStats.State.STOPPING;
+            reason = reason.isEmpty() ? "transform is set to stop at the next checkpoint" : reason;
+        }
+        return new TransformStats(
+            task.getTransformId(),
+            derivedState,
+            reason,
+            null,
+            task.getStats(),
+            checkpointingInfo == null ? TransformCheckpointingInfo.EMPTY : checkpointingInfo);
+    }
+
     private void collectStatsForTransformsWithoutTasks(Request request,
                                                        Response response,
                                                        ActionListener<Response> listener) {

+ 18 - 7
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java

@@ -153,13 +153,24 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
         }
 
         if (ids.contains(transformTask.getTransformId())) {
-            try {
-                transformTask.stop(request.isForce());
-            } catch (ElasticsearchException ex) {
-                listener.onFailure(ex);
-                return;
-            }
-            listener.onResponse(new Response(Boolean.TRUE));
+            transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(
+                r -> {
+                    try {
+                        transformTask.stop(request.isForce(), request.isWaitForCheckpoint());
+                        listener.onResponse(new Response(true));
+                    } catch (ElasticsearchException ex) {
+                        listener.onFailure(ex);
+                    }
+                },
+                e -> listener.onFailure(
+                    new ElasticsearchStatusException(
+                        "Failed to update transform task [{}] state value should_stop_at_checkpoint from [{}] to [{}]",
+                        RestStatus.CONFLICT,
+                        transformTask.getTransformId(),
+                        transformTask.getState().shouldStopAtNextCheckpoint(),
+                        request.isWaitForCheckpoint()))
+                )
+            );
         } else {
             listener.onFailure(new RuntimeException("ID of transform task [" + transformTask.getTransformId()
                     + "] does not match request's ID [" + request.getId() + "]"));

+ 6 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java

@@ -54,6 +54,8 @@ public final class TransformInternalIndex {
      *                  progress::docs_processed, progress::docs_indexed,
      *                  stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed,
      *                  stats::exponential_avg_documents_processed
+     *
+     * version 4 (7.6): state::should_stop_at_checkpoint
      */
 
     // constants for mappings
@@ -71,6 +73,7 @@ public final class TransformInternalIndex {
     public static final String DOUBLE = "double";
     public static final String LONG = "long";
     public static final String KEYWORD = "keyword";
+    public static final String BOOLEAN = "boolean";
 
     public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException {
         IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)
@@ -175,6 +178,9 @@ public final class TransformInternalIndex {
                     .startObject(TransformState.INDEXER_STATE.getPreferredName())
                         .field(TYPE, KEYWORD)
                     .endObject()
+                    .startObject(TransformState.SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName())
+                        .field(TYPE, BOOLEAN)
+                    .endObject()
                     .startObject(TransformState.CURRENT_POSITION.getPreferredName())
                         .field(ENABLED, false)
                     .endObject()

+ 3 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopTransformAction.java

@@ -28,13 +28,15 @@ public class RestStopTransformAction extends BaseRestHandler {
         boolean waitForCompletion = restRequest.paramAsBoolean(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), false);
         boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false);
         boolean allowNoMatch = restRequest.paramAsBoolean(TransformField.ALLOW_NO_MATCH.getPreferredName(), false);
+        boolean waitForCheckpoint = restRequest.paramAsBoolean(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), false);
 
 
         StopTransformAction.Request request = new StopTransformAction.Request(id,
             waitForCompletion,
             force,
             timeout,
-            allowNoMatch);
+            allowNoMatch,
+            waitForCheckpoint);
 
         return channel -> client.execute(StopTransformAction.INSTANCE, request,
                 new RestToXContentListener<>(channel));

+ 3 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/compat/RestStopTransformActionDeprecated.java

@@ -36,13 +36,15 @@ public class RestStopTransformActionDeprecated extends BaseRestHandler {
         boolean waitForCompletion = restRequest.paramAsBoolean(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), false);
         boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false);
         boolean allowNoMatch = restRequest.paramAsBoolean(TransformField.ALLOW_NO_MATCH.getPreferredName(), false);
+        boolean waitForCheckpoint = restRequest.paramAsBoolean(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), false);
 
 
         StopTransformAction.Request request = new StopTransformAction.Request(id,
             waitForCompletion,
             force,
             timeout,
-            allowNoMatch);
+            allowNoMatch,
+            waitForCheckpoint);
 
         return channel -> client.execute(StopTransformActionDeprecated.INSTANCE, request,
                 new RestToXContentListener<>(channel));

+ 89 - 6
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

@@ -64,6 +64,7 @@ class ClientTransformIndexer extends TransformIndexer {
     // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
     private volatile String lastAuditedExceptionMessage = null;
     private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
+    private volatile boolean shouldStopAtCheckpoint = false;
     private volatile Instant changesLastDetectedAt;
 
     ClientTransformIndexer(TransformConfigManager transformsConfigManager,
@@ -78,7 +79,8 @@ class ClientTransformIndexer extends TransformIndexer {
                            TransformProgress transformProgress,
                            TransformCheckpoint lastCheckpoint,
                            TransformCheckpoint nextCheckpoint,
-                           TransformTask parentTask) {
+                           TransformTask parentTask,
+                           boolean shouldStopAtCheckpoint) {
         super(ExceptionsHelper.requireNonNull(parentTask, "parentTask")
                 .getThreadPool()
                 .executor(ThreadPool.Names.GENERIC),
@@ -97,6 +99,50 @@ class ClientTransformIndexer extends TransformIndexer {
         this.client = ExceptionsHelper.requireNonNull(client, "client");
         this.transformTask = parentTask;
         this.failureCount = new AtomicInteger(0);
+        this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
+    }
+
+    boolean shouldStopAtCheckpoint() {
+        return shouldStopAtCheckpoint;
+    }
+
+    void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) {
+        this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
+    }
+
+    void persistShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener<Void> shouldStopAtCheckpointListener) {
+        if (this.shouldStopAtCheckpoint == shouldStopAtCheckpoint ||
+            getState() == IndexerState.STOPPED ||
+            getState() == IndexerState.STOPPING) {
+            shouldStopAtCheckpointListener.onResponse(null);
+            return;
+        }
+        TransformState state = new TransformState(
+            transformTask.getTaskState(),
+            getState(),
+            getPosition(),
+            transformTask.getCheckpoint(),
+            transformTask.getStateReason(),
+            getProgress(),
+            null, //Node attributes
+            shouldStopAtCheckpoint);
+        doSaveState(state,
+            ActionListener.wrap(
+                r -> {
+                    // We only want to update this internal value if it is persisted as such
+                    this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
+                    logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]",
+                        getJobId(),
+                        shouldStopAtCheckpoint);
+                    shouldStopAtCheckpointListener.onResponse(null);
+                },
+                statsExc -> {
+                    logger.warn("[{}] failed to persist should_stop_at_checkpoint update [{}]",
+                        getJobId(),
+                        shouldStopAtCheckpoint);
+                    shouldStopAtCheckpointListener.onFailure(statsExc);
+                }
+            ));
     }
 
     @Override
@@ -297,6 +343,21 @@ class ClientTransformIndexer extends TransformIndexer {
             return;
         }
 
+        boolean shouldStopAtCheckpoint = shouldStopAtCheckpoint();
+
+        // If we should stop at the next checkpoint, are STARTED, and with `initialRun()` we are in one of two states
+        // 1. We have just called `onFinish` completing our request, but `shouldStopAtCheckpoint` was set to `true` before our check
+        //    there and now
+        // 2. We are on the very first run of a NEW checkpoint and got here either through a failure, or the very first save state call.
+        //
+        // In either case, we should stop so that we guarantee a consistent state and that there are no partially completed checkpoints
+        if (shouldStopAtCheckpoint && initialRun() && indexerState.equals(IndexerState.STARTED)) {
+            indexerState = IndexerState.STOPPED;
+            auditor.info(transformConfig.getId(), "Transform is no longer in the middle of a checkpoint, initiating stop.");
+            logger.info("[{}] transform is no longer in the middle of a checkpoint, initiating stop.",
+                transformConfig.getId());
+        }
+
         // This means that the indexer was triggered to discover changes, found none, and exited early.
         // If the state is `STOPPED` this means that TransformTask#stop was called while we were checking for changes.
         // Allow the stop call path to continue
@@ -321,6 +382,12 @@ class ClientTransformIndexer extends TransformIndexer {
         // OR we called `doSaveState` manually as the indexer was not actively running.
         // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
         if (indexerState.equals(IndexerState.STOPPED)) {
+            // If we are going to stop after the state is saved, we should NOT persist `shouldStopAtCheckpoint: true` as this may
+            // cause problems if the task starts up again.
+            // Additionally, we don't have to worry about inconsistency with the ClusterState (if it is persisted there) as the
+            // when we stop, we mark the task as complete and that state goes away.
+            shouldStopAtCheckpoint = false;
+
             // We don't want adjust the stored taskState because as soon as it is `STOPPED` a user could call
             // .start again.
             taskState = TransformTaskState.STOPPED;
@@ -332,9 +399,19 @@ class ClientTransformIndexer extends TransformIndexer {
             position,
             transformTask.getCheckpoint(),
             transformTask.getStateReason(),
-            getProgress());
+            getProgress(),
+            null,
+            shouldStopAtCheckpoint);
         logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString());
 
+        doSaveState(state, ActionListener.wrap(
+            r -> next.run(),
+            e -> next.run()
+        ));
+    }
+
+    private void doSaveState(TransformState state, ActionListener<Void> listener) {
+
         // This could be `null` but the putOrUpdateTransformStoredDoc handles that case just fine
         SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = transformTask.getSeqNoPrimaryTermAndIndex();
 
@@ -356,7 +433,7 @@ class ClientTransformIndexer extends TransformIndexer {
                                 transformsConfigManager.deleteOldTransformStoredDocuments(getJobId(), ActionListener.wrap(
                                     nil -> {
                                         logger.trace("[{}] deleted old transform stats and state document", getJobId());
-                                        next.run();
+                                        listener.onResponse(null);
                                     },
                                     e -> {
                                         String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.",
@@ -364,11 +441,11 @@ class ClientTransformIndexer extends TransformIndexer {
                                         logger.warn(msg, e);
                                         // If we have failed, we should attempt the clean up again later
                                         oldStatsCleanedUp.set(false);
-                                        next.run();
+                                        listener.onResponse(null);
                                     }
                                 ));
                             } else {
-                                next.run();
+                                listener.onResponse(null);
                             }
                         },
                         statsExc -> {
@@ -381,7 +458,7 @@ class ClientTransformIndexer extends TransformIndexer {
                             if (state.getTaskState().equals(TransformTaskState.STOPPED)) {
                                 transformTask.shutdown();
                             }
-                            next.run();
+                            listener.onFailure(statsExc);
                         }
                 ));
     }
@@ -404,6 +481,9 @@ class ClientTransformIndexer extends TransformIndexer {
             // This indicates an early exit since no changes were found.
             // So, don't treat this like a checkpoint being completed, as no work was done.
             if (hasSourceChanged == false) {
+                if (shouldStopAtCheckpoint) {
+                    stop();
+                }
                 listener.onResponse(null);
                 return;
             }
@@ -447,6 +527,9 @@ class ClientTransformIndexer extends TransformIndexer {
             logger.debug(
                 "[{}] finished indexing for transform checkpoint [{}].", getJobId(), checkpoint);
             auditBulkFailures = true;
+            if (shouldStopAtCheckpoint) {
+                stop();
+            }
             listener.onResponse(null);
         } catch (Exception e) {
             listener.onFailure(e);

+ 9 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java

@@ -34,6 +34,7 @@ class ClientTransformIndexerBuilder {
     private TransformProgress progress;
     private TransformCheckpoint lastCheckpoint;
     private TransformCheckpoint nextCheckpoint;
+    private boolean shouldStopAtCheckpoint;
 
     ClientTransformIndexerBuilder() {
         this.initialStats = new TransformIndexerStats();
@@ -54,7 +55,13 @@ class ClientTransformIndexerBuilder {
             this.progress,
             this.lastCheckpoint,
             this.nextCheckpoint,
-            parentTask);
+            parentTask,
+            this.shouldStopAtCheckpoint);
+    }
+
+    ClientTransformIndexerBuilder setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) {
+        this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
+        return this;
     }
 
     ClientTransformIndexerBuilder setClient(Client client) {
@@ -120,4 +127,4 @@ class ClientTransformIndexerBuilder {
         this.nextCheckpoint = nextCheckpoint;
         return this;
     }
-}
+}

+ 4 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

@@ -206,16 +206,18 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
                 // Since we have not set the value for this yet, it SHOULD be null
                 buildTask.updateSeqNoPrimaryTermAndIndex(null, seqNoPrimaryTermAndIndex);
                 logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString());
+                TransformState transformState = stateAndStats.getTransformState();
                 indexerBuilder.setInitialStats(stateAndStats.getTransformStats())
                     .setInitialPosition(stateAndStats.getTransformState().getPosition())
                     .setProgress(stateAndStats.getTransformState().getProgress())
-                    .setIndexerState(currentIndexerState(stateAndStats.getTransformState()));
+                    .setIndexerState(currentIndexerState(transformState))
+                    .setShouldStopAtCheckpoint(transformState.shouldStopAtNextCheckpoint());
                 logger.debug("[{}] Loading existing state: [{}], position [{}]",
                     transformId,
                     stateAndStats.getTransformState(),
                     stateAndStats.getTransformState().getPosition());
 
-                stateHolder.set(stateAndStats.getTransformState());
+                stateHolder.set(transformState);
                 final long lastCheckpoint = stateHolder.get().getCheckpoint();
 
                 if (lastCheckpoint == 0) {

+ 58 - 13
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

@@ -141,7 +141,9 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
                 initialPosition,
                 currentCheckpoint.get(),
                 stateReason.get(),
-                null);
+                null,
+                null,
+            false);
         } else {
            return new TransformState(
                taskState.get(),
@@ -149,7 +151,9 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
                indexer.get().getPosition(),
                currentCheckpoint.get(),
                stateReason.get(),
-               getIndexer().getProgress());
+               getIndexer().getProgress(),
+               null,
+               getIndexer().shouldStopAtCheckpoint());
         }
     }
 
@@ -247,7 +251,9 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
             getIndexer().getPosition(),
             currentCheckpoint.get(),
             null,
-            getIndexer().getProgress());
+            getIndexer().getProgress(),
+            null,
+            getIndexer().shouldStopAtCheckpoint());
 
         logger.info("[{}] updating state for transform to [{}].", transform.getId(), state.toString());
         // Even though the indexer information is persisted to an index, we still need TransformTaskState in the clusterstate
@@ -275,8 +281,34 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
         ));
     }
 
-    public synchronized void stop(boolean force) {
-        logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());
+    /**
+     * This sets the flag for the task to stop at the next checkpoint.
+     *
+     * If first persists the flag to cluster state, and then mutates the local variable.
+     *
+     * It only persists to cluster state if the value is different than what is currently held in memory.
+     * @param shouldStopAtCheckpoint whether or not we should stop at the next checkpoint or not
+     * @param shouldStopAtCheckpointListener the listener to return to when we have persisted the updated value to the state index.
+     */
+    public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint,
+                                                       ActionListener<Void> shouldStopAtCheckpointListener) {
+        logger.debug("[{}] attempted to set task to stop at checkpoint [{}] with state [{}]",
+            getTransformId(),
+            shouldStopAtCheckpoint,
+            getState());
+        if (taskState.get() != TransformTaskState.STARTED || getIndexer() == null) {
+            shouldStopAtCheckpointListener.onResponse(null);
+            return;
+        }
+        getIndexer().persistShouldStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener);
+    }
+
+    public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) {
+        logger.debug("[{}] stop called with force [{}], shouldStopAtCheckpoint [{}], state [{}]",
+            getTransformId(),
+            force,
+            shouldStopAtCheckpoint,
+            getState());
         if (getIndexer() == null) {
             // If there is no indexer the task has not been triggered
             // but it still needs to be stopped and removed
@@ -296,16 +328,23 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
                 RestStatus.CONFLICT);
         }
 
-        IndexerState state = getIndexer().stop();
         stateReason.set(null);
         // No reason to keep it in the potentially failed state.
-        // Since we have called `stop` against the indexer, we have no more fear of triggering again.
-        // But, since `doSaveState` is asynchronous, it is best to set the state as STARTED so that another `start` call cannot be
-        // executed while we are wrapping up.
-        taskState.compareAndSet(TransformTaskState.FAILED, TransformTaskState.STARTED);
-        if (state == IndexerState.STOPPED) {
-            getIndexer().onStop();
-            getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {});
+        boolean wasFailed = taskState.compareAndSet(TransformTaskState.FAILED, TransformTaskState.STARTED);
+        // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after).
+        // if it is false, stop immediately
+        if (shouldStopAtCheckpoint == false ||
+            // If state was in a failed state, we should stop immediately as we will never reach the next checkpoint
+            wasFailed ||
+            // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint,
+            // or has yet to even start one.
+            // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time).
+            (getIndexer().getState() == IndexerState.STARTED && getIndexer().initialRun())) {
+            IndexerState state = getIndexer().stop();
+            if (state == IndexerState.STOPPED) {
+                getIndexer().onStop();
+                getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {});
+            }
         }
     }
 
@@ -400,6 +439,12 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
         // We should not keep retrying. Either the task will be stopped, or started
         // If it is started again, it is registered again.
         deregisterSchedulerJob();
+        // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again,
+        // we should set this flag to false.
+        if (getIndexer() != null) {
+            getIndexer().setShouldStopAtCheckpoint(false);
+        }
+        // The end user should see that the task is in a failed state, and attempt to stop it again but with force=true
         taskState.set(TransformTaskState.FAILED);
         stateReason.set(reason);
         TransformState newState = getState();

+ 134 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java

@@ -0,0 +1,134 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.transform.action;
+
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.indexing.IndexerState;
+import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats;
+import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
+import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
+import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStatsTests;
+import org.elasticsearch.xpack.core.transform.transforms.TransformState;
+import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
+import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
+import org.elasticsearch.xpack.transform.transforms.TransformTask;
+
+import java.time.Instant;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TransportGetTransformStatsActionTests extends ESTestCase {
+
+    private TransformTask task = mock(TransformTask.class);
+
+    public void testDeriveStatsStopped() {
+        String transformId = "transform-with-stats";
+        String reason = "";
+        TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
+        TransformState stoppedState =
+            new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true);
+        withIdStateAndStats(transformId, stoppedState, stats);
+        TransformCheckpointingInfo info = new TransformCheckpointingInfo(
+            new TransformCheckpointStats(1, null, null, 1, 1),
+            new TransformCheckpointStats(2, null, null, 2, 5),
+            2,
+            Instant.now());
+
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
+            equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, "", null, stats, TransformCheckpointingInfo.EMPTY)));
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
+            equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, "", null, stats, info)));
+
+
+        reason = "foo";
+        stoppedState = new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true);
+        withIdStateAndStats(transformId, stoppedState, stats);
+
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
+            equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
+            equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info)));
+    }
+
+    public void testDeriveStatsFailed() {
+        String transformId = "transform-with-stats";
+        String reason = "";
+        TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
+        TransformState failedState =
+            new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true);
+        withIdStateAndStats(transformId, failedState, stats);
+        TransformCheckpointingInfo info = new TransformCheckpointingInfo(
+            new TransformCheckpointStats(1, null, null, 1, 1),
+            new TransformCheckpointStats(2, null, null, 2, 5),
+            2,
+            Instant.now());
+
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
+            equalTo(new TransformStats(transformId, TransformStats.State.FAILED, "", null, stats, TransformCheckpointingInfo.EMPTY)));
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
+            equalTo(new TransformStats(transformId, TransformStats.State.FAILED, "", null, stats, info)));
+
+
+        reason = "the task is failed";
+        failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true);
+        withIdStateAndStats(transformId, failedState, stats);
+
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
+            equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
+            equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info)));
+    }
+
+
+    public void testDeriveStats() {
+        String transformId = "transform-with-stats";
+        String reason = "";
+        TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
+        TransformState runningState =
+            new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true);
+        withIdStateAndStats(transformId, runningState, stats);
+        TransformCheckpointingInfo info = new TransformCheckpointingInfo(
+            new TransformCheckpointStats(1, null, null, 1, 1),
+            new TransformCheckpointStats(2, null, null, 2, 5),
+            2,
+            Instant.now());
+
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
+            equalTo(new TransformStats(transformId, TransformStats.State.STOPPING,
+                "transform is set to stop at the next checkpoint", null, stats, TransformCheckpointingInfo.EMPTY)));
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
+            equalTo(new TransformStats(transformId, TransformStats.State.STOPPING,
+                "transform is set to stop at the next checkpoint", null, stats, info)));
+
+
+        reason = "foo";
+        runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true);
+        withIdStateAndStats(transformId, runningState, stats);
+
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
+            equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
+            equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, info)));
+
+        // Stop at next checkpoint is false.
+        runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, false);
+        withIdStateAndStats(transformId, runningState, stats);
+
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
+            equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
+        assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
+            equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, info)));
+    }
+
+    private void withIdStateAndStats(String transformId, TransformState state, TransformIndexerStats stats) {
+        when(task.getTransformId()).thenReturn(transformId);
+        when(task.getState()).thenReturn(state);
+        when(task.getStats()).thenReturn(stats);
+    }
+
+}

+ 2 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

@@ -67,7 +67,8 @@ public class ClientTransformIndexerTests extends ESTestCase {
                 2L,
                 Collections.emptyMap(),
                 Instant.now().toEpochMilli()),
-            parentTask);
+            parentTask,
+            false);
 
         List<Boolean> shouldAudit = IntStream.range(0, 100_000).boxed().map(indexer::shouldAuditOnFinish).collect(Collectors.toList());
 

+ 3 - 3
x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml

@@ -279,6 +279,6 @@ setup:
 
   - do:
       indices.get_mapping:
-        index: .transform-internal-003
-  - match: { \.transform-internal-003.mappings.dynamic: "false" }
-  - match: { \.transform-internal-003.mappings.properties.id.type: "keyword" }
+        index: .transform-internal-004
+  - match: { \.transform-internal-004.mappings.dynamic: "false" }
+  - match: { \.transform-internal-004.mappings.properties.id.type: "keyword" }