Browse Source

[ML] Data Frame HLRC start & stop APIs (#40154)

David Kyle 6 years ago
parent
commit
1f62a7aba8
18 changed files with 1067 additions and 4 deletions
  1. 85 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameClient.java
  2. 34 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java
  3. 99 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/core/AcknowledgedTasksResponse.java
  4. 84 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StartDataFrameTransformRequest.java
  5. 51 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StartDataFrameTransformResponse.java
  6. 98 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformRequest.java
  7. 51 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformResponse.java
  8. 56 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java
  9. 41 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java
  10. 151 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/core/AcknowledgedTasksResponseTests.java
  11. 42 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/StartDataFrameTransformRequestTests.java
  12. 42 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/StopDataFrameTransformRequestTests.java
  13. 141 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java
  14. 37 0
      docs/java-rest/high-level/dataframe/start_data_frame.asciidoc
  15. 38 0
      docs/java-rest/high-level/dataframe/stop_data_frame.asciidoc
  16. 5 1
      docs/java-rest/high-level/supported-apis.asciidoc
  17. 5 3
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java
  18. 7 0
      x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.start_data_frame_transform.json

+ 85 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameClient.java

@@ -23,6 +23,10 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.core.AcknowledgedResponse;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
+import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -115,4 +119,85 @@ public final class DataFrameClient {
                 listener,
                 Collections.emptySet());
     }
+
+
+    /**
+     * Start a data frame transform
+     * <p>
+     * For additional info
+     * see <a href="https://www.TODO.com">Start Data Frame transform documentation</a>
+     *
+     * @param request The start data frame transform request
+     * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return A response object indicating request success
+     * @throws IOException when there is a serialization issue sending the request or receiving the response
+     */
+    public StartDataFrameTransformResponse startDataFrameTransform(StartDataFrameTransformRequest request, RequestOptions options)
+            throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(request,
+                DataFrameRequestConverters::startDataFrameTransform,
+                options,
+                StartDataFrameTransformResponse::fromXContent,
+                Collections.emptySet());
+    }
+
+    /**
+     * Start a data frame transform asynchronously and notifies listener on completion
+     * <p>
+     * For additional info
+     * see <a href="https://www.TODO.com">Start Data Frame transform documentation</a>
+     *
+     * @param request The start data frame transform request
+     * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @param listener Listener to be notified upon request completion
+     */
+    public void startDataFrameTransformAsync(StartDataFrameTransformRequest request, RequestOptions options,
+                                            ActionListener<StartDataFrameTransformResponse> listener) {
+        restHighLevelClient.performRequestAsyncAndParseEntity(request,
+                DataFrameRequestConverters::startDataFrameTransform,
+                options,
+                StartDataFrameTransformResponse::fromXContent,
+                listener,
+                Collections.emptySet());
+    }
+
+    /**
+     * Stop a data frame transform
+     * <p>
+     * For additional info
+     * see <a href="https://www.TODO.com">Stop Data Frame transform documentation</a>
+     *
+     * @param request The stop data frame transform request
+     * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return A response object indicating request success
+     * @throws IOException when there is a serialization issue sending the request or receiving the response
+     */
+    public StopDataFrameTransformResponse stopDataFrameTransform(StopDataFrameTransformRequest request, RequestOptions options)
+            throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(request,
+                DataFrameRequestConverters::stopDataFrameTransform,
+                options,
+                StopDataFrameTransformResponse::fromXContent,
+                Collections.emptySet());
+    }
+
+    /**
+     * Stop a data frame transform asynchronously and notifies listener on completion
+     * <p>
+     * For additional info
+     * see <a href="https://www.TODO.com">Stop Data Frame transform documentation</a>
+     *
+     * @param request The stop data frame transform request
+     * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @param listener Listener to be notified upon request completion
+     */
+    public void stopDataFrameTransformAsync(StopDataFrameTransformRequest request, RequestOptions options,
+                                              ActionListener<StopDataFrameTransformResponse> listener) {
+        restHighLevelClient.performRequestAsyncAndParseEntity(request,
+                DataFrameRequestConverters::stopDataFrameTransform,
+                options,
+                StopDataFrameTransformResponse::fromXContent,
+                listener,
+                Collections.emptySet());
+    }
 }

+ 34 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java

@@ -20,9 +20,12 @@
 package org.elasticsearch.client;
 
 import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
 
 import java.io.IOException;
 
@@ -50,4 +53,35 @@ final class DataFrameRequestConverters {
                 .build();
         return new Request(HttpDelete.METHOD_NAME, endpoint);
     }
+
+    static Request startDataFrameTransform(StartDataFrameTransformRequest startRequest) {
+        String endpoint = new RequestConverters.EndpointBuilder()
+                .addPathPartAsIs("_data_frame", "transforms")
+                .addPathPart(startRequest.getId())
+                .addPathPartAsIs("_start")
+                .build();
+        Request request = new Request(HttpPost.METHOD_NAME, endpoint);
+        RequestConverters.Params params = new RequestConverters.Params(request);
+        if (startRequest.getTimeout() != null) {
+            params.withTimeout(startRequest.getTimeout());
+        }
+        return request;
+    }
+
+    static Request stopDataFrameTransform(StopDataFrameTransformRequest stopRequest) {
+            String endpoint = new RequestConverters.EndpointBuilder()
+                    .addPathPartAsIs("_data_frame", "transforms")
+                    .addPathPart(stopRequest.getId())
+                    .addPathPartAsIs("_stop")
+                    .build();
+            Request request = new Request(HttpPost.METHOD_NAME, endpoint);
+            RequestConverters.Params params = new RequestConverters.Params(request);
+            if (stopRequest.getWaitForCompletion() != null) {
+                params.withWaitForCompletion(stopRequest.getWaitForCompletion());
+            }
+            if (stopRequest.getTimeout() != null) {
+                params.withTimeout(stopRequest.getTimeout());
+            }
+            return request;
+    }
 }

+ 99 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/core/AcknowledgedTasksResponse.java

@@ -0,0 +1,99 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.core;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.TaskOperationFailure;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.TriFunction;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+public class AcknowledgedTasksResponse {
+
+    protected static final ParseField TASK_FAILURES = new ParseField("task_failures");
+    protected static final ParseField NODE_FAILURES = new ParseField("node_failures");
+
+    @SuppressWarnings("unchecked")
+    protected static <T extends AcknowledgedTasksResponse> ConstructingObjectParser<T, Void> generateParser(
+            String name,
+            TriFunction<Boolean, List<TaskOperationFailure>, List<? extends ElasticsearchException>, T> ctor,
+            String ackFieldName) {
+
+        ConstructingObjectParser<T, Void> parser = new ConstructingObjectParser<>(name, true,
+                args -> ctor.apply((boolean) args[0], (List<TaskOperationFailure>) args[1], (List<ElasticsearchException>) args[2]));
+        parser.declareBoolean(constructorArg(), new ParseField(ackFieldName));
+        parser.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p), TASK_FAILURES);
+        parser.declareObjectArray(optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), NODE_FAILURES);
+        return parser;
+    }
+
+    private boolean acknowledged;
+    private List<TaskOperationFailure> taskFailures;
+    private List<ElasticsearchException> nodeFailures;
+
+    public AcknowledgedTasksResponse(boolean acknowledged, @Nullable List<TaskOperationFailure> taskFailures,
+                                     @Nullable List<? extends ElasticsearchException> nodeFailures) {
+        this.acknowledged = acknowledged;
+        this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(taskFailures));
+        this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures));
+    }
+
+    public boolean isAcknowledged() {
+        return acknowledged;
+    }
+
+    public List<TaskOperationFailure> getTaskFailures() {
+        return taskFailures;
+    }
+
+    public List<ElasticsearchException> getNodeFailures() {
+        return nodeFailures;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        AcknowledgedTasksResponse other = (AcknowledgedTasksResponse) obj;
+        return acknowledged == other.acknowledged
+                && taskFailures.equals(other.taskFailures)
+                && nodeFailures.equals(other.nodeFailures);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(acknowledged, taskFailures, nodeFailures);
+    }
+}

+ 84 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StartDataFrameTransformRequest.java

@@ -0,0 +1,84 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.client.Validatable;
+import org.elasticsearch.client.ValidationException;
+import org.elasticsearch.common.unit.TimeValue;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class StartDataFrameTransformRequest implements Validatable {
+
+    private final String id;
+    private TimeValue timeout;
+
+    public StartDataFrameTransformRequest(String id) {
+        this.id = id;
+    }
+
+    public StartDataFrameTransformRequest(String id, TimeValue timeout) {
+        this.id = id;
+        this.timeout = timeout;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public TimeValue getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(TimeValue timeout) {
+        this.timeout = timeout;
+    }
+
+    @Override
+    public Optional<ValidationException> validate() {
+        if (id == null) {
+            ValidationException validationException = new ValidationException();
+            validationException.addValidationError("data frame transform id must not be null");
+            return Optional.of(validationException);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, timeout);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        StartDataFrameTransformRequest other = (StartDataFrameTransformRequest) obj;
+        return Objects.equals(this.id, other.id)
+                && Objects.equals(this.timeout, other.timeout);
+    }
+}

+ 51 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StartDataFrameTransformResponse.java

@@ -0,0 +1,51 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.TaskOperationFailure;
+import org.elasticsearch.client.core.AcknowledgedTasksResponse;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StartDataFrameTransformResponse extends AcknowledgedTasksResponse {
+
+    private static final String STARTED = "started";
+
+    private static final ConstructingObjectParser<StartDataFrameTransformResponse, Void> PARSER =
+            AcknowledgedTasksResponse.generateParser("start_data_frame_transform_response", StartDataFrameTransformResponse::new, STARTED);
+
+    public static StartDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    public StartDataFrameTransformResponse(boolean started, @Nullable List<TaskOperationFailure> taskFailures,
+                                          @Nullable List<? extends ElasticsearchException> nodeFailures) {
+        super(started, taskFailures, nodeFailures);
+    }
+
+    public boolean isStarted() {
+        return isAcknowledged();
+    }
+}

+ 98 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformRequest.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.client.Validatable;
+import org.elasticsearch.client.ValidationException;
+import org.elasticsearch.common.unit.TimeValue;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class StopDataFrameTransformRequest implements Validatable {
+
+    private final String id;
+    private Boolean waitForCompletion;
+    private TimeValue timeout;
+
+    public StopDataFrameTransformRequest(String id) {
+        this.id = id;
+        waitForCompletion = null;
+        timeout = null;
+    }
+
+    public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) {
+        this.id = id;
+        this.waitForCompletion = waitForCompletion;
+        this.timeout = timeout;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setWaitForCompletion(Boolean waitForCompletion) {
+        this.waitForCompletion = waitForCompletion;
+    }
+
+    public Boolean getWaitForCompletion() {
+        return waitForCompletion;
+    }
+
+    public void setTimeout(TimeValue timeout) {
+        this.timeout = timeout;
+    }
+
+    public TimeValue getTimeout() {
+        return timeout;
+    }
+
+    @Override
+    public Optional<ValidationException> validate() {
+        if (id == null) {
+            ValidationException validationException = new ValidationException();
+            validationException.addValidationError("data frame transform id must not be null");
+            return Optional.of(validationException);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, waitForCompletion, timeout);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        StopDataFrameTransformRequest other = (StopDataFrameTransformRequest) obj;
+        return Objects.equals(this.id, other.id)
+                && Objects.equals(this.waitForCompletion, other.waitForCompletion)
+                && Objects.equals(this.timeout, other.timeout);
+    }
+
+}

+ 51 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformResponse.java

@@ -0,0 +1,51 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.TaskOperationFailure;
+import org.elasticsearch.client.core.AcknowledgedTasksResponse;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StopDataFrameTransformResponse extends AcknowledgedTasksResponse {
+
+    private static final String STOPPED = "stopped";
+
+    private static final ConstructingObjectParser<StopDataFrameTransformResponse, Void> PARSER =
+            AcknowledgedTasksResponse.generateParser("stop_data_frame_transform_response", StopDataFrameTransformResponse::new, STOPPED);
+
+    public static StopDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    public StopDataFrameTransformResponse(boolean stopped, @Nullable List<TaskOperationFailure> taskFailures,
+                                          @Nullable List<? extends ElasticsearchException> nodeFailures) {
+        super(stopped, taskFailures, nodeFailures);
+    }
+
+    public boolean isStopped() {
+        return isAcknowledged();
+    }
+}

+ 56 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java

@@ -20,12 +20,16 @@
 package org.elasticsearch.client;
 
 import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
@@ -66,4 +70,56 @@ public class DataFrameRequestConvertersTests extends ESTestCase {
         assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
         assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/foo"));
     }
+
+    public void testStartDataFrameTransform() {
+        String id = randomAlphaOfLength(10);
+        TimeValue timeValue = null;
+        if (randomBoolean()) {
+            timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
+        }
+        StartDataFrameTransformRequest startRequest = new StartDataFrameTransformRequest(id, timeValue);
+
+        Request request = DataFrameRequestConverters.startDataFrameTransform(startRequest);
+        assertEquals(HttpPost.METHOD_NAME, request.getMethod());
+        assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + startRequest.getId() + "/_start"));
+
+        if (timeValue != null) {
+            assertTrue(request.getParameters().containsKey("timeout"));
+            assertEquals(startRequest.getTimeout(), TimeValue.parseTimeValue(request.getParameters().get("timeout"), "timeout"));
+        } else {
+            assertFalse(request.getParameters().containsKey("timeout"));
+        }
+    }
+
+    public void testStopDataFrameTransform() {
+        String id = randomAlphaOfLength(10);
+        Boolean waitForCompletion = null;
+        if (randomBoolean()) {
+            waitForCompletion = randomBoolean();
+        }
+        TimeValue timeValue = null;
+        if (randomBoolean()) {
+            timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
+        }
+        StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue);
+
+
+        Request request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest);
+        assertEquals(HttpPost.METHOD_NAME, request.getMethod());
+        assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + stopRequest.getId() + "/_stop"));
+
+        if (waitForCompletion != null) {
+            assertTrue(request.getParameters().containsKey("wait_for_completion"));
+            assertEquals(stopRequest.getWaitForCompletion(), Boolean.parseBoolean(request.getParameters().get("wait_for_completion")));
+        } else {
+            assertFalse(request.getParameters().containsKey("wait_for_completion"));
+        }
+
+        if (timeValue != null) {
+            assertTrue(request.getParameters().containsKey("timeout"));
+            assertEquals(stopRequest.getTimeout(), TimeValue.parseTimeValue(request.getParameters().get("timeout"), "timeout"));
+        } else {
+            assertFalse(request.getParameters().containsKey("timeout"));
+        }
+    }
 }

+ 41 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

@@ -23,6 +23,10 @@ import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.client.core.AcknowledgedResponse;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
+import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.client.dataframe.transforms.QueryConfig;
 import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
@@ -41,6 +45,7 @@ import java.util.Collections;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
 
 public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
 
@@ -96,5 +101,41 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
                         client::deleteDataFrameTransformAsync));
         assertThat(deleteError.getMessage(), containsString("Transform with id [test-crud] could not be found"));
     }
+
+    public void testStartStop() throws IOException {
+        String sourceIndex = "transform-source";
+        createIndex(sourceIndex);
+
+        QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
+        GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
+        AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
+        aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
+        AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
+        PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
+
+        String id = "test-stop-start";
+        DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig);
+
+        DataFrameClient client = highLevelClient().dataFrame();
+        AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
+                client::putDataFrameTransformAsync);
+        assertTrue(ack.isAcknowledged());
+
+        StartDataFrameTransformRequest startRequest = new StartDataFrameTransformRequest(id);
+        StartDataFrameTransformResponse startResponse =
+                execute(startRequest, client::startDataFrameTransform, client::startDataFrameTransformAsync);
+        assertTrue(startResponse.isStarted());
+        assertThat(startResponse.getNodeFailures(), empty());
+        assertThat(startResponse.getTaskFailures(), empty());
+
+        // TODO once get df stats is implemented assert the df has started
+
+        StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id);
+        StopDataFrameTransformResponse stopResponse =
+                execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
+        assertTrue(stopResponse.isStopped());
+        assertThat(stopResponse.getNodeFailures(), empty());
+        assertThat(stopResponse.getTaskFailures(), empty());
+    }
 }
 

+ 151 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/core/AcknowledgedTasksResponseTests.java

@@ -0,0 +1,151 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.core;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.TaskOperationFailure;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.BiPredicate;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+import static org.hamcrest.Matchers.containsString;
+
+public class AcknowledgedTasksResponseTests extends ESTestCase {
+
+    public void testFromXContent() throws IOException {
+        xContentTester(this::createParser,
+                this::createTestInstance,
+                AcknowledgedTasksResponseTests::toXContent,
+                AcknowledgedTasksResponseTests::fromXContent)
+                .assertEqualsConsumer(this::assertEqualInstances)
+                .assertToXContentEquivalence(false)
+                .supportsUnknownFields(false)
+                .test();
+    }
+
+    // Serialisation of TaskOperationFailure and ElasticsearchException changes
+    // the object so use a custom compare method rather than Object.equals
+    private void assertEqualInstances(AcknowledgedTasksResponse expected, AcknowledgedTasksResponse actual) {
+        assertNotSame(expected, actual);
+        assertEquals(expected.isAcknowledged(), actual.isAcknowledged());
+
+        List<TaskOperationFailure> expectedTaskFailures = expected.getTaskFailures();
+        List<TaskOperationFailure> actualTaskFailures = actual.getTaskFailures();
+
+        assertListEquals(expectedTaskFailures, actualTaskFailures, (a, b) ->
+                Objects.equals(a.getNodeId(), b.getNodeId())
+                        && Objects.equals(a.getTaskId(), b.getTaskId())
+                        && Objects.equals(a.getStatus(), b.getStatus())
+        );
+
+        List<ElasticsearchException> expectedExceptions = expected.getNodeFailures();
+        List<ElasticsearchException> actualExceptions = actual.getNodeFailures();
+
+        // actualException is a wrapped copy of expectedException so the
+        // error messages won't be the same but actualException should contain
+        // the error message from expectedException
+        assertListEquals(expectedExceptions, actualExceptions, (expectedException, actualException) -> {
+            assertThat(actualException.getDetailedMessage(), containsString(expectedException.getMessage()));
+            return true;
+        });
+    }
+
+    private <T> void assertListEquals(List<T> expected, List<T> actual, BiPredicate<T, T> comparator) {
+        if (expected == null) {
+            assertNull(actual);
+            return;
+        } else {
+            assertNotNull(actual);
+        }
+
+        assertEquals(expected.size(), actual.size());
+        for (int i=0; i<expected.size(); i++) {
+            assertTrue(comparator.test(expected.get(i), actual.get(i)));
+        }
+    }
+
+    private static AcknowledgedTasksResponse fromXContent(XContentParser parser) {
+        return AcknowledgedTasksResponse.generateParser("ack_tasks_response",
+                AcknowledgedTasksResponse::new, "acknowleged")
+                .apply(parser, null);
+    }
+
+    private AcknowledgedTasksResponse createTestInstance() {
+        List<TaskOperationFailure> taskFailures = null;
+        if (randomBoolean()) {
+            taskFailures = new ArrayList<>();
+            int numTaskFailures = randomIntBetween(1, 4);
+            for (int i=0; i<numTaskFailures; i++) {
+                taskFailures.add(new TaskOperationFailure(randomAlphaOfLength(4), randomNonNegativeLong(), new IllegalStateException()));
+            }
+        }
+        List<ElasticsearchException> nodeFailures = null;
+        if (randomBoolean()) {
+            nodeFailures = new ArrayList<>();
+            int numNodeFailures = randomIntBetween(1, 4);
+            for (int i=0; i<numNodeFailures; i++) {
+                nodeFailures.add(new ElasticsearchException("AcknowledgedTasksResponseTest"));
+            }
+        }
+
+        return new AcknowledgedTasksResponse(randomBoolean(), taskFailures, nodeFailures);
+    }
+
+    public static void toXContent(AcknowledgedTasksResponse response, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        {
+            builder.field("acknowleged", response.isAcknowledged());
+
+            List<TaskOperationFailure> taskFailures = response.getTaskFailures();
+            if (taskFailures != null && taskFailures.isEmpty() == false) {
+                builder.startArray(AcknowledgedTasksResponse.TASK_FAILURES.getPreferredName());
+                for (TaskOperationFailure failure : taskFailures) {
+                    builder.startObject();
+                    failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
+                    builder.endObject();
+                }
+                builder.endArray();
+            }
+
+            List<ElasticsearchException> nodeFailures = response.getNodeFailures();
+            if (nodeFailures != null && nodeFailures.isEmpty() == false) {
+                builder.startArray(AcknowledgedTasksResponse.NODE_FAILURES.getPreferredName());
+                for (ElasticsearchException failure : nodeFailures) {
+                    builder.startObject();
+                    failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
+                    builder.endObject();
+                }
+                builder.endArray();
+            }
+        }
+        builder.endObject();
+    }
+
+}
+
+

+ 42 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/StartDataFrameTransformRequestTests.java

@@ -0,0 +1,42 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.client.ValidationException;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Optional;
+
+import static org.hamcrest.Matchers.containsString;
+
+public class StartDataFrameTransformRequestTests extends ESTestCase {
+    public void testValidate_givenNullId() {
+        StartDataFrameTransformRequest request = new StartDataFrameTransformRequest(null, null);
+        Optional<ValidationException> validate = request.validate();
+        assertTrue(validate.isPresent());
+        assertThat(validate.get().getMessage(), containsString("data frame transform id must not be null"));
+    }
+
+    public void testValidate_givenValid() {
+        StartDataFrameTransformRequest request = new StartDataFrameTransformRequest("foo", null);
+        Optional<ValidationException> validate = request.validate();
+        assertFalse(validate.isPresent());
+    }
+}

+ 42 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/StopDataFrameTransformRequestTests.java

@@ -0,0 +1,42 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.client.ValidationException;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Optional;
+
+import static org.hamcrest.Matchers.containsString;
+
+public class StopDataFrameTransformRequestTests extends ESTestCase {
+    public void testValidate_givenNullId() {
+        StopDataFrameTransformRequest request = new StopDataFrameTransformRequest(null);
+        Optional<ValidationException> validate = request.validate();
+        assertTrue(validate.isPresent());
+        assertThat(validate.get().getMessage(), containsString("data frame transform id must not be null"));
+    }
+
+    public void testValidate_givenValid() {
+        StopDataFrameTransformRequest request = new StopDataFrameTransformRequest("foo");
+        Optional<ValidationException> validate = request.validate();
+        assertFalse(validate.isPresent());
+    }
+}

+ 141 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

@@ -27,6 +27,10 @@ import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.core.AcknowledgedResponse;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
+import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.client.dataframe.transforms.QueryConfig;
 import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
@@ -35,13 +39,17 @@ import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig;
 import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource;
 import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.query.MatchAllQueryBuilder;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.junit.After;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -49,6 +57,22 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
 public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTestCase {
 
+    private List<String> transformsToClean = new ArrayList<>();
+
+    @After
+    public void cleanUpTransforms() throws IOException {
+        for (String transformId : transformsToClean) {
+            highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
+        }
+
+        for (String transformId : transformsToClean) {
+            highLevelClient().dataFrame().deleteDataFrameTransform(
+                    new DeleteDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
+        }
+
+        transformsToClean = new ArrayList<>();
+    }
+
     private void createIndex(String indexName) throws IOException {
 
         XContentBuilder builder = jsonBuilder();
@@ -152,6 +176,123 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
         }
     }
 
+    public void testStartStop() throws IOException, InterruptedException {
+        createIndex("source-data");
+
+        RestHighLevelClient client = highLevelClient();
+
+        QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
+        GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
+        AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
+        aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
+        AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
+        PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
+
+        DataFrameTransformConfig transformConfig = new DataFrameTransformConfig("mega-transform",
+                "source-data", "pivot-dest", queryConfig, pivotConfig);
+
+        client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
+        transformsToClean.add(transformConfig.getId());
+
+        {
+            // tag::start-data-frame-transform-request
+            StartDataFrameTransformRequest request =
+                    new StartDataFrameTransformRequest("mega-transform");  // <1>
+            // end::start-data-frame-transform-request
+
+            // tag::start-data-frame-transform-request-options
+            request.setTimeout(TimeValue.timeValueSeconds(20));  // <1>
+            // end::start-data-frame-transform-request-options
+
+            // tag::start-data-frame-transform-execute
+            StartDataFrameTransformResponse response =
+                    client.dataFrame().startDataFrameTransform(
+                            request, RequestOptions.DEFAULT);
+            // end::start-data-frame-transform-execute
+
+            assertTrue(response.isStarted());
+        }
+        {
+            // tag::stop-data-frame-transform-request
+            StopDataFrameTransformRequest request =
+                    new StopDataFrameTransformRequest("mega-transform"); // <1>
+            // end::stop-data-frame-transform-request
+
+            // tag::stop-data-frame-transform-request-options
+            request.setWaitForCompletion(Boolean.TRUE);  // <1>
+            request.setTimeout(TimeValue.timeValueSeconds(30));  // <2>
+            // end::stop-data-frame-transform-request-options
+
+            // tag::stop-data-frame-transform-execute
+            StopDataFrameTransformResponse response =
+                    client.dataFrame().stopDataFrameTransform(
+                            request, RequestOptions.DEFAULT);
+            // end::stop-data-frame-transform-execute
+
+            assertTrue(response.isStopped());
+        }
+        {
+            // tag::start-data-frame-transform-execute-listener
+            ActionListener<StartDataFrameTransformResponse> listener =
+                    new ActionListener<StartDataFrameTransformResponse>() {
+                        @Override
+                        public void onResponse(
+                                StartDataFrameTransformResponse response) {
+                            // <1>
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            // <2>
+                        }
+                    };
+            // end::start-data-frame-transform-execute-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            ActionListener<StartDataFrameTransformResponse> ackListener = listener;
+            listener = new LatchedActionListener<>(listener, latch);
+
+            StartDataFrameTransformRequest request = new StartDataFrameTransformRequest("mega-transform");
+            // tag::start-data-frame-transform-execute-async
+            client.dataFrame().startDataFrameTransformAsync(
+                    request, RequestOptions.DEFAULT, listener); // <1>
+            // end::start-data-frame-transform-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+        {
+            // tag::stop-data-frame-transform-execute-listener
+            ActionListener<StopDataFrameTransformResponse> listener =
+                    new ActionListener<StopDataFrameTransformResponse>() {
+                        @Override
+                        public void onResponse(
+                                StopDataFrameTransformResponse response) {
+                            // <1>
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            // <2>
+                        }
+                    };
+            // end::stop-data-frame-transform-execute-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            ActionListener<StopDataFrameTransformResponse> ackListener = listener;
+            listener = new LatchedActionListener<>(listener, latch);
+
+            StopDataFrameTransformRequest request = new StopDataFrameTransformRequest("mega-transform");
+            // tag::stop-data-frame-transform-execute-async
+            client.dataFrame().stopDataFrameTransformAsync(
+                    request, RequestOptions.DEFAULT, listener); // <1>
+            // end::stop-data-frame-transform-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+    }
+
     public void testDeleteDataFrameTransform() throws IOException, InterruptedException {
         createIndex("source-data");
 

+ 37 - 0
docs/java-rest/high-level/dataframe/start_data_frame.asciidoc

@@ -0,0 +1,37 @@
+--
+:api: start-data-frame-transform
+:request: StartDataFrameTransformRequest
+:response: StartDataFrameTransformResponse
+--
+[id="{upid}-{api}"]
+=== Start Data Frame Transform API
+
+Start a {dataframe-job}.
+It accepts a +{request}+ object and responds with a +{response}+ object.
+
+[id="{upid}-{api}-request"]
+==== Start Data Frame Request
+
+A +{request}+ object requires a non-null `id`.
+
+["source","java",subs="attributes,callouts,macros"]
+---------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-request]
+---------------------------------------------------
+<1> Constructing a new start request referencing an existing {dataframe-job}
+
+==== Optional Arguments
+
+The following arguments are optional.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-request-options]
+--------------------------------------------------
+<1> Controls the amount of time to wait until the {dataframe-job} starts.
+
+include::../execution.asciidoc[]
+
+==== Response
+
+The returned +{response}+ object acknowledges the {dataframe-job} has started.

+ 38 - 0
docs/java-rest/high-level/dataframe/stop_data_frame.asciidoc

@@ -0,0 +1,38 @@
+--
+:api: stop-data-frame-transform
+:request: StopDataFrameTransformRequest
+:response: StopDataFrameTransformResponse
+--
+[id="{upid}-{api}"]
+=== Stop Data Frame Transform API
+
+Stop a started {dataframe-job}.
+It accepts a +{request}+ object and responds with a +{response}+ object.
+
+[id="{upid}-{api}-request"]
+==== Stop Data Frame Request
+
+A +{request}+ object requires a non-null `id`.
+
+["source","java",subs="attributes,callouts,macros"]
+---------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-request]
+---------------------------------------------------
+<1> Constructing a new stop request referencing an existing {dataframe-job}
+
+==== Optional Arguments
+
+The following arguments are optional.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-request-options]
+--------------------------------------------------
+<1> If true wait for the data frame task to stop before responding
+<2> Controls the amount of time to wait until the {dataframe-job} stops.
+
+include::../execution.asciidoc[]
+
+==== Response
+
+The returned +{response}+ object acknowledges the {dataframe-job} has stopped.

+ 5 - 1
docs/java-rest/high-level/supported-apis.asciidoc

@@ -556,6 +556,10 @@ The Java High Level REST Client supports the following Data Frame APIs:
 
 * <<{upid}-put-data-frame-transform>>
 * <<{upid}-delete-data-frame-transform>>
+* <<{upid}-start-data-frame-transform>>
+* <<{upid}-stop-data-frame-transform>>
 
 include::dataframe/put_data_frame.asciidoc[]
-include::dataframe/delete_data_frame.asciidoc[]
+include::dataframe/delete_data_frame.asciidoc[]
+include::dataframe/start_data_frame.asciidoc[]
+include::dataframe/stop_data_frame.asciidoc[]

+ 5 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java

@@ -6,6 +6,7 @@
 
 package org.elasticsearch.xpack.dataframe.rest.action;
 
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.rest.BaseRestHandler;
@@ -13,7 +14,6 @@ import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
-import org.elasticsearch.xpack.core.rollup.RollupField;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
 
 import java.io.IOException;
@@ -27,9 +27,11 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler {
 
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
-        String id = restRequest.param(RollupField.ID.getPreferredName());
+        String id = restRequest.param(DataFrameField.ID.getPreferredName());
         StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id);
-
+        if (restRequest.hasParam(DataFrameField.TIMEOUT.getPreferredName())) {
+            request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
+        }
         return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 

+ 7 - 0
x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.start_data_frame_transform.json

@@ -11,6 +11,13 @@
           "required": true,
           "description": "The id of the transform to start"
         }
+      },
+      "params": {
+        "timeout": {
+          "type": "time",
+          "required": false,
+          "description": "Controls the time to wait for the transform to start"
+        }
       }
     },
     "body": null