Ver código fonte

[ML-DataFrame] Rewrite continuous logic to prevent terms count limit (#44219)

Rewrites how continuous data frame transforms calculates and handles buckets that require an update. Instead of storing the whole set in memory, it pages through the updates using a 2nd cursor. This lowers memory consumption and prevents problems with limits at query time (max_terms_count). The list of updates can be re-retrieved in a failure case (#43662)
Hendrik Muhs 6 anos atrás
pai
commit
d4ec21bca1
19 arquivos alterados com 908 adições e 279 exclusões
  1. 2 2
      build.gradle
  2. 99 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPosition.java
  3. 30 16
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java
  4. 76 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPositionTests.java
  5. 6 23
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java
  6. 77 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameIndexerPositionTests.java
  7. 5 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java
  8. 11 4
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java
  9. 3 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java
  10. 118 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerPosition.java
  11. 43 22
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java
  12. 36 23
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java
  13. 67 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerPositionTests.java
  14. 1 21
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java
  15. 222 100
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java
  16. 58 30
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java
  17. 30 31
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java
  18. 12 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java
  19. 12 4
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java

+ 2 - 2
build.gradle

@@ -160,8 +160,8 @@ task verifyVersions {
  * after the backport of the backcompat code is complete.
  */
 
-boolean bwc_tests_enabled = true
-final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
+boolean bwc_tests_enabled = false
+final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/44219" /* place a PR link here when committing bwc changes */
 if (bwc_tests_enabled == false) {
   if (bwc_tests_disabled_issue.isEmpty()) {
     throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

+ 99 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPosition.java

@@ -0,0 +1,99 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+/**
+ * Holds state of the cursors:
+ *
+ *  indexer_position: the position of the indexer querying the source
+ *  bucket_position: the position used for identifying changes
+ */
+public class DataFrameIndexerPosition {
+    public static final ParseField INDEXER_POSITION = new ParseField("indexer_position");
+    public static final ParseField BUCKET_POSITION = new ParseField("bucket_position");
+
+    private final Map<String, Object> indexerPosition;
+    private final Map<String, Object> bucketPosition;
+
+    @SuppressWarnings("unchecked")
+    public static final ConstructingObjectParser<DataFrameIndexerPosition, Void> PARSER = new ConstructingObjectParser<>(
+            "data_frame_indexer_position",
+            true,
+            args -> new DataFrameIndexerPosition((Map<String, Object>) args[0],(Map<String, Object>) args[1]));
+
+    static {
+        PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, INDEXER_POSITION, ValueType.OBJECT);
+        PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, BUCKET_POSITION, ValueType.OBJECT);
+    }
+
+    public DataFrameIndexerPosition(Map<String, Object> indexerPosition, Map<String, Object> bucketPosition) {
+        this.indexerPosition = indexerPosition == null ? null : Collections.unmodifiableMap(indexerPosition);
+        this.bucketPosition = bucketPosition == null ? null : Collections.unmodifiableMap(bucketPosition);
+    }
+
+    public Map<String, Object> getIndexerPosition() {
+        return indexerPosition;
+    }
+
+    public Map<String, Object> getBucketsPosition() {
+        return bucketPosition;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        DataFrameIndexerPosition that = (DataFrameIndexerPosition) other;
+
+        return Objects.equals(this.indexerPosition, that.indexerPosition) &&
+            Objects.equals(this.bucketPosition, that.bucketPosition);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(indexerPosition, bucketPosition);
+    }
+
+    public static DataFrameIndexerPosition fromXContent(XContentParser parser) {
+        try {
+            return PARSER.parse(parser, null);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

+ 30 - 16
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java

@@ -27,8 +27,6 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
 import org.elasticsearch.common.xcontent.XContentParser;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -39,7 +37,10 @@ public class DataFrameTransformState {
 
     private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
     private static final ParseField TASK_STATE = new ParseField("task_state");
+
+    // 7.3 BWC: current_position only exists in 7.2.  In 7.3+ it is replaced by position.
     private static final ParseField CURRENT_POSITION = new ParseField("current_position");
+    private static final ParseField POSITION = new ParseField("position");
     private static final ParseField CHECKPOINT = new ParseField("checkpoint");
     private static final ParseField REASON = new ParseField("reason");
     private static final ParseField PROGRESS = new ParseField("progress");
@@ -48,18 +49,31 @@ public class DataFrameTransformState {
     @SuppressWarnings("unchecked")
     public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
             new ConstructingObjectParser<>("data_frame_transform_state", true,
-                    args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
-                        (IndexerState) args[1],
-                        (Map<String, Object>) args[2],
-                        (long) args[3],
-                        (String) args[4],
-                        (DataFrameTransformProgress) args[5],
-                        (NodeAttributes) args[6]));
+                    args -> {
+                        DataFrameTransformTaskState taskState = (DataFrameTransformTaskState) args[0];
+                        IndexerState indexerState = (IndexerState) args[1];
+                        Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
+                        DataFrameIndexerPosition dataFrameIndexerPosition = (DataFrameIndexerPosition) args[3];
+
+                        // BWC handling, translate current_position to position iff position isn't set
+                        if (bwcCurrentPosition != null && dataFrameIndexerPosition == null) {
+                            dataFrameIndexerPosition = new DataFrameIndexerPosition(bwcCurrentPosition, null);
+                        }
+
+                        long checkpoint = (long) args[4];
+                        String reason = (String) args[5];
+                        DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6];
+                        NodeAttributes node = (NodeAttributes) args[7];
+
+                        return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress,
+                                node);
+                    });
 
     static {
         PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
         PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
         PARSER.declareField(optionalConstructorArg(), (p, c) -> p.mapOrdered(), CURRENT_POSITION, ValueType.OBJECT);
+        PARSER.declareField(optionalConstructorArg(), DataFrameIndexerPosition::fromXContent, POSITION, ValueType.OBJECT);
         PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
         PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT);
@@ -73,21 +87,21 @@ public class DataFrameTransformState {
     private final DataFrameTransformTaskState taskState;
     private final IndexerState indexerState;
     private final long checkpoint;
-    private final Map<String, Object> currentPosition;
+    private final DataFrameIndexerPosition position;
     private final String reason;
     private final DataFrameTransformProgress progress;
     private final NodeAttributes node;
 
     public DataFrameTransformState(DataFrameTransformTaskState taskState,
                                    IndexerState indexerState,
-                                   @Nullable Map<String, Object> position,
+                                   @Nullable DataFrameIndexerPosition position,
                                    long checkpoint,
                                    @Nullable String reason,
                                    @Nullable DataFrameTransformProgress progress,
                                    @Nullable NodeAttributes node) {
         this.taskState = taskState;
         this.indexerState = indexerState;
-        this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
+        this.position = position;
         this.checkpoint = checkpoint;
         this.reason = reason;
         this.progress = progress;
@@ -103,8 +117,8 @@ public class DataFrameTransformState {
     }
 
     @Nullable
-    public Map<String, Object> getPosition() {
-        return currentPosition;
+    public DataFrameIndexerPosition getPosition() {
+        return position;
     }
 
     public long getCheckpoint() {
@@ -140,7 +154,7 @@ public class DataFrameTransformState {
 
         return Objects.equals(this.taskState, that.taskState) &&
             Objects.equals(this.indexerState, that.indexerState) &&
-            Objects.equals(this.currentPosition, that.currentPosition) &&
+            Objects.equals(this.position, that.position) &&
             Objects.equals(this.progress, that.progress) &&
             this.checkpoint == that.checkpoint &&
             Objects.equals(this.node, that.node) &&
@@ -149,7 +163,7 @@ public class DataFrameTransformState {
 
     @Override
     public int hashCode() {
-        return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node);
+        return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node);
     }
 
 }

+ 76 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPositionTests.java

@@ -0,0 +1,76 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+
+public class DataFrameIndexerPositionTests extends ESTestCase {
+
+    public void testFromXContent() throws IOException {
+        xContentTester(this::createParser,
+                DataFrameIndexerPositionTests::randomDataFrameIndexerPosition,
+                DataFrameIndexerPositionTests::toXContent,
+                DataFrameIndexerPosition::fromXContent)
+                .supportsUnknownFields(true)
+                .randomFieldsExcludeFilter(field -> field.equals("indexer_position") ||
+                    field.equals("bucket_position"))
+                .test();
+    }
+
+    public static DataFrameIndexerPosition randomDataFrameIndexerPosition() {
+        return new DataFrameIndexerPosition(randomPositionMap(), randomPositionMap());
+    }
+
+    public static void toXContent(DataFrameIndexerPosition position, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        if (position.getIndexerPosition() != null) {
+            builder.field("indexer_position", position.getIndexerPosition());
+        }
+        if (position.getBucketsPosition() != null) {
+            builder.field("bucket_position", position.getBucketsPosition());
+        }
+        builder.endObject();
+    }
+
+    private static Map<String, Object> randomPositionMap() {
+        if (randomBoolean()) {
+            return null;
+        }
+        int numFields = randomIntBetween(1, 5);
+        Map<String, Object> position = new LinkedHashMap<>();
+        for (int i = 0; i < numFields; i++) {
+            Object value;
+            if (randomBoolean()) {
+                value = randomLong();
+            } else {
+                value = randomAlphaOfLengthBetween(1, 10);
+            }
+            position.put(randomAlphaOfLengthBetween(3, 10), value);
+        }
+        return position;
+    }
+}

+ 6 - 23
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java

@@ -25,8 +25,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
 
@@ -38,15 +36,16 @@ public class DataFrameTransformStateTests extends ESTestCase {
                 DataFrameTransformStateTests::toXContent,
                 DataFrameTransformState::fromXContent)
                 .supportsUnknownFields(true)
-                .randomFieldsExcludeFilter(field -> field.equals("current_position") ||
-                    field.equals("node.attributes"))
+                .randomFieldsExcludeFilter(field -> field.equals("position.indexer_position") ||
+                        field.equals("position.bucket_position") ||
+                        field.equals("node.attributes"))
                 .test();
     }
 
     public static DataFrameTransformState randomDataFrameTransformState() {
         return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
             randomFrom(IndexerState.values()),
-            randomPositionMap(),
+            randomBoolean() ? null : DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
             randomLongBetween(0,10),
             randomBoolean() ? null : randomAlphaOfLength(10),
             randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(),
@@ -58,7 +57,8 @@ public class DataFrameTransformStateTests extends ESTestCase {
         builder.field("task_state", state.getTaskState().value());
         builder.field("indexer_state", state.getIndexerState().value());
         if (state.getPosition() != null) {
-            builder.field("current_position", state.getPosition());
+            builder.field("position");
+            DataFrameIndexerPositionTests.toXContent(state.getPosition(), builder);
         }
         builder.field("checkpoint", state.getCheckpoint());
         if (state.getReason() != null) {
@@ -75,21 +75,4 @@ public class DataFrameTransformStateTests extends ESTestCase {
         builder.endObject();
     }
 
-    private static Map<String, Object> randomPositionMap() {
-        if (randomBoolean()) {
-            return null;
-        }
-        int numFields = randomIntBetween(1, 5);
-        Map<String, Object> position = new LinkedHashMap<>();
-        for (int i = 0; i < numFields; i++) {
-            Object value;
-            if (randomBoolean()) {
-                value = randomLong();
-            } else {
-                value = randomAlphaOfLengthBetween(1, 10);
-            }
-            position.put(randomAlphaOfLengthBetween(3, 10), value);
-        }
-        return position;
-    }
 }

+ 77 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameIndexerPositionTests.java

@@ -0,0 +1,77 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms.hlrc;
+
+import org.elasticsearch.client.AbstractResponseTestCase;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class DataFrameIndexerPositionTests extends AbstractResponseTestCase<
+        DataFrameIndexerPosition,
+        org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition> {
+
+    public static DataFrameIndexerPosition fromHlrc(
+            org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition instance) {
+        if (instance == null) {
+            return null;
+        }
+        return new DataFrameIndexerPosition(instance.getIndexerPosition(), instance.getBucketsPosition());
+    }
+
+    @Override
+    protected DataFrameIndexerPosition createServerTestInstance() {
+        return new DataFrameIndexerPosition(randomPositionMap(), randomPositionMap());
+    }
+
+    @Override
+    protected org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition doParseToClientInstance(XContentParser parser) {
+        return org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition.fromXContent(parser);
+    }
+
+    @Override
+    protected void assertInstances(DataFrameIndexerPosition serverTestInstance,
+                                   org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition clientInstance) {
+        assertThat(serverTestInstance.getIndexerPosition(), equalTo(clientInstance.getIndexerPosition()));
+        assertThat(serverTestInstance.getBucketsPosition(), equalTo(clientInstance.getBucketsPosition()));
+    }
+
+    private static Map<String, Object> randomPositionMap() {
+        if (randomBoolean()) {
+            return null;
+        }
+        int numFields = randomIntBetween(1, 5);
+        Map<String, Object> position = new LinkedHashMap<>();
+        for (int i = 0; i < numFields; i++) {
+            Object value;
+            if (randomBoolean()) {
+                value = randomLong();
+            } else {
+                value = randomAlphaOfLengthBetween(1, 10);
+            }
+            position.put(randomAlphaOfLengthBetween(3, 10), value);
+        }
+        return position;
+    }
+}

+ 5 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java

@@ -19,8 +19,8 @@
 
 package org.elasticsearch.client.dataframe.transforms.hlrc;
 
-import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.client.AbstractHlrcXContentTestCase;
+import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
 
@@ -64,7 +64,10 @@ public class DataFrameTransformStateAndStatsTests extends AbstractHlrcXContentTe
 
     @Override
     protected Predicate<String> getRandomFieldsExcludeFilter() {
-        return field -> field.equals("state.current_position") || field.equals("state.node") || field.equals("state.node.attributes");
+        return field -> field.equals("state.position.indexer_position") ||
+                field.equals("state.position.bucket_position") ||
+                field.equals("state.node") ||
+                field.equals("state.node.attributes");
     }
 }
 

+ 11 - 4
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java

@@ -19,8 +19,9 @@
 
 package org.elasticsearch.client.dataframe.transforms.hlrc;
 
-import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.client.AbstractHlrcXContentTestCase;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
@@ -42,7 +43,7 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
     public static DataFrameTransformState fromHlrc(org.elasticsearch.client.dataframe.transforms.DataFrameTransformState instance) {
         return new DataFrameTransformState(DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
             IndexerState.fromString(instance.getIndexerState().value()),
-            instance.getPosition(),
+            DataFrameIndexerPositionTests.fromHlrc(instance.getPosition()),
             instance.getCheckpoint(),
             instance.getReason(),
             DataFrameTransformProgressTests.fromHlrc(instance.getProgress()),
@@ -85,7 +86,9 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
 
     @Override
     protected Predicate<String> getRandomFieldsExcludeFilter() {
-        return field -> field.equals("current_position") || field.equals("node.attributes");
+        return field -> field.equals("position.indexer_position") ||
+                field.equals("position.bucket_position") ||
+                field.equals("node.attributes");
     }
 
     public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) {
@@ -95,6 +98,10 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
             randomDataFrameTransformCheckpointingInfo());
     }
 
+    public static DataFrameIndexerPosition randomDataFrameIndexerPosition() {
+        return new DataFrameIndexerPosition(randomPosition(), randomPosition());
+    }
+
     public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
         return new DataFrameTransformCheckpointingInfo(randomDataFrameTransformCheckpointStats(),
             randomDataFrameTransformCheckpointStats(), randomNonNegativeLong());
@@ -134,7 +141,7 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
     public static DataFrameTransformState randomDataFrameTransformState() {
         return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
             randomFrom(IndexerState.values()),
-            randomPosition(),
+            randomDataFrameIndexerPosition(),
             randomLongBetween(0,10),
             randomBoolean() ? null : randomAlphaOfLength(10),
             randomBoolean() ? null : randomDataFrameTransformProgress(),

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java

@@ -43,7 +43,9 @@ public class DataFrameMessages {
     public static final String FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION =
             "Failed to parse transform statistics for data frame transform [{0}]";
     public static final String FAILED_TO_LOAD_TRANSFORM_CHECKPOINT =
-            "Failed to load data frame transform configuration for transform [{0}]";
+            "Failed to load data frame transform checkpoint for transform [{0}]";
+    public static final String FAILED_TO_LOAD_TRANSFORM_STATE =
+            "Failed to load data frame transform state for transform [{0}]";
     public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_NO_TRANSFORM =
             "Data frame transform configuration must specify exactly 1 function";
     public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_PIVOT_NO_GROUP_BY =

+ 118 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerPosition.java

@@ -0,0 +1,118 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+public class DataFrameIndexerPosition implements Writeable, ToXContentObject {
+    public static final String NAME = "data_frame/indexer_position";
+
+    public static final ParseField INDEXER_POSITION = new ParseField("indexer_position");
+    public static final ParseField BUCKET_POSITION = new ParseField("bucket_position");
+
+    private final Map<String, Object> indexerPosition;
+    private final Map<String, Object> bucketPosition;
+
+    @SuppressWarnings("unchecked")
+    public static final ConstructingObjectParser<DataFrameIndexerPosition, Void> PARSER = new ConstructingObjectParser<>(NAME,
+            true,
+            args -> new DataFrameIndexerPosition((Map<String, Object>) args[0],(Map<String, Object>) args[1]));
+
+    static {
+        PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, INDEXER_POSITION, ValueType.OBJECT);
+        PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, BUCKET_POSITION, ValueType.OBJECT);
+    }
+
+    public DataFrameIndexerPosition(Map<String, Object> indexerPosition, Map<String, Object> bucketPosition) {
+        this.indexerPosition = indexerPosition == null ? null : Collections.unmodifiableMap(indexerPosition);
+        this.bucketPosition = bucketPosition == null ? null : Collections.unmodifiableMap(bucketPosition);
+    }
+
+    public DataFrameIndexerPosition(StreamInput in) throws IOException {
+        Map<String, Object> position = in.readMap();
+        indexerPosition = position == null ? null : Collections.unmodifiableMap(position);
+        position = in.readMap();
+        bucketPosition = position == null ? null : Collections.unmodifiableMap(position);
+    }
+
+    public Map<String, Object> getIndexerPosition() {
+        return indexerPosition;
+    }
+
+    public Map<String, Object> getBucketsPosition() {
+        return bucketPosition;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeMap(indexerPosition);
+        out.writeMap(bucketPosition);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        if (indexerPosition != null) {
+            builder.field(INDEXER_POSITION.getPreferredName(), indexerPosition);
+        }
+        if (bucketPosition != null) {
+            builder.field(BUCKET_POSITION.getPreferredName(), bucketPosition);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        DataFrameIndexerPosition that = (DataFrameIndexerPosition) other;
+
+        return Objects.equals(this.indexerPosition, that.indexerPosition) &&
+            Objects.equals(this.bucketPosition, that.bucketPosition);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(indexerPosition, bucketPosition);
+    }
+
+    @Override
+    public String toString() {
+        return Strings.toString(this);
+    }
+
+    public static DataFrameIndexerPosition fromXContent(XContentParser parser) {
+        try {
+            return PARSER.parse(parser, null);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

+ 43 - 22
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java

@@ -22,8 +22,6 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -39,7 +37,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
     private final long checkpoint;
 
     @Nullable
-    private final Map<String, Object> currentPosition;
+    private final DataFrameIndexerPosition position;
     @Nullable
     private final String reason;
     @Nullable
@@ -47,7 +45,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
 
     public static final ParseField TASK_STATE = new ParseField("task_state");
     public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
+
+    // 7.3 BWC: current_position only exists in 7.2.  In 7.3+ it is replaced by position.
     public static final ParseField CURRENT_POSITION = new ParseField("current_position");
+    public static final ParseField POSITION = new ParseField("position");
     public static final ParseField CHECKPOINT = new ParseField("checkpoint");
     public static final ParseField REASON = new ParseField("reason");
     public static final ParseField PROGRESS = new ParseField("progress");
@@ -56,18 +57,30 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
     @SuppressWarnings("unchecked")
     public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
             true,
-            args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
-                (IndexerState) args[1],
-                (Map<String, Object>) args[2],
-                (long) args[3],
-                (String) args[4],
-                (DataFrameTransformProgress) args[5],
-                (NodeAttributes) args[6]));
+            args -> {
+                DataFrameTransformTaskState taskState = (DataFrameTransformTaskState) args[0];
+                IndexerState indexerState = (IndexerState) args[1];
+                Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
+                DataFrameIndexerPosition dataFrameIndexerPosition = (DataFrameIndexerPosition) args[3];
+
+                // BWC handling, translate current_position to position iff position isn't set
+                if (bwcCurrentPosition != null && dataFrameIndexerPosition == null) {
+                    dataFrameIndexerPosition = new DataFrameIndexerPosition(bwcCurrentPosition, null);
+                }
+
+                long checkpoint = (long) args[4];
+                String reason = (String) args[5];
+                DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6];
+                NodeAttributes node = (NodeAttributes) args[7];
+
+                return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress, node);
+            });
 
     static {
         PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
         PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
         PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, CURRENT_POSITION, ValueType.OBJECT);
+        PARSER.declareField(optionalConstructorArg(), DataFrameIndexerPosition::fromXContent, POSITION, ValueType.OBJECT);
         PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
         PARSER.declareString(optionalConstructorArg(), REASON);
         PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT);
@@ -76,14 +89,14 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
 
     public DataFrameTransformState(DataFrameTransformTaskState taskState,
                                    IndexerState indexerState,
-                                   @Nullable Map<String, Object> position,
+                                   @Nullable DataFrameIndexerPosition position,
                                    long checkpoint,
                                    @Nullable String reason,
                                    @Nullable DataFrameTransformProgress progress,
                                    @Nullable NodeAttributes node) {
         this.taskState = taskState;
         this.indexerState = indexerState;
-        this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
+        this.position = position;
         this.checkpoint = checkpoint;
         this.reason = reason;
         this.progress = progress;
@@ -92,7 +105,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
 
     public DataFrameTransformState(DataFrameTransformTaskState taskState,
                                    IndexerState indexerState,
-                                   @Nullable Map<String, Object> position,
+                                   @Nullable DataFrameIndexerPosition position,
                                    long checkpoint,
                                    @Nullable String reason,
                                    @Nullable DataFrameTransformProgress progress) {
@@ -102,8 +115,12 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
     public DataFrameTransformState(StreamInput in) throws IOException {
         taskState = DataFrameTransformTaskState.fromStream(in);
         indexerState = IndexerState.fromStream(in);
-        Map<String, Object> position = in.readMap();
-        currentPosition = position == null ? null : Collections.unmodifiableMap(position);
+        if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
+            position = in.readOptionalWriteable(DataFrameIndexerPosition::new);
+        } else {
+            Map<String, Object> pos = in.readMap();
+            position = new DataFrameIndexerPosition(pos, null);
+        }
         checkpoint = in.readLong();
         reason = in.readOptionalString();
         progress = in.readOptionalWriteable(DataFrameTransformProgress::new);
@@ -122,8 +139,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
         return indexerState;
     }
 
-    public Map<String, Object> getPosition() {
-        return currentPosition;
+    public DataFrameIndexerPosition getPosition() {
+        return position;
     }
 
     public long getCheckpoint() {
@@ -169,8 +186,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
         builder.startObject();
         builder.field(TASK_STATE.getPreferredName(), taskState.value());
         builder.field(INDEXER_STATE.getPreferredName(), indexerState.value());
-        if (currentPosition != null) {
-            builder.field(CURRENT_POSITION.getPreferredName(), currentPosition);
+        if (position != null) {
+            builder.field(POSITION.getPreferredName(), position);
         }
         builder.field(CHECKPOINT.getPreferredName(), checkpoint);
         if (reason != null) {
@@ -195,7 +212,11 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
     public void writeTo(StreamOutput out) throws IOException {
         taskState.writeTo(out);
         indexerState.writeTo(out);
-        out.writeMap(currentPosition);
+        if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
+            out.writeOptionalWriteable(position);
+        } else {
+            out.writeMap(position != null ? position.getIndexerPosition() : null);
+        }
         out.writeLong(checkpoint);
         out.writeOptionalString(reason);
         out.writeOptionalWriteable(progress);
@@ -218,7 +239,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
 
         return Objects.equals(this.taskState, that.taskState) &&
             Objects.equals(this.indexerState, that.indexerState) &&
-            Objects.equals(this.currentPosition, that.currentPosition) &&
+            Objects.equals(this.position, that.position) &&
             this.checkpoint == that.checkpoint &&
             Objects.equals(this.reason, that.reason) &&
             Objects.equals(this.progress, that.progress) &&
@@ -227,7 +248,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
 
     @Override
     public int hashCode() {
-        return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node);
+        return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node);
     }
 
     @Override

+ 36 - 23
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

@@ -349,31 +349,44 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
             }
 
             final List<IndexRequest> docs = iterationResult.getToIndex();
-            final BulkRequest bulkRequest = new BulkRequest();
-            docs.forEach(bulkRequest::add);
-
-            // TODO this might be a valid case, e.g. if implementation filters
-            assert bulkRequest.requests().size() > 0;
-
-            stats.markStartIndexing();
-            doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
-                // TODO we should check items in the response and move after accordingly to
-                // resume the failing buckets ?
-                if (bulkResponse.hasFailures()) {
-                    logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
-                }
-                stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
-
-                // check if indexer has been asked to stop, state {@link IndexerState#STOPPING}
-                if (checkState(getState()) == false) {
-                    return;
-                }
 
-                JobPosition newPosition = iterationResult.getPosition();
-                position.set(newPosition);
+            // an iteration result might return an empty set of documents to be indexed
+            if (docs.isEmpty() == false) {
+                final BulkRequest bulkRequest = new BulkRequest();
+                docs.forEach(bulkRequest::add);
+
+                stats.markStartIndexing();
+                doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
+                    // TODO we should check items in the response and move after accordingly to
+                    // resume the failing buckets ?
+                    if (bulkResponse.hasFailures()) {
+                        logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
+                    }
+                    stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
+
+                    // check if indexer has been asked to stop, state {@link IndexerState#STOPPING}
+                    if (checkState(getState()) == false) {
+                        return;
+                    }
+
+                    JobPosition newPosition = iterationResult.getPosition();
+                    position.set(newPosition);
+
+                    onBulkResponse(bulkResponse, newPosition);
+                }, this::finishWithIndexingFailure));
+            } else {
+                // no documents need to be indexed, continue with search
+                try {
+                    JobPosition newPosition = iterationResult.getPosition();
+                    position.set(newPosition);
 
-                onBulkResponse(bulkResponse, newPosition);
-            }, this::finishWithIndexingFailure));
+                    ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
+                    nextSearch(listener);
+                } catch (Exception e) {
+                    finishAndSetState();
+                    onFailure(e);
+                }
+            }
         } catch (Exception e) {
             finishWithSearchFailure(e);
         }

+ 67 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerPositionTests.java

@@ -0,0 +1,67 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+
+public class DataFrameIndexerPositionTests extends AbstractSerializingTestCase<DataFrameIndexerPosition> {
+
+    public static DataFrameIndexerPosition randomDataFrameIndexerPosition() {
+        return new DataFrameIndexerPosition(randomPosition(), randomPosition());
+    }
+
+    @Override
+    protected DataFrameIndexerPosition createTestInstance() {
+        return randomDataFrameIndexerPosition();
+    }
+
+    @Override
+    protected Reader<DataFrameIndexerPosition> instanceReader() {
+        return DataFrameIndexerPosition::new;
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+    @Override
+    protected Predicate<String> getRandomFieldsExcludeFilter() {
+        return field -> !field.isEmpty();
+    }
+
+    @Override
+    protected DataFrameIndexerPosition doParseInstance(XContentParser parser) throws IOException {
+        return DataFrameIndexerPosition.fromXContent(parser);
+    }
+
+    private static Map<String, Object> randomPosition() {
+        if (randomBoolean()) {
+            return null;
+        }
+        int numFields = randomIntBetween(1, 5);
+        Map<String, Object> position = new HashMap<>();
+        for (int i = 0; i < numFields; i++) {
+            Object value;
+            if (randomBoolean()) {
+                value = randomLong();
+            } else {
+                value = randomAlphaOfLengthBetween(1, 10);
+            }
+            position.put(randomAlphaOfLengthBetween(3, 10), value);
+        }
+
+        return position;
+    }
+}

+ 1 - 21
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java

@@ -12,8 +12,6 @@ import org.elasticsearch.test.AbstractSerializingTestCase;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.function.Predicate;
 
 import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests.randomDataFrameTransformProgress;
@@ -24,7 +22,7 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase<Da
     public static DataFrameTransformState randomDataFrameTransformState() {
         return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
             randomFrom(IndexerState.values()),
-            randomPosition(),
+            DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
             randomLongBetween(0,10),
             randomBoolean() ? null : randomAlphaOfLength(10),
             randomBoolean() ? null : randomDataFrameTransformProgress(),
@@ -46,24 +44,6 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase<Da
         return DataFrameTransformState::new;
     }
 
-    private static Map<String, Object> randomPosition() {
-        if (randomBoolean()) {
-            return null;
-        }
-        int numFields = randomIntBetween(1, 5);
-        Map<String, Object> position = new HashMap<>();
-        for (int i = 0; i < numFields; i++) {
-            Object value;
-            if (randomBoolean()) {
-                value = randomLong();
-            } else {
-                value = randomAlphaOfLengthBetween(1, 10);
-            }
-            position.put(randomAlphaOfLengthBetween(3, 10), value);
-        }
-        return position;
-    }
-
     @Override
     protected boolean supportsUnknownFields() {
         return true;

+ 222 - 100
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java

@@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregati
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@@ -47,7 +48,23 @@ import java.util.stream.Stream;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
-public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, DataFrameIndexerTransformStats> {
+public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<DataFrameIndexerPosition, DataFrameIndexerTransformStats> {
+
+    /**
+     * RunState is an internal (non-persisted) state that controls the internal logic
+     * which query filters to run and which index requests to send
+     */
+    private enum RunState {
+        // do a complete query/index, this is used for batch data frames and for bootstraping (1st run)
+        FULL_RUN,
+
+        // Partial run modes in 2 stages:
+        // identify buckets that have changed
+        PARTIAL_RUN_IDENTIFY_CHANGES,
+
+        // recalculate buckets based on the update list
+        PARTIAL_RUN_APPLY_CHANGES
+    }
 
     public static final int MINIMUM_PAGE_SIZE = 10;
     public static final String COMPOSITE_AGGREGATION_NAME = "_data_frame";
@@ -61,24 +78,34 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
 
     private Pivot pivot;
     private int pageSize = 0;
-    protected volatile DataFrameTransformCheckpoint inProgressOrLastCheckpoint;
+    protected volatile DataFrameTransformCheckpoint lastCheckpoint;
+    protected volatile DataFrameTransformCheckpoint nextCheckpoint;
+
+    private volatile RunState runState;
+
+    // hold information for continuous mode (partial updates)
     private volatile Map<String, Set<String>> changedBuckets;
+    private volatile Map<String, Object> changedBucketsAfterKey;
 
     public DataFrameIndexer(Executor executor,
                             DataFrameAuditor auditor,
                             DataFrameTransformConfig transformConfig,
                             Map<String, String> fieldMappings,
                             AtomicReference<IndexerState> initialState,
-                            Map<String, Object> initialPosition,
+                            DataFrameIndexerPosition initialPosition,
                             DataFrameIndexerTransformStats jobStats,
                             DataFrameTransformProgress transformProgress,
-                            DataFrameTransformCheckpoint inProgressOrLastCheckpoint) {
+                            DataFrameTransformCheckpoint lastCheckpoint,
+                            DataFrameTransformCheckpoint nextCheckpoint) {
         super(executor, initialState, initialPosition, jobStats);
         this.auditor = Objects.requireNonNull(auditor);
         this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
         this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
         this.progress = transformProgress;
-        this.inProgressOrLastCheckpoint = inProgressOrLastCheckpoint;
+        this.lastCheckpoint = lastCheckpoint;
+        this.nextCheckpoint = nextCheckpoint;
+        // give runState a default
+        this.runState = RunState.FULL_RUN;
     }
 
     protected abstract void failIndexer(String message);
@@ -117,6 +144,8 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
             if (pageSize == 0) {
                 pageSize = pivot.getInitialPageSize();
             }
+
+            runState = determineRunStateAtStart();
             listener.onResponse(null);
         } catch (Exception e) {
             listener.onFailure(e);
@@ -136,24 +165,95 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
     }
 
     @Override
-    protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
+    protected IterationResult<DataFrameIndexerPosition> doProcess(SearchResponse searchResponse) {
         final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
 
+        switch (runState) {
+        case FULL_RUN:
+            return processBuckets(agg);
+        case PARTIAL_RUN_APPLY_CHANGES:
+            return processPartialBucketUpdates(agg);
+        case PARTIAL_RUN_IDENTIFY_CHANGES:
+            return processChangedBuckets(agg);
+
+        default:
+            // Any other state is a bug, should not happen
+            logger.warn("Encountered unexpected run state [" + runState + "]");
+            throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
+        }
+    }
+
+    private IterationResult<DataFrameIndexerPosition> processBuckets(final CompositeAggregation agg) {
         // we reached the end
         if (agg.getBuckets().isEmpty()) {
             return new IterationResult<>(Collections.emptyList(), null, true);
         }
 
         long docsBeforeProcess = getStats().getNumDocuments();
-        IterationResult<Map<String, Object>> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()),
-            agg.afterKey(),
-            agg.getBuckets().isEmpty());
+
+        DataFrameIndexerPosition oldPosition = getPosition();
+        DataFrameIndexerPosition newPosition = new DataFrameIndexerPosition(agg.afterKey(),
+                oldPosition != null ? getPosition().getBucketsPosition() : null);
+
+        IterationResult<DataFrameIndexerPosition> result = new IterationResult<>(
+                processBucketsToIndexRequests(agg).collect(Collectors.toList()),
+                newPosition,
+                agg.getBuckets().isEmpty());
+
         if (progress != null) {
             progress.docsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
         }
+
         return result;
     }
 
+    private IterationResult<DataFrameIndexerPosition> processPartialBucketUpdates(final CompositeAggregation agg) {
+        // we reached the end
+        if (agg.getBuckets().isEmpty()) {
+            // cleanup changed Buckets
+            changedBuckets = null;
+
+            // reset the runState to fetch changed buckets
+            runState = RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
+            // advance the cursor for changed bucket detection
+            return new IterationResult<>(Collections.emptyList(),
+                    new DataFrameIndexerPosition(null, changedBucketsAfterKey), false);
+        }
+
+        return processBuckets(agg);
+    }
+
+
+    private IterationResult<DataFrameIndexerPosition> processChangedBuckets(final CompositeAggregation agg) {
+        // initialize the map of changed buckets, the map might be empty if source do not require/implement
+        // changed bucket detection
+        changedBuckets = pivot.initialIncrementalBucketUpdateMap();
+
+        // reached the end?
+        if (agg.getBuckets().isEmpty()) {
+            // reset everything and return the end marker
+            changedBuckets = null;
+            changedBucketsAfterKey = null;
+            return new IterationResult<>(Collections.emptyList(), null, true);
+        }
+        // else
+
+        // collect all buckets that require the update
+        agg.getBuckets().stream().forEach(bucket -> {
+            bucket.getKey().forEach((k, v) -> {
+                changedBuckets.get(k).add(v.toString());
+            });
+        });
+
+        // remember the after key but do not store it in the state yet (in the failure we need to retrieve it again)
+        changedBucketsAfterKey = agg.afterKey();
+
+        // reset the runState to fetch the partial updates next
+        runState = RunState.PARTIAL_RUN_APPLY_CHANGES;
+
+        return new IterationResult<>(Collections.emptyList(), getPosition(), false);
+    }
+
     /*
      * Parses the result and creates a stream of indexable documents
      *
@@ -197,43 +297,128 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
     }
 
     protected QueryBuilder buildFilterQuery() {
+        assert nextCheckpoint != null;
+
         QueryBuilder pivotQueryBuilder = getConfig().getSource().getQueryConfig().getQuery();
 
         DataFrameTransformConfig config = getConfig();
-        if (config.getSyncConfig() != null) {
-            if (inProgressOrLastCheckpoint == null) {
-                throw new RuntimeException("in progress checkpoint not found");
-            }
+        if (this.isContinuous()) {
 
             BoolQueryBuilder filteredQuery = new BoolQueryBuilder()
-                .filter(pivotQueryBuilder)
-                .filter(config.getSyncConfig().getRangeQuery(inProgressOrLastCheckpoint));
+                .filter(pivotQueryBuilder);
 
-            if (changedBuckets != null && changedBuckets.isEmpty() == false) {
-                QueryBuilder pivotFilter = pivot.filterBuckets(changedBuckets);
-                if (pivotFilter != null) {
-                    filteredQuery.filter(pivotFilter);
-                }
+            if (lastCheckpoint != null) {
+                filteredQuery.filter(config.getSyncConfig().getRangeQuery(lastCheckpoint, nextCheckpoint));
+            } else {
+                filteredQuery.filter(config.getSyncConfig().getRangeQuery(nextCheckpoint));
             }
-
-            logger.trace("running filtered query: {}", filteredQuery);
             return filteredQuery;
-        } else {
-            return pivotQueryBuilder;
         }
+
+        return pivotQueryBuilder;
     }
 
     @Override
     protected SearchRequest buildSearchRequest() {
-        SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex());
+        assert nextCheckpoint != null;
+
+        SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex())
+                .allowPartialSearchResults(false);
         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
-            .aggregation(pivot.buildAggregation(getPosition(), pageSize))
-            .size(0)
-            .query(buildFilterQuery());
+                .size(0);
+
+        switch (runState) {
+        case FULL_RUN:
+            buildFullRunQuery(sourceBuilder);
+            break;
+        case PARTIAL_RUN_IDENTIFY_CHANGES:
+            buildChangedBucketsQuery(sourceBuilder);
+            break;
+        case PARTIAL_RUN_APPLY_CHANGES:
+            buildPartialUpdateQuery(sourceBuilder);
+            break;
+        default:
+            // Any other state is a bug, should not happen
+            logger.warn("Encountered unexpected run state [" + runState + "]");
+            throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
+        }
+
         searchRequest.source(sourceBuilder);
         return searchRequest;
     }
 
+    private SearchSourceBuilder buildFullRunQuery(SearchSourceBuilder sourceBuilder) {
+        DataFrameIndexerPosition position = getPosition();
+
+        sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize));
+        DataFrameTransformConfig config = getConfig();
+
+        QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery();
+        if (isContinuous()) {
+            BoolQueryBuilder filteredQuery = new BoolQueryBuilder()
+                    .filter(pivotQueryBuilder)
+                    .filter(config.getSyncConfig()
+                            .getRangeQuery(nextCheckpoint));
+            sourceBuilder.query(filteredQuery);
+        } else {
+            sourceBuilder.query(pivotQueryBuilder);
+        }
+
+        logger.trace("running full run query: {}", sourceBuilder);
+
+        return sourceBuilder;
+    }
+
+    private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceBuilder) {
+        assert isContinuous();
+
+        DataFrameIndexerPosition position = getPosition();
+
+        CompositeAggregationBuilder changesAgg = pivot.buildIncrementalBucketUpdateAggregation(pageSize);
+        changesAgg.aggregateAfter(position != null ? position.getBucketsPosition() : null);
+        sourceBuilder.aggregation(changesAgg);
+
+        QueryBuilder pivotQueryBuilder = getConfig().getSource().getQueryConfig().getQuery();
+
+        DataFrameTransformConfig config = getConfig();
+        BoolQueryBuilder filteredQuery = new BoolQueryBuilder().
+                filter(pivotQueryBuilder).
+                filter(config.getSyncConfig().getRangeQuery(lastCheckpoint, nextCheckpoint));
+
+        sourceBuilder.query(filteredQuery);
+
+        logger.trace("running changes query {}", sourceBuilder);
+        return sourceBuilder;
+    }
+
+    private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBuilder) {
+        assert isContinuous();
+
+        DataFrameIndexerPosition position = getPosition();
+
+        sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize));
+        DataFrameTransformConfig config = getConfig();
+
+        QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery();
+
+        BoolQueryBuilder filteredQuery = new BoolQueryBuilder()
+                .filter(pivotQueryBuilder)
+                .filter(config.getSyncConfig()
+                        .getRangeQuery(nextCheckpoint));
+
+        if (changedBuckets != null && changedBuckets.isEmpty() == false) {
+            QueryBuilder pivotFilter = pivot.filterBuckets(changedBuckets);
+            if (pivotFilter != null) {
+                filteredQuery.filter(pivotFilter);
+            }
+        }
+
+        sourceBuilder.query(filteredQuery);
+        logger.trace("running partial update query: {}", sourceBuilder);
+
+        return sourceBuilder;
+    }
+
     /**
      * Handle the circuit breaking case: A search consumed to much memory and got aborted.
      *
@@ -272,82 +457,19 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
         return true;
     }
 
-    protected void getChangedBuckets(DataFrameTransformCheckpoint oldCheckpoint,
-                                     DataFrameTransformCheckpoint newCheckpoint,
-                                     ActionListener<Map<String, Set<String>>> listener) {
-
-        ActionListener<Map<String, Set<String>>> wrappedListener = ActionListener.wrap(
-            r -> {
-                this.inProgressOrLastCheckpoint = newCheckpoint;
-                this.changedBuckets = r;
-                listener.onResponse(r);
-            },
-            listener::onFailure
-        );
-        // initialize the map of changed buckets, the map might be empty if source do not require/implement
-        // changed bucket detection
-        Map<String, Set<String>> keys = pivot.initialIncrementalBucketUpdateMap();
-        if (keys.isEmpty()) {
-            logger.trace("This data frame does not implement changed bucket detection, returning");
-            wrappedListener.onResponse(null);
-            return;
+    private RunState determineRunStateAtStart() {
+        // either 1st run or not a continuous data frame
+        if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
+            return RunState.FULL_RUN;
         }
 
-        SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex());
-        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
-
-        // we do not need the sub-aggs
-        CompositeAggregationBuilder changesAgg = pivot.buildIncrementalBucketUpdateAggregation(pageSize);
-        sourceBuilder.aggregation(changesAgg);
-        sourceBuilder.size(0);
-
-        QueryBuilder pivotQueryBuilder = getConfig().getSource().getQueryConfig().getQuery();
-
-        DataFrameTransformConfig config = getConfig();
-        if (config.getSyncConfig() != null) {
-            BoolQueryBuilder filteredQuery = new BoolQueryBuilder().
-                    filter(pivotQueryBuilder).
-                    filter(config.getSyncConfig().getRangeQuery(oldCheckpoint, newCheckpoint));
-
-            logger.trace("Gathering changes using query {}", filteredQuery);
-            sourceBuilder.query(filteredQuery);
-        } else {
-            logger.trace("No sync configured");
-            wrappedListener.onResponse(null);
-            return;
+        // if incremental update is not supported, do a full run
+        if (pivot.supportsIncrementalBucketUpdate() == false) {
+            return RunState.FULL_RUN;
         }
 
-        searchRequest.source(sourceBuilder);
-        searchRequest.allowPartialSearchResults(false);
-
-        collectChangedBuckets(searchRequest, changesAgg, keys, ActionListener.wrap(wrappedListener::onResponse, e -> {
-            // fall back if bucket collection failed
-            logger.error("Failed to retrieve changed buckets, fall back to complete retrieval", e);
-            wrappedListener.onResponse(null);
-        }));
-    }
-
-    void collectChangedBuckets(SearchRequest searchRequest, CompositeAggregationBuilder changesAgg, Map<String, Set<String>> keys,
-            ActionListener<Map<String, Set<String>>> finalListener) {
-
-        // re-using the existing search hook
-        doNextSearch(searchRequest, ActionListener.wrap(searchResponse -> {
-            final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
-
-            agg.getBuckets().stream().forEach(bucket -> {
-                bucket.getKey().forEach((k, v) -> {
-                    keys.get(k).add(v.toString());
-                });
-            });
-
-            if (agg.getBuckets().isEmpty()) {
-                finalListener.onResponse(keys);
-            } else {
-                // adjust the after key
-                changesAgg.aggregateAfter(agg.afterKey());
-                collectChangedBuckets(searchRequest, changesAgg, keys, finalListener);
-            }
-        }, finalListener::onFailure));
+        // continuous mode: we need to get the changed buckets first
+        return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
     }
 
     /**

+ 58 - 30
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.transforms;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
@@ -110,7 +111,6 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
     protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
         final String transformId = params.getId();
         final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
-        final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state;
 
         final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
             new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
@@ -119,14 +119,53 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
                 .setTransformsCheckpointService(dataFrameTransformsCheckpointService)
                 .setTransformsConfigManager(transformsConfigManager);
 
+        final SetOnce<DataFrameTransformState> stateHolder = new SetOnce<>();
+
         ActionListener<StartDataFrameTransformTaskAction.Response> startTaskListener = ActionListener.wrap(
             response -> logger.info("Successfully completed and scheduled task in node operation"),
             failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure)
         );
 
-        Long previousCheckpoint = transformPTaskState != null ? transformPTaskState.getCheckpoint() : null;
+        // <5> load next checkpoint
+        ActionListener<DataFrameTransformCheckpoint> getTransformNextCheckpointListener = ActionListener.wrap(
+                nextCheckpoint -> {
+                    indexerBuilder.setNextCheckpoint(nextCheckpoint);
+
+                    final long lastCheckpoint = stateHolder.get().getCheckpoint();
+
+                    logger.trace("[{}] No next checkpoint found, starting the task", transformId);
+                    startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener);
+                },
+                error -> {
+                    // TODO: do not use the same error message as for loading the last checkpoint
+                    String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId);
+                    logger.error(msg, error);
+                    markAsFailed(buildTask, msg);
+                }
+        );
+
+        // <4> load last checkpoint
+        ActionListener<DataFrameTransformCheckpoint> getTransformLastCheckpointListener = ActionListener.wrap(
+                lastCheckpoint -> {
+                    indexerBuilder.setLastCheckpoint(lastCheckpoint);
+
+                    final long nextCheckpoint = stateHolder.get().getInProgressCheckpoint();
 
-        // <4> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
+                    if (nextCheckpoint > 0) {
+                        transformsConfigManager.getTransformCheckpoint(transformId, nextCheckpoint, getTransformNextCheckpointListener);
+                    } else {
+                        logger.trace("[{}] No next checkpoint found, starting the task", transformId);
+                        startTask(buildTask, indexerBuilder, lastCheckpoint.getCheckpoint(), startTaskListener);
+                    }
+                },
+                error -> {
+                    String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId);
+                    logger.error(msg, error);
+                    markAsFailed(buildTask, msg);
+                }
+        );
+
+        // <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
         // Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start
         // Schedule execution regardless
         ActionListener<DataFrameTransformStateAndStats> transformStatsActionListener = ActionListener.wrap(
@@ -141,27 +180,26 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
                     stateAndStats.getTransformState(),
                     stateAndStats.getTransformState().getPosition());
 
-                final Long checkpoint = stateAndStats.getTransformState().getCheckpoint();
-                startTask(buildTask, indexerBuilder, checkpoint, startTaskListener);
+                stateHolder.set(stateAndStats.getTransformState());
+                final long lastCheckpoint = stateHolder.get().getCheckpoint();
+
+                if (lastCheckpoint == 0) {
+                    logger.trace("[{}] No checkpoint found, starting the task", transformId);
+                    startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener);
+                } else {
+                    logger.trace ("[{}] Restore last checkpoint: [{}]", transformId, lastCheckpoint);
+                    transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint, getTransformLastCheckpointListener);
+                }
             },
             error -> {
                 if (error instanceof ResourceNotFoundException == false) {
-                    logger.warn("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
+                    String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_STATE, transformId);
+                    logger.error(msg, error);
+                    markAsFailed(buildTask, msg);
                 }
-                startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener);
-            }
-        );
 
-        // <3> set the in progress checkpoint for the indexer, get the in progress checkpoint
-        ActionListener<DataFrameTransformCheckpoint> getTransformCheckpointListener = ActionListener.wrap(
-            cp -> {
-                indexerBuilder.setInProgressOrLastCheckpoint(cp);
-                transformsConfigManager.getTransformStats(transformId, transformStatsActionListener);
-            },
-            error -> {
-                String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId);
-                logger.error(msg, error);
-                markAsFailed(buildTask, msg);
+                logger.trace("[{}] No stats found(new transform), starting the task", transformId);
+                startTask(buildTask, indexerBuilder, null, startTaskListener);
             }
         );
 
@@ -169,17 +207,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
         ActionListener<Map<String, String>> getFieldMappingsListener = ActionListener.wrap(
             fieldMappings -> {
                 indexerBuilder.setFieldMappings(fieldMappings);
-
-                long inProgressCheckpoint = transformPTaskState == null ? 0L :
-                    Math.max(transformPTaskState.getCheckpoint(), transformPTaskState.getInProgressCheckpoint());
-
-                logger.debug("Restore in progress or last checkpoint: {}", inProgressCheckpoint);
-
-                if (inProgressCheckpoint == 0) {
-                    getTransformCheckpointListener.onResponse(DataFrameTransformCheckpoint.EMPTY);
-                } else {
-                    transformsConfigManager.getTransformCheckpoint(transformId, inProgressCheckpoint, getTransformCheckpointListener);
-                }
+                transformsConfigManager.getTransformStats(transformId, transformStatsActionListener);
             },
             error -> {
                 String msg = DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,

+ 30 - 31
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
@@ -50,7 +51,6 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils
 
 import java.util.Arrays;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -72,7 +72,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     private final SchedulerEngine schedulerEngine;
     private final ThreadPool threadPool;
     private final DataFrameAuditor auditor;
-    private final Map<String, Object> initialPosition;
+    private final DataFrameIndexerPosition initialPosition;
     private final IndexerState initialIndexerState;
 
     private final SetOnce<ClientDataFrameIndexer> indexer = new SetOnce<>();
@@ -95,7 +95,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         DataFrameTransformTaskState initialTaskState = DataFrameTransformTaskState.STOPPED;
         String initialReason = null;
         long initialGeneration = 0;
-        Map<String, Object> initialPosition = null;
+        DataFrameIndexerPosition initialPosition = null;
         if (state != null) {
             initialTaskState = state.getTaskState();
             initialReason = state.getReason();
@@ -383,9 +383,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         private DataFrameTransformConfig transformConfig;
         private DataFrameIndexerTransformStats initialStats;
         private IndexerState indexerState = IndexerState.STOPPED;
-        private Map<String, Object> initialPosition;
+        private DataFrameIndexerPosition initialPosition;
         private DataFrameTransformProgress progress;
-        private DataFrameTransformCheckpoint inProgressOrLastCheckpoint;
+        private DataFrameTransformCheckpoint lastCheckpoint;
+        private DataFrameTransformCheckpoint nextCheckpoint;
 
         ClientDataFrameIndexerBuilder(String transformId) {
             this.transformId = transformId;
@@ -404,7 +405,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 this.transformConfig,
                 this.fieldMappings,
                 this.progress,
-                this.inProgressOrLastCheckpoint,
+                this.lastCheckpoint,
+                this.nextCheckpoint,
                 parentTask);
         }
 
@@ -457,7 +459,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             return this;
         }
 
-        ClientDataFrameIndexerBuilder setInitialPosition(Map<String, Object> initialPosition) {
+        ClientDataFrameIndexerBuilder setInitialPosition(DataFrameIndexerPosition initialPosition) {
             this.initialPosition = initialPosition;
             return this;
         }
@@ -467,8 +469,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             return this;
         }
 
-        ClientDataFrameIndexerBuilder setInProgressOrLastCheckpoint(DataFrameTransformCheckpoint inProgressOrLastCheckpoint) {
-            this.inProgressOrLastCheckpoint = inProgressOrLastCheckpoint;
+        ClientDataFrameIndexerBuilder setLastCheckpoint(DataFrameTransformCheckpoint lastCheckpoint) {
+            this.lastCheckpoint = lastCheckpoint;
+            return this;
+        }
+
+        ClientDataFrameIndexerBuilder setNextCheckpoint(DataFrameTransformCheckpoint nextCheckpoint) {
+            this.nextCheckpoint = nextCheckpoint;
             return this;
         }
     }
@@ -491,14 +498,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                                DataFrameTransformsConfigManager transformsConfigManager,
                                DataFrameTransformsCheckpointService transformsCheckpointService,
                                AtomicReference<IndexerState> initialState,
-                               Map<String, Object> initialPosition,
+                               DataFrameIndexerPosition initialPosition,
                                Client client,
                                DataFrameAuditor auditor,
                                DataFrameIndexerTransformStats initialStats,
                                DataFrameTransformConfig transformConfig,
                                Map<String, String> fieldMappings,
                                DataFrameTransformProgress transformProgress,
-                               DataFrameTransformCheckpoint inProgressOrLastCheckpoint,
+                               DataFrameTransformCheckpoint lastCheckpoint,
+                               DataFrameTransformCheckpoint nextCheckpoint,
                                DataFrameTransformTask parentTask) {
             super(ExceptionsHelper.requireNonNull(parentTask, "parentTask")
                     .threadPool
@@ -510,7 +518,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 initialPosition,
                 initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats,
                 transformProgress,
-                inProgressOrLastCheckpoint);
+                lastCheckpoint,
+                nextCheckpoint);
             this.transformId = ExceptionsHelper.requireNonNull(transformId, "transformId");
             this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager");
             this.transformsCheckpointService = ExceptionsHelper.requireNonNull(transformsCheckpointService,
@@ -526,9 +535,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             // Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather
             // the progress here, and not in the executor.
             if (initialRun()) {
-                ActionListener<Map<String, Set<String>>> changedBucketsListener = ActionListener.wrap(
-                    r -> {
-                        TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap(
+                createCheckpoint(ActionListener.wrap(cp -> {
+                    nextCheckpoint = cp;
+                    TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap(
                             newProgress -> {
                                 logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress);
                                 progress = newProgress;
@@ -540,20 +549,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                                 super.onStart(now, listener);
                             }
                         ));
-                    },
-                    listener::onFailure
-                );
-
-                createCheckpoint(ActionListener.wrap(cp -> {
-                    DataFrameTransformCheckpoint oldCheckpoint = inProgressOrLastCheckpoint;
-                    if (oldCheckpoint.isEmpty()) {
-                        // this is the 1st run, accept the new in progress checkpoint and go on
-                        inProgressOrLastCheckpoint = cp;
-                        changedBucketsListener.onResponse(null);
-                    } else {
-                        logger.debug ("Getting changes from {} to {}", oldCheckpoint.getTimeUpperBound(), cp.getTimeUpperBound());
-                        getChangedBuckets(oldCheckpoint, cp, changedBucketsListener);
-                    }
                 }, listener::onFailure));
             } else {
                 super.onStart(now, listener);
@@ -615,7 +610,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         }
 
         @Override
-        protected void doSaveState(IndexerState indexerState, Map<String, Object> position, Runnable next) {
+        protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition position, Runnable next) {
             if (indexerState.equals(IndexerState.ABORTING)) {
                 // If we're aborting, just invoke `next` (which is likely an onFailure handler)
                 next.run();
@@ -698,8 +693,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         @Override
         protected void onFinish(ActionListener<Void> listener) {
             try {
+                // TODO: needs cleanup super is called with a listener, but listener.onResponse is called below
+                // super.onFinish() fortunately ignores the listener
                 super.onFinish(listener);
                 long checkpoint = transformTask.currentCheckpoint.getAndIncrement();
+                lastCheckpoint = nextCheckpoint;
+                nextCheckpoint = null;
                 // Reset our failure count as we have finished and may start again with a new checkpoint
                 failureCount.set(0);
                 if (checkpoint % ON_FINISH_AUDIT_FREQUENCY == 0) {
@@ -756,7 +755,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             SetOnce<Boolean> changed = new SetOnce<>();
             transformsCheckpointService.getCheckpoint(transformConfig, new LatchedActionListener<>(ActionListener.wrap(
                     cp -> {
-                        long behind = DataFrameTransformCheckpoint.getBehind(inProgressOrLastCheckpoint, cp);
+                        long behind = DataFrameTransformCheckpoint.getBehind(lastCheckpoint, cp);
                         if (behind > 0) {
                             logger.debug("Detected changes, dest is {} operations behind the source", behind);
                             changed.set(true);

+ 12 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java

@@ -51,6 +51,7 @@ public class Pivot {
     private static final Logger logger = LogManager.getLogger(Pivot.class);
 
     private final PivotConfig config;
+    private final boolean supportsIncrementalBucketUpdate;
 
     // objects for re-using
     private final CompositeAggregationBuilder cachedCompositeAggregation;
@@ -58,6 +59,13 @@ public class Pivot {
     public Pivot(PivotConfig config) {
         this.config = config;
         this.cachedCompositeAggregation = createCompositeAggregation(config);
+
+        boolean supportsIncrementalBucketUpdate = false;
+        for(Entry<String, SingleGroupSource> entry: config.getGroupConfig().getGroups().entrySet()) {
+            supportsIncrementalBucketUpdate |= entry.getValue().supportsIncrementalBucketUpdate();
+        }
+
+        this.supportsIncrementalBucketUpdate = supportsIncrementalBucketUpdate;
     }
 
     public void validate(Client client, SourceConfig sourceConfig, final ActionListener<Boolean> listener) {
@@ -135,6 +143,10 @@ public class Pivot {
         return changedBuckets;
     }
 
+    public boolean supportsIncrementalBucketUpdate() {
+        return supportsIncrementalBucketUpdate;
+    }
+
     public Stream<Map<String, Object>> extractResults(CompositeAggregation agg,
                                                       Map<String, String> fieldTypeMap,
                                                       DataFrameIndexerTransformStats dataFrameIndexerTransformStats) {

+ 12 - 4
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@@ -32,6 +33,8 @@ import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
 import org.junit.Before;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -68,13 +71,13 @@ public class DataFrameIndexerTests extends ESTestCase {
                 Map<String, String> fieldMappings,
                 DataFrameAuditor auditor,
                 AtomicReference<IndexerState> initialState,
-                Map<String, Object> initialPosition,
+                DataFrameIndexerPosition initialPosition,
                 DataFrameIndexerTransformStats jobStats,
                 Function<SearchRequest, SearchResponse> searchFunction,
                 Function<BulkRequest, BulkResponse> bulkFunction,
                 Consumer<Exception> failureConsumer) {
             super(executor, auditor, transformConfig, fieldMappings, initialState, initialPosition, jobStats,
-                    /* DataFrameTransformProgress */ null, DataFrameTransformCheckpoint.EMPTY);
+                    /* DataFrameTransformProgress */ null, DataFrameTransformCheckpoint.EMPTY, DataFrameTransformCheckpoint.EMPTY);
             this.searchFunction = searchFunction;
             this.bulkFunction = bulkFunction;
             this.failureConsumer = failureConsumer;
@@ -129,7 +132,7 @@ public class DataFrameIndexerTests extends ESTestCase {
         }
 
         @Override
-        protected void doSaveState(IndexerState state, Map<String, Object> position, Runnable next) {
+        protected void doSaveState(IndexerState state, DataFrameIndexerPosition position, Runnable next) {
             assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED;
             next.run();
         }
@@ -198,7 +201,12 @@ public class DataFrameIndexerTests extends ESTestCase {
 
         Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
 
-        Consumer<Exception> failureConsumer = e -> fail("expected circuit breaker exception to be handled");
+        Consumer<Exception> failureConsumer = e -> {
+            final StringWriter sw = new StringWriter();
+            final PrintWriter pw = new PrintWriter(sw, true);
+            e.printStackTrace(pw);
+            fail("expected circuit breaker exception to be handled, got:" + e + " Trace: " + sw.getBuffer().toString());
+        };
 
         final ExecutorService executor = Executors.newFixedThreadPool(1);
         try {