Browse Source

HLRC: ML Flush job (#33187)

* HLRC: ML Flush job

* Fixing package, paths, and test

* Addressing comments
Benjamin Trent 7 years ago
parent
commit
6770a456b8

+ 14 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

@@ -33,6 +33,7 @@ import org.elasticsearch.client.ml.GetRecordsRequest;
 import org.elasticsearch.client.ml.OpenJobRequest;
 import org.elasticsearch.client.ml.PutJobRequest;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.client.ml.FlushJobRequest;
 
 import java.io.IOException;
 
@@ -127,6 +128,19 @@ final class MLRequestConverters {
         return request;
     }
 
+    static Request flushJob(FlushJobRequest flushJobRequest) throws IOException {
+        String endpoint = new EndpointBuilder()
+            .addPathPartAsIs("_xpack")
+            .addPathPartAsIs("ml")
+            .addPathPartAsIs("anomaly_detectors")
+            .addPathPart(flushJobRequest.getJobId())
+            .addPathPartAsIs("_flush")
+            .build();
+        Request request = new Request(HttpPost.METHOD_NAME, endpoint);
+        request.setEntity(createEntity(flushJobRequest, REQUEST_BODY_CONTENT_TYPE));
+        return request;
+    }
+
     static Request getJobStats(GetJobStatsRequest getJobStatsRequest) {
         String endpoint = new EndpointBuilder()
             .addPathPartAsIs("_xpack")

+ 56 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java

@@ -19,6 +19,8 @@
 package org.elasticsearch.client;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.client.ml.FlushJobRequest;
+import org.elasticsearch.client.ml.FlushJobResponse;
 import org.elasticsearch.client.ml.GetJobStatsRequest;
 import org.elasticsearch.client.ml.GetJobStatsResponse;
 import org.elasticsearch.client.ml.job.stats.JobStats;
@@ -292,6 +294,60 @@ public final class MachineLearningClient {
      }
 
     /**
+     * Flushes internally buffered data for the given Machine Learning Job ensuring all data sent to the has been processed.
+     * This may cause new results to be calculated depending on the contents of the buffer
+     *
+     * Both flush and close operations are similar,
+     * however the flush is more efficient if you are expecting to send more data for analysis.
+     *
+     * When flushing, the job remains open and is available to continue analyzing data.
+     * A close operation additionally prunes and persists the model state to disk and the
+     * job must be opened again before analyzing further data.
+     *
+     * <p>
+     * For additional info
+     * see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
+     *
+     * @param request  The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
+     * @param options  Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     */
+     public FlushJobResponse flushJob(FlushJobRequest request, RequestOptions options) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(request,
+            MLRequestConverters::flushJob,
+            options,
+            FlushJobResponse::fromXContent,
+            Collections.emptySet());
+     }
+
+    /**
+     * Flushes internally buffered data for the given Machine Learning Job asynchronously ensuring all data sent to the has been processed.
+     * This may cause new results to be calculated depending on the contents of the buffer
+     *
+     * Both flush and close operations are similar,
+     * however the flush is more efficient if you are expecting to send more data for analysis.
+     *
+     * When flushing, the job remains open and is available to continue analyzing data.
+     * A close operation additionally prunes and persists the model state to disk and the
+     * job must be opened again before analyzing further data.
+     *
+     * <p>
+     * For additional info
+     * see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
+     *
+     * @param request  The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
+     * @param options  Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @param listener Listener to be notified upon request completion
+     */
+     public void flushJobAsync(FlushJobRequest request, RequestOptions options, ActionListener<FlushJobResponse> listener) {
+         restHighLevelClient.performRequestAsyncAndParseEntity(request,
+             MLRequestConverters::flushJob,
+             options,
+             FlushJobResponse::fromXContent,
+             listener,
+             Collections.emptySet());
+     }
+
+     /**
      * Gets usage statistics for one or more Machine Learning jobs
      *
      * <p>

+ 195 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/FlushJobRequest.java

@@ -0,0 +1,195 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.client.ml.job.config.Job;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Request object to flush a given Machine Learning job.
+ */
+public class FlushJobRequest extends ActionRequest implements ToXContentObject {
+
+    public static final ParseField CALC_INTERIM = new ParseField("calc_interim");
+    public static final ParseField START = new ParseField("start");
+    public static final ParseField END = new ParseField("end");
+    public static final ParseField ADVANCE_TIME = new ParseField("advance_time");
+    public static final ParseField SKIP_TIME = new ParseField("skip_time");
+
+    public static final ConstructingObjectParser<FlushJobRequest, Void> PARSER =
+        new ConstructingObjectParser<>("flush_job_request", (a) -> new FlushJobRequest((String) a[0]));
+
+    static {
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
+        PARSER.declareBoolean(FlushJobRequest::setCalcInterim, CALC_INTERIM);
+        PARSER.declareString(FlushJobRequest::setStart, START);
+        PARSER.declareString(FlushJobRequest::setEnd, END);
+        PARSER.declareString(FlushJobRequest::setAdvanceTime, ADVANCE_TIME);
+        PARSER.declareString(FlushJobRequest::setSkipTime, SKIP_TIME);
+    }
+
+    private final String jobId;
+    private Boolean calcInterim;
+    private String start;
+    private String end;
+    private String advanceTime;
+    private String skipTime;
+
+    /**
+     * Create new Flush job request
+     *
+     * @param jobId The job ID of the job to flush
+     */
+    public FlushJobRequest(String jobId) {
+        this.jobId = jobId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public boolean getCalcInterim() {
+        return calcInterim;
+    }
+
+    /**
+     * When {@code true} calculates the interim results for the most recent bucket or all buckets within the latency period.
+     *
+     * @param calcInterim defaults to {@code false}.
+     */
+    public void setCalcInterim(boolean calcInterim) {
+        this.calcInterim = calcInterim;
+    }
+
+    public String getStart() {
+        return start;
+    }
+
+    /**
+     * When used in conjunction with {@link FlushJobRequest#calcInterim},
+     * specifies the start of the range of buckets on which to calculate interim results.
+     *
+     * @param start the beginning of the range of buckets; may be an epoch seconds, epoch millis or an ISO string
+     */
+    public void setStart(String start) {
+        this.start = start;
+    }
+
+    public String getEnd() {
+        return end;
+    }
+
+    /**
+     * When used in conjunction with {@link FlushJobRequest#calcInterim}, specifies the end of the range
+     * of buckets on which to calculate interim results
+     *
+     * @param end the end of the range of buckets; may be an epoch seconds, epoch millis or an ISO string
+     */
+    public void setEnd(String end) {
+        this.end = end;
+    }
+
+    public String getAdvanceTime() {
+        return advanceTime;
+    }
+
+    /**
+     * Specifies to advance to a particular time value.
+     * Results are generated and the model is updated for data from the specified time interval.
+     *
+     * @param advanceTime String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO string
+     */
+    public void setAdvanceTime(String advanceTime) {
+        this.advanceTime = advanceTime;
+    }
+
+    public String getSkipTime() {
+        return skipTime;
+    }
+
+    /**
+     * Specifies to skip to a particular time value.
+     * Results are not generated and the model is not updated for data from the specified time interval.
+     *
+     * @param skipTime String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO string
+     */
+    public void setSkipTime(String skipTime) {
+        this.skipTime = skipTime;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        FlushJobRequest other = (FlushJobRequest) obj;
+        return Objects.equals(jobId, other.jobId) &&
+            calcInterim == other.calcInterim &&
+            Objects.equals(start, other.start) &&
+            Objects.equals(end, other.end) &&
+            Objects.equals(advanceTime, other.advanceTime) &&
+            Objects.equals(skipTime, other.skipTime);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(Job.ID.getPreferredName(), jobId);
+        if (calcInterim != null) {
+            builder.field(CALC_INTERIM.getPreferredName(), calcInterim);
+        }
+        if (start != null) {
+            builder.field(START.getPreferredName(), start);
+        }
+        if (end != null) {
+            builder.field(END.getPreferredName(), end);
+        }
+        if (advanceTime != null) {
+            builder.field(ADVANCE_TIME.getPreferredName(), advanceTime);
+        }
+        if (skipTime != null) {
+            builder.field(SKIP_TIME.getPreferredName(), skipTime);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public ActionRequestValidationException validate() {
+        return null;
+    }
+}

+ 112 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/FlushJobResponse.java

@@ -0,0 +1,112 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * Response object containing flush acknowledgement and additional data
+ */
+public class FlushJobResponse extends ActionResponse implements ToXContentObject {
+
+    public static final ParseField FLUSHED = new ParseField("flushed");
+    public static final ParseField LAST_FINALIZED_BUCKET_END = new ParseField("last_finalized_bucket_end");
+
+    public static final ConstructingObjectParser<FlushJobResponse, Void> PARSER =
+        new ConstructingObjectParser<>("flush_job_response",
+            true,
+            (a) -> {
+                boolean flushed = (boolean) a[0];
+                Date date = a[1] == null ? null : new Date((long) a[1]);
+                return new FlushJobResponse(flushed, date);
+            });
+
+    static {
+        PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FLUSHED);
+        PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FINALIZED_BUCKET_END);
+    }
+
+    public static FlushJobResponse fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    private final boolean flushed;
+    private final Date lastFinalizedBucketEnd;
+
+    public FlushJobResponse(boolean flushed, @Nullable Date lastFinalizedBucketEnd) {
+        this.flushed = flushed;
+        this.lastFinalizedBucketEnd = lastFinalizedBucketEnd;
+    }
+
+    /**
+     * Was the job successfully flushed or not
+     */
+    public boolean isFlushed() {
+        return flushed;
+    }
+
+    /**
+     * Provides the timestamp (in milliseconds-since-the-epoch) of the end of the last bucket that was processed.
+     */
+    @Nullable
+    public Date getLastFinalizedBucketEnd() {
+        return lastFinalizedBucketEnd;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(flushed, lastFinalizedBucketEnd);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        FlushJobResponse that = (FlushJobResponse) other;
+        return that.flushed == flushed && Objects.equals(lastFinalizedBucketEnd, that.lastFinalizedBucketEnd);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(FLUSHED.getPreferredName(), flushed);
+        if (lastFinalizedBucketEnd != null) {
+            builder.timeField(LAST_FINALIZED_BUCKET_END.getPreferredName(),
+                LAST_FINALIZED_BUCKET_END.getPreferredName() + "_string", lastFinalizedBucketEnd.getTime());
+        }
+        builder.endObject();
+        return builder;
+    }
+}

+ 22 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

@@ -36,6 +36,7 @@ import org.elasticsearch.client.ml.job.util.PageParams;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.client.ml.FlushJobRequest;
 import org.elasticsearch.client.ml.GetJobStatsRequest;
 import org.elasticsearch.test.ESTestCase;
 
@@ -140,6 +141,27 @@ public class MLRequestConvertersTests extends ESTestCase {
         }
     }
 
+    public void testFlushJob() throws Exception {
+        String jobId = randomAlphaOfLength(10);
+        FlushJobRequest flushJobRequest = new FlushJobRequest(jobId);
+
+        Request request = MLRequestConverters.flushJob(flushJobRequest);
+        assertEquals(HttpPost.METHOD_NAME, request.getMethod());
+        assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + "/_flush", request.getEndpoint());
+        assertEquals("{\"job_id\":\"" + jobId + "\"}", requestEntityToString(request));
+
+        flushJobRequest.setSkipTime("1000");
+        flushJobRequest.setStart("105");
+        flushJobRequest.setEnd("200");
+        flushJobRequest.setAdvanceTime("100");
+        flushJobRequest.setCalcInterim(true);
+        request = MLRequestConverters.flushJob(flushJobRequest);
+        assertEquals(
+            "{\"job_id\":\"" + jobId + "\",\"calc_interim\":true,\"start\":\"105\"," +
+                "\"end\":\"200\",\"advance_time\":\"100\",\"skip_time\":\"1000\"}",
+            requestEntityToString(request));
+    }
+
     public void testGetJobStats() {
         GetJobStatsRequest getJobStatsRequestRequest = new GetJobStatsRequest();
 

+ 16 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

@@ -39,6 +39,9 @@ import org.elasticsearch.client.ml.job.config.AnalysisConfig;
 import org.elasticsearch.client.ml.job.config.DataDescription;
 import org.elasticsearch.client.ml.job.config.Detector;
 import org.elasticsearch.client.ml.job.config.Job;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.client.ml.FlushJobRequest;
+import org.elasticsearch.client.ml.FlushJobResponse;
 import org.junit.After;
 
 import java.io.IOException;
@@ -144,6 +147,19 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
         assertTrue(response.isClosed());
     }
 
+    public void testFlushJob() throws Exception {
+        String jobId = randomValidJobId();
+        Job job = buildJob(jobId);
+        MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
+        machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
+        machineLearningClient.openJob(new OpenJobRequest(jobId), RequestOptions.DEFAULT);
+
+        FlushJobResponse response = execute(new FlushJobRequest(jobId),
+            machineLearningClient::flushJob,
+            machineLearningClient::flushJobAsync);
+        assertTrue(response.isFlushed());
+    }
+
     public void testGetJobStats() throws Exception {
         String jobId1 = "ml-get-job-stats-test-id-1";
         String jobId2 = "ml-get-job-stats-test-id-2";

+ 65 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java

@@ -52,6 +52,8 @@ import org.elasticsearch.client.ml.job.results.Bucket;
 import org.elasticsearch.client.ml.job.util.PageParams;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.client.ml.FlushJobRequest;
+import org.elasticsearch.client.ml.FlushJobResponse;
 import org.elasticsearch.client.ml.job.stats.JobStats;
 import org.junit.After;
 
@@ -461,6 +463,69 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
         }
     }
 
+    public void testFlushJob() throws Exception {
+        RestHighLevelClient client = highLevelClient();
+
+        Job job = MachineLearningIT.buildJob("flushing-my-first-machine-learning-job");
+        client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
+        client.machineLearning().openJob(new OpenJobRequest(job.getId()), RequestOptions.DEFAULT);
+
+        Job secondJob = MachineLearningIT.buildJob("flushing-my-second-machine-learning-job");
+        client.machineLearning().putJob(new PutJobRequest(secondJob), RequestOptions.DEFAULT);
+        client.machineLearning().openJob(new OpenJobRequest(secondJob.getId()), RequestOptions.DEFAULT);
+
+        {
+            //tag::x-pack-ml-flush-job-request
+            FlushJobRequest flushJobRequest = new FlushJobRequest("flushing-my-first-machine-learning-job"); //<1>
+            //end::x-pack-ml-flush-job-request
+
+            //tag::x-pack-ml-flush-job-request-options
+            flushJobRequest.setCalcInterim(true); //<1>
+            flushJobRequest.setAdvanceTime("2018-08-31T16:35:07+00:00"); //<2>
+            flushJobRequest.setStart("2018-08-31T16:35:17+00:00"); //<3>
+            flushJobRequest.setEnd("2018-08-31T16:35:27+00:00"); //<4>
+            flushJobRequest.setSkipTime("2018-08-31T16:35:00+00:00"); //<5>
+            //end::x-pack-ml-flush-job-request-options
+
+            //tag::x-pack-ml-flush-job-execute
+            FlushJobResponse flushJobResponse = client.machineLearning().flushJob(flushJobRequest, RequestOptions.DEFAULT);
+            //end::x-pack-ml-flush-job-execute
+
+            //tag::x-pack-ml-flush-job-response
+            boolean isFlushed = flushJobResponse.isFlushed(); //<1>
+            Date lastFinalizedBucketEnd = flushJobResponse.getLastFinalizedBucketEnd(); //<2>
+            //end::x-pack-ml-flush-job-response
+
+        }
+        {
+            //tag::x-pack-ml-flush-job-listener
+            ActionListener<FlushJobResponse> listener = new ActionListener<FlushJobResponse>() {
+                @Override
+                public void onResponse(FlushJobResponse FlushJobResponse) {
+                    //<1>
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    // <2>
+                }
+            };
+            //end::x-pack-ml-flush-job-listener
+            FlushJobRequest flushJobRequest = new FlushJobRequest("flushing-my-second-machine-learning-job");
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            listener = new LatchedActionListener<>(listener, latch);
+
+            // tag::x-pack-ml-flush-job-execute-async
+            client.machineLearning().flushJobAsync(flushJobRequest, RequestOptions.DEFAULT, listener); //<1>
+            // end::x-pack-ml-flush-job-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+    }
+
+
     public void testGetJobStats() throws Exception {
         RestHighLevelClient client = highLevelClient();
 

+ 59 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/FlushJobRequestTests.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+
+public class FlushJobRequestTests extends AbstractXContentTestCase<FlushJobRequest> {
+
+    @Override
+    protected FlushJobRequest createTestInstance() {
+        FlushJobRequest request = new FlushJobRequest(randomAlphaOfLengthBetween(1, 20));
+
+        if (randomBoolean()) {
+            request.setCalcInterim(randomBoolean());
+        }
+        if (randomBoolean()) {
+            request.setAdvanceTime(String.valueOf(randomLong()));
+        }
+        if (randomBoolean()) {
+            request.setStart(String.valueOf(randomLong()));
+        }
+        if (randomBoolean()) {
+            request.setEnd(String.valueOf(randomLong()));
+        }
+        if (randomBoolean()) {
+            request.setSkipTime(String.valueOf(randomLong()));
+        }
+        return request;
+    }
+
+    @Override
+    protected FlushJobRequest doParseInstance(XContentParser parser) throws IOException {
+        return FlushJobRequest.PARSER.apply(parser, null);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return false;
+    }
+}

+ 44 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/FlushJobResponseTests.java

@@ -0,0 +1,44 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+import java.util.Date;
+
+public class FlushJobResponseTests extends AbstractXContentTestCase<FlushJobResponse> {
+
+    @Override
+    protected FlushJobResponse createTestInstance() {
+        return new FlushJobResponse(randomBoolean(),
+            randomBoolean() ? null : new Date(randomNonNegativeLong()));
+    }
+
+    @Override
+    protected FlushJobResponse doParseInstance(XContentParser parser) throws IOException {
+        return FlushJobResponse.PARSER.apply(parser, null);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+}

+ 83 - 0
docs/java-rest/high-level/ml/flush-job.asciidoc

@@ -0,0 +1,83 @@
+[[java-rest-high-x-pack-ml-flush-job]]
+=== Flush Job API
+
+The Flush Job API provides the ability to flush a {ml} job's 
+datafeed in the cluster.
+It accepts a `FlushJobRequest` object and responds
+with a `FlushJobResponse` object.
+
+[[java-rest-high-x-pack-ml-flush-job-request]]
+==== Flush Job Request
+
+A `FlushJobRequest` object gets created with an existing non-null `jobId`.
+All other fields are optional for the request.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-request]
+--------------------------------------------------
+<1> Constructing a new request referencing an existing `jobId`
+
+==== Optional Arguments
+
+The following arguments are optional.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-request-options]
+--------------------------------------------------
+<1> Set request to calculate the interim results
+<2> Set the advanced time to flush to the particular time value
+<3> Set the start time for the range of buckets on which
+to calculate the interim results (requires `calc_interim` to be `true`)
+<4> Set the end time for the range of buckets on which
+to calculate interim results (requires `calc_interim` to be `true`)
+<5> Set the skip time to skip a particular time value
+
+[[java-rest-high-x-pack-ml-flush-job-execution]]
+==== Execution
+
+The request can be executed through the `MachineLearningClient` contained
+in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-execute]
+--------------------------------------------------
+
+[[java-rest-high-x-pack-ml-flush-job-execution-async]]
+==== Asynchronous Execution
+
+The request can also be executed asynchronously:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-execute-async]
+--------------------------------------------------
+<1> The `FlushJobRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The method does not block and returns immediately. The passed `ActionListener` is used
+to notify the caller of completion. A typical `ActionListener` for `FlushJobResponse` may
+look like
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-listener]
+--------------------------------------------------
+<1> `onResponse` is called back when the action is completed successfully
+<2> `onFailure` is called back when some unexpected error occurs
+
+[[java-rest-high-x-pack-ml-flush-job-response]]
+==== Flush Job Response
+
+A `FlushJobResponse` contains an acknowledgement and an optional end date for the
+last finalized bucket
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-flush-job-response]
+--------------------------------------------------
+<1> `isFlushed()` indicates if the job was successfully flushed or not.
+<2> `getLastFinalizedBucketEnd()` provides the timestamp
+(in milliseconds-since-the-epoch) of the end of the last bucket that was processed.

+ 2 - 0
docs/java-rest/high-level/supported-apis.asciidoc

@@ -211,6 +211,7 @@ The Java High Level REST Client supports the following Machine Learning APIs:
 * <<java-rest-high-x-pack-ml-delete-job>>
 * <<java-rest-high-x-pack-ml-open-job>>
 * <<java-rest-high-x-pack-ml-close-job>>
+* <<java-rest-high-x-pack-ml-flush-job>>
 * <<java-rest-high-x-pack-ml-get-job-stats>>
 * <<java-rest-high-x-pack-ml-get-buckets>>
 * <<java-rest-high-x-pack-ml-get-records>>
@@ -220,6 +221,7 @@ include::ml/get-job.asciidoc[]
 include::ml/delete-job.asciidoc[]
 include::ml/open-job.asciidoc[]
 include::ml/close-job.asciidoc[]
+include::ml/flush-job.asciidoc[]
 include::ml/get-job-stats.asciidoc[]
 include::ml/get-buckets.asciidoc[]
 include::ml/get-records.asciidoc[]