Răsfoiți Sursa

[ML-DataFrame] create checkpoints on every new run (#40725)

Use the checkpoint service to create a checkpoint on every new run. Expose checkpoints stats on _stats endpoint.
Hendrik Muhs 6 ani în urmă
părinte
comite
f83f72cf69
34 a modificat fișierele cu 1447 adăugiri și 146 ștergeri
  1. 93 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStats.java
  2. 102 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfo.java
  3. 9 9
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java
  4. 16 4
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStats.java
  5. 51 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java
  6. 61 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java
  7. 4 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStatsTests.java
  8. 1 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java
  9. 11 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java
  10. 108 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStats.java
  11. 141 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfo.java
  12. 21 12
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java
  13. 24 6
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java
  14. 35 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java
  15. 35 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java
  16. 2 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStatsTests.java
  17. 52 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsHlrcTests.java
  18. 55 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/hlrc/DataFrameTransformCheckpointingInfoHlrcTests.java
  19. 2 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsHlrcTests.java
  20. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/hlrc/DataFrameTransformStateHlrcTests.java
  21. 6 6
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java
  22. 1 1
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java
  23. 1 1
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java
  24. 20 8
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java
  25. 105 2
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java
  26. 2 1
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java
  27. 12 1
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java
  28. 82 26
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformCheckpoint.java
  29. 46 25
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java
  30. 2 2
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java
  31. 258 0
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformCheckpointServiceNodeTests.java
  32. 3 2
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java
  33. 83 33
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformCheckpointTests.java
  34. 2 2
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

+ 93 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStats.java

@@ -0,0 +1,93 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class DataFrameTransformCheckpointStats {
+    public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
+    public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
+
+    public static DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, 0L);
+
+    private final long timestampMillis;
+    private final long timeUpperBoundMillis;
+
+    public static final ConstructingObjectParser<DataFrameTransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
+            "data_frame_transform_checkpoint_stats", true, args -> {
+                long timestamp = args[0] == null ? 0L : (Long) args[0];
+                long timeUpperBound = args[1] == null ? 0L : (Long) args[1];
+
+                return new DataFrameTransformCheckpointStats(timestamp, timeUpperBound);
+            });
+
+    static {
+        LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIMESTAMP_MILLIS);
+        LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS);
+    }
+
+    public static DataFrameTransformCheckpointStats fromXContent(XContentParser parser) throws IOException {
+        return LENIENT_PARSER.parse(parser, null);
+    }
+
+    public DataFrameTransformCheckpointStats(final long timestampMillis, final long timeUpperBoundMillis) {
+        this.timestampMillis = timestampMillis;
+        this.timeUpperBoundMillis = timeUpperBoundMillis;
+    }
+
+    public DataFrameTransformCheckpointStats(StreamInput in) throws IOException {
+        this.timestampMillis = in.readLong();
+        this.timeUpperBoundMillis = in.readLong();
+    }
+
+    public long getTimestampMillis() {
+        return timestampMillis;
+    }
+
+    public long getTimeUpperBoundMillis() {
+        return timeUpperBoundMillis;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(timestampMillis, timeUpperBoundMillis);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other;
+
+        return this.timestampMillis == that.timestampMillis && this.timeUpperBoundMillis == that.timeUpperBoundMillis;
+    }
+}

+ 102 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfo.java

@@ -0,0 +1,102 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.util.Objects;
+
+public class DataFrameTransformCheckpointingInfo {
+
+    public static final ParseField CURRENT_CHECKPOINT = new ParseField("current");
+    public static final ParseField IN_PROGRESS_CHECKPOINT = new ParseField("in_progress");
+    public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
+
+    private final DataFrameTransformCheckpointStats current;
+    private final DataFrameTransformCheckpointStats inProgress;
+    private final long operationsBehind;
+
+
+    private static final ConstructingObjectParser<DataFrameTransformCheckpointingInfo, Void> LENIENT_PARSER =
+            new ConstructingObjectParser<>(
+                    "data_frame_transform_checkpointing_info", true, a -> {
+                        long behind = a[2] == null ? 0L : (Long) a[2];
+
+                        return new DataFrameTransformCheckpointingInfo(
+                                a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0],
+                                a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], behind);
+                    });
+
+    static {
+        LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
+                (p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), CURRENT_CHECKPOINT);
+        LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
+                (p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), IN_PROGRESS_CHECKPOINT);
+        LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
+    }
+
+    public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats current, DataFrameTransformCheckpointStats inProgress,
+            long operationsBehind) {
+        this.current = Objects.requireNonNull(current);
+        this.inProgress = Objects.requireNonNull(inProgress);
+        this.operationsBehind = operationsBehind;
+    }
+
+    public DataFrameTransformCheckpointStats getCurrent() {
+        return current;
+    }
+
+    public DataFrameTransformCheckpointStats getInProgress() {
+        return inProgress;
+    }
+
+    public long getOperationsBehind() {
+        return operationsBehind;
+    }
+
+    public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p) {
+        return LENIENT_PARSER.apply(p, null);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(current, inProgress, operationsBehind);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        DataFrameTransformCheckpointingInfo that = (DataFrameTransformCheckpointingInfo) other;
+
+        return Objects.equals(this.current, that.current) &&
+                Objects.equals(this.inProgress, that.inProgress) &&
+                this.operationsBehind == that.operationsBehind;
+    }
+
+}

+ 9 - 9
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java

@@ -42,7 +42,7 @@ public class DataFrameTransformState {
     private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
     private static final ParseField TASK_STATE = new ParseField("task_state");
     private static final ParseField CURRENT_POSITION = new ParseField("current_position");
-    private static final ParseField GENERATION = new ParseField("generation");
+    private static final ParseField CHECKPOINT = new ParseField("checkpoint");
     private static final ParseField REASON = new ParseField("reason");
 
     @SuppressWarnings("unchecked")
@@ -69,7 +69,7 @@ public class DataFrameTransformState {
             }
             throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
         }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
-        PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), GENERATION);
+        PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
     }
 
@@ -79,19 +79,19 @@ public class DataFrameTransformState {
 
     private final DataFrameTransformTaskState taskState;
     private final IndexerState indexerState;
-    private final long generation;
+    private final long checkpoint;
     private final SortedMap<String, Object> currentPosition;
     private final String reason;
 
     public DataFrameTransformState(DataFrameTransformTaskState taskState,
                                    IndexerState indexerState,
                                    @Nullable Map<String, Object> position,
-                                   long generation,
+                                   long checkpoint,
                                    @Nullable String reason) {
         this.taskState = taskState;
         this.indexerState = indexerState;
         this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
-        this.generation = generation;
+        this.checkpoint = checkpoint;
         this.reason = reason;
     }
 
@@ -108,8 +108,8 @@ public class DataFrameTransformState {
         return currentPosition;
     }
 
-    public long getGeneration() {
-        return generation;
+    public long getCheckpoint() {
+        return checkpoint;
     }
 
     @Nullable
@@ -132,13 +132,13 @@ public class DataFrameTransformState {
         return Objects.equals(this.taskState, that.taskState) &&
             Objects.equals(this.indexerState, that.indexerState) &&
             Objects.equals(this.currentPosition, that.currentPosition) &&
-            this.generation == that.generation &&
+            this.checkpoint == that.checkpoint &&
             Objects.equals(this.reason, that.reason);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(taskState, indexerState, currentPosition, generation, reason);
+        return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason);
     }
 
 }

+ 16 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStats.java

@@ -31,16 +31,20 @@ public class DataFrameTransformStateAndStats {
     public static final ParseField ID = new ParseField("id");
     public static final ParseField STATE_FIELD = new ParseField("state");
     public static final ParseField STATS_FIELD = new ParseField("stats");
+    public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");
 
     public static final ConstructingObjectParser<DataFrameTransformStateAndStats, Void> PARSER = new ConstructingObjectParser<>(
             "data_frame_transform_state_and_stats", true,
-            a -> new DataFrameTransformStateAndStats((String) a[0], (DataFrameTransformState) a[1], (DataFrameIndexerTransformStats) a[2]));
+            a -> new DataFrameTransformStateAndStats((String) a[0], (DataFrameTransformState) a[1], (DataFrameIndexerTransformStats) a[2],
+                    (DataFrameTransformCheckpointingInfo) a[3]));
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), DataFrameTransformState.PARSER::apply, STATE_FIELD);
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> DataFrameIndexerTransformStats.fromXContent(p),
                 STATS_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
+                (p, c) -> DataFrameTransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
     }
 
     public static DataFrameTransformStateAndStats fromXContent(XContentParser parser) throws IOException {
@@ -50,11 +54,14 @@ public class DataFrameTransformStateAndStats {
     private final String id;
     private final DataFrameTransformState transformState;
     private final DataFrameIndexerTransformStats transformStats;
+    private final DataFrameTransformCheckpointingInfo checkpointingInfo;
 
-    public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats) {
+    public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
+            DataFrameTransformCheckpointingInfo checkpointingInfo) {
         this.id = id;
         this.transformState = state;
         this.transformStats = stats;
+        this.checkpointingInfo = checkpointingInfo;
     }
 
     public String getId() {
@@ -69,9 +76,13 @@ public class DataFrameTransformStateAndStats {
         return transformState;
     }
 
+    public DataFrameTransformCheckpointingInfo getCheckpointingInfo() {
+        return checkpointingInfo;
+    }
+
     @Override
     public int hashCode() {
-        return Objects.hash(id, transformState, transformStats);
+        return Objects.hash(id, transformState, transformStats, checkpointingInfo);
     }
 
     @Override
@@ -87,6 +98,7 @@ public class DataFrameTransformStateAndStats {
         DataFrameTransformStateAndStats that = (DataFrameTransformStateAndStats) other;
 
         return Objects.equals(this.id, that.id) && Objects.equals(this.transformState, that.transformState)
-                && Objects.equals(this.transformStats, that.transformStats);
+                && Objects.equals(this.transformStats, that.transformStats)
+                && Objects.equals(this.checkpointingInfo, that.checkpointingInfo);
     }
 }

+ 51 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java

@@ -0,0 +1,51 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+
+public class DataFrameTransformCheckpointStatsTests extends ESTestCase {
+
+    public void testFromXContent() throws IOException {
+        xContentTester(this::createParser,
+                DataFrameTransformCheckpointStatsTests::randomDataFrameTransformCheckpointStats,
+                DataFrameTransformCheckpointStatsTests::toXContent,
+                DataFrameTransformCheckpointStats::fromXContent)
+                .supportsUnknownFields(true)
+                .test();
+    }
+
+    public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
+        return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000));
+    }
+
+    public static void toXContent(DataFrameTransformCheckpointStats stats, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        builder.field("timestamp_millis", stats.getTimestampMillis());
+        builder.field("time_upper_bound_millis", stats.getTimeUpperBoundMillis());
+        builder.endObject();
+    }
+
+}

+ 61 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java

@@ -0,0 +1,61 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+
+public class DataFrameTransformCheckpointingInfoTests extends ESTestCase {
+
+    public void testFromXContent() throws IOException {
+        xContentTester(this::createParser,
+                DataFrameTransformCheckpointingInfoTests::randomDataFrameTransformCheckpointingInfo,
+                DataFrameTransformCheckpointingInfoTests::toXContent,
+                DataFrameTransformCheckpointingInfo::fromXContent)
+                .supportsUnknownFields(false)
+                .test();
+    }
+
+    public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
+        return new DataFrameTransformCheckpointingInfo(
+                DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
+                DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
+                randomLongBetween(0, 10000));
+    }
+
+    public static void toXContent(DataFrameTransformCheckpointingInfo info, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        if (info.getCurrent().getTimestampMillis() > 0) {
+            builder.field("current");
+            DataFrameTransformCheckpointStatsTests.toXContent(info.getCurrent(), builder);
+        }
+        if (info.getInProgress().getTimestampMillis() > 0) {
+            builder.field("in_progress");
+            DataFrameTransformCheckpointStatsTests.toXContent(info.getInProgress(), builder);
+        }
+        builder.field("operations_behind", info.getOperationsBehind());
+        builder.endObject();
+    }
+
+}

+ 4 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStatsTests.java

@@ -41,7 +41,8 @@ public class DataFrameTransformStateAndStatsTests extends ESTestCase {
     public static DataFrameTransformStateAndStats randomInstance() {
         return new DataFrameTransformStateAndStats(randomAlphaOfLength(10),
                 DataFrameTransformStateTests.randomDataFrameTransformState(),
-                DataFrameIndexerTransformStatsTests.randomStats());
+                DataFrameIndexerTransformStatsTests.randomStats(),
+                DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
     }
 
     public static void toXContent(DataFrameTransformStateAndStats stateAndStats, XContentBuilder builder) throws IOException {
@@ -51,6 +52,8 @@ public class DataFrameTransformStateAndStatsTests extends ESTestCase {
         DataFrameTransformStateTests.toXContent(stateAndStats.getTransformState(), builder);
         builder.field(DataFrameTransformStateAndStats.STATS_FIELD.getPreferredName());
         DataFrameIndexerTransformStatsTests.toXContent(stateAndStats.getTransformStats(), builder);
+        builder.field(DataFrameTransformStateAndStats.CHECKPOINTING_INFO_FIELD.getPreferredName());
+        DataFrameTransformCheckpointingInfoTests.toXContent(stateAndStats.getCheckpointingInfo(), builder);
         builder.endObject();
     }
 }

+ 1 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java

@@ -56,7 +56,7 @@ public class DataFrameTransformStateTests extends ESTestCase {
         if (state.getPosition() != null) {
             builder.field("current_position", state.getPosition());
         }
-        builder.field("generation", state.getGeneration());
+        builder.field("checkpoint", state.getCheckpoint());
         if (state.getReason() != null) {
             builder.field("reason", state.getReason());
         }

+ 11 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java

@@ -28,6 +28,17 @@ public final class DataFrameField {
     public static final ParseField DESTINATION = new ParseField("dest");
     public static final ParseField FORCE = new ParseField("force");
 
+    /**
+     * Fields for checkpointing
+     */
+    // the timestamp of the checkpoint, mandatory
+    public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
+    public static final ParseField TIMESTAMP = new ParseField("timestamp");
+    // checkpoint for for time based sync
+    // TODO: consider a lower bound for usecases where you want to transform on a window of a stream
+    public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
+    public static final ParseField TIME_UPPER_BOUND = new ParseField("time_upper_bound");
+
     // common strings
     public static final String TASK_NAME = "data_frame/transforms";
     public static final String REST_BASE_PATH = "/_data_frame/";

+ 108 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStats.java

@@ -0,0 +1,108 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.dataframe.DataFrameField;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Checkpoint stats data for 1 checkpoint
+ *
+ * This is the user-facing side of DataFrameTransformCheckpoint, containing only the stats to be exposed.
+ */
+public class DataFrameTransformCheckpointStats implements Writeable, ToXContentObject {
+
+    public static DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, 0L);
+
+    private final long timestampMillis;
+    private final long timeUpperBoundMillis;
+
+    private static final ConstructingObjectParser<DataFrameTransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
+            "data_frame_transform_checkpoint_stats", true, args -> {
+                long timestamp = args[0] == null ? 0L : (Long) args[0];
+                long timeUpperBound = args[1] == null ? 0L : (Long) args[1];
+
+                return new DataFrameTransformCheckpointStats(timestamp, timeUpperBound);
+            });
+
+    static {
+        LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), DataFrameField.TIMESTAMP_MILLIS);
+        LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), DataFrameField.TIME_UPPER_BOUND_MILLIS);
+    }
+
+    public DataFrameTransformCheckpointStats(final long timestampMillis, final long timeUpperBoundMillis) {
+        this.timestampMillis = timestampMillis;
+        this.timeUpperBoundMillis = timeUpperBoundMillis;
+    }
+
+    public DataFrameTransformCheckpointStats(StreamInput in) throws IOException {
+        this.timestampMillis = in.readLong();
+        this.timeUpperBoundMillis = in.readLong();
+    }
+
+    public long getTimestampMillis() {
+        return timestampMillis;
+    }
+
+    public long getTimeUpperBoundMillis() {
+        return timeUpperBoundMillis;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.timeField(DataFrameField.TIMESTAMP_MILLIS.getPreferredName(), DataFrameField.TIMESTAMP.getPreferredName(),
+                getTimestampMillis());
+        if (timeUpperBoundMillis > 0) {
+            builder.timeField(DataFrameField.TIME_UPPER_BOUND_MILLIS.getPreferredName(), DataFrameField.TIME_UPPER_BOUND.getPreferredName(),
+                    timeUpperBoundMillis);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeLong(timestampMillis);
+        out.writeLong(timeUpperBoundMillis);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(timestampMillis, timeUpperBoundMillis);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other;
+
+        return this.timestampMillis == that.timestampMillis &&
+                this.timeUpperBoundMillis == that.timeUpperBoundMillis;
+    }
+
+    public static DataFrameTransformCheckpointStats fromXContent(XContentParser p) {
+        return LENIENT_PARSER.apply(p, null);
+    }
+
+}

+ 141 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfo.java

@@ -0,0 +1,141 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Holds information about checkpointing regarding
+ *  - the current checkpoint
+ *  - the in progress checkpoint
+ *  - the current state of the source
+ */
+public class DataFrameTransformCheckpointingInfo implements Writeable, ToXContentObject {
+
+    public static DataFrameTransformCheckpointingInfo EMPTY = new DataFrameTransformCheckpointingInfo(
+            DataFrameTransformCheckpointStats.EMPTY,
+            DataFrameTransformCheckpointStats.EMPTY,
+            0L);
+
+    public static final ParseField CURRENT_CHECKPOINT = new ParseField("current");
+    public static final ParseField IN_PROGRESS_CHECKPOINT = new ParseField("in_progress");
+    public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
+
+    private final DataFrameTransformCheckpointStats current;
+    private final DataFrameTransformCheckpointStats inProgress;
+    private final long operationsBehind;
+
+    private static final ConstructingObjectParser<DataFrameTransformCheckpointingInfo, Void> LENIENT_PARSER =
+            new ConstructingObjectParser<>(
+                    "data_frame_transform_checkpointing_info", true, a -> {
+                        long behind = a[2] == null ? 0L : (Long) a[2];
+
+                        return new DataFrameTransformCheckpointingInfo(
+                                a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0],
+                                a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], behind);
+                    });
+
+    static {
+        LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
+                (p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), CURRENT_CHECKPOINT);
+        LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
+                (p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), IN_PROGRESS_CHECKPOINT);
+        LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
+    }
+
+    /**
+     * Create checkpoint stats object with checkpoint information about the current and in progress checkpoint as well as the current state
+     * of source.
+     *
+     * @param current stats of the current checkpoint
+     * @param inProgress stats of the in progress checkpoint
+     * @param operationsBehind counter of operations the current checkpoint is behind source
+     */
+    public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats current, DataFrameTransformCheckpointStats inProgress,
+            long operationsBehind) {
+        this.current = Objects.requireNonNull(current);
+        this.inProgress = Objects.requireNonNull(inProgress);
+        this.operationsBehind = operationsBehind;
+    }
+
+    public DataFrameTransformCheckpointingInfo(StreamInput in) throws IOException {
+        current = new DataFrameTransformCheckpointStats(in);
+        inProgress = new DataFrameTransformCheckpointStats(in);
+        operationsBehind = in.readLong();
+    }
+
+    public DataFrameTransformCheckpointStats getCurrent() {
+        return current;
+    }
+
+    public DataFrameTransformCheckpointStats getInProgress() {
+        return inProgress;
+    }
+
+    public long getOperationsBehind() {
+        return operationsBehind;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        if (current.getTimestampMillis() > 0) {
+            builder.field(CURRENT_CHECKPOINT.getPreferredName(), current);
+        }
+        if (inProgress.getTimestampMillis() > 0) {
+            builder.field(IN_PROGRESS_CHECKPOINT.getPreferredName(), inProgress);
+        }
+
+        builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind);
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        current.writeTo(out);
+        inProgress.writeTo(out);
+        out.writeLong(operationsBehind);
+    }
+
+    public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p) {
+        return LENIENT_PARSER.apply(p, null);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(current, inProgress, operationsBehind);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        DataFrameTransformCheckpointingInfo that = (DataFrameTransformCheckpointingInfo) other;
+
+        return Objects.equals(this.current, that.current) &&
+                Objects.equals(this.inProgress, that.inProgress) &&
+                this.operationsBehind == that.operationsBehind;
+    }
+
+}

+ 21 - 12
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java

@@ -35,7 +35,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
 
     private final DataFrameTransformTaskState taskState;
     private final IndexerState indexerState;
-    private final long generation;
+    private final long checkpoint;
 
     @Nullable
     private final SortedMap<String, Object> currentPosition;
@@ -45,7 +45,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
     private static final ParseField TASK_STATE = new ParseField("task_state");
     private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
     private static final ParseField CURRENT_POSITION = new ParseField("current_position");
-    private static final ParseField GENERATION = new ParseField("generation");
+    private static final ParseField CHECKPOINT = new ParseField("checkpoint");
     private static final ParseField REASON = new ParseField("reason");
 
     @SuppressWarnings("unchecked")
@@ -80,19 +80,19 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
             }
             throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
         }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
-        PARSER.declareLong(constructorArg(), GENERATION);
+        PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
         PARSER.declareString(optionalConstructorArg(), REASON);
     }
 
     public DataFrameTransformState(DataFrameTransformTaskState taskState,
                                    IndexerState indexerState,
                                    @Nullable Map<String, Object> position,
-                                   long generation,
+                                   long checkpoint,
                                    @Nullable String reason) {
         this.taskState = taskState;
         this.indexerState = indexerState;
         this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
-        this.generation = generation;
+        this.checkpoint = checkpoint;
         this.reason = reason;
     }
 
@@ -100,7 +100,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
         taskState = DataFrameTransformTaskState.fromStream(in);
         indexerState = IndexerState.fromStream(in);
         currentPosition = in.readBoolean() ? Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap())) : null;
-        generation = in.readLong();
+        checkpoint = in.readLong();
         reason = in.readOptionalString();
     }
 
@@ -116,8 +116,17 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
         return currentPosition;
     }
 
-    public long getGeneration() {
-        return generation;
+    public long getCheckpoint() {
+        return checkpoint;
+    }
+
+    /**
+     * Get the in-progress checkpoint
+     *
+     * @return checkpoint in progress or 0 if task/indexer is not active
+     */
+    public long getInProgressCheckpoint() {
+        return indexerState.equals(IndexerState.INDEXING) ? checkpoint + 1L : 0;
     }
 
     public String getReason() {
@@ -140,7 +149,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
         if (currentPosition != null) {
             builder.field(CURRENT_POSITION.getPreferredName(), currentPosition);
         }
-        builder.field(GENERATION.getPreferredName(), generation);
+        builder.field(CHECKPOINT.getPreferredName(), checkpoint);
         if (reason != null) {
             builder.field(REASON.getPreferredName(), reason);
         }
@@ -161,7 +170,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
         if (currentPosition != null) {
             out.writeMap(currentPosition);
         }
-        out.writeLong(generation);
+        out.writeLong(checkpoint);
         out.writeOptionalString(reason);
     }
 
@@ -180,13 +189,13 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
         return Objects.equals(this.taskState, that.taskState) &&
             Objects.equals(this.indexerState, that.indexerState) &&
             Objects.equals(this.currentPosition, that.currentPosition) &&
-            this.generation == that.generation &&
+            this.checkpoint == that.checkpoint &&
             Objects.equals(this.reason, that.reason);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(taskState, indexerState, currentPosition, generation, reason);
+        return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason);
     }
 
     @Override

+ 24 - 6
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java

@@ -24,20 +24,27 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
 
     private static final String NAME = "data_frame_transform_state_and_stats";
     public static final ParseField STATE_FIELD = new ParseField("state");
+    public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");
 
     private final String id;
     private final DataFrameTransformState transformState;
     private final DataFrameIndexerTransformStats transformStats;
+    private final DataFrameTransformCheckpointingInfo checkpointingInfo;
 
     public static final ConstructingObjectParser<DataFrameTransformStateAndStats, Void> PARSER = new ConstructingObjectParser<>(
             NAME, true,
-            a -> new DataFrameTransformStateAndStats((String) a[0], (DataFrameTransformState) a[1], (DataFrameIndexerTransformStats) a[2]));
+            a -> new DataFrameTransformStateAndStats((String) a[0],
+                    (DataFrameTransformState) a[1],
+                    (DataFrameIndexerTransformStats) a[2],
+                    (DataFrameTransformCheckpointingInfo) a[3]));
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), DataFrameTransformState.PARSER::apply, STATE_FIELD);
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> DataFrameIndexerTransformStats.fromXContent(p),
                 DataFrameField.STATS_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
+                                (p, c) -> DataFrameTransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
     }
 
     public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
@@ -47,27 +54,32 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
     public static DataFrameTransformStateAndStats initialStateAndStats(String id, DataFrameIndexerTransformStats indexerTransformStats) {
         return new DataFrameTransformStateAndStats(id,
             new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null),
-            indexerTransformStats);
+            indexerTransformStats,
+            DataFrameTransformCheckpointingInfo.EMPTY);
     }
 
-    public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats) {
+    public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
+            DataFrameTransformCheckpointingInfo checkpointingInfo) {
         this.id = Objects.requireNonNull(id);
         this.transformState = Objects.requireNonNull(state);
         this.transformStats = Objects.requireNonNull(stats);
+        this.checkpointingInfo = Objects.requireNonNull(checkpointingInfo);
     }
 
     public DataFrameTransformStateAndStats(StreamInput in) throws IOException {
         this.id = in.readString();
         this.transformState = new DataFrameTransformState(in);
         this.transformStats = new DataFrameIndexerTransformStats(in);
+        this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in);
     }
 
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
         builder.field(DataFrameField.ID.getPreferredName(), id);
-        builder.field(STATE_FIELD.getPreferredName(), transformState);
+        builder.field(STATE_FIELD.getPreferredName(), transformState, params);
         builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats, params);
+        builder.field(CHECKPOINTING_INFO_FIELD.getPreferredName(), checkpointingInfo, params);
         builder.endObject();
         return builder;
     }
@@ -77,11 +89,12 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
         out.writeString(id);
         transformState.writeTo(out);
         transformStats.writeTo(out);
+        checkpointingInfo.writeTo(out);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, transformState, transformStats);
+        return Objects.hash(id, transformState, transformStats, checkpointingInfo);
     }
 
     @Override
@@ -97,7 +110,8 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
         DataFrameTransformStateAndStats that = (DataFrameTransformStateAndStats) other;
 
         return Objects.equals(this.id, that.id) && Objects.equals(this.transformState, that.transformState)
-                && Objects.equals(this.transformStats, that.transformStats);
+                && Objects.equals(this.transformStats, that.transformStats)
+                && Objects.equals(this.checkpointingInfo, that.checkpointingInfo);
     }
 
     public String getId() {
@@ -112,6 +126,10 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
         return transformState;
     }
 
+    public DataFrameTransformCheckpointingInfo getCheckpointingInfo() {
+        return checkpointingInfo;
+    }
+
     @Override
     public String toString() {
         return Strings.toString(this);

+ 35 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java

@@ -0,0 +1,35 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+
+public class DataFrameTransformCheckpointStatsTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformCheckpointStats>
+{
+    public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
+        return new DataFrameTransformCheckpointStats(randomNonNegativeLong(), randomNonNegativeLong());
+    }
+
+    @Override
+    protected DataFrameTransformCheckpointStats doParseInstance(XContentParser parser) throws IOException {
+        return DataFrameTransformCheckpointStats.fromXContent(parser);
+    }
+
+    @Override
+    protected DataFrameTransformCheckpointStats createTestInstance() {
+        return randomDataFrameTransformCheckpointStats();
+    }
+
+    @Override
+    protected Reader<DataFrameTransformCheckpointStats> instanceReader() {
+        return DataFrameTransformCheckpointStats::new;
+    }
+
+}

+ 35 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointingInfoTests.java

@@ -0,0 +1,35 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+
+public class DataFrameTransformCheckpointingInfoTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformCheckpointingInfo> {
+
+    public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
+        return new DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
+                DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), randomNonNegativeLong());
+    }
+
+    @Override
+    protected DataFrameTransformCheckpointingInfo doParseInstance(XContentParser parser) throws IOException {
+        return DataFrameTransformCheckpointingInfo.fromXContent(parser);
+    }
+
+    @Override
+    protected DataFrameTransformCheckpointingInfo createTestInstance() {
+        return randomDataFrameTransformCheckpointingInfo();
+    }
+
+    @Override
+    protected Reader<DataFrameTransformCheckpointingInfo> instanceReader() {
+        return DataFrameTransformCheckpointingInfo::new;
+    }
+}

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStatsTests.java

@@ -22,7 +22,8 @@ public class DataFrameTransformStateAndStatsTests extends AbstractSerializingDat
     public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) {
         return new DataFrameTransformStateAndStats(id,
                 DataFrameTransformStateTests.randomDataFrameTransformState(),
-                DataFrameIndexerTransformStatsTests.randomStats(id));
+                DataFrameIndexerTransformStatsTests.randomStats(id),
+                DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
     }
 
     public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats() {

+ 52 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsHlrcTests.java

@@ -0,0 +1,52 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms.hlrc;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.protocol.AbstractHlrcXContentTestCase;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStatsTests;
+
+import java.io.IOException;
+
+public class DataFrameTransformCheckpointStatsHlrcTests extends AbstractHlrcXContentTestCase<
+        DataFrameTransformCheckpointStats,
+        org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats> {
+
+    public static DataFrameTransformCheckpointStats fromHlrc(
+            org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats instance) {
+        return new DataFrameTransformCheckpointStats(instance.getTimestampMillis(), instance.getTimeUpperBoundMillis());
+    }
+
+    @Override
+    public org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats doHlrcParseInstance(XContentParser parser)
+            throws IOException {
+        return org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats.fromXContent(parser);
+    }
+
+    @Override
+    public DataFrameTransformCheckpointStats convertHlrcToInternal(
+            org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats instance) {
+        return fromHlrc(instance);
+    }
+
+    @Override
+    protected DataFrameTransformCheckpointStats createTestInstance() {
+        return DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats();
+    }
+
+    @Override
+    protected DataFrameTransformCheckpointStats doParseInstance(XContentParser parser) throws IOException {
+        return DataFrameTransformCheckpointStats.fromXContent(parser);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+}

+ 55 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/hlrc/DataFrameTransformCheckpointingInfoHlrcTests.java

@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms.hlrc;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.protocol.AbstractHlrcXContentTestCase;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfoTests;
+
+import java.io.IOException;
+
+public class DataFrameTransformCheckpointingInfoHlrcTests  extends AbstractHlrcXContentTestCase<
+        DataFrameTransformCheckpointingInfo,
+        org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo> {
+
+    public static DataFrameTransformCheckpointingInfo fromHlrc(
+            org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo instance) {
+        return new DataFrameTransformCheckpointingInfo(
+                DataFrameTransformCheckpointStatsHlrcTests.fromHlrc(instance.getCurrent()),
+                DataFrameTransformCheckpointStatsHlrcTests.fromHlrc(instance.getInProgress()),
+                instance.getOperationsBehind());
+    }
+
+    @Override
+    public org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo doHlrcParseInstance(XContentParser parser)
+            throws IOException {
+        return org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo.fromXContent(parser);
+    }
+
+    @Override
+    public DataFrameTransformCheckpointingInfo convertHlrcToInternal(
+            org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo instance) {
+        return fromHlrc(instance);
+    }
+
+    @Override
+    protected DataFrameTransformCheckpointingInfo createTestInstance() {
+        return DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo();
+    }
+
+    @Override
+    protected DataFrameTransformCheckpointingInfo doParseInstance(XContentParser parser) throws IOException {
+        return DataFrameTransformCheckpointingInfo.fromXContent(parser);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+}

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsHlrcTests.java

@@ -29,7 +29,8 @@ public class DataFrameTransformStateAndStatsHlrcTests extends AbstractHlrcXConte
             org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats instance) {
         return new DataFrameTransformStateAndStats(instance.getId(),
                 DataFrameTransformStateHlrcTests.fromHlrc(instance.getTransformState()),
-                DataFrameIndexerTransformStatsHlrcTests.fromHlrc(instance.getTransformStats()));
+                DataFrameIndexerTransformStatsHlrcTests.fromHlrc(instance.getTransformStats()),
+                DataFrameTransformCheckpointingInfoHlrcTests.fromHlrc(instance.getCheckpointingInfo()));
     }
 
     @Override

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/hlrc/DataFrameTransformStateHlrcTests.java

@@ -21,7 +21,7 @@ public class DataFrameTransformStateHlrcTests extends AbstractHlrcXContentTestCa
 
     public static DataFrameTransformState fromHlrc(org.elasticsearch.client.dataframe.transforms.DataFrameTransformState instance) {
         return new DataFrameTransformState(DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
-                IndexerState.fromString(instance.getIndexerState().value()), instance.getPosition(), instance.getGeneration(),
+                IndexerState.fromString(instance.getIndexerState().value()), instance.getPosition(), instance.getCheckpoint(),
                 instance.getReason());
     }
 

+ 6 - 6
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

@@ -196,7 +196,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         // start the transform
         startDataframeTransform(transformId, false, authHeader);
         // wait until the dataframe has been created and all data is available
-        waitForDataFrameGeneration(transformId);
+        waitForDataFrameCheckpoint(transformId);
         refreshIndex(dataFrameIndex);
     }
 
@@ -212,10 +212,10 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         return request;
     }
 
-    void waitForDataFrameGeneration(String transformId) throws Exception {
+    void waitForDataFrameCheckpoint(String transformId) throws Exception {
         assertBusy(() -> {
-            long generation = getDataFrameGeneration(transformId);
-            assertEquals(1, generation);
+            long checkpoint = getDataFrameCheckpoint(transformId);
+            assertEquals(1, checkpoint);
         }, 30, TimeUnit.SECONDS);
     }
 
@@ -321,11 +321,11 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         }
     }
 
-    static int getDataFrameGeneration(String transformId) throws IOException {
+    static int getDataFrameCheckpoint(String transformId) throws IOException {
         Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));
 
         Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
-        return (int) XContentMapValues.extractValue("state.generation", transformStatsAsMap);
+        return (int) XContentMapValues.extractValue("state.checkpoint", transformStatsAsMap);
     }
 
     protected void setupDataAccessRole(String role, String... indices) throws IOException {

+ 1 - 1
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java

@@ -60,7 +60,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
         // Force start the data frame to indicate failure correction
         startDataframeTransform(transformId, true);
         // Wait for data to be indexed appropriately and refresh for search
-        waitForDataFrameGeneration(transformId);
+        waitForDataFrameCheckpoint(transformId);
         refreshIndex(dataFrameIndex);
 
         // Verify that we have started and that our reason is cleared

+ 1 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

@@ -190,7 +190,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
             DATA_FRAME_ORIGIN,
             DataFrameAuditMessage.builder()));
         dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry));
-        dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client));
+        dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client, dataFrameTransformsConfigManager.get()));
 
         return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameAuditor.get(), dataFrameTransformsCheckpointService.get());
     }

+ 20 - 8
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java

@@ -45,7 +45,9 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStats
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
+import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
@@ -71,14 +73,18 @@ public class TransportGetDataFrameTransformsStatsAction extends
 
     private final Client client;
     private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
+    private final DataFrameTransformsCheckpointService transformsCheckpointService;
+
     @Inject
     public TransportGetDataFrameTransformsStatsAction(TransportService transportService, ActionFilters actionFilters,
                                                       ClusterService clusterService, Client client,
-                                                      DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
+                                                      DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
+                                                      DataFrameTransformsCheckpointService transformsCheckpointService) {
         super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
                 Response::new, ThreadPool.Names.SAME);
         this.client = client;
         this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
+        this.transformsCheckpointService = transformsCheckpointService;
     }
 
     @Override
@@ -93,16 +99,22 @@ public class TransportGetDataFrameTransformsStatsAction extends
 
     @Override
     protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener<Response> listener) {
-        List<DataFrameTransformStateAndStats> transformsStateAndStats = Collections.emptyList();
-
         // Little extra insurance, make sure we only return transforms that aren't cancelled
         if (task.isCancelled() == false) {
-            DataFrameTransformStateAndStats transformStateAndStats = new DataFrameTransformStateAndStats(task.getTransformId(),
-                    task.getState(), task.getStats());
-            transformsStateAndStats = Collections.singletonList(transformStateAndStats);
+            transformsCheckpointService.getCheckpointStats(task.getTransformId(), task.getCheckpoint(), task.getInProgressCheckpoint(),
+                    ActionListener.wrap(checkpointStats -> {
+                        listener.onResponse(new Response(Collections.singletonList(
+                        new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(), task.getStats(), checkpointStats))));
+            }, e -> {
+                    listener.onResponse(new Response(
+                        Collections.singletonList(new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(),
+                                task.getStats(), DataFrameTransformCheckpointingInfo.EMPTY)),
+                        Collections.emptyList(),
+                        Collections.singletonList(new ElasticsearchException("Failed to retrieve checkpointing info", e))));
+            }));
+        } else {
+            listener.onResponse(new Response(Collections.emptyList()));
         }
-
-        listener.onResponse(new Response(transformsStateAndStats));
     }
 
     @Override

+ 105 - 2
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.dataframe.checkpoint;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.action.admin.indices.get.GetIndexAction;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
@@ -16,14 +17,19 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.xpack.core.ClientHelper;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
+import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpoint;
 
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * DataFrameTransform Checkpoint Service
@@ -35,12 +41,24 @@ import java.util.TreeMap;
  */
 public class DataFrameTransformsCheckpointService {
 
+    private class Checkpoints {
+        DataFrameTransformCheckpoint currentCheckpoint = DataFrameTransformCheckpoint.EMPTY;
+        DataFrameTransformCheckpoint inProgressCheckpoint = DataFrameTransformCheckpoint.EMPTY;
+        DataFrameTransformCheckpoint sourceCheckpoint = DataFrameTransformCheckpoint.EMPTY;
+    }
+
     private static final Logger logger = LogManager.getLogger(DataFrameTransformsCheckpointService.class);
 
+    // timeout for retrieving checkpoint information
+    private static final int CHECKPOINT_STATS_TIMEOUT_SECONDS = 5;
+
     private final Client client;
+    private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
 
-    public DataFrameTransformsCheckpointService(final Client client) {
+    public DataFrameTransformsCheckpointService(final Client client,
+            final DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
         this.client = client;
+        this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
     }
 
     /**
@@ -84,6 +102,7 @@ public class DataFrameTransformsCheckpointService {
                                 Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
                                 DataFrameTransformCheckpoint checkpointDoc = new DataFrameTransformCheckpoint(transformConfig.getId(),
                                         timestamp, checkpoint, checkpointsByIndex, timeUpperBound);
+
                                 listener.onResponse(checkpointDoc);
 
                             }, IndicesStatsRequestException -> {
@@ -96,6 +115,90 @@ public class DataFrameTransformsCheckpointService {
 
     }
 
+    /**
+     * Get checkpointing stats for a data frame
+     *
+     * Implementation details:
+     *  - fires up to 3 requests _in parallel_ rather than cascading them
+     *
+     * @param transformId The data frame task
+     * @param currentCheckpoint the current checkpoint
+     * @param inProgressCheckpoint in progress checkpoint
+     * @param listener listener to retrieve the result
+     */
+    public void getCheckpointStats(
+            String transformId,
+            long currentCheckpoint,
+            long inProgressCheckpoint,
+            ActionListener<DataFrameTransformCheckpointingInfo> listener) {
+
+        // process in parallel: current checkpoint, in-progress checkpoint, current state of the source
+        CountDownLatch latch = new CountDownLatch(3);
+
+        // ensure listener is called exactly once
+        final ActionListener<DataFrameTransformCheckpointingInfo> wrappedListener = ActionListener.notifyOnce(listener);
+
+        // holder structure for writing the results of the 3 parallel tasks
+        Checkpoints checkpoints = new Checkpoints();
+
+        // get the current checkpoint
+        if (currentCheckpoint != 0) {
+            dataFrameTransformsConfigManager.getTransformCheckpoint(transformId, currentCheckpoint,
+                    new LatchedActionListener<>(ActionListener.wrap(checkpoint -> checkpoints.currentCheckpoint = checkpoint, e -> {
+                        logger.debug("Failed to retrieve checkpoint [" + currentCheckpoint + "] for data frame []" + transformId, e);
+                        wrappedListener
+                                .onFailure(new CheckpointException("Failed to retrieve current checkpoint [" + currentCheckpoint + "]", e));
+                    }), latch));
+        } else {
+            latch.countDown();
+        }
+
+        // get the in-progress checkpoint
+        if (inProgressCheckpoint != 0) {
+            dataFrameTransformsConfigManager.getTransformCheckpoint(transformId, inProgressCheckpoint,
+                    new LatchedActionListener<>(ActionListener.wrap(checkpoint -> checkpoints.inProgressCheckpoint = checkpoint, e -> {
+                        logger.debug("Failed to retrieve in progress checkpoint [" + inProgressCheckpoint + "] for data frame ["
+                                + transformId + "]", e);
+                        wrappedListener.onFailure(
+                                new CheckpointException("Failed to retrieve in progress checkpoint [" + inProgressCheckpoint + "]", e));
+                    }), latch));
+        } else {
+            latch.countDown();
+        }
+
+        // get the current state
+        dataFrameTransformsConfigManager.getTransformConfiguration(transformId, ActionListener.wrap(transformConfig -> {
+            getCheckpoint(transformConfig,
+                    new LatchedActionListener<>(ActionListener.wrap(checkpoint -> checkpoints.sourceCheckpoint = checkpoint, e2 -> {
+                        logger.debug("Failed to retrieve actual checkpoint for data frame [" + transformId + "]", e2);
+                        wrappedListener.onFailure(new CheckpointException("Failed to retrieve actual checkpoint", e2));
+                    }), latch));
+        }, e -> {
+            logger.warn("Failed to retrieve configuration for data frame [" + transformId + "]", e);
+            wrappedListener.onFailure(new CheckpointException("Failed to retrieve configuration", e));
+            latch.countDown();
+        }));
+
+        try {
+            if (latch.await(CHECKPOINT_STATS_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+                logger.debug("Retrieval of checkpoint information succeeded for data frame [" + transformId + "]");
+                wrappedListener.onResponse(new DataFrameTransformCheckpointingInfo(
+                            new DataFrameTransformCheckpointStats(checkpoints.currentCheckpoint.getTimestamp(),
+                                    checkpoints.currentCheckpoint.getTimeUpperBound()),
+                            new DataFrameTransformCheckpointStats(checkpoints.inProgressCheckpoint.getTimestamp(),
+                                    checkpoints.inProgressCheckpoint.getTimeUpperBound()),
+                            DataFrameTransformCheckpoint.getBehind(checkpoints.currentCheckpoint, checkpoints.sourceCheckpoint)));
+            } else {
+                // timed out
+                logger.debug("Retrieval of checkpoint information has timed out for data frame [" + transformId + "]");
+                wrappedListener.onFailure(new CheckpointException("Retrieval of checkpoint information has timed out"));
+            }
+        } catch (InterruptedException e) {
+            logger.debug("Failed to retrieve checkpoints for data frame [" + transformId + "]", e);
+            wrappedListener.onFailure(new CheckpointException("Failure during checkpoint info retrieval", e));
+        }
+    }
+
     static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices) {
         Map<String, TreeMap<Integer, Long>> checkpointsByIndex = new TreeMap<>();
 

+ 2 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java

@@ -45,8 +45,8 @@ import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpoint;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -148,6 +148,7 @@ public class DataFrameTransformsConfigManager {
 
             if (getResponse.isExists() == false) {
                 // do not fail if checkpoint does not exist but return an empty checkpoint
+                logger.trace("found no checkpoint for transform [" + transformId + "], returning empty checkpoint");
                 resultListener.onResponse(DataFrameTransformCheckpoint.EMPTY);
                 return;
             }

+ 12 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java

@@ -51,12 +51,23 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
 
     protected abstract Map<String, String> getFieldMappings();
 
+    /**
+     * Request a checkpoint
+     */
+    protected abstract void createCheckpoint(ActionListener<Void> listener);
+
     @Override
     protected void onStart(long now, ActionListener<Void> listener) {
         try {
             QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();
             pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
-            listener.onResponse(null);
+
+            // if run for the 1st time, create checkpoint
+            if (getPosition() == null) {
+                createCheckpoint(listener);
+            } else {
+                listener.onResponse(null);
+            }
         } catch (Exception e) {
             listener.onFailure(e);
         }

+ 82 - 26
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java → x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformCheckpoint.java

@@ -4,7 +4,7 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.core.dataframe.transforms;
+package org.elasticsearch.xpack.dataframe.transforms;
 
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.common.ParseField;
@@ -35,7 +35,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
  * The fields:
  *
  *  timestamp the timestamp when this document has been created
- *  checkpoint the checkpoint number, incremented for every checkpoint
+ *  checkpoint the checkpoint number, incremented for every checkpoint, if -1 this is a non persisted checkpoint
  *  indices a map of the indices from the source including all checkpoints of all indices matching the source pattern, shard level
  *  time_upper_bound for time-based indices this holds the upper time boundary of this checkpoint
  *
@@ -44,21 +44,12 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
 
     public static DataFrameTransformCheckpoint EMPTY = new DataFrameTransformCheckpoint("empty", 0L, -1L, Collections.emptyMap(), 0L);
 
-    // the timestamp of the checkpoint, mandatory
-    public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
-    public static final ParseField TIMESTAMP = new ParseField("timestamp");
-
     // the own checkpoint
     public static final ParseField CHECKPOINT = new ParseField("checkpoint");
 
     // checkpoint of the indexes (sequence id's)
     public static final ParseField INDICES = new ParseField("indices");
 
-    // checkpoint for for time based sync
-    // TODO: consider a lower bound for usecases where you want to transform on a window of a stream
-    public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
-    public static final ParseField TIME_UPPER_BOUND = new ParseField("time_upper_bound");
-
     private static final String NAME = "data_frame_transform_checkpoint";
 
     private static final ConstructingObjectParser<DataFrameTransformCheckpoint, Void> STRICT_PARSER = createParser(false);
@@ -89,7 +80,7 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
         parser.declareString(constructorArg(), DataFrameField.ID);
 
         // note: this is never parsed from the outside where timestamp can be formatted as date time
-        parser.declareLong(constructorArg(), TIMESTAMP_MILLIS);
+        parser.declareLong(constructorArg(), DataFrameField.TIMESTAMP_MILLIS);
         parser.declareLong(constructorArg(), CHECKPOINT);
 
         parser.declareObject(constructorArg(), (p,c) -> {
@@ -111,7 +102,7 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
             }
             return checkPointsByIndexName;
         }, INDICES);
-        parser.declareLong(optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS);
+        parser.declareLong(optionalConstructorArg(), DataFrameField.TIME_UPPER_BOUND_MILLIS);
         parser.declareString(optionalConstructorArg(), DataFrameField.INDEX_DOC_TYPE);
 
         return parser;
@@ -134,29 +125,47 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
         this.timeUpperBoundMillis = in.readLong();
     }
 
+    public boolean isEmpty() {
+        return indicesCheckpoints.isEmpty();
+    }
+
+    /**
+     * Whether this checkpoint is a transient (non persisted) checkpoint
+     *
+     * @return true if this is a transient checkpoint, false otherwise
+     */
+    public boolean isTransient() {
+        return checkpoint == -1;
+    }
+
+    /**
+     * Create XContent for the purpose of storing it in the internal index
+     *
+     * Note:
+     * @param builder the {@link XContentBuilder}
+     * @param params builder specific parameters
+     *
+     * @return builder instance
+     */
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
 
-        // the id, doc_type and checkpoint is only internally used for storage, the user-facing version gets embedded
-        if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
-            builder.field(DataFrameField.ID.getPreferredName(), transformId);
-            builder.field(CHECKPOINT.getPreferredName(), checkpoint);
-            builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
-        }
-
-        builder.timeField(TIMESTAMP_MILLIS.getPreferredName(), TIMESTAMP.getPreferredName(), timestampMillis);
-
-        if (timeUpperBoundMillis > 0) {
-            builder.timeField(TIME_UPPER_BOUND_MILLIS.getPreferredName(), TIME_UPPER_BOUND.getPreferredName(), timeUpperBoundMillis);
-        }
-
+        builder.field(DataFrameField.ID.getPreferredName(), transformId);
+        builder.field(CHECKPOINT.getPreferredName(), checkpoint);
+        builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
         builder.startObject(INDICES.getPreferredName());
         for (Entry<String, long[]> entry : indicesCheckpoints.entrySet()) {
             builder.array(entry.getKey(), entry.getValue());
         }
         builder.endObject();
 
+        builder.field(DataFrameField.TIMESTAMP_MILLIS.getPreferredName(), timestampMillis);
+
+        if (timeUpperBoundMillis > 0) {
+            builder.field(DataFrameField.TIME_UPPER_BOUND_MILLIS.getPreferredName(), timeUpperBoundMillis);
+        }
+
         builder.endObject();
         return builder;
     }
@@ -249,6 +258,53 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
         return NAME + "-" + transformId + "-" + checkpoint;
     }
 
+    /**
+     * Calculate the diff of 2 checkpoints
+     *
+     * This is to get an indicator for the difference between checkpoints.
+     *
+     * Note: order is important
+     *
+     * @param oldCheckpoint the older checkpoint, if transient, newer must be transient, too
+     * @param newCheckpoint the newer checkpoint, can be a transient checkpoint
+     *
+     * @return count number of operations the checkpoint is behind or -1L if it could not calculate the difference
+     */
+    public static long getBehind(DataFrameTransformCheckpoint oldCheckpoint, DataFrameTransformCheckpoint newCheckpoint) {
+        if (oldCheckpoint.isTransient()) {
+            if (newCheckpoint.isTransient() == false) {
+                throw new IllegalArgumentException("can not compare transient against a non transient checkpoint");
+            } // else: both are transient
+        } else if (newCheckpoint.isTransient() == false && oldCheckpoint.getCheckpoint() > newCheckpoint.getCheckpoint()) {
+            throw new IllegalArgumentException("old checkpoint is newer than new checkpoint");
+        }
+
+        // all old indices must be contained in the new ones but not vice versa
+        if (newCheckpoint.indicesCheckpoints.keySet().containsAll(oldCheckpoint.indicesCheckpoints.keySet()) == false) {
+            return -1L;
+        }
+
+        // get the sum of of shard checkpoints
+        // note: we require shard checkpoints to strictly increase and never decrease
+        long oldCheckPointSum = 0;
+        long newCheckPointSum = 0;
+
+        for (long[] v : oldCheckpoint.indicesCheckpoints.values()) {
+            oldCheckPointSum += Arrays.stream(v).sum();
+        }
+
+        for (long[] v : newCheckpoint.indicesCheckpoints.values()) {
+            newCheckPointSum += Arrays.stream(v).sum();
+        }
+
+        // this should not be possible
+        if (newCheckPointSum < oldCheckPointSum) {
+            return -1L;
+        }
+
+        return newCheckPointSum - oldCheckPointSum;
+    }
+
     private static Map<String, long[]> readCheckpoints(Map<String, Object> readMap) {
         Map<String, long[]> checkpoints = new TreeMap<>();
         for (Map.Entry<String, Object> e : readMap.entrySet()) {

+ 46 - 25
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 
@@ -68,10 +69,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
     private final AtomicReference<DataFrameTransformTaskState> taskState;
     private final AtomicReference<String> stateReason;
-    // the generation of this data frame, for v1 there will be only
-    // 0: data frame not created or still indexing
-    // 1: data frame complete, all data has been indexed
-    private final AtomicReference<Long> generation;
+    // the checkpoint of this data frame, storing the checkpoint until data indexing from source to dest is _complete_
+    // Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
+    private final AtomicLong currentCheckpoint;
     private final AtomicInteger failureCount;
 
     public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform,
@@ -105,12 +105,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 initialState = existingState;
             }
             initialPosition = state.getPosition();
-            initialGeneration = state.getGeneration();
+            initialGeneration = state.getCheckpoint();
         }
 
         this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService,
             new AtomicReference<>(initialState), initialPosition, client, auditor);
-        this.generation = new AtomicReference<>(initialGeneration);
+        this.currentCheckpoint = new AtomicLong(initialGeneration);
         this.previousStats = new DataFrameIndexerTransformStats(transform.getId());
         this.taskState = new AtomicReference<>(initialTaskState);
         this.stateReason = new AtomicReference<>(initialReason);
@@ -130,7 +130,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     }
 
     public DataFrameTransformState getState() {
-        return new DataFrameTransformState(taskState.get(), indexer.getState(), indexer.getPosition(), generation.get(), stateReason.get());
+        return new DataFrameTransformState(
+                taskState.get(),
+                indexer.getState(),
+                indexer.getPosition(),
+                currentCheckpoint.get(),
+                stateReason.get());
     }
 
     void initializePreviousStats(DataFrameIndexerTransformStats stats) {
@@ -141,8 +146,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         return new DataFrameIndexerTransformStats(previousStats).merge(indexer.getStats());
     }
 
-    public long getGeneration() {
-        return generation.get();
+    public long getCheckpoint() {
+        return currentCheckpoint.get();
+    }
+
+    /**
+     * Get the in-progress checkpoint
+     *
+     * @return checkpoint in progress or 0 if task/indexer is not active
+     */
+    public long getInProgressCheckpoint() {
+        return indexer.getState().equals(IndexerState.INDEXING) ? currentCheckpoint.get() + 1L : 0;
     }
 
     public boolean isStopped() {
@@ -164,7 +178,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             DataFrameTransformTaskState.STARTED,
             IndexerState.STOPPED,
             indexer.getPosition(),
-            generation.get(),
+            currentCheckpoint.get(),
             null);
 
         logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
@@ -203,7 +217,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 DataFrameTransformTaskState.STOPPED,
                 IndexerState.STOPPED,
                 indexer.getPosition(),
-                generation.get(),
+                currentCheckpoint.get(),
                 stateReason.get());
             persistStateToClusterState(state, ActionListener.wrap(
                 task -> {
@@ -224,7 +238,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
     @Override
     public synchronized void triggered(Event event) {
-        if (generation.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
+        //  for now no rerun, so only trigger if checkpoint == 0
+        if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
             logger.debug("Data frame indexer [" + event.getJobName() + "] schedule has triggered, state: [" + indexer.getState() + "]");
             indexer.maybeTriggerAsyncJob(System.currentTimeMillis());
         }
@@ -298,6 +313,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
     protected class ClientDataFrameIndexer extends DataFrameIndexer {
         private static final int LOAD_TRANSFORM_TIMEOUT_IN_SECONDS = 30;
+        private static final int CREATE_CHECKPOINT_TIMEOUT_IN_SECONDS = 30;
+
         private final Client client;
         private final DataFrameTransformsConfigManager transformsConfigManager;
         private final DataFrameTransformsCheckpointService transformsCheckpointService;
@@ -413,21 +430,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 return;
             }
 
-            if(indexerState.equals(IndexerState.STARTED) && getStats().getNumDocuments() > 0) {
-                // if the indexer resets the state to started, it means it is done with a run through the data.
-                // But, if there were no documents, we should allow it to attempt to gather more again, as there is no risk of overwriting
-                // Some reasons for no documents are (but is not limited to):
-                // * Could have failed early on search or index
-                // * Have an empty index
-                // * Have a query that returns no documents
-                generation.compareAndSet(0L, 1L);
-            }
-
             final DataFrameTransformState state = new DataFrameTransformState(
                 taskState.get(),
                 indexerState,
                 getPosition(),
-                generation.get(),
+                currentCheckpoint.get(),
                 stateReason.get());
             logger.info("Updating persistent state of transform [" + transform.getId() + "] to [" + state.toString() + "]");
 
@@ -480,8 +487,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         @Override
         protected void onFinish(ActionListener<Void> listener) {
             try {
-                auditor.info(transform.getId(), "Finished indexing for data frame transform");
-                logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
+                long checkpoint = currentCheckpoint.incrementAndGet();
+                auditor.info(transform.getId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]");
+                logger.info("Finished indexing for data frame transform [" + transform.getId() + "] checkpoint [" + checkpoint + "]");
                 listener.onResponse(null);
             } catch (Exception e) {
                 listener.onFailure(e);
@@ -494,6 +502,19 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             logger.info("Data frame transform [" + transform.getId() + "] received abort request, stopping indexer");
             shutdown();
         }
+
+        @Override
+        protected void createCheckpoint(ActionListener<Void> listener) {
+            transformsCheckpointService.getCheckpoint(transformConfig, currentCheckpoint.get() + 1, ActionListener.wrap(checkpoint -> {
+                transformsConfigManager.putTransformCheckpoint(checkpoint, ActionListener.wrap(putCheckPointResponse -> {
+                    listener.onResponse(null);
+                }, createCheckpointException -> {
+                    listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException));
+                }));
+            }, getCheckPointException -> {
+                listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException));
+            }));
+        }
     }
 
     class DataFrameConfigurationException extends RuntimeException {

+ 2 - 2
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameSingleNodeTestCase.java → x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameSingleNodeTestCase.java

@@ -4,7 +4,7 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.dataframe.persistence;
+package org.elasticsearch.xpack.dataframe;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
@@ -15,7 +15,7 @@ import org.elasticsearch.index.reindex.ReindexPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.xpack.core.template.TemplateUtils;
-import org.elasticsearch.xpack.dataframe.LocalStateDataFrame;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
 import org.junit.Before;
 
 import java.util.Collection;

+ 258 - 0
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformCheckpointServiceNodeTests.java

@@ -0,0 +1,258 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.checkpoint;
+
+import org.elasticsearch.action.Action;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
+import org.elasticsearch.action.admin.indices.stats.CommonStats;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.cache.query.QueryCacheStats;
+import org.elasticsearch.index.cache.request.RequestCacheStats;
+import org.elasticsearch.index.engine.SegmentsStats;
+import org.elasticsearch.index.fielddata.FieldDataStats;
+import org.elasticsearch.index.flush.FlushStats;
+import org.elasticsearch.index.get.GetStats;
+import org.elasticsearch.index.merge.MergeStats;
+import org.elasticsearch.index.refresh.RefreshStats;
+import org.elasticsearch.index.search.stats.SearchStats;
+import org.elasticsearch.index.seqno.SeqNoStats;
+import org.elasticsearch.index.shard.DocsStats;
+import org.elasticsearch.index.shard.IndexingStats;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
+import org.elasticsearch.index.store.StoreStats;
+import org.elasticsearch.index.warmer.WarmerStats;
+import org.elasticsearch.search.suggest.completion.CompletionStats;
+import org.elasticsearch.test.client.NoOpClient;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
+import org.elasticsearch.xpack.dataframe.DataFrameSingleNodeTestCase;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
+import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpoint;
+import org.junit.After;
+import org.junit.Before;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DataFrameTransformCheckpointServiceNodeTests extends DataFrameSingleNodeTestCase {
+
+    private DataFrameTransformsConfigManager transformsConfigManager;
+    private MockClientForCheckpointing mockClientForCheckpointing;
+    private DataFrameTransformsCheckpointService transformsCheckpointService;
+
+    private class MockClientForCheckpointing extends NoOpClient {
+
+        private ShardStats[] shardStats;
+        private String[] indices;
+
+        MockClientForCheckpointing(String testName) {
+            super(testName);
+        }
+
+        public void setShardStats(ShardStats[] shardStats) {
+            this.shardStats = shardStats;
+
+            Set<String> indices = new HashSet<>();
+            for (ShardStats s:shardStats) {
+                indices.add(s.getShardRouting().getIndexName());
+            }
+
+            this.indices = indices.toArray(new String[0]);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(Action<Response> action, Request request,
+                ActionListener<Response> listener) {
+
+            if (request instanceof GetIndexRequest) {
+                // for this test we only need the indices
+                final GetIndexResponse indexResponse = new GetIndexResponse(indices, null, null, null, null);
+
+                listener.onResponse((Response) indexResponse);
+                return;
+            } else if (request instanceof IndicesStatsRequest) {
+
+                // IndicesStatsResponse is package private, therefore using a mock
+                final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
+                when(indicesStatsResponse.getShards()).thenReturn(shardStats);
+                when(indicesStatsResponse.getFailedShards()).thenReturn(0);
+
+
+                listener.onResponse((Response) indicesStatsResponse);
+                return;
+            }
+
+            super.doExecute(action, request, listener);
+        }
+    }
+
+    @Before
+    public void createComponents() {
+        transformsConfigManager = new DataFrameTransformsConfigManager(client(), xContentRegistry());
+
+        // use a mock for the checkpoint service
+        mockClientForCheckpointing = new MockClientForCheckpointing(getTestName());
+        transformsCheckpointService = new DataFrameTransformsCheckpointService(mockClientForCheckpointing, transformsConfigManager);
+    }
+
+    @After
+    public void tearDownClient() {
+        mockClientForCheckpointing.close();
+    }
+
+    public void testCreateReadDeleteCheckpoint() throws InterruptedException {
+        String transformId = randomAlphaOfLengthBetween(3, 10);
+        long timestamp = 1000;
+
+        DataFrameTransformCheckpoint checkpoint = new DataFrameTransformCheckpoint(transformId, timestamp, 1L,
+                createCheckPointMap(transformId, 10, 10, 10), null);
+
+        // create transform
+        assertAsync(
+                listener -> transformsConfigManager
+                        .putTransformConfiguration(DataFrameTransformConfigTests.randomDataFrameTransformConfig(transformId), listener),
+                true, null, null);
+
+        // by design no exception is thrown but an empty checkpoint is returned
+        assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 1L, listener),
+                DataFrameTransformCheckpoint.EMPTY, null, null);
+
+        assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null);
+
+        assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 1L, listener), checkpoint, null, null);
+
+        // add a 2nd checkpoint
+        DataFrameTransformCheckpoint checkpoint2 = new DataFrameTransformCheckpoint(transformId, timestamp + 100L, 2L,
+                createCheckPointMap(transformId, 20, 20, 20), null);
+
+        assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint2, listener), true, null, null);
+
+        // both checkpoints should be there
+        assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 1L, listener), checkpoint, null, null);
+        assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 2L, listener), checkpoint2, null, null);
+
+        // delete transform
+        assertAsync(listener -> transformsConfigManager.deleteTransform(transformId, listener), true, null, null);
+
+        // checkpoints should be empty again
+        assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 1L, listener),
+                DataFrameTransformCheckpoint.EMPTY, null, null);
+
+        assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 2L, listener),
+                DataFrameTransformCheckpoint.EMPTY, null, null);
+    }
+
+    public void testGetCheckpointStats() throws InterruptedException {
+        String transformId = randomAlphaOfLengthBetween(3, 10);
+        long timestamp = 1000;
+
+        // create transform
+        assertAsync(
+                listener -> transformsConfigManager
+                        .putTransformConfiguration(DataFrameTransformConfigTests.randomDataFrameTransformConfig(transformId), listener),
+                true, null, null);
+
+        DataFrameTransformCheckpoint checkpoint = new DataFrameTransformCheckpoint(transformId, timestamp, 1L,
+                createCheckPointMap(transformId, 10, 10, 10), null);
+
+        assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null);
+
+        DataFrameTransformCheckpoint checkpoint2 = new DataFrameTransformCheckpoint(transformId, timestamp + 100L, 2L,
+                createCheckPointMap(transformId, 20, 20, 20), null);
+
+        assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint2, listener), true, null, null);
+
+        mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 20, 20, 20)));
+        DataFrameTransformCheckpointingInfo checkpointInfo = new DataFrameTransformCheckpointingInfo(
+                new DataFrameTransformCheckpointStats(timestamp, 0L),
+                new DataFrameTransformCheckpointStats(timestamp + 100L, 0L),
+                30L);
+
+        assertAsync(listener -> transformsCheckpointService.getCheckpointStats(transformId, 1, 2, listener), checkpointInfo, null, null);
+
+        mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 50, 33)));
+        checkpointInfo = new DataFrameTransformCheckpointingInfo(
+                new DataFrameTransformCheckpointStats(timestamp, 0L),
+                new DataFrameTransformCheckpointStats(timestamp + 100L, 0L),
+                63L);
+        assertAsync(listener -> transformsCheckpointService.getCheckpointStats(transformId, 1, 2, listener), checkpointInfo, null, null);
+
+        // same as current
+        mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 10, 10)));
+        checkpointInfo = new DataFrameTransformCheckpointingInfo(
+                new DataFrameTransformCheckpointStats(timestamp, 0L),
+                new DataFrameTransformCheckpointStats(timestamp + 100L, 0L),
+                0L);
+        assertAsync(listener -> transformsCheckpointService.getCheckpointStats(transformId, 1, 2, listener), checkpointInfo, null, null);
+    }
+
+    private static Map<String, long[]> createCheckPointMap(String index, long checkpointShard1, long checkpointShard2,
+            long checkpointShard3) {
+        return Collections.singletonMap(index, new long[] { checkpointShard1, checkpointShard2, checkpointShard3 });
+    }
+
+    private static ShardStats[] createShardStats(Map<String, long[]> checkpoints) {
+        List<ShardStats> shardStats = new ArrayList<>();
+
+        for (Entry<String, long[]> entry : checkpoints.entrySet()) {
+
+            for (int i = 0; i < entry.getValue().length; ++i) {
+                long checkpoint = entry.getValue()[i];
+                CommonStats stats = new CommonStats();
+                stats.fieldData = new FieldDataStats();
+                stats.queryCache = new QueryCacheStats();
+                stats.docs = new DocsStats();
+                stats.store = new StoreStats();
+                stats.indexing = new IndexingStats();
+                stats.search = new SearchStats();
+                stats.segments = new SegmentsStats();
+                stats.merge = new MergeStats();
+                stats.refresh = new RefreshStats();
+                stats.completion = new CompletionStats();
+                stats.requestCache = new RequestCacheStats();
+                stats.get = new GetStats();
+                stats.flush = new FlushStats();
+                stats.warmer = new WarmerStats();
+
+                SeqNoStats seqNoStats = new SeqNoStats(checkpoint, checkpoint, checkpoint);
+                Index index = new Index(entry.getKey(), UUIDs.randomBase64UUID(random()));
+                ShardId shardId = new ShardId(index, i);
+                ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
+                        new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
+                Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(i));
+
+                shardStats.add(new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, seqNoStats, null));
+            }
+
+        }
+        return shardStats.toArray(new ShardStats[0]);
+    }
+
+}

+ 3 - 2
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java

@@ -10,10 +10,11 @@ import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointTests;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
+import org.elasticsearch.xpack.dataframe.DataFrameSingleNodeTestCase;
+import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpoint;
+import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpointTests;
 import org.junit.Before;
 
 import java.util.Arrays;

+ 83 - 33
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java → x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformCheckpointTests.java

@@ -4,18 +4,17 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.core.dataframe.transforms;
+package org.elasticsearch.xpack.dataframe.transforms;
 
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.dataframe.transforms.AbstractSerializingDataFrameTestCase;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -44,11 +43,6 @@ public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFr
         return DataFrameTransformCheckpoint::new;
     }
 
-    @Override
-    protected ToXContent.Params getToXContentParams() {
-        return TO_XCONTENT_PARAMS;
-    }
-
     public void testXContentForInternalStorage() throws IOException {
         DataFrameTransformCheckpoint dataFrameTransformCheckpoints = randomDataFrameTransformCheckpoints();
 
@@ -58,28 +52,6 @@ public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFr
 
             assertThat(doc, matchesPattern(".*\"doc_type\"\\s*:\\s*\"data_frame_transform_checkpoint\".*"));
         }
-
-        try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
-            XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
-            String doc = Strings.toString(content);
-
-            assertFalse(doc.contains("doc_type"));
-        }
-    }
-
-    public void testXContentForApiUsage() throws IOException {
-        DataFrameTransformCheckpoint dataFrameTransformCheckpoints = new DataFrameTransformCheckpoint(randomAlphaOfLengthBetween(1, 10),
-                1546300800000L, randomNonNegativeLong(), Collections.emptyMap(), 1545609600000L);
-
-        try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
-            xContentBuilder.humanReadable(true);
-            XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
-            String doc = Strings.toString(content);
-            assertThat(doc, matchesPattern(".*\"timestamp_millis\"\\s*:\\s*1546300800000.*"));
-            assertThat(doc, matchesPattern(".*\"time_upper_bound_millis\"\\s*:\\s*1545609600000.*"));
-            assertThat(doc, matchesPattern(".*\"timestamp\"\\s*:\\s*\"2019-01-01T00:00:00.000Z\".*"));
-            assertThat(doc, matchesPattern(".*\"time_upper_bound\"\\s*:\\s*\"2018-12-24T00:00:00.000Z\".*"));
-        }
     }
 
     public void testMatches() throws IOException {
@@ -119,12 +91,90 @@ public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFr
                 .matches(new DataFrameTransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1)));
     }
 
+    public void testGetBehind() {
+        String id = randomAlphaOfLengthBetween(1, 10);
+        long timestamp = randomNonNegativeLong();
+
+        TreeMap<String, long[]> checkpointsByIndexOld = new TreeMap<>();
+        TreeMap<String, long[]> checkpointsByIndexNew = new TreeMap<>();
+
+        int indices = randomIntBetween(3, 10);
+        int shards = randomIntBetween(1, 20);
+
+        for (int i = 0; i < indices; ++i) {
+            List<Long> checkpoints1 = new ArrayList<>();
+            List<Long> checkpoints2 = new ArrayList<>();
+
+            for (int j = 0; j < shards; ++j) {
+                long shardCheckpoint = randomLongBetween(0, 1_000_000);
+                checkpoints1.add(shardCheckpoint);
+                checkpoints2.add(shardCheckpoint + 10);
+            }
+
+            String indexName = randomAlphaOfLengthBetween(1, 10);
+
+            checkpointsByIndexOld.put(indexName, checkpoints1.stream().mapToLong(l -> l).toArray());
+            checkpointsByIndexNew.put(indexName, checkpoints2.stream().mapToLong(l -> l).toArray());
+        }
+
+        long checkpoint = randomLongBetween(10, 100);
+
+        DataFrameTransformCheckpoint checkpointOld = new DataFrameTransformCheckpoint(
+                id, timestamp, checkpoint, checkpointsByIndexOld, 0L);
+        DataFrameTransformCheckpoint checkpointTransientNew = new DataFrameTransformCheckpoint(
+                id, timestamp, -1L, checkpointsByIndexNew, 0L);
+        DataFrameTransformCheckpoint checkpointNew = new DataFrameTransformCheckpoint(
+                id, timestamp, checkpoint + 1, checkpointsByIndexNew, 0L);
+        DataFrameTransformCheckpoint checkpointOlderButNewerShardsCheckpoint = new DataFrameTransformCheckpoint(
+                id, timestamp, checkpoint - 1, checkpointsByIndexNew, 0L);
+
+        assertEquals(indices * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
+        assertEquals(indices * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointNew));
+
+        // no difference for same checkpoints, transient or not
+        assertEquals(0L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointOld));
+        assertEquals(0L, DataFrameTransformCheckpoint.getBehind(checkpointTransientNew, checkpointTransientNew));
+        assertEquals(0L, DataFrameTransformCheckpoint.getBehind(checkpointNew, checkpointNew));
+
+        // new vs transient new: ok
+        assertEquals(0L, DataFrameTransformCheckpoint.getBehind(checkpointNew, checkpointTransientNew));
+
+        // transient new vs new: illegal
+        Exception e = expectThrows(IllegalArgumentException.class,
+                () -> DataFrameTransformCheckpoint.getBehind(checkpointTransientNew, checkpointNew));
+        assertEquals("can not compare transient against a non transient checkpoint", e.getMessage());
+
+        // new vs old: illegal
+        e = expectThrows(IllegalArgumentException.class, () -> DataFrameTransformCheckpoint.getBehind(checkpointNew, checkpointOld));
+        assertEquals("old checkpoint is newer than new checkpoint", e.getMessage());
+
+        // corner case: the checkpoint appears older but the inner shard checkpoints are newer
+        assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOlderButNewerShardsCheckpoint, checkpointOld));
+
+        // test cases where indices sets do not match
+        // remove something from old, so newer has 1 index more than old
+        checkpointsByIndexOld.remove(checkpointsByIndexOld.firstKey());
+        long behind = DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew);
+        assertTrue("Expected behind (" + behind + ") > sum of shard checkpoints (" + indices * shards * 10L + ")",
+                behind > indices * shards * 10L);
+
+        // remove same key: old and new should have equal indices again
+        checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
+        assertEquals((indices - 1) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
+
+        // remove 1st index from new, now old has 1 index more, behind can not be calculated
+        checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
+        assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
+    }
+
     private static Map<String, long[]> randomCheckpointsByIndex() {
         Map<String, long[]> checkpointsByIndex = new TreeMap<>();
-        for (int i = 0; i < randomIntBetween(1, 10); ++i) {
+        int indices = randomIntBetween(1, 10);
+        for (int i = 0; i < indices; ++i) {
             List<Long> checkpoints = new ArrayList<>();
-            for (int j = 0; j < randomIntBetween(1, 20); ++j) {
-                checkpoints.add(randomNonNegativeLong());
+            int shards = randomIntBetween(1, 20);
+            for (int j = 0; j < shards; ++j) {
+                checkpoints.add(randomLongBetween(0, 1_000_000));
             }
             checkpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), checkpoints.stream().mapToLong(l -> l).toArray());
         }

+ 2 - 2
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

@@ -48,7 +48,7 @@ teardown:
   - match: { transforms.0.id: "airline-transform-stats" }
   - match: { transforms.0.state.indexer_state: "started" }
   - match: { transforms.0.state.task_state: "started" }
-  - match: { transforms.0.state.generation: 0 }
+  - match: { transforms.0.state.checkpoint: 0 }
   - match: { transforms.0.stats.pages_processed: 0 }
   - match: { transforms.0.stats.documents_processed: 0 }
   - match: { transforms.0.stats.documents_indexed: 0 }
@@ -196,7 +196,7 @@ teardown:
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-stats-dos" }
   - match: { transforms.0.state.indexer_state: "stopped" }
-  - match: { transforms.0.state.generation: 0 }
+  - match: { transforms.0.state.checkpoint: 0 }
   - match: { transforms.0.stats.pages_processed: 0 }
   - match: { transforms.0.stats.documents_processed: 0 }
   - match: { transforms.0.stats.documents_indexed: 0 }