Browse Source

[Transform] Add `from` parameter to Transform Start API (#91116)

Przemysław Witek 2 years ago
parent
commit
40d32205db
25 changed files with 348 additions and 32 deletions
  1. 6 0
      docs/changelog/91116.yaml
  2. 5 0
      docs/reference/transform/apis/start-transform.asciidoc
  3. 5 0
      rest-api-spec/src/main/resources/rest-api-spec/api/transform.start_transform.json
  4. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java
  5. 2 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java
  6. 19 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformAction.java
  7. 7 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java
  8. 29 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java
  9. 12 4
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StartTransformActionRequestTests.java
  10. 11 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java
  11. 2 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParamsTests.java
  12. 2 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTests.java
  13. 25 0
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_start_stop.yml
  14. 76 0
      x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java
  15. 15 12
      x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java
  16. 1 0
      x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformOldTransformsIT.java
  17. 10 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java
  18. 20 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStartTransformAction.java
  19. 4 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java
  20. 11 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java
  21. 5 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java
  22. 1 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java
  23. 10 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java
  24. 61 0
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestStartTransformActionTests.java
  25. 8 0
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java

+ 6 - 0
docs/changelog/91116.yaml

@@ -0,0 +1,6 @@
+pr: 91116
+summary: Add from parameter to Transform Start API
+area: Transform
+type: enhancement
+issues:
+ - 88646

+ 5 - 0
docs/reference/transform/apis/start-transform.asciidoc

@@ -61,6 +61,11 @@ Identifier for the {transform}.
 [[start-transform-query-parms]]
 == {api-query-parms-title}
 
+`from`::
+(Optional, string) Restricts the set of transformed entities to those changed
+ after this time. Relative times like now-30d are supported.
+Only applicable for continuous transforms.
+
 `timeout`::
 (Optional, time)
 Period to wait for a response. If no response is received before the timeout

+ 5 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/transform.start_transform.json

@@ -26,6 +26,11 @@
       ]
     },
     "params":{
+      "from":{
+        "type":"string",
+        "required":false,
+        "description":"Restricts the set of transformed entities to those changed after this time"
+      },
       "timeout":{
         "type":"time",
         "required":false,

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java

@@ -22,6 +22,7 @@ public final class TransformField {
     public static final ParseField COUNT = new ParseField("count");
     public static final ParseField GROUP_BY = new ParseField("group_by");
     public static final ParseField TIMEOUT = new ParseField("timeout");
+    public static final ParseField FROM = new ParseField("from");
     public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
     public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint");
     public static final ParseField STATS_FIELD = new ParseField("stats");

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

@@ -75,6 +75,8 @@ public class TransformMessages {
 
     public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS = "Failed to parse transform checkpoints for [{0}]";
 
+    public static final String FAILED_TO_PARSE_DATE = "Failed to parse date for [{0}]";
+
     public static final String ID_TOO_LONG = "The id cannot contain more than {0} characters.";
     public static final String INVALID_ID = "Invalid {0}; ''{1}'' can contain lowercase alphanumeric (a-z and 0-9), hyphens or "
         + "underscores; must start and end with alphanumeric";

+ 19 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformAction.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.core.transform.action;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.master.AcknowledgedRequest;
@@ -20,6 +21,7 @@ import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.Objects;
 
@@ -35,25 +37,39 @@ public class StartTransformAction extends ActionType<StartTransformAction.Respon
     public static class Request extends AcknowledgedRequest<Request> {
 
         private final String id;
+        private final Instant from;
 
-        public Request(String id, TimeValue timeout) {
+        public Request(String id, Instant from, TimeValue timeout) {
             super(timeout);
             this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
+            this.from = from;
         }
 
         public Request(StreamInput in) throws IOException {
             super(in);
             id = in.readString();
+            if (in.getVersion().onOrAfter(Version.V_8_7_0)) {
+                from = in.readOptionalInstant();
+            } else {
+                from = null;
+            }
         }
 
         public String getId() {
             return id;
         }
 
+        public Instant from() {
+            return from;
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeString(id);
+            if (out.getVersion().onOrAfter(Version.V_8_7_0)) {
+                out.writeOptionalInstant(from);
+            }
         }
 
         @Override
@@ -64,7 +80,7 @@ public class StartTransformAction extends ActionType<StartTransformAction.Respon
         @Override
         public int hashCode() {
             // the base class does not implement hashCode, therefore we need to hash timeout ourselves
-            return Objects.hash(timeout(), id);
+            return Objects.hash(timeout(), id, from);
         }
 
         @Override
@@ -77,7 +93,7 @@ public class StartTransformAction extends ActionType<StartTransformAction.Respon
             }
             Request other = (Request) obj;
             // the base class does not implement equals, therefore we need to check timeout ourselves
-            return Objects.equals(id, other.id) && timeout().equals(other.timeout());
+            return Objects.equals(id, other.id) && Objects.equals(from, other.from) && timeout().equals(other.timeout());
         }
     }
 

+ 7 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java

@@ -46,7 +46,12 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
  */
 public class TransformCheckpoint implements Writeable, ToXContentObject {
 
-    public static TransformCheckpoint EMPTY = new TransformCheckpoint("empty", 0L, -1L, Collections.emptyMap(), 0L);
+    public static String EMPTY_NAME = "_empty";
+    public static TransformCheckpoint EMPTY = createEmpty(0);
+
+    public static TransformCheckpoint createEmpty(long timestampMillis) {
+        return new TransformCheckpoint(EMPTY_NAME, timestampMillis, -1L, Collections.emptyMap(), timestampMillis);
+    }
 
     // the own checkpoint
     public static final ParseField CHECKPOINT = new ParseField("checkpoint");
@@ -128,7 +133,7 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
     }
 
     public boolean isEmpty() {
-        return this.equals(EMPTY);
+        return EMPTY_NAME.equals(transformId) && checkpoint == -1;
     }
 
     /**

+ 29 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java

@@ -20,44 +20,54 @@ import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.transform.TransformField;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Objects;
 
 public class TransformTaskParams implements SimpleDiffable<TransformTaskParams>, PersistentTaskParams {
 
     public static final String NAME = TransformField.TASK_NAME;
+    public static final ParseField FROM = TransformField.FROM;
     public static final ParseField FREQUENCY = TransformField.FREQUENCY;
     public static final ParseField REQUIRES_REMOTE = new ParseField("requires_remote");
 
     private final String transformId;
     private final Version version;
+    private final Instant from;
     private final TimeValue frequency;
     private final Boolean requiresRemote;
 
     public static final ConstructingObjectParser<TransformTaskParams, Void> PARSER = new ConstructingObjectParser<>(
         NAME,
         true,
-        a -> new TransformTaskParams((String) a[0], (String) a[1], (String) a[2], (Boolean) a[3])
+        a -> new TransformTaskParams((String) a[0], (String) a[1], (Long) a[2], (String) a[3], (Boolean) a[4])
     );
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), TransformField.ID);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TransformField.VERSION);
+        PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), FROM);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
         PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REQUIRES_REMOTE);
     }
 
-    private TransformTaskParams(String transformId, String version, String frequency, Boolean remote) {
+    private TransformTaskParams(String transformId, String version, Long from, String frequency, Boolean remote) {
         this(
             transformId,
             version == null ? null : Version.fromString(version),
+            from == null ? null : Instant.ofEpochMilli(from),
             frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()),
             remote == null ? false : remote.booleanValue()
         );
     }
 
     public TransformTaskParams(String transformId, Version version, TimeValue frequency, boolean remote) {
+        this(transformId, version, null, frequency, remote);
+    }
+
+    public TransformTaskParams(String transformId, Version version, Instant from, TimeValue frequency, boolean remote) {
         this.transformId = transformId;
         this.version = version == null ? Version.V_7_2_0 : version;
+        this.from = from;
         this.frequency = frequency;
         this.requiresRemote = remote;
     }
@@ -65,6 +75,11 @@ public class TransformTaskParams implements SimpleDiffable<TransformTaskParams>,
     public TransformTaskParams(StreamInput in) throws IOException {
         this.transformId = in.readString();
         this.version = Version.readVersion(in);
+        if (in.getVersion().onOrAfter(Version.V_8_7_0)) {
+            this.from = in.readOptionalInstant();
+        } else {
+            this.from = null;
+        }
         this.frequency = in.readOptionalTimeValue();
         this.requiresRemote = in.readBoolean();
     }
@@ -83,6 +98,9 @@ public class TransformTaskParams implements SimpleDiffable<TransformTaskParams>,
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(transformId);
         Version.writeVersion(version, out);
+        if (out.getVersion().onOrAfter(Version.V_8_7_0)) {
+            out.writeOptionalInstant(from);
+        }
         out.writeOptionalTimeValue(frequency);
         out.writeBoolean(requiresRemote);
     }
@@ -92,6 +110,9 @@ public class TransformTaskParams implements SimpleDiffable<TransformTaskParams>,
         builder.startObject();
         builder.field(TransformField.ID.getPreferredName(), transformId);
         builder.field(TransformField.VERSION.getPreferredName(), version);
+        if (from != null) {
+            builder.field(FROM.getPreferredName(), from.toEpochMilli());
+        }
         if (frequency != null) {
             builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
         }
@@ -108,6 +129,10 @@ public class TransformTaskParams implements SimpleDiffable<TransformTaskParams>,
         return version;
     }
 
+    public Instant from() {
+        return from;
+    }
+
     public TimeValue getFrequency() {
         return frequency;
     }
@@ -134,12 +159,13 @@ public class TransformTaskParams implements SimpleDiffable<TransformTaskParams>,
 
         return Objects.equals(this.transformId, that.transformId)
             && Objects.equals(this.version, that.version)
+            && Objects.equals(this.from, that.from)
             && Objects.equals(this.frequency, that.frequency)
             && this.requiresRemote == that.requiresRemote;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(transformId, version, frequency, requiresRemote);
+        return Objects.hash(transformId, version, from, frequency, requiresRemote);
     }
 }

+ 12 - 4
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StartTransformActionRequestTests.java

@@ -13,11 +13,17 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.xpack.core.transform.action.StartTransformAction.Request;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
 
 public class StartTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
     @Override
     protected Request createTestInstance() {
-        return new Request(randomAlphaOfLengthBetween(1, 20), TimeValue.parseTimeValue(randomTimeValue(), "timeout"));
+        return new Request(
+            randomAlphaOfLengthBetween(1, 20),
+            randomBoolean() ? Instant.ofEpochMilli(randomNonNegativeLong()) : null,
+            TimeValue.parseTimeValue(randomTimeValue(), "timeout")
+        );
     }
 
     @Override
@@ -28,14 +34,16 @@ public class StartTransformActionRequestTests extends AbstractWireSerializingTes
     @Override
     protected Request mutateInstance(Request instance) throws IOException {
         String id = instance.getId();
+        Instant from = instance.from();
         TimeValue timeout = instance.timeout();
 
-        switch (between(0, 1)) {
+        switch (between(0, 2)) {
             case 0 -> id += randomAlphaOfLengthBetween(1, 5);
-            case 1 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit());
+            case 1 -> from = from != null ? from.plus(Duration.ofDays(1)) : Instant.ofEpochMilli(randomNonNegativeLong());
+            case 2 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit());
             default -> throw new AssertionError("Illegal randomization branch");
         }
 
-        return new Request(id, timeout);
+        return new Request(id, from, timeout);
     }
 }

+ 11 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java

@@ -112,7 +112,18 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa
 
     public void testEmpty() {
         assertTrue(TransformCheckpoint.EMPTY.isEmpty());
+        assertTrue(new TransformCheckpoint("_empty", 123L, -1, Collections.emptyMap(), 456L).isEmpty());
         assertFalse(new TransformCheckpoint("some_id", 0L, -1, Collections.emptyMap(), 0L).isEmpty());
+        assertFalse(new TransformCheckpoint("some_id", 0L, 0, Collections.emptyMap(), 0L).isEmpty());
+        assertFalse(new TransformCheckpoint("some_id", 0L, 1, Collections.emptyMap(), 0L).isEmpty());
+    }
+
+    public void testTransient() {
+        assertTrue(TransformCheckpoint.EMPTY.isTransient());
+        assertTrue(new TransformCheckpoint("_empty", 123L, -1, Collections.emptyMap(), 456L).isTransient());
+        assertTrue(new TransformCheckpoint("some_id", 0L, -1, Collections.emptyMap(), 0L).isTransient());
+        assertFalse(new TransformCheckpoint("some_id", 0L, 0, Collections.emptyMap(), 0L).isTransient());
+        assertFalse(new TransformCheckpoint("some_id", 0L, 1, Collections.emptyMap(), 0L).isTransient());
     }
 
     public void testGetBehind() {

+ 2 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParamsTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
 
 import java.io.IOException;
+import java.time.Instant;
 
 public class TransformTaskParamsTests extends AbstractSerializingTransformTestCase<TransformTaskParams> {
 
@@ -21,6 +22,7 @@ public class TransformTaskParamsTests extends AbstractSerializingTransformTestCa
         return new TransformTaskParams(
             randomAlphaOfLengthBetween(1, 10),
             randomBoolean() ? VersionUtils.randomVersion(random()) : null,
+            randomBoolean() ? Instant.ofEpochMilli(randomLongBetween(0, 1_000_000_000_000L)) : null,
             randomBoolean() ? TimeValue.timeValueSeconds(randomLongBetween(1, 24 * 60 * 60)) : null,
             randomBoolean()
         );

+ 2 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
 
 import java.io.IOException;
+import java.time.Instant;
 
 public class TransformTests extends AbstractSerializingTransformTestCase<TransformTaskParams> {
 
@@ -27,6 +28,7 @@ public class TransformTests extends AbstractSerializingTransformTestCase<Transfo
         return new TransformTaskParams(
             randomAlphaOfLength(10),
             randomBoolean() ? null : Version.CURRENT,
+            randomBoolean() ? Instant.ofEpochMilli(randomLongBetween(0, 1_000_000_000_000L)) : null,
             randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
             randomBoolean()
         );

+ 25 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_start_stop.yml

@@ -256,6 +256,31 @@ teardown:
         transform_id: "airline-transform-start-stop-continuous"
         wait_for_completion: true
   - match: { acknowledged: true }
+
+---
+"Test start transform with empty value of from parameter":
+  - do:
+      catch: /Failed to parse date for \[from\]/
+      transform.start_transform:
+        from: ""
+        transform_id: "airline-transform-start-stop-continuous"
+
+---
+"Test start transform with invalid value of from parameter":
+  - do:
+      catch: /Failed to parse date for \[from\]/
+      transform.start_transform:
+        from: "not-a-valid-timestamp"
+        transform_id: "airline-transform-start-stop-continuous"
+
+---
+"Test start batch transform with from parameter":
+  - do:
+      catch: /\[from\] parameter is currently not supported for batch \(non-continuous\) transforms/
+      transform.start_transform:
+        from: "2023-01-11T12:00:00"
+        transform_id: "airline-transform-start-stop"
+
 ---
 "Test stop missing transform":
   - do:

+ 76 - 0
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

@@ -499,6 +499,82 @@ public class TransformPivotRestIT extends TransformRestTestCase {
         assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_42", 2.0);
     }
 
+    public void testContinuousPivotFrom() throws Exception {
+        String indexName = "continuous_reviews_from";
+        createReviewsIndex(indexName);
+        String transformId = "continuous_pivot_from";
+        String transformIndex = "pivot_reviews_continuous_from";
+        setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
+        final Request createTransformRequest = createRequestWithAuth(
+            "PUT",
+            getTransformEndpoint() + transformId,
+            BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
+        );
+        String config = Strings.format("""
+            {
+              "source": {
+                "index": "%s"
+              },
+              "dest": {
+                "index": "%s"
+              },
+              "frequency": "1s",
+              "sync": {
+                "time": {
+                  "field": "timestamp",
+                  "delay": "1s"
+                }
+              },
+              "pivot": {
+                "group_by": {
+                  "reviewer": {
+                    "terms": {
+                      "field": "user_id",
+                      "missing_bucket": true
+                    }
+                  }
+                },
+                "aggregations": {
+                  "avg_rating": {
+                    "avg": {
+                      "field": "stars"
+                    }
+                  }
+                }
+              }
+            }""", indexName, transformIndex);
+        createTransformRequest.setJsonEntity(config);
+        Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
+        assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+
+        final StringBuilder bulk = new StringBuilder();
+        bulk.append(Strings.format("""
+            {"index":{"_index":"%s"}}
+            {"user_id":"user_%s","business_id":"business_%s","stars":%s,"location":"%s","timestamp":%s}
+            """, indexName, 666, 777, 7, 888, "\"2017-01-20\""));
+        bulk.append("\r\n");
+
+        final Request bulkRequest = new Request("POST", "/_bulk");
+        bulkRequest.addParameter("refresh", "true");
+        bulkRequest.setJsonEntity(bulk.toString());
+        Map<String, Object> bulkResponse = entityAsMap(client().performRequest(bulkRequest));
+        assertThat(bulkResponse.get("errors"), equalTo(Boolean.FALSE));
+
+        startAndWaitForContinuousTransform(transformId, transformIndex, null, "2017-01-23", 1L);
+        assertTrue(indexExists(transformIndex));
+
+        assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", getAsMap(transformIndex + "/_stats")));
+
+        // get and check some users
+        assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
+        assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
+        assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769);
+        assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
+
+        stopTransform(transformId, false);
+        deleteIndex(indexName);
+    }
+
     public void testHistogramPivot() throws Exception {
         String transformId = "simple_histogram_pivot";
         String transformIndex = "pivot_reviews_via_histogram";

+ 15 - 12
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -400,15 +400,10 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
     }
 
     protected void startTransform(String transformId) throws IOException {
-        startTransform(transformId, null);
+        startTransform(transformId, null, null, null);
     }
 
-    protected void startTransform(String transformId, String authHeader, String... warnings) throws IOException {
-        // start the transform
-        startTransform(transformId, authHeader, null, warnings);
-    }
-
-    protected void startTransform(String transformId, String authHeader, String secondaryAuthHeader, String... warnings)
+    protected void startTransform(String transformId, String authHeader, String secondaryAuthHeader, String from, String... warnings)
         throws IOException {
         // start the transform
         final Request startTransformRequest = createRequestWithSecondaryAuth(
@@ -417,6 +412,9 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
             authHeader,
             secondaryAuthHeader
         );
+        if (from != null) {
+            startTransformRequest.addParameter(TransformField.FROM.getPreferredName(), from);
+        }
         if (warnings.length > 0) {
             startTransformRequest.setOptions(expectWarnings(warnings));
         }
@@ -462,7 +460,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
         String... warnings
     ) throws Exception {
         // start the transform
-        startTransform(transformId, authHeader, secondaryAuthHeader, warnings);
+        startTransform(transformId, authHeader, secondaryAuthHeader, null, warnings);
         assertTrue(indexExists(transformIndex));
         // wait until the transform has been created and all data is available
         waitForTransformCheckpoint(transformId);
@@ -472,13 +470,18 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
     }
 
     protected void startAndWaitForContinuousTransform(String transformId, String transformIndex, String authHeader) throws Exception {
-        startAndWaitForContinuousTransform(transformId, transformIndex, authHeader, 1L);
+        startAndWaitForContinuousTransform(transformId, transformIndex, authHeader, null, 1L);
     }
 
-    protected void startAndWaitForContinuousTransform(String transformId, String transformIndex, String authHeader, long checkpoint)
-        throws Exception {
+    protected void startAndWaitForContinuousTransform(
+        String transformId,
+        String transformIndex,
+        String authHeader,
+        String from,
+        long checkpoint
+    ) throws Exception {
         // start the transform
-        startTransform(transformId, authHeader, new String[0]);
+        startTransform(transformId, authHeader, null, from, new String[0]);
         assertTrue(indexExists(transformIndex));
         // wait until the transform has been created and all data is available
         waitForTransformCheckpoint(transformId, checkpoint);

+ 1 - 0
x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformOldTransformsIT.java

@@ -123,6 +123,7 @@ public class TransformOldTransformsIT extends TransformSingleNodeTestCase {
 
         StartTransformAction.Request startTransformRequest = new StartTransformAction.Request(
             transformId,
+            null,
             AcknowledgedRequest.DEFAULT_ACK_TIMEOUT
         );
 

+ 10 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

@@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.IndicesOptions;
@@ -21,7 +22,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
+import static org.elasticsearch.action.ValidateActions.addValidationError;
 import static org.elasticsearch.core.Strings.format;
 import static org.elasticsearch.xpack.core.transform.TransformMessages.CANNOT_START_FAILED_TRANSFORM;
 
@@ -203,7 +204,13 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
 
         // <2> run transform validations
         ActionListener<TransformConfig> getTransformListener = ActionListener.wrap(config -> {
-            ValidationException validationException = config.validate(null);
+            ActionRequestValidationException validationException = config.validate(null);
+            if (request.from() != null && config.getSyncConfig() == null) {
+                validationException = addValidationError(
+                    "[from] parameter is currently not supported for batch (non-continuous) transforms",
+                    validationException
+                );
+            }
             if (validationException != null) {
                 listener.onFailure(
                     new ElasticsearchStatusException(
@@ -221,6 +228,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
                 new TransformTaskParams(
                     config.getId(),
                     config.getVersion(),
+                    request.from(),
                     config.getFrequency(),
                     config.getSource().requiresRemoteCluster()
                 )

+ 20 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStartTransformAction.java

@@ -7,16 +7,23 @@
 
 package org.elasticsearch.xpack.transform.rest.action;
 
+import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.common.time.DateMathParser;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xpack.core.transform.TransformField;
+import org.elasticsearch.xpack.core.transform.TransformMessages;
 import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
 
+import java.time.Instant;
 import java.util.List;
+import java.util.function.LongSupplier;
 
 import static org.elasticsearch.rest.RestRequest.Method.POST;
 
@@ -30,12 +37,24 @@ public class RestStartTransformAction extends BaseRestHandler {
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
         String id = restRequest.param(TransformField.ID.getPreferredName());
+        String fromAsString = restRequest.param(TransformField.FROM.getPreferredName());
+        Instant from = fromAsString != null ? parseDateOrThrow(fromAsString, TransformField.FROM, System::currentTimeMillis) : null;
         TimeValue timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
 
-        StartTransformAction.Request request = new StartTransformAction.Request(id, timeout);
+        StartTransformAction.Request request = new StartTransformAction.Request(id, from, timeout);
         return channel -> client.execute(StartTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 
+    private static Instant parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
+        DateMathParser dateMathParser = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.toDateMathParser();
+        try {
+            return dateMathParser.parse(date, now);
+        } catch (Exception e) {
+            String msg = TransformMessages.getMessage(TransformMessages.FAILED_TO_PARSE_DATE, paramName.getPreferredName(), date);
+            throw new ElasticsearchParseException(msg, e);
+        }
+    }
+
     @Override
     public String getName() {
         return "transform_start_transform_action";

+ 4 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java

@@ -52,7 +52,10 @@ class ClientTransformIndexerBuilder {
             initialStats,
             transformConfig,
             progress,
-            TransformCheckpoint.isNullOrEmpty(lastCheckpoint) ? TransformCheckpoint.EMPTY : lastCheckpoint,
+            // If there already exists at least one checkpoint, the "from" setting is effectively ignored.
+            TransformCheckpoint.isNullOrEmpty(lastCheckpoint)
+                ? (context.from() != null ? TransformCheckpoint.createEmpty(context.from().toEpochMilli()) : TransformCheckpoint.EMPTY)
+                : lastCheckpoint,
             TransformCheckpoint.isNullOrEmpty(nextCheckpoint) ? TransformCheckpoint.EMPTY : nextCheckpoint,
             seqNoPrimaryTermAndIndex,
             context,

+ 11 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java

@@ -47,10 +47,17 @@ public class TransformContext {
     // Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
     private final AtomicLong currentCheckpoint;
 
+    private final Instant from;
+
     public TransformContext(TransformTaskState taskState, String stateReason, long currentCheckpoint, Listener taskListener) {
+        this(taskState, stateReason, currentCheckpoint, null, taskListener);
+    }
+
+    public TransformContext(TransformTaskState taskState, String stateReason, long currentCheckpoint, Instant from, Listener taskListener) {
         this.taskState = new AtomicReference<>(taskState);
         this.stateReason = new AtomicReference<>(stateReason);
         this.currentCheckpoint = new AtomicLong(currentCheckpoint);
+        this.from = from;
         this.taskListener = taskListener;
         this.failureCount = new AtomicInteger(0);
     }
@@ -99,6 +106,10 @@ public class TransformContext {
         return currentCheckpoint.get();
     }
 
+    Instant from() {
+        return from;
+    }
+
     long incrementAndGetCheckpoint() {
         return currentCheckpoint.incrementAndGet();
     }

+ 5 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -1097,7 +1097,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
                 .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint));
 
             // Only apply extra filter if it is the subsequent run of the continuous transform
-            if (nextCheckpoint.getCheckpoint() > 1 && changeCollector != null) {
+            if (changeCollector != null) {
                 QueryBuilder filter = changeCollector.buildFilterQuery(lastCheckpoint, nextCheckpoint);
                 if (filter != null) {
                     filteredQuery.filter(filter);
@@ -1154,6 +1154,10 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
     }
 
     private RunState determineRunStateAtStart() {
+        if (context.from() != null) {
+            return RunState.IDENTIFY_CHANGES;
+        }
+
         // either 1st run or not a continuous transform
         if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
             return RunState.APPLY_RESULTS;

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

@@ -114,7 +114,7 @@ public class TransformTask extends AllocatedPersistentTask implements TransformS
         this.initialIndexerState = initialState;
         this.initialPosition = initialPosition;
 
-        this.context = new TransformContext(initialTaskState, initialReason, initialCheckpoint, this);
+        this.context = new TransformContext(initialTaskState, initialReason, initialCheckpoint, transform.from(), this);
     }
 
     public ParentTaskAssigningClient getParentTaskClient() {

+ 10 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java

@@ -384,6 +384,10 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
             if (missingBucket) {
                 return null;
             }
+            // filterByChanges has been called before collectChangesFromAggregations
+            if (lowerBound == 0 && upperBound == 0) {
+                return null;
+            }
             return new RangeQueryBuilder(sourceFieldName).gte(lowerBound).lte(upperBound).format("epoch_millis");
         }
 
@@ -480,11 +484,15 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
 
         @Override
         public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) {
+
             if (missingBucket) {
                 return null;
             }
-
-            // (upperBound - lowerBound) >= interval, so never 0
+            // filterByChanges has been called before collectChangesFromAggregations
+            if (lowerBound == 0 && upperBound == 0) {
+                return null;
+            }
+            // (upperBound - lowerBound) >= interval, so never 0.
             if ((maxUpperBound - minLowerBound) / (upperBound - lowerBound) < MIN_CUT_OFF) {
                 return null;
             }

+ 61 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestStartTransformActionTests.java

@@ -0,0 +1,61 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.rest.action;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.rest.FakeRestRequest;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
+
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+
+public class RestStartTransformActionTests extends ESTestCase {
+
+    private static final String ID = "id";
+    private static final String FROM = "from";
+
+    public void testFromValid() throws Exception {
+        testFromValid(null);
+        testFromValid("12345678");
+        testFromValid("2022-10-25");
+        testFromValid("now-1d");
+    }
+
+    private void testFromValid(String from) {
+        RestStartTransformAction handler = new RestStartTransformAction();
+        FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY);
+        if (from == null) {
+            requestBuilder.withParams(Map.of(ID, "my-id"));
+        } else {
+            requestBuilder.withParams(Map.of(ID, "my-id", FROM, from));
+        }
+        FakeRestRequest request = requestBuilder.build();
+        handler.prepareRequest(request, mock(NodeClient.class));
+    }
+
+    public void testFromInvalid() {
+        testFromInvalid("");
+        testFromInvalid("not-a-valid-timestamp");
+        testFromInvalid("2023-17-42");
+    }
+
+    private void testFromInvalid(String from) {
+        final RestStartTransformAction handler = new RestStartTransformAction();
+        final FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(Map.of(ID, "my-id", FROM, from))
+            .build();
+        ElasticsearchParseException e = expectThrows(
+            ElasticsearchParseException.class,
+            () -> handler.prepareRequest(request, mock(NodeClient.class))
+        );
+        assertThat(e.getMessage(), equalTo("Failed to parse date for [from]"));
+    }
+}

+ 8 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java

@@ -12,6 +12,8 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.junit.After;
 import org.junit.Before;
 
+import java.time.Instant;
+
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
@@ -88,4 +90,10 @@ public class TransformContextTests extends ESTestCase {
 
         verify(listener).failureCountChanged();
     }
+
+    public void testFrom() {
+        Instant from = Instant.ofEpochMilli(randomLongBetween(0, 1_000_000_000_000L));
+        TransformContext context = new TransformContext(TransformTaskState.STARTED, null, 0, from, listener);
+        assertThat(context.from(), is(equalTo(from)));
+    }
 }