1
0
Эх сурвалжийг харах

HLRC: ML start data feed API (#33898)

* HLRC: ML start data feed API
Benjamin Trent 7 жил өмнө
parent
commit
bf0a0f74da

+ 14 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

@@ -48,6 +48,7 @@ import org.elasticsearch.client.ml.PostDataRequest;
 import org.elasticsearch.client.ml.PutCalendarRequest;
 import org.elasticsearch.client.ml.PutDatafeedRequest;
 import org.elasticsearch.client.ml.PutJobRequest;
+import org.elasticsearch.client.ml.StartDatafeedRequest;
 import org.elasticsearch.client.ml.UpdateJobRequest;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -231,6 +232,19 @@ final class MLRequestConverters {
         return request;
     }
 
+    static Request startDatafeed(StartDatafeedRequest startDatafeedRequest) throws IOException {
+        String endpoint = new EndpointBuilder()
+            .addPathPartAsIs("_xpack")
+            .addPathPartAsIs("ml")
+            .addPathPartAsIs("datafeeds")
+            .addPathPart(startDatafeedRequest.getDatafeedId())
+            .addPathPartAsIs("_start")
+            .build();
+        Request request = new Request(HttpPost.METHOD_NAME, endpoint);
+        request.setEntity(createEntity(startDatafeedRequest, REQUEST_BODY_CONTENT_TYPE));
+        return request;
+    }
+
     static Request deleteForecast(DeleteForecastRequest deleteForecastRequest) {
         String endpoint = new EndpointBuilder()
             .addPathPartAsIs("_xpack")

+ 42 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java

@@ -58,6 +58,8 @@ import org.elasticsearch.client.ml.PutDatafeedRequest;
 import org.elasticsearch.client.ml.PutDatafeedResponse;
 import org.elasticsearch.client.ml.PutJobRequest;
 import org.elasticsearch.client.ml.PutJobResponse;
+import org.elasticsearch.client.ml.StartDatafeedRequest;
+import org.elasticsearch.client.ml.StartDatafeedResponse;
 import org.elasticsearch.client.ml.UpdateJobRequest;
 import org.elasticsearch.client.ml.job.stats.JobStats;
 
@@ -565,6 +567,46 @@ public final class MachineLearningClient {
                 Collections.emptySet());
     }
 
+    /**
+     * Starts the given Machine Learning Datafeed
+     * <p>
+     * For additional info
+     * see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html">
+     *     ML Start Datafeed documentation</a>
+     *
+     * @param request The request to start the datafeed
+     * @param options  Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return action acknowledgement
+     * @throws IOException when there is a serialization issue sending the request or receiving the response
+     */
+    public StartDatafeedResponse startDatafeed(StartDatafeedRequest request, RequestOptions options) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(request,
+            MLRequestConverters::startDatafeed,
+            options,
+            StartDatafeedResponse::fromXContent,
+            Collections.emptySet());
+    }
+
+    /**
+     * Starts the given Machine Learning Datafeed asynchronously and notifies the listener on completion
+     * <p>
+     * For additional info
+     * see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-start-datafeed.html">
+     *         ML Start Datafeed documentation</a>
+     *
+     * @param request The request to start the datafeed
+     * @param options  Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @param listener Listener to be notified upon request completion
+     */
+    public void startDatafeedAsync(StartDatafeedRequest request, RequestOptions options, ActionListener<StartDatafeedResponse> listener) {
+        restHighLevelClient.performRequestAsyncAndParseEntity(request,
+            MLRequestConverters::startDatafeed,
+            options,
+            StartDatafeedResponse::fromXContent,
+            listener,
+            Collections.emptySet());
+    }
+
     /**
      * Updates a Machine Learning {@link org.elasticsearch.client.ml.job.config.Job}
      * <p>

+ 160 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedRequest.java

@@ -0,0 +1,160 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Request to start a Datafeed
+ */
+public class StartDatafeedRequest extends ActionRequest implements ToXContentObject {
+
+    public static final ParseField START = new ParseField("start");
+    public static final ParseField END = new ParseField("end");
+    public static final ParseField TIMEOUT = new ParseField("timeout");
+
+    public static ConstructingObjectParser<StartDatafeedRequest, Void> PARSER =
+        new ConstructingObjectParser<>("start_datafeed_request", a -> new StartDatafeedRequest((String)a[0]));
+
+    static {
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);
+        PARSER.declareString(StartDatafeedRequest::setStart, START);
+        PARSER.declareString(StartDatafeedRequest::setEnd, END);
+        PARSER.declareString((params, val) ->
+            params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
+    }
+
+    private final String datafeedId;
+    private String start;
+    private String end;
+    private TimeValue timeout;
+
+    /**
+     * Create a new StartDatafeedRequest for the given DatafeedId
+     *
+     * @param datafeedId non-null existing Datafeed ID
+     */
+    public StartDatafeedRequest(String datafeedId) {
+        this.datafeedId = Objects.requireNonNull(datafeedId, "[datafeed_id] must not be null");
+    }
+
+    public String getDatafeedId() {
+        return datafeedId;
+    }
+
+    public String getStart() {
+        return start;
+    }
+
+    /**
+     * The time that the datafeed should begin. This value is inclusive.
+     *
+     * If you specify a start value that is earlier than the timestamp of the latest processed record,
+     * the datafeed continues from 1 millisecond after the timestamp of the latest processed record.
+     *
+     * If you do not specify a start time and the datafeed is associated with a new job,
+     * the analysis starts from the earliest time for which data is available.
+     *
+     * @param start String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
+     */
+    public void setStart(String start) {
+        this.start = start;
+    }
+
+    public String getEnd() {
+        return end;
+    }
+
+    /**
+     * The time that the datafeed should end. This value is exclusive.
+     * If you do not specify an end time, the datafeed runs continuously.
+     *
+     * @param end String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
+     */
+    public void setEnd(String end) {
+        this.end = end;
+    }
+
+    public TimeValue getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Indicates how long to wait for the cluster to respond to the request.
+     *
+     * @param timeout TimeValue for how long to wait for a response from the cluster
+     */
+    public void setTimeout(TimeValue timeout) {
+        this.timeout = timeout;
+    }
+
+    @Override
+    public ActionRequestValidationException validate() {
+       return null;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(datafeedId, start, end, timeout);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || obj.getClass() != getClass()) {
+            return false;
+        }
+
+        StartDatafeedRequest other = (StartDatafeedRequest) obj;
+        return Objects.equals(datafeedId, other.datafeedId) &&
+            Objects.equals(start, other.start) &&
+            Objects.equals(end, other.end) &&
+            Objects.equals(timeout, other.timeout);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
+        if (start != null) {
+            builder.field(START.getPreferredName(), start);
+        }
+        if (end != null) {
+            builder.field(END.getPreferredName(), end);
+        }
+        if (timeout != null) {
+            builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
+        }
+        builder.endObject();
+        return builder;
+    }
+}

+ 93 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StartDatafeedResponse.java

@@ -0,0 +1,93 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Response indicating if the Machine Learning Datafeed is now started or not
+ */
+public class StartDatafeedResponse extends ActionResponse implements ToXContentObject {
+
+    private static final ParseField STARTED = new ParseField("started");
+
+    public static final ConstructingObjectParser<StartDatafeedResponse, Void> PARSER =
+        new ConstructingObjectParser<>(
+            "start_datafeed_response",
+            true,
+            (a) -> new StartDatafeedResponse((Boolean)a[0]));
+
+    static {
+        PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), STARTED);
+    }
+
+    private final boolean started;
+
+    public StartDatafeedResponse(boolean started) {
+        this.started = started;
+    }
+
+    public static StartDatafeedResponse fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    /**
+     * Has the Datafeed started or not
+     *
+     * @return boolean value indicating the Datafeed started status
+     */
+    public boolean isStarted() {
+        return started;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        StartDatafeedResponse that = (StartDatafeedResponse) other;
+        return isStarted() == that.isStarted();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(isStarted());
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(STARTED.getPreferredName(), started);
+        builder.endObject();
+        return builder;
+    }
+}

+ 15 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

@@ -44,6 +44,8 @@ import org.elasticsearch.client.ml.PostDataRequest;
 import org.elasticsearch.client.ml.PutCalendarRequest;
 import org.elasticsearch.client.ml.PutDatafeedRequest;
 import org.elasticsearch.client.ml.PutJobRequest;
+import org.elasticsearch.client.ml.StartDatafeedRequest;
+import org.elasticsearch.client.ml.StartDatafeedRequestTests;
 import org.elasticsearch.client.ml.UpdateJobRequest;
 import org.elasticsearch.client.ml.calendars.Calendar;
 import org.elasticsearch.client.ml.calendars.CalendarTests;
@@ -261,6 +263,19 @@ public class MLRequestConvertersTests extends ESTestCase {
         assertEquals(Boolean.toString(true), request.getParameters().get("force"));
     }
 
+    public void testStartDatafeed() throws Exception {
+        String datafeedId = DatafeedConfigTests.randomValidDatafeedId();
+        StartDatafeedRequest datafeedRequest = StartDatafeedRequestTests.createRandomInstance(datafeedId);
+
+        Request request = MLRequestConverters.startDatafeed(datafeedRequest);
+        assertEquals(HttpPost.METHOD_NAME, request.getMethod());
+        assertEquals("/_xpack/ml/datafeeds/" + datafeedId + "/_start", request.getEndpoint());
+        try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
+            StartDatafeedRequest parsedDatafeedRequest = StartDatafeedRequest.PARSER.apply(parser, null);
+            assertThat(parsedDatafeedRequest, equalTo(datafeedRequest));
+        }
+    }
+
     public void testDeleteForecast() {
         String jobId = randomAlphaOfLength(10);
         DeleteForecastRequest deleteForecastRequest = new DeleteForecastRequest(jobId);

+ 81 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

@@ -20,8 +20,12 @@ package org.elasticsearch.client;
 
 import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.ml.CloseJobRequest;
 import org.elasticsearch.client.ml.CloseJobResponse;
@@ -51,6 +55,8 @@ import org.elasticsearch.client.ml.PutDatafeedRequest;
 import org.elasticsearch.client.ml.PutDatafeedResponse;
 import org.elasticsearch.client.ml.PutJobRequest;
 import org.elasticsearch.client.ml.PutJobResponse;
+import org.elasticsearch.client.ml.StartDatafeedRequest;
+import org.elasticsearch.client.ml.StartDatafeedResponse;
 import org.elasticsearch.client.ml.UpdateJobRequest;
 import org.elasticsearch.client.ml.calendars.Calendar;
 import org.elasticsearch.client.ml.calendars.CalendarTests;
@@ -63,6 +69,7 @@ import org.elasticsearch.client.ml.job.config.JobState;
 import org.elasticsearch.client.ml.job.config.JobUpdate;
 import org.elasticsearch.client.ml.job.stats.JobStats;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.rest.RestStatus;
 import org.junit.After;
 
@@ -416,6 +423,80 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
         assertTrue(response.isAcknowledged());
     }
 
+    public void testStartDatafeed() throws Exception {
+        String jobId = "test-start-datafeed";
+        String indexName = "start_data_1";
+
+        // Set up the index and docs
+        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
+        createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long");
+        highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
+        BulkRequest bulk = new BulkRequest();
+        bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        long now = System.currentTimeMillis();
+        long oneDayAgo = now - 86400000;
+        int i = 0;
+        long dayAgoCopy = oneDayAgo;
+        while(dayAgoCopy < now) {
+            IndexRequest doc = new IndexRequest();
+            doc.index(indexName);
+            doc.type("doc");
+            doc.id("id" + i);
+            doc.source("{\"total\":" +randomInt(1000) + ",\"timestamp\":"+ dayAgoCopy +"}", XContentType.JSON);
+            bulk.add(doc);
+            dayAgoCopy += 1000000;
+            i++;
+        }
+        highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
+        final long totalDocCount = i;
+
+        // create the job and the datafeed
+        Job job = buildJob(jobId);
+        MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
+        machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
+        machineLearningClient.openJob(new OpenJobRequest(jobId), RequestOptions.DEFAULT);
+
+        String datafeedId = jobId + "-feed";
+        DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)
+            .setIndices(indexName)
+            .setQueryDelay(TimeValue.timeValueSeconds(1))
+            .setTypes(Arrays.asList("doc"))
+            .setFrequency(TimeValue.timeValueSeconds(1)).build();
+        machineLearningClient.putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT);
+
+
+        StartDatafeedRequest startDatafeedRequest = new StartDatafeedRequest(datafeedId);
+        startDatafeedRequest.setStart(String.valueOf(oneDayAgo));
+        // Should only process two documents
+        startDatafeedRequest.setEnd(String.valueOf(oneDayAgo + 2000000));
+        StartDatafeedResponse response = execute(startDatafeedRequest,
+            machineLearningClient::startDatafeed,
+            machineLearningClient::startDatafeedAsync);
+
+        assertTrue(response.isStarted());
+
+        assertBusy(() -> {
+            JobStats stats = machineLearningClient.getJobStats(new GetJobStatsRequest(jobId), RequestOptions.DEFAULT).jobStats().get(0);
+            assertEquals(2L, stats.getDataCounts().getInputRecordCount());
+            assertEquals(JobState.CLOSED, stats.getState());
+        }, 30, TimeUnit.SECONDS);
+
+        machineLearningClient.openJob(new OpenJobRequest(jobId), RequestOptions.DEFAULT);
+        StartDatafeedRequest wholeDataFeed = new StartDatafeedRequest(datafeedId);
+        // Process all documents and end the stream
+        wholeDataFeed.setEnd(String.valueOf(now));
+        StartDatafeedResponse wholeResponse = execute(wholeDataFeed,
+            machineLearningClient::startDatafeed,
+            machineLearningClient::startDatafeedAsync);
+        assertTrue(wholeResponse.isStarted());
+
+        assertBusy(() -> {
+            JobStats stats = machineLearningClient.getJobStats(new GetJobStatsRequest(jobId), RequestOptions.DEFAULT).jobStats().get(0);
+            assertEquals(totalDocCount, stats.getDataCounts().getInputRecordCount());
+            assertEquals(JobState.CLOSED, stats.getState());
+        }, 30, TimeUnit.SECONDS);
+    }
+
     public void testDeleteForecast() throws Exception {
         String jobId = "test-delete-forecast";
 

+ 66 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java

@@ -20,6 +20,7 @@ package org.elasticsearch.client.documentation;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
@@ -70,6 +71,8 @@ import org.elasticsearch.client.ml.PutDatafeedRequest;
 import org.elasticsearch.client.ml.PutDatafeedResponse;
 import org.elasticsearch.client.ml.PutJobRequest;
 import org.elasticsearch.client.ml.PutJobResponse;
+import org.elasticsearch.client.ml.StartDatafeedRequest;
+import org.elasticsearch.client.ml.StartDatafeedResponse;
 import org.elasticsearch.client.ml.UpdateJobRequest;
 import org.elasticsearch.client.ml.calendars.Calendar;
 import org.elasticsearch.client.ml.datafeed.ChunkingConfig;
@@ -703,6 +706,69 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
         }
     }
 
+    public void testStartDatafeed() throws Exception {
+        RestHighLevelClient client = highLevelClient();
+
+        Job job = MachineLearningIT.buildJob("start-datafeed-job");
+        client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
+        String datafeedId = job.getId() + "-feed";
+        String indexName = "start_data_2";
+        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
+        createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long");
+        highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
+        DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, job.getId())
+            .setTypes(Arrays.asList("doc"))
+            .setIndices(indexName)
+            .build();
+        client.machineLearning().putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT);
+        client.machineLearning().openJob(new OpenJobRequest(job.getId()), RequestOptions.DEFAULT);
+        {
+            //tag::x-pack-ml-start-datafeed-request
+            StartDatafeedRequest request = new StartDatafeedRequest(datafeedId); // <1>
+            //end::x-pack-ml-start-datafeed-request
+
+            //tag::x-pack-ml-start-datafeed-request-options
+            request.setEnd("2018-08-21T00:00:00Z"); // <1>
+            request.setStart("2018-08-20T00:00:00Z"); // <2>
+            request.setTimeout(TimeValue.timeValueMinutes(10)); // <3>
+            //end::x-pack-ml-start-datafeed-request-options
+
+            //tag::x-pack-ml-start-datafeed-execute
+            StartDatafeedResponse response = client.machineLearning().startDatafeed(request, RequestOptions.DEFAULT);
+            boolean started = response.isStarted(); // <1>
+            //end::x-pack-ml-start-datafeed-execute
+
+            assertTrue(started);
+        }
+        {
+            StartDatafeedRequest request = new StartDatafeedRequest(datafeedId);
+
+            // tag::x-pack-ml-start-datafeed-listener
+            ActionListener<StartDatafeedResponse> listener = new ActionListener<StartDatafeedResponse>() {
+                @Override
+                public void onResponse(StartDatafeedResponse response) {
+                    // <1>
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    // <2>
+                }
+            };
+            // end::x-pack-ml-start-datafeed-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            listener = new LatchedActionListener<>(listener, latch);
+
+            // tag::x-pack-ml-start-datafeed-execute-async
+            client.machineLearning().startDatafeedAsync(request, RequestOptions.DEFAULT, listener); // <1>
+            // end::x-pack-ml-start-datafeed-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+    }
+
     public void testGetBuckets() throws IOException, InterruptedException {
         RestHighLevelClient client = highLevelClient();
 

+ 61 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedRequestTests.java

@@ -0,0 +1,61 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.client.ml.datafeed.DatafeedConfigTests;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+
+public class StartDatafeedRequestTests extends AbstractXContentTestCase<StartDatafeedRequest> {
+
+    public static StartDatafeedRequest createRandomInstance(String datafeedId) {
+        StartDatafeedRequest request = new StartDatafeedRequest(datafeedId);
+
+        if (randomBoolean()) {
+            request.setStart(String.valueOf(randomLongBetween(1, 1000)));
+        }
+        if (randomBoolean()) {
+            request.setEnd(String.valueOf(randomLongBetween(1, 1000)));
+        }
+        if (randomBoolean()) {
+            request.setTimeout(TimeValue.timeValueMinutes(randomLongBetween(1, 1000)));
+        }
+
+        return request;
+    }
+
+    @Override
+    protected StartDatafeedRequest createTestInstance() {
+        String datafeedId = DatafeedConfigTests.randomValidDatafeedId();
+        return createRandomInstance(datafeedId);
+    }
+
+    @Override
+    protected StartDatafeedRequest doParseInstance(XContentParser parser) throws IOException {
+        return StartDatafeedRequest.PARSER.parse(parser, null);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return false;
+    }
+}

+ 42 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/StartDatafeedResponseTests.java

@@ -0,0 +1,42 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+
+public class StartDatafeedResponseTests extends AbstractXContentTestCase<StartDatafeedResponse> {
+
+    @Override
+    protected StartDatafeedResponse createTestInstance() {
+        return new StartDatafeedResponse(randomBoolean());
+    }
+
+    @Override
+    protected StartDatafeedResponse doParseInstance(XContentParser parser) throws IOException {
+        return StartDatafeedResponse.fromXContent(parser);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+}

+ 71 - 0
docs/java-rest/high-level/ml/start-datafeed.asciidoc

@@ -0,0 +1,71 @@
+[[java-rest-high-x-pack-ml-start-datafeed]]
+=== Start Datafeed API
+
+The Start Datafeed API provides the ability to start a {ml} datafeed in the cluster.
+It accepts a `StartDatafeedRequest` object and responds
+with a `StartDatafeedResponse` object.
+
+[[java-rest-high-x-pack-ml-start-datafeed-request]]
+==== Start Datafeed Request
+
+A `StartDatafeedRequest` object is created referencing a non-null `datafeedId`.
+All other fields are optional for the request.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-request]
+--------------------------------------------------
+<1> Constructing a new request referencing an existing `datafeedId`
+
+==== Optional Arguments
+
+The following arguments are optional.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-request-options]
+--------------------------------------------------
+<1> Set when the datafeed should end, the value is exclusive.
+May be an epoch seconds, epoch millis or an ISO 8601 string.
+"now" is a special value that indicates the current time.
+If you do not specify an end time, the datafeed runs continuously.
+<2> Set when the datafeed should start, the value is inclusive.
+May be an epoch seconds, epoch millis or an ISO 8601 string.
+If you do not specify a start time and the datafeed is associated with a new job,
+the analysis starts from the earliest time for which data is available.
+<3> Set the timeout for the request
+
+[[java-rest-high-x-pack-ml-start-datafeed-execution]]
+==== Execution
+
+The request can be executed through the `MachineLearningClient` contained
+in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-execute]
+--------------------------------------------------
+<1> Did the datafeed successfully start?
+
+[[java-rest-high-x-pack-ml-start-datafeed-execution-async]]
+==== Asynchronous Execution
+
+The request can also be executed asynchronously:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-execute-async]
+--------------------------------------------------
+<1> The `StartDatafeedRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The method does not block and returns immediately. The passed `ActionListener` is used
+to notify the caller of completion. A typical `ActionListener` for `StartDatafeedResponse` may
+look like
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-start-datafeed-listener]
+--------------------------------------------------
+<1> `onResponse` is called back when the action is completed successfully
+<2> `onFailure` is called back when some unexpected error occurs

+ 2 - 0
docs/java-rest/high-level/supported-apis.asciidoc

@@ -225,6 +225,7 @@ The Java High Level REST Client supports the following Machine Learning APIs:
 * <<java-rest-high-x-pack-ml-put-datafeed>>
 * <<java-rest-high-x-pack-ml-get-datafeed>>
 * <<java-rest-high-x-pack-ml-delete-datafeed>>
+* <<java-rest-high-x-pack-ml-start-datafeed>>
 * <<java-rest-high-x-pack-ml-forecast-job>>
 * <<java-rest-high-x-pack-ml-delete-forecast>>
 * <<java-rest-high-x-pack-ml-get-buckets>>
@@ -247,6 +248,7 @@ include::ml/flush-job.asciidoc[]
 include::ml/put-datafeed.asciidoc[]
 include::ml/get-datafeed.asciidoc[]
 include::ml/delete-datafeed.asciidoc[]
+include::ml/start-datafeed.asciidoc[]
 include::ml/get-job-stats.asciidoc[]
 include::ml/forecast-job.asciidoc[]
 include::ml/delete-forecast.asciidoc[]