Browse Source

[ML] allow documents to be out of order within the same time bucket (#70468)

This commit allows documents seen within the same time bucket to be out of order.

This is already supported within the native process.

Additionally, when recording the "latest" record timestamp, we were assuming that the latest seen document was truly the "latest". This is not really the case if latency is utilized or if documents come out of order within the same bucket.
Benjamin Trent 4 years ago
parent
commit
10e637d97c

+ 6 - 5
docs/reference/ml/ml-shared.asciidoc

@@ -1321,11 +1321,12 @@ For open jobs only, the elapsed time for which the job has been open.
 end::open-time[]
 
 tag::out-of-order-timestamp-count[]
-The number of input documents that are out of time sequence and outside
-of the latency window. This information is applicable only when you provide data
-to the {anomaly-job} by using the <<ml-post-data,post data API>>. These out of
-order documents are  discarded, since jobs require time series data to be in
-ascending chronological order.
+The number of input documents that have a timestamp chronologically
+preceding the start of the current anomaly detection bucket offset by 
+the latency window. This information is applicable only when you provide 
+data to the {anomaly-job} by using the <<ml-post-data,post data API>>. 
+These out of order documents are discarded, since jobs require time 
+series data to be in ascending chronological order.
 end::out-of-order-timestamp-count[]
 
 tag::outlier-fraction[]

+ 0 - 8
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/DataCounts.java

@@ -407,14 +407,6 @@ public class DataCounts implements ToXContentObject, Writeable {
         this.latestRecordTimeStamp = latestRecordTimeStamp;
     }
 
-    public void updateLatestRecordTimeStamp(Date latestRecordTimeStamp) {
-        if (latestRecordTimeStamp != null &&
-                (this.latestRecordTimeStamp == null ||
-                latestRecordTimeStamp.after(this.latestRecordTimeStamp))) {
-            this.latestRecordTimeStamp = latestRecordTimeStamp;
-        }
-    }
-
     /**
      * The wall clock time the latest record was seen.
      *

+ 91 - 59
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java

@@ -46,6 +46,10 @@ public class MlJobIT extends ESRestTestCase {
 
     private static final String BASIC_AUTH_VALUE = UsernamePasswordToken.basicAuthHeaderValue("x_pack_rest_user",
             SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
+    private static final RequestOptions POST_DATA = RequestOptions.DEFAULT.toBuilder()
+        .setWarningsHandler(warnings -> Collections.singletonList(
+            "Posting data directly to anomaly detection jobs is deprecated, "
+                + "in a future major version it will be compulsory to use a datafeed").equals(warnings) == false).build();
 
     @Override
     protected Settings restClientSettings() {
@@ -127,8 +131,7 @@ public class MlJobIT extends ESRestTestCase {
         Map<String, Object> usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage")));
         assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage));
         assertEquals(2, XContentMapValues.extractValue("ml.jobs.closed.count", usage));
-        Response openResponse = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/job-1/_open"));
-        assertThat(entityAsMap(openResponse), hasEntry("opened", true));
+        openJob("job-1");
         usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage")));
         assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage));
         assertEquals(1, XContentMapValues.extractValue("ml.jobs.closed.count", usage));
@@ -136,20 +139,17 @@ public class MlJobIT extends ESRestTestCase {
     }
 
     private Response createFarequoteJob(String jobId) throws IOException {
-        Request request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
-        request.setJsonEntity(
-                  "{\n"
-                + "    \"description\":\"Analysis of response time by airline\",\n"
-                + "    \"analysis_config\" : {\n"
-                + "        \"bucket_span\": \"3600s\",\n"
-                + "        \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
-                + "    },\n" + "    \"data_description\" : {\n"
-                + "        \"field_delimiter\":\",\",\n"
-                + "        \"time_field\":\"time\",\n"
-                + "        \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n"
-                + "    }\n"
-                + "}");
-        return client().performRequest(request);
+        return putJob(jobId, "{\n"
+            + "    \"description\":\"Analysis of response time by airline\",\n"
+            + "    \"analysis_config\" : {\n"
+            + "        \"bucket_span\": \"3600s\",\n"
+            + "        \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
+            + "    },\n" + "    \"data_description\" : {\n"
+            + "        \"field_delimiter\":\",\",\n"
+            + "        \"time_field\":\"time\",\n"
+            + "        \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n"
+            + "    }\n"
+            + "}");
     }
 
     public void testCantCreateJobWithSameID() throws Exception {
@@ -161,13 +161,10 @@ public class MlJobIT extends ESRestTestCase {
                 "  \"results_index_name\" : \"%s\"}";
 
         String jobId = "cant-create-job-with-same-id-job";
-        Request createJob1 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
-        createJob1.setJsonEntity(String.format(Locale.ROOT, jobTemplate, "index-1"));
-        client().performRequest(createJob1);
-
-        Request createJob2 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
-        createJob2.setJsonEntity(String.format(Locale.ROOT, jobTemplate, "index-2"));
-        ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(createJob2));
+        putJob(jobId, String.format(Locale.ROOT, jobTemplate, "index-1"));
+        ResponseException e = expectThrows(ResponseException.class,
+            () -> putJob(jobId, String.format(Locale.ROOT, jobTemplate, "index-2"))
+        );
 
         assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
         assertThat(e.getMessage(), containsString("The job cannot be created with the Id '" + jobId + "'. The Id is already used."));
@@ -183,14 +180,10 @@ public class MlJobIT extends ESRestTestCase {
 
         String jobId1 = "create-jobs-with-index-name-option-job-1";
         String indexName = "non-default-index";
-        Request createJob1 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
-        createJob1.setJsonEntity(String.format(Locale.ROOT, jobTemplate, indexName));
-        client().performRequest(createJob1);
+        putJob(jobId1, String.format(Locale.ROOT, jobTemplate, indexName));
 
         String jobId2 = "create-jobs-with-index-name-option-job-2";
-        Request createJob2 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2);
-        createJob2.setEntity(createJob1.getEntity());
-        client().performRequest(createJob2);
+        putJob(jobId2, String.format(Locale.ROOT, jobTemplate, indexName));
 
         // With security enabled GET _aliases throws an index_not_found_exception
         // if no aliases have been created. In multi-node tests the alias may not
@@ -313,9 +306,7 @@ public class MlJobIT extends ESRestTestCase {
         String jobId2 = "create-job-in-shared-index-updates-mapping-job-2";
         String byFieldName2 = "cpu-usage";
 
-        Request createJob1Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
-        createJob1Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName1));
-        client().performRequest(createJob1Request);
+        putJob(jobId1, String.format(Locale.ROOT, jobTemplate, byFieldName1));
 
         // Check the index mapping contains the first by_field_name
         Request getResultsMappingRequest = new Request("GET",
@@ -325,10 +316,7 @@ public class MlJobIT extends ESRestTestCase {
         assertThat(resultsMappingAfterJob1, containsString(byFieldName1));
         assertThat(resultsMappingAfterJob1, not(containsString(byFieldName2)));
 
-        Request createJob2Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2);
-        createJob2Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName2));
-        client().performRequest(createJob2Request);
-
+        putJob(jobId2, String.format(Locale.ROOT, jobTemplate, byFieldName2));
         // Check the index mapping now contains both fields
         String resultsMappingAfterJob2 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity());
         assertThat(resultsMappingAfterJob2, containsString(byFieldName1));
@@ -348,9 +336,7 @@ public class MlJobIT extends ESRestTestCase {
         String jobId2 = "create-job-in-custom-shared-index-updates-mapping-job-2";
         String byFieldName2 = "cpu-usage";
 
-        Request createJob1Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
-        createJob1Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName1));
-        client().performRequest(createJob1Request);
+        putJob(jobId1, String.format(Locale.ROOT, jobTemplate, byFieldName1));
 
         // Check the index mapping contains the first by_field_name
         Request getResultsMappingRequest = new Request("GET",
@@ -360,9 +346,7 @@ public class MlJobIT extends ESRestTestCase {
         assertThat(resultsMappingAfterJob1, containsString(byFieldName1));
         assertThat(resultsMappingAfterJob1, not(containsString(byFieldName2)));
 
-        Request createJob2Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2);
-        createJob2Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName2));
-        client().performRequest(createJob2Request);
+        putJob(jobId2, String.format(Locale.ROOT, jobTemplate, byFieldName2));
 
         // Check the index mapping now contains both fields
         String resultsMappingAfterJob2 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity());
@@ -391,13 +375,10 @@ public class MlJobIT extends ESRestTestCase {
             byFieldName2 = "response";
         }
 
-        Request createJob1Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
-        createJob1Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName1));
-        client().performRequest(createJob1Request);
+        putJob(jobId1, String.format(Locale.ROOT, jobTemplate, byFieldName1));
 
-        Request createJob2Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2);
-        createJob2Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName2));
-        ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(createJob2Request));
+        ResponseException e = expectThrows(ResponseException.class,
+            () -> putJob(jobId2, String.format(Locale.ROOT, jobTemplate, byFieldName2)));
         assertThat(e.getMessage(),
                 containsString("This job would cause a mapping clash with existing field [response] - " +
                         "avoid the clash by assigning a dedicated results index"));
@@ -419,8 +400,8 @@ public class MlJobIT extends ESRestTestCase {
         try {
             ResponseException exception = expectThrows(
                 ResponseException.class,
-                () -> client().performRequest(
-                new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")));
+                () -> openJob(jobId)
+            );
             assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(429));
             assertThat(EntityUtils.toString(exception.getResponse().getEntity()),
                 containsString("Cannot open jobs because persistent task assignment is disabled by the " +
@@ -463,6 +444,41 @@ public class MlJobIT extends ESRestTestCase {
                 client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")));
     }
 
+    public void testOutOfOrderData() throws Exception {
+        String jobId = "job-with-out-of-order-docs";
+        createFarequoteJob(jobId);
+
+        openJob(jobId);
+
+        Request postDataRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data");
+        // Post data is deprecated, so expect a deprecation warning
+        postDataRequest.setOptions(POST_DATA);
+        // Bucket span is 1h (3600s). So, posting data within the same hour should not result in out of order data
+        postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"responsetime\":100, \"time\":\"2019-07-01 00:00:00Z\" }");
+        client().performRequest(postDataRequest);
+        postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"responsetime\":100, \"time\":\"2019-07-01 00:30:00Z\" }");
+        client().performRequest(postDataRequest);
+        // out of order, but in the same time bucket
+        postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"responsetime\":100, \"time\":\"2019-07-01 00:10:00Z\" }");
+        client().performRequest(postDataRequest);
+
+        Response flushResponse =
+            client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_flush"));
+        assertThat(entityAsMap(flushResponse), hasEntry("flushed", true));
+
+        closeJob(jobId);
+
+        String stats = EntityUtils.toString(
+            client().performRequest(new Request("GET", "_ml/anomaly_detectors/" + jobId + "/_stats")).getEntity()
+        );
+        //assert 2019-07-01 00:30:00Z
+        assertThat(stats, containsString("\"latest_record_timestamp\":1561941000000"));
+        assertThat(stats, containsString("\"out_of_order_timestamp_count\":0"));
+        assertThat(stats, containsString("\"processed_record_count\":3"));
+
+        client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
+    }
+
     public void testDeleteJob_TimingStatsDocumentIsDeleted() throws Exception {
         String jobId = "delete-job-with-timing-stats-document-job";
         String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
@@ -472,15 +488,11 @@ public class MlJobIT extends ESRestTestCase {
             EntityUtils.toString(client().performRequest(new Request("GET", indexName + "/_count")).getEntity()),
             containsString("\"count\":0"));  // documents related to the job do not exist yet
 
-        Response openResponse =
-            client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"));
-        assertThat(entityAsMap(openResponse), hasEntry("opened", true));
+        openJob(jobId);
 
         Request postDataRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data");
         // Post data is deprecated, so expect a deprecation warning
-        postDataRequest.setOptions(RequestOptions.DEFAULT.toBuilder()
-            .setWarningsHandler(warnings -> Collections.singletonList("Posting data directly to anomaly detection jobs is deprecated, " +
-                "in a future major version it will be compulsory to use a datafeed").equals(warnings) == false));
+        postDataRequest.setOptions(POST_DATA);
         postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"response_time\":100, \"time\":\"2019-07-01 00:00:00Z\" }");
         client().performRequest(postDataRequest);
         postDataRequest.setJsonEntity("{ \"airline\":\"LOT\", \"response_time\":100, \"time\":\"2019-07-01 02:00:00Z\" }");
@@ -490,9 +502,7 @@ public class MlJobIT extends ESRestTestCase {
             client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_flush"));
         assertThat(entityAsMap(flushResponse), hasEntry("flushed", true));
 
-        Response closeResponse =
-            client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close"));
-        assertThat(entityAsMap(closeResponse), hasEntry("closed", true));
+        closeJob(jobId);
 
         String timingStatsDoc =
             EntityUtils.toString(
@@ -840,6 +850,28 @@ public class MlJobIT extends ESRestTestCase {
         return EntityUtils.toString(response.getEntity());
     }
 
+    private void openJob(String jobId) throws IOException {
+        Response openResponse = client().performRequest(new Request(
+            "POST",
+            MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"
+        ));
+        assertThat(entityAsMap(openResponse), hasEntry("opened", true));
+    }
+
+    private void closeJob(String jobId) throws IOException {
+        Response openResponse = client().performRequest(new Request(
+            "POST",
+            MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close"
+        ));
+        assertThat(entityAsMap(openResponse), hasEntry("closed", true));
+    }
+
+    private Response putJob(String jobId, String jsonBody) throws IOException {
+        Request request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
+        request.setJsonEntity(jsonBody);
+        return client().performRequest(request);
+    }
+
     @After
     public void clearMlState() throws Exception {
         new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata();

+ 7 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java

@@ -74,20 +74,23 @@ public class DataCountsReporter {
      *                        but the actual number of fields in the record
      * @param recordTimeMs    The time of the record written
      *                        in milliseconds from the epoch.
+     * @param latestRecordTimeMs The time of the latest (in time) record written.
+     *                           May be greater than or equal to `recordTimeMs`
      */
-    public void reportRecordWritten(long inputFieldCount, long recordTimeMs) {
-        Date recordDate = new Date(recordTimeMs);
+    public void reportRecordWritten(long inputFieldCount, long recordTimeMs, long latestRecordTimeMs) {
+        final Date latestRecordDate = new Date(latestRecordTimeMs);
 
         totalRecordStats.incrementInputFieldCount(inputFieldCount);
         totalRecordStats.incrementProcessedRecordCount(1);
-        totalRecordStats.setLatestRecordTimeStamp(recordDate);
+        totalRecordStats.setLatestRecordTimeStamp(latestRecordDate);
 
         incrementalRecordStats.incrementInputFieldCount(inputFieldCount);
         incrementalRecordStats.incrementProcessedRecordCount(1);
-        incrementalRecordStats.setLatestRecordTimeStamp(recordDate);
+        incrementalRecordStats.setLatestRecordTimeStamp(latestRecordDate);
 
         boolean isFirstReport = totalRecordStats.getEarliestRecordTimeStamp() == null;
         if (isFirstReport) {
+            final Date recordDate = new Date(recordTimeMs);
             totalRecordStats.setEarliestRecordTimeStamp(recordDate);
             incrementalRecordStats.setEarliestRecordTimeStamp(recordDate);
         }

+ 11 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java

@@ -33,6 +33,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
 
+import static org.elasticsearch.xpack.core.ml.utils.Intervals.alignToFloor;
+
 public abstract class AbstractDataToProcessWriter implements DataToProcessWriter {
 
     private static final int TIME_FIELD_OUT_INDEX = 0;
@@ -48,7 +50,8 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
 
     private final Logger logger;
     private final DateTransformer dateTransformer;
-    private long latencySeconds;
+    private final long bucketSpanMs;
+    private final long latencySeconds;
 
     protected Map<String, Integer> inFieldIndexes;
     protected List<InputOutputMap> inputOutputMap;
@@ -68,6 +71,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
         this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter);
         this.logger = Objects.requireNonNull(logger);
         this.latencySeconds = analysisConfig.getLatency() == null ? 0 : analysisConfig.getLatency().seconds();
+        this.bucketSpanMs = analysisConfig.getBucketSpan().getMillis();
 
         Date date = dataCountsReporter.getLatestRecordTime();
         latestEpochMsThisUpload = 0;
@@ -178,9 +182,11 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
         }
 
         record[TIME_FIELD_OUT_INDEX] = Long.toString(epochMs / MS_IN_SECOND);
+        final long latestBucketFloor = alignToFloor(latestEpochMs, bucketSpanMs);
 
-        // Records have epoch seconds timestamp so compare for out of order in seconds
-        if (epochMs / MS_IN_SECOND < latestEpochMs / MS_IN_SECOND - latencySeconds) {
+        // We care only about records that are older than the current bucket according to our latest timestamp
+        // The native side handles random order within the same bucket without issue
+        if (epochMs / MS_IN_SECOND < latestBucketFloor / MS_IN_SECOND - latencySeconds) {
             // out of order
             dataCountsReporter.reportOutOfOrderRecord(numberOfFieldsRead);
 
@@ -196,7 +202,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
         latestEpochMsThisUpload = latestEpochMs;
 
         autodetectProcess.writeRecord(record);
-        dataCountsReporter.reportRecordWritten(numberOfFieldsRead, epochMs);
+        dataCountsReporter.reportRecordWritten(numberOfFieldsRead, epochMs, latestEpochMs);
 
         return true;
     }
@@ -325,7 +331,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
     /**
      * Input and output array indexes map
      */
-    protected class InputOutputMap {
+    protected static class InputOutputMap {
         int inputIndex;
         int outputIndex;
 

+ 14 - 14
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java

@@ -33,7 +33,7 @@ public class DataCountsReporterTests extends ESTestCase {
 
     private Job job;
     private JobDataCountsPersister jobDataCountsPersister;
-    private TimeValue bucketSpan = TimeValue.timeValueSeconds(300);
+    private final TimeValue bucketSpan = TimeValue.timeValueSeconds(300);
 
     @Before
     public void setUpMocks() {
@@ -86,8 +86,8 @@ public class DataCountsReporterTests extends ESTestCase {
 
         dataCountsReporter.setAnalysedFieldsPerRecord(3);
 
-        dataCountsReporter.reportRecordWritten(5, 1000);
-        dataCountsReporter.reportRecordWritten(5, 1000);
+        dataCountsReporter.reportRecordWritten(5, 1000, 1000);
+        dataCountsReporter.reportRecordWritten(5, 1000, 1000);
         assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
         assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
         assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
@@ -104,8 +104,8 @@ public class DataCountsReporterTests extends ESTestCase {
         // write some more data
         // skip a bucket so there is a non-zero empty bucket count
         long timeStamp = bucketSpan.millis() * 2 + 2000;
-        dataCountsReporter.reportRecordWritten(5, timeStamp);
-        dataCountsReporter.reportRecordWritten(5, timeStamp);
+        dataCountsReporter.reportRecordWritten(5, timeStamp, timeStamp);
+        dataCountsReporter.reportRecordWritten(5, timeStamp, timeStamp);
         assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
         assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
         assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
@@ -141,14 +141,14 @@ public class DataCountsReporterTests extends ESTestCase {
         DataCountsReporter dataCountsReporter = new DataCountsReporter(job, new DataCounts(job.getId()), jobDataCountsPersister);
         dataCountsReporter.setAnalysedFieldsPerRecord(3);
 
-        dataCountsReporter.reportRecordWritten(5, 2000);
+        dataCountsReporter.reportRecordWritten(5, 2000, 2000);
         assertEquals(1, dataCountsReporter.incrementalStats().getInputRecordCount());
         assertEquals(5, dataCountsReporter.incrementalStats().getInputFieldCount());
         assertEquals(1, dataCountsReporter.incrementalStats().getProcessedRecordCount());
         assertEquals(3, dataCountsReporter.incrementalStats().getProcessedFieldCount());
         assertEquals(2000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
 
-        dataCountsReporter.reportRecordWritten(5, 3000);
+        dataCountsReporter.reportRecordWritten(5, 3000, 3000);
         dataCountsReporter.reportMissingField();
         assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
         assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
@@ -166,7 +166,7 @@ public class DataCountsReporterTests extends ESTestCase {
         dataCountsReporter.setAnalysedFieldsPerRecord(3);
 
         for (int i = 1; i <= 9999; i++) {
-            dataCountsReporter.reportRecordWritten(5, i);
+            dataCountsReporter.reportRecordWritten(5, i, i);
         }
 
         assertEquals(9999, dataCountsReporter.incrementalStats().getInputRecordCount());
@@ -183,7 +183,7 @@ public class DataCountsReporterTests extends ESTestCase {
         dataCountsReporter.setAnalysedFieldsPerRecord(3);
 
         for (int i = 1; i <= 30001; i++) {
-            dataCountsReporter.reportRecordWritten(5, i);
+            dataCountsReporter.reportRecordWritten(5, i, i);
         }
 
         assertEquals(30001, dataCountsReporter.incrementalStats().getInputRecordCount());
@@ -200,7 +200,7 @@ public class DataCountsReporterTests extends ESTestCase {
         dataCountsReporter.setAnalysedFieldsPerRecord(3);
 
         for (int i = 1; i <= 100000; i++) {
-            dataCountsReporter.reportRecordWritten(5, i);
+            dataCountsReporter.reportRecordWritten(5, i, i);
         }
 
         assertEquals(100000, dataCountsReporter.incrementalStats().getInputRecordCount());
@@ -217,7 +217,7 @@ public class DataCountsReporterTests extends ESTestCase {
         dataCountsReporter.setAnalysedFieldsPerRecord(3);
 
         for (int i = 1; i <= 1_000_000; i++) {
-            dataCountsReporter.reportRecordWritten(5, i);
+            dataCountsReporter.reportRecordWritten(5, i, i);
         }
 
         assertEquals(1_000_000, dataCountsReporter.incrementalStats().getInputRecordCount());
@@ -234,7 +234,7 @@ public class DataCountsReporterTests extends ESTestCase {
         dataCountsReporter.setAnalysedFieldsPerRecord(3);
 
         for (int i = 1; i <= 2_000_000; i++) {
-            dataCountsReporter.reportRecordWritten(5, i);
+            dataCountsReporter.reportRecordWritten(5, i, i);
         }
 
         assertEquals(2000000, dataCountsReporter.incrementalStats().getInputRecordCount());
@@ -254,8 +254,8 @@ public class DataCountsReporterTests extends ESTestCase {
         Date now = new Date();
         DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 0L, new Date(2000), new Date(3000),
                 now, (Date) null, (Date) null, (Instant) null);
-        dataCountsReporter.reportRecordWritten(5, 2000);
-        dataCountsReporter.reportRecordWritten(5, 3000);
+        dataCountsReporter.reportRecordWritten(5, 2000, 2000);
+        dataCountsReporter.reportRecordWritten(5, 3000, 3000);
         dataCountsReporter.reportMissingField();
         dataCountsReporter.finishReporting();
 

+ 55 - 10
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvDataToProcessWriterTests.java

@@ -86,7 +86,9 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
         dataDescription.setTimeFormat(DataDescription.EPOCH);
 
         Detector detector = new Detector.Builder("metric", "value").build();
-        analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector)).build();
+        analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector))
+            .setBucketSpan(TimeValue.timeValueSeconds(1))
+            .build();
     }
 
     public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws IOException {
@@ -190,15 +192,54 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
 
         verify(dataCountsReporter, times(2)).reportOutOfOrderRecord(2);
         verify(dataCountsReporter, times(2)).reportLatestTimeIncrementalStats(anyLong());
-        verify(dataCountsReporter, never()).reportRecordWritten(anyLong(), anyLong());
+        verify(dataCountsReporter, never()).reportRecordWritten(anyLong(), anyLong(), anyLong());
         verify(dataCountsReporter).finishReporting();
     }
 
     public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder() throws IOException {
-        AnalysisConfig.Builder builder =
-                new AnalysisConfig.Builder(Collections.singletonList(new Detector.Builder("metric", "value").build()));
-        builder.setLatency(TimeValue.timeValueSeconds(2));
-        analysisConfig = builder.build();
+        analysisConfig = new AnalysisConfig.Builder(
+            Collections.singletonList(
+                new Detector.Builder("metric", "value").build()
+            ))
+            .setLatency(TimeValue.timeValueSeconds(2))
+            .setBucketSpan(TimeValue.timeValueSeconds(1))
+            .build();
+
+        StringBuilder input = new StringBuilder();
+        input.append("time,metric,value\n");
+        input.append("4,foo,4.0\n");
+        input.append("5,foo,5.0\n");
+        input.append("3,foo,3.0\n");
+        input.append("4,bar,4.0\n");
+        input.append("2,bar,2.0\n");
+        input.append("\0");
+        InputStream inputStream = createInputStream(input.toString());
+        CsvDataToProcessWriter writer = createWriter();
+        writer.writeHeader();
+        writer.write(inputStream, null, null, (r, e) -> {});
+        verify(dataCountsReporter, times(1)).startNewIncrementalCount();
+
+        List<String[]> expectedRecords = new ArrayList<>();
+        // The final field is the control field
+        expectedRecords.add(new String[] { "time", "value", "." });
+        expectedRecords.add(new String[] { "4", "4.0", "" });
+        expectedRecords.add(new String[] { "5", "5.0", "" });
+        expectedRecords.add(new String[] { "3", "3.0", "" });
+        expectedRecords.add(new String[] { "4", "4.0", "" });
+        assertWrittenRecordsEqualTo(expectedRecords);
+
+        verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2);
+        verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong());
+        verify(dataCountsReporter).finishReporting();
+    }
+
+    public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsOutOfOrderWithinBucketSpan() throws Exception {
+        analysisConfig = new AnalysisConfig.Builder(
+            Collections.singletonList(
+                new Detector.Builder("metric", "value").build()
+            ))
+            .setBucketSpan(TimeValue.timeValueSeconds(10))
+            .build();
 
         StringBuilder input = new StringBuilder();
         input.append("time,metric,value\n");
@@ -207,6 +248,8 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
         input.append("3,foo,3.0\n");
         input.append("4,bar,4.0\n");
         input.append("2,bar,2.0\n");
+        input.append("12,bar,12.0\n");
+        input.append("2,bar,2.0\n");
         input.append("\0");
         InputStream inputStream = createInputStream(input.toString());
         CsvDataToProcessWriter writer = createWriter();
@@ -221,6 +264,8 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
         expectedRecords.add(new String[] { "5", "5.0", "" });
         expectedRecords.add(new String[] { "3", "3.0", "" });
         expectedRecords.add(new String[] { "4", "4.0", "" });
+        expectedRecords.add(new String[] { "2", "2.0", "" });
+        expectedRecords.add(new String[] { "12", "12.0", "" });
         assertWrittenRecordsEqualTo(expectedRecords);
 
         verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2);
@@ -258,10 +303,10 @@ public class CsvDataToProcessWriterTests extends ESTestCase {
         assertWrittenRecordsEqualTo(expectedRecords);
 
         verify(dataCountsReporter, times(2)).reportMissingField();
-        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000);
-        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 2000);
-        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 3000);
-        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 4000);
+        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000, 1000);
+        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 2000, 2000);
+        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 3000, 3000);
+        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 4000, 4000);
         verify(dataCountsReporter, times(1)).reportDateParseError(2);
         verify(dataCountsReporter).finishReporting();
     }

+ 51 - 9
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/JsonDataToProcessWriterTests.java

@@ -87,7 +87,9 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
         dataDescription.setTimeFormat(DataDescription.EPOCH);
 
         Detector detector = new Detector.Builder("metric", "value").build();
-        analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector)).build();
+        analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector))
+            .setBucketSpan(TimeValue.timeValueSeconds(1))
+            .build();
     }
 
     public void testWrite_GivenTimeFormatIsEpochAndDataIsValid() throws Exception {
@@ -168,11 +170,51 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
         verify(dataCountsReporter).finishReporting();
     }
 
+    public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsOutOfOrderWithinBucketSpan() throws Exception {
+        analysisConfig = new AnalysisConfig.Builder(
+            Collections.singletonList(
+                new Detector.Builder("metric", "value").build()
+            ))
+            .setBucketSpan(TimeValue.timeValueSeconds(10))
+            .build();
+
+        StringBuilder input = new StringBuilder();
+        input.append("{\"time\":\"4\", \"metric\":\"foo\", \"value\":\"4.0\"}");
+        input.append("{\"time\":\"5\", \"metric\":\"foo\", \"value\":\"5.0\"}");
+        input.append("{\"time\":\"3\", \"metric\":\"bar\", \"value\":\"3.0\"}");
+        input.append("{\"time\":\"4\", \"metric\":\"bar\", \"value\":\"4.0\"}");
+        input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}");
+        input.append("{\"time\":\"12\", \"metric\":\"bar\", \"value\":\"12.0\"}");
+        input.append("{\"time\":\"2\", \"metric\":\"bar\", \"value\":\"2.0\"}");
+        InputStream inputStream = createInputStream(input.toString());
+        JsonDataToProcessWriter writer = createWriter();
+        writer.writeHeader();
+        writer.write(inputStream, null, XContentType.JSON, (r, e) -> {});
+
+        List<String[]> expectedRecords = new ArrayList<>();
+        // The final field is the control field
+        expectedRecords.add(new String[]{"time", "value", "."});
+        expectedRecords.add(new String[]{"4", "4.0", ""});
+        expectedRecords.add(new String[]{"5", "5.0", ""});
+        expectedRecords.add(new String[]{"3", "3.0", ""});
+        expectedRecords.add(new String[]{"4", "4.0", ""});
+        expectedRecords.add(new String[]{"2", "2.0", ""});
+        expectedRecords.add(new String[]{"12", "12.0", ""});
+        assertWrittenRecordsEqualTo(expectedRecords);
+
+        verify(dataCountsReporter, times(1)).reportOutOfOrderRecord(2);
+        verify(dataCountsReporter, never()).reportLatestTimeIncrementalStats(anyLong());
+        verify(dataCountsReporter).finishReporting();
+    }
+
     public void testWrite_GivenTimeFormatIsEpochAndSomeTimestampsWithinLatencySomeOutOfOrder() throws Exception {
-        AnalysisConfig.Builder builder =
-                new AnalysisConfig.Builder(Collections.singletonList(new Detector.Builder("metric", "value").build()));
-        builder.setLatency(TimeValue.timeValueSeconds(2));
-        analysisConfig = builder.build();
+        analysisConfig = new AnalysisConfig.Builder(
+            Collections.singletonList(
+                new Detector.Builder("metric", "value").build()
+            ))
+            .setLatency(TimeValue.timeValueSeconds(2))
+            .setBucketSpan(TimeValue.timeValueSeconds(1)).setLatency(TimeValue.timeValueSeconds(2))
+            .build();
 
         StringBuilder input = new StringBuilder();
         input.append("{\"time\":\"4\", \"metric\":\"foo\", \"value\":\"4.0\"}");
@@ -326,10 +368,10 @@ public class JsonDataToProcessWriterTests extends ESTestCase {
         assertWrittenRecordsEqualTo(expectedRecords);
 
         verify(dataCountsReporter, times(1)).reportMissingFields(1L);
-        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000);
-        verify(dataCountsReporter, times(1)).reportRecordWritten(1, 2000);
-        verify(dataCountsReporter, times(1)).reportRecordWritten(1, 3000);
-        verify(dataCountsReporter, times(1)).reportRecordWritten(1, 4000);
+        verify(dataCountsReporter, times(1)).reportRecordWritten(2, 1000, 1000);
+        verify(dataCountsReporter, times(1)).reportRecordWritten(1, 2000, 2000);
+        verify(dataCountsReporter, times(1)).reportRecordWritten(1, 3000, 3000);
+        verify(dataCountsReporter, times(1)).reportRecordWritten(1, 4000, 4000);
         verify(dataCountsReporter, times(1)).reportDateParseError(0);
         verify(dataCountsReporter).finishReporting();
     }