소스 검색

Add delayed datacheck to the datafeed job runner (#35387)

* ML: Adding missing datacheck to datafeedjob

* Adding client side and docs

* Making adjustments to validations

* Making values default to on, having more sensible limits

* Intermittent commit, still need to figure out interval

* Adjusting delayed data check interval

* updating docs

* Making parameter Boolean, so it is nullable

* bumping bwc to 7 before backport

* changing to version current

* moving delayed data check config its own object

* Separation of duties for delayed data detection

* fixing checkstyles

* fixing checkstyles

* Adjusting default behavior so that null windows are allowed

* Mentioning the default value

* Fixing comments, syncing up validations
Benjamin Trent 7 년 전
부모
커밋
f7ada9b29b
31개의 변경된 파일1137개의 추가작업 그리고 166개의 파일을 삭제
  1. 33 4
      client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java
  2. 23 3
      client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java
  3. 130 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DelayedDataCheckConfig.java
  4. 9 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java
  5. 3 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java
  6. 3 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java
  7. 65 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DelayedDataCheckConfigTests.java
  8. 11 0
      docs/java-rest/high-level/ml/put-datafeed.asciidoc
  9. 26 0
      docs/reference/ml/apis/datafeedresource.asciidoc
  10. 4 0
      docs/reference/ml/apis/put-datafeed.asciidoc
  11. 34 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java
  12. 24 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java
  13. 33 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java
  14. 127 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DelayedDataCheckConfig.java
  15. 10 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java
  16. 17 7
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java
  17. 3 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java
  18. 6 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java
  19. 95 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DelayedDataCheckConfigTests.java
  20. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java
  21. 35 13
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java
  22. 58 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java
  23. 4 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java
  24. 38 48
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java
  25. 14 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetector.java
  26. 125 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java
  27. 35 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/NullDelayedDataDetector.java
  28. 43 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java
  29. 25 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java
  30. 0 76
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java
  31. 103 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactoryTests.java

+ 33 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java

@@ -62,6 +62,7 @@ public class DatafeedConfig implements ToXContentObject {
     public static final ParseField AGGREGATIONS = new ParseField("aggregations");
     public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
     public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
+    public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
 
     public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
         "datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
@@ -88,6 +89,7 @@ public class DatafeedConfig implements ToXContentObject {
         }, SCRIPT_FIELDS);
         PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
         PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
+        PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG);
     }
 
     private static BytesReference parseBytes(XContentParser parser) throws IOException {
@@ -107,10 +109,12 @@ public class DatafeedConfig implements ToXContentObject {
     private final List<SearchSourceBuilder.ScriptField> scriptFields;
     private final Integer scrollSize;
     private final ChunkingConfig chunkingConfig;
+    private final DelayedDataCheckConfig delayedDataCheckConfig;
+
 
     private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
                            BytesReference query, BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
-                           Integer scrollSize, ChunkingConfig chunkingConfig) {
+                           Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
         this.id = id;
         this.jobId = jobId;
         this.queryDelay = queryDelay;
@@ -122,6 +126,7 @@ public class DatafeedConfig implements ToXContentObject {
         this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
         this.scrollSize = scrollSize;
         this.chunkingConfig = chunkingConfig;
+        this.delayedDataCheckConfig = delayedDataCheckConfig;
     }
 
     public String getId() {
@@ -168,6 +173,10 @@ public class DatafeedConfig implements ToXContentObject {
         return chunkingConfig;
     }
 
+    public DelayedDataCheckConfig getDelayedDataCheckConfig() {
+        return delayedDataCheckConfig;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
@@ -204,6 +213,9 @@ public class DatafeedConfig implements ToXContentObject {
         if (chunkingConfig != null) {
             builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
         }
+        if (delayedDataCheckConfig != null) {
+            builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
+        }
 
         builder.endObject();
         return builder;
@@ -244,7 +256,8 @@ public class DatafeedConfig implements ToXContentObject {
             && Objects.equals(this.scrollSize, that.scrollSize)
             && Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
             && Objects.equals(this.scriptFields, that.scriptFields)
-            && Objects.equals(this.chunkingConfig, that.chunkingConfig);
+            && Objects.equals(this.chunkingConfig, that.chunkingConfig)
+            && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig);
     }
 
     /**
@@ -255,7 +268,7 @@ public class DatafeedConfig implements ToXContentObject {
     @Override
     public int hashCode() {
         return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
-            chunkingConfig);
+            chunkingConfig, delayedDataCheckConfig);
     }
 
     public static Builder builder(String id, String jobId) {
@@ -275,6 +288,7 @@ public class DatafeedConfig implements ToXContentObject {
         private List<SearchSourceBuilder.ScriptField> scriptFields;
         private Integer scrollSize;
         private ChunkingConfig chunkingConfig;
+        private DelayedDataCheckConfig delayedDataCheckConfig;
 
         public Builder(String id, String jobId) {
             this.id = Objects.requireNonNull(id, ID.getPreferredName());
@@ -293,6 +307,7 @@ public class DatafeedConfig implements ToXContentObject {
             this.scriptFields = config.scriptFields;
             this.scrollSize = config.scrollSize;
             this.chunkingConfig = config.chunkingConfig;
+            this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
         }
 
         public Builder setIndices(List<String> indices) {
@@ -366,9 +381,23 @@ public class DatafeedConfig implements ToXContentObject {
             return this;
         }
 
+        /**
+         * This sets the {@link DelayedDataCheckConfig} settings.
+         *
+         * See {@link DelayedDataCheckConfig} for more information.
+         *
+         * @param delayedDataCheckConfig the delayed data check configuration
+         *                               Default value is enabled, with `check_window` being null. This means the true window is
+         *                               calculated when the real-time Datafeed runs.
+         */
+        public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
+            this.delayedDataCheckConfig = delayedDataCheckConfig;
+            return this;
+        }
+
         public DatafeedConfig build() {
             return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
-                chunkingConfig);
+                chunkingConfig, delayedDataCheckConfig);
         }
 
         private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {

+ 23 - 3
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java

@@ -77,6 +77,9 @@ public class DatafeedUpdate implements ToXContentObject {
         }, DatafeedConfig.SCRIPT_FIELDS);
         PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE);
         PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG);
+        PARSER.declareObject(Builder::setDelayedDataCheckConfig,
+            DelayedDataCheckConfig.PARSER,
+            DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
     }
 
     private static BytesReference parseBytes(XContentParser parser) throws IOException {
@@ -96,10 +99,11 @@ public class DatafeedUpdate implements ToXContentObject {
     private final List<SearchSourceBuilder.ScriptField> scriptFields;
     private final Integer scrollSize;
     private final ChunkingConfig chunkingConfig;
+    private final DelayedDataCheckConfig delayedDataCheckConfig;
 
     private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
                            BytesReference query, BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
-                           Integer scrollSize, ChunkingConfig chunkingConfig) {
+                           Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
         this.id = id;
         this.jobId = jobId;
         this.queryDelay = queryDelay;
@@ -111,6 +115,7 @@ public class DatafeedUpdate implements ToXContentObject {
         this.scriptFields = scriptFields;
         this.scrollSize = scrollSize;
         this.chunkingConfig = chunkingConfig;
+        this.delayedDataCheckConfig = delayedDataCheckConfig;
     }
 
     /**
@@ -146,6 +151,9 @@ public class DatafeedUpdate implements ToXContentObject {
             }
             builder.endObject();
         }
+        if (delayedDataCheckConfig != null) {
+            builder.field(DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
+        }
         addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
         addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
         builder.endObject();
@@ -198,6 +206,10 @@ public class DatafeedUpdate implements ToXContentObject {
         return chunkingConfig;
     }
 
+    public DelayedDataCheckConfig getDelayedDataCheckConfig() {
+        return delayedDataCheckConfig;
+    }
+
     private static Map<String, Object> asMap(BytesReference bytesReference) {
         return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
     }
@@ -232,6 +244,7 @@ public class DatafeedUpdate implements ToXContentObject {
             && Objects.equals(asMap(this.query), asMap(that.query))
             && Objects.equals(this.scrollSize, that.scrollSize)
             && Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
+            && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
             && Objects.equals(this.scriptFields, that.scriptFields)
             && Objects.equals(this.chunkingConfig, that.chunkingConfig);
     }
@@ -244,7 +257,7 @@ public class DatafeedUpdate implements ToXContentObject {
     @Override
     public int hashCode() {
         return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
-            chunkingConfig);
+            chunkingConfig, delayedDataCheckConfig);
     }
 
     public static Builder builder(String id) {
@@ -264,6 +277,7 @@ public class DatafeedUpdate implements ToXContentObject {
         private List<SearchSourceBuilder.ScriptField> scriptFields;
         private Integer scrollSize;
         private ChunkingConfig chunkingConfig;
+        private DelayedDataCheckConfig delayedDataCheckConfig;
 
         public Builder(String id) {
             this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
@@ -281,6 +295,7 @@ public class DatafeedUpdate implements ToXContentObject {
             this.scriptFields = config.scriptFields;
             this.scrollSize = config.scrollSize;
             this.chunkingConfig = config.chunkingConfig;
+            this.delayedDataCheckConfig = config.delayedDataCheckConfig;
         }
 
         public Builder setJobId(String jobId) {
@@ -359,9 +374,14 @@ public class DatafeedUpdate implements ToXContentObject {
             return this;
         }
 
+        public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
+            this.delayedDataCheckConfig = delayedDataCheckConfig;
+            return this;
+        }
+
         public DatafeedUpdate build() {
             return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
-                chunkingConfig);
+                chunkingConfig, delayedDataCheckConfig);
         }
 
         private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {

+ 130 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DelayedDataCheckConfig.java

@@ -0,0 +1,130 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml.datafeed;
+
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * The configuration object containing the delayed data check settings.
+ *
+ * See {@link DelayedDataCheckConfig#enabledDelayedDataCheckConfig(TimeValue)} for creating a new
+ * enabled datacheck with the given check_window
+ *
+ * See {@link DelayedDataCheckConfig#disabledDelayedDataCheckConfig()} for creating a config for disabling
+ * delayed data checking.
+ */
+public class DelayedDataCheckConfig implements ToXContentObject {
+
+    public static final ParseField ENABLED = new ParseField("enabled");
+    public static final ParseField CHECK_WINDOW = new ParseField("check_window");
+
+    // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly
+    public static final ConstructingObjectParser<DelayedDataCheckConfig, Void> PARSER = new ConstructingObjectParser<>(
+        "delayed_data_check_config", true, a -> new DelayedDataCheckConfig((Boolean) a[0], (TimeValue) a[1]));
+    static {
+        PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
+        PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
+            if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
+                return TimeValue.parseTimeValue(p.text(), CHECK_WINDOW.getPreferredName());
+            }
+            throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
+        }, CHECK_WINDOW, ObjectParser.ValueType.STRING);
+    }
+
+   /**
+    * This creates a new DelayedDataCheckConfig that has a check_window of the passed `timeValue`
+    *
+    * We query the index to the latest finalized bucket from this TimeValue in the past looking to see if any data has been indexed
+    * since the data was read with the Datafeed.
+    *
+    * The window must be larger than the {@link org.elasticsearch.client.ml.job.config.AnalysisConfig#bucketSpan}, less than
+    * 24 hours, and span less than 10,000x buckets.
+    *
+    *
+    * @param timeValue The time length in the past from the latest finalized bucket to look for latent data.
+    *                  If `null` is provided, the appropriate window is calculated when it is used
+    **/
+    public static DelayedDataCheckConfig enabledDelayedDataCheckConfig(TimeValue timeValue) {
+        return new DelayedDataCheckConfig(true, timeValue);
+    }
+
+    /**
+     * This creates a new DelayedDataCheckConfig that disables the data check.
+     */
+    public static DelayedDataCheckConfig disabledDelayedDataCheckConfig() {
+        return new DelayedDataCheckConfig(false, null);
+    }
+
+    private final boolean enabled;
+    private final TimeValue checkWindow;
+
+    DelayedDataCheckConfig(Boolean enabled, TimeValue checkWindow) {
+        this.enabled = enabled;
+        this.checkWindow = checkWindow;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    @Nullable
+    public TimeValue getCheckWindow() {
+        return checkWindow;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(ENABLED.getPreferredName(), enabled);
+        if (checkWindow != null) {
+            builder.field(CHECK_WINDOW.getPreferredName(), checkWindow.getStringRep());
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(enabled, checkWindow);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        DelayedDataCheckConfig other = (DelayedDataCheckConfig) obj;
+        return Objects.equals(this.enabled, other.enabled) && Objects.equals(this.checkWindow, other.checkWindow);
+    }
+
+}

+ 9 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java

@@ -95,6 +95,7 @@ import org.elasticsearch.client.ml.datafeed.ChunkingConfig;
 import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.client.ml.datafeed.DatafeedStats;
 import org.elasticsearch.client.ml.datafeed.DatafeedUpdate;
+import org.elasticsearch.client.ml.datafeed.DelayedDataCheckConfig;
 import org.elasticsearch.client.ml.job.config.AnalysisConfig;
 import org.elasticsearch.client.ml.job.config.AnalysisLimits;
 import org.elasticsearch.client.ml.job.config.DataDescription;
@@ -583,6 +584,14 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
             datafeedBuilder.setQueryDelay(TimeValue.timeValueMinutes(1)); // <1>
             // end::put-datafeed-config-set-query-delay
 
+            // tag::put-datafeed-config-set-delayed-data-check-config
+            datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig
+                .enabledDelayedDataCheckConfig(TimeValue.timeValueHours(1))); // <1>
+            // end::put-datafeed-config-set-delayed-data-check-config
+
+            // no need to accidentally trip internal validations due to job bucket size
+            datafeedBuilder.setDelayedDataCheckConfig(null);
+
             List<SearchSourceBuilder.ScriptField> scriptFields = Collections.emptyList();
             // tag::put-datafeed-config-set-script-fields
             datafeedBuilder.setScriptFields(scriptFields); // <1>

+ 3 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java

@@ -103,6 +103,9 @@ public class DatafeedConfigTests extends AbstractXContentTestCase<DatafeedConfig
         if (randomBoolean()) {
             builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
         }
+        if (randomBoolean()) {
+            builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
+        }
         return builder;
     }
 

+ 3 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java

@@ -83,6 +83,9 @@ public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate
         if (randomBoolean()) {
             builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
         }
+        if (randomBoolean()) {
+            builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
+        }
         return builder.build();
     }
 

+ 65 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DelayedDataCheckConfigTests.java

@@ -0,0 +1,65 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.ml.datafeed;
+
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class DelayedDataCheckConfigTests extends AbstractXContentTestCase<DelayedDataCheckConfig> {
+
+    @Override
+    protected DelayedDataCheckConfig createTestInstance() {
+        return createRandomizedConfig();
+    }
+
+    @Override
+    protected DelayedDataCheckConfig doParseInstance(XContentParser parser) {
+        return DelayedDataCheckConfig.PARSER.apply(parser, null);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+    public void testEnabledDelayedDataCheckConfig() {
+        DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(5));
+        assertThat(delayedDataCheckConfig.isEnabled(), equalTo(true));
+        assertThat(delayedDataCheckConfig.getCheckWindow(), equalTo(TimeValue.timeValueHours(5)));
+    }
+
+    public void testDisabledDelayedDataCheckConfig() {
+        DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.disabledDelayedDataCheckConfig();
+        assertThat(delayedDataCheckConfig.isEnabled(), equalTo(false));
+        assertThat(delayedDataCheckConfig.getCheckWindow(), equalTo(null));
+    }
+
+    public static DelayedDataCheckConfig createRandomizedConfig() {
+        boolean enabled = randomBoolean();
+        TimeValue timeWindow = null;
+        if (enabled || randomBoolean()) {
+            timeWindow = TimeValue.timeValueMillis(randomLongBetween(1, 1_000));
+        }
+        return new DelayedDataCheckConfig(enabled, timeWindow);
+    }
+}
+

+ 11 - 0
docs/java-rest/high-level/ml/put-datafeed.asciidoc

@@ -63,6 +63,17 @@ include-tagged::{doc-tests-file}[{api}-config-set-query-delay]
 --------------------------------------------------
 <1> The time interval behind real time that data is queried.
 
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-config-set-delayed-data-check-config]
+--------------------------------------------------
+<1> Sets the delayed data check configuration.
+The window must be larger than the Job's bucket size, but smaller than 24 hours,
+and span less than 10,000 buckets.
+Defaults to `null`, which causes an appropriate window span to be calculated when
+the datafeed runs.
+To explicitly disable, pass `DelayedDataCheckConfig.disabledDelayedDataCheckConfig()`.
+
 ["source","java",subs="attributes,callouts,macros"]
 --------------------------------------------------
 include-tagged::{doc-tests-file}[{api}-config-set-script-fields]

+ 26 - 0
docs/reference/ml/apis/datafeedresource.asciidoc

@@ -64,6 +64,11 @@ A {dfeed} resource has the following properties:
   example: `[]`. This property is provided for backwards compatibility with
   releases earlier than 6.0.0. For more information, see <<removal-of-types>>.  
 
+`delayed_data_check_config`::
+  (object) Specifies if and with how large a window should the data feed check
+  for missing data. See <<ml-datafeed-delayed-data-check-config>>.
+  For example: `{"enabled": true, "check_window": "1h"}`
+
 [[ml-datafeed-chunking-config]]
 ==== Chunking Configuration Objects
 
@@ -86,6 +91,27 @@ A chunking configuration object has the following properties:
   This setting is only applicable when the mode is set to `manual`.
   For example: `3h`.
 
+[[ml-datafeed-delayed-data-check-config]]
+==== Delayed Data Check Configuration Objects
+
+The {dfeed} can optionally search over indices that have already been read in
+an effort to find if any data has since been added to the index. If missing data
+is found, it is a good indication that the `query_delay` option is set too low and
+the data is being indexed after the {dfeed} has passed that moment in time.
+
+This check only runs on real-time {dfeeds}
+
+The configuration object has the following properties:
+
+`enabled`::
+  (boolean) Should the {dfeed} periodically check for data being indexed after reading.
+  Defaults to `true`
+
+`check_window`::
+  (time units) The window of time before the latest finalized bucket that should be searched
+  for late data. Defaults to `null` which causes an appropriate `check_window` to be calculated
+  when the real-time {dfeed} runs.
+
 [float]
 [[ml-datafeed-counts]]
 ==== {dfeed-cap} Counts

+ 4 - 0
docs/reference/ml/apis/put-datafeed.asciidoc

@@ -78,6 +78,10 @@ You must create a job before you create a {dfeed}.  You can associate only one
   For example: `[]`. This property is provided for backwards compatibility with
   releases earlier than 6.0.0. For more information, see <<removal-of-types>>.
 
+`delayed_data_check_config`::
+  (object) Specifies if and with how large a window should the data feed check
+  for missing data. See <<ml-datafeed-delayed-data-check-config>>.
+
 For more information about these properties,
 see <<ml-datafeed-resource>>.
 

+ 34 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

@@ -84,6 +84,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
     public static final ParseField SOURCE = new ParseField("_source");
     public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
     public static final ParseField HEADERS = new ParseField("headers");
+    public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
 
     // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly
     public static final ObjectParser<Builder, Void> LENIENT_PARSER = createParser(true);
@@ -124,7 +125,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
             // (For config, headers are explicitly transferred from the auth headers by code in the put/update datafeed actions.)
             parser.declareObject(Builder::setHeaders, (p, c) -> p.mapStrings(), HEADERS);
         }
-
+        parser.declareObject(Builder::setDelayedDataCheckConfig,
+            ignoreUnknownFields ? DelayedDataCheckConfig.LENIENT_PARSER : DelayedDataCheckConfig.STRICT_PARSER,
+            DELAYED_DATA_CHECK_CONFIG);
         return parser;
     }
 
@@ -149,10 +152,12 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
     private final Integer scrollSize;
     private final ChunkingConfig chunkingConfig;
     private final Map<String, String> headers;
+    private final DelayedDataCheckConfig delayedDataCheckConfig;
 
     private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
                            QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
-                           Integer scrollSize, ChunkingConfig chunkingConfig, Map<String, String> headers) {
+                           Integer scrollSize, ChunkingConfig chunkingConfig, Map<String, String> headers,
+                           DelayedDataCheckConfig delayedDataCheckConfig) {
         this.id = id;
         this.jobId = jobId;
         this.queryDelay = queryDelay;
@@ -165,6 +170,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
         this.scrollSize = scrollSize;
         this.chunkingConfig = chunkingConfig;
         this.headers = Collections.unmodifiableMap(headers);
+        this.delayedDataCheckConfig = delayedDataCheckConfig;
     }
 
     public DatafeedConfig(StreamInput in) throws IOException {
@@ -196,6 +202,11 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
         } else {
             this.headers = Collections.emptyMap();
         }
+        if (in.getVersion().onOrAfter(Version.CURRENT)) {
+            delayedDataCheckConfig = in.readOptionalWriteable(DelayedDataCheckConfig::new);
+        } else {
+            delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
+        }
     }
 
     public String getId() {
@@ -260,6 +271,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
         return headers;
     }
 
+    public DelayedDataCheckConfig getDelayedDataCheckConfig() {
+        return delayedDataCheckConfig;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(id);
@@ -291,6 +306,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
         if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
             out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
         }
+        if (out.getVersion().onOrAfter(Version.CURRENT)) {
+            out.writeOptionalWriteable(delayedDataCheckConfig);
+        }
     }
 
     @Override
@@ -328,6 +346,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
         if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == true) {
             builder.field(HEADERS.getPreferredName(), headers);
         }
+        if (delayedDataCheckConfig != null) {
+            builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
+        }
         return builder;
     }
 
@@ -359,13 +380,14 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
                 && Objects.equals(this.aggregations, that.aggregations)
                 && Objects.equals(this.scriptFields, that.scriptFields)
                 && Objects.equals(this.chunkingConfig, that.chunkingConfig)
-                && Objects.equals(this.headers, that.headers);
+                && Objects.equals(this.headers, that.headers)
+                && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields,
-                chunkingConfig, headers);
+                chunkingConfig, headers, delayedDataCheckConfig);
     }
 
     @Override
@@ -438,6 +460,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
         private Integer scrollSize = DEFAULT_SCROLL_SIZE;
         private ChunkingConfig chunkingConfig;
         private Map<String, String> headers = Collections.emptyMap();
+        private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
 
         public Builder() {
         }
@@ -461,6 +484,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
             this.scrollSize = config.scrollSize;
             this.chunkingConfig = config.chunkingConfig;
             this.headers = config.headers;
+            this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
         }
 
         public void setId(String datafeedId) {
@@ -523,6 +547,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
             this.chunkingConfig = chunkingConfig;
         }
 
+        public void setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
+            this.delayedDataCheckConfig = delayedDataCheckConfig;
+        }
+
         public DatafeedConfig build() {
             ExceptionsHelper.requireNonNull(id, ID.getPreferredName());
             ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
@@ -535,11 +563,12 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
             if (types == null || types.contains(null) || types.contains("")) {
                 throw invalidOptionValue(TYPES.getPreferredName(), types);
             }
+
             validateAggregations();
             setDefaultChunkingConfig();
             setDefaultQueryDelay();
             return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
-                    chunkingConfig, headers);
+                    chunkingConfig, headers, delayedDataCheckConfig);
         }
 
         void validateAggregations() {

+ 24 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java

@@ -31,6 +31,30 @@ public final class DatafeedJobValidator {
             checkValidHistogramInterval(datafeedConfig, analysisConfig);
             checkFrequencyIsMultipleOfHistogramInterval(datafeedConfig);
         }
+
+        DelayedDataCheckConfig delayedDataCheckConfig = datafeedConfig.getDelayedDataCheckConfig();
+        TimeValue bucketSpan = analysisConfig.getBucketSpan();
+        if (delayedDataCheckConfig.isEnabled()) {
+            checkValidDelayedDataCheckConfig(bucketSpan, delayedDataCheckConfig);
+        }
+    }
+
+    private static void checkValidDelayedDataCheckConfig(TimeValue bucketSpan, DelayedDataCheckConfig delayedDataCheckConfig) {
+        TimeValue delayedDataCheckWindow =  delayedDataCheckConfig.getCheckWindow();
+        if (delayedDataCheckWindow != null) { // NULL implies we calculate on use and thus is always valid
+            if (delayedDataCheckWindow.compareTo(bucketSpan) < 0) {
+                throw ExceptionsHelper.badRequestException(
+                    Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL,
+                        delayedDataCheckWindow,
+                        bucketSpan));
+            }
+            if (delayedDataCheckWindow.millis() > bucketSpan.millis() * DelayedDataCheckConfig.MAX_NUMBER_SPANABLE_BUCKETS) {
+                throw ExceptionsHelper.badRequestException(
+                    Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS,
+                        delayedDataCheckWindow,
+                        bucketSpan));
+            }
+        }
     }
 
     private static void checkSummaryCountFieldNameIsSet(AnalysisConfig analysisConfig) {

+ 33 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.core.ml.datafeed;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -68,6 +69,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         }, DatafeedConfig.SCRIPT_FIELDS);
         PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE);
         PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.STRICT_PARSER, DatafeedConfig.CHUNKING_CONFIG);
+        PARSER.declareObject(Builder::setDelayedDataCheckConfig,
+            DelayedDataCheckConfig.STRICT_PARSER,
+            DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
     }
 
     private final String id;
@@ -81,10 +85,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
     private final List<SearchSourceBuilder.ScriptField> scriptFields;
     private final Integer scrollSize;
     private final ChunkingConfig chunkingConfig;
+    private final DelayedDataCheckConfig delayedDataCheckConfig;
 
     private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
                            QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
-                           Integer scrollSize, ChunkingConfig chunkingConfig) {
+                           Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
         this.id = id;
         this.jobId = jobId;
         this.queryDelay = queryDelay;
@@ -96,6 +101,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         this.scriptFields = scriptFields;
         this.scrollSize = scrollSize;
         this.chunkingConfig = chunkingConfig;
+        this.delayedDataCheckConfig = delayedDataCheckConfig;
     }
 
     public DatafeedUpdate(StreamInput in) throws IOException {
@@ -122,6 +128,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         }
         this.scrollSize = in.readOptionalVInt();
         this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new);
+        if (in.getVersion().onOrAfter(Version.CURRENT)) {
+            delayedDataCheckConfig = in.readOptionalWriteable(DelayedDataCheckConfig::new);
+        } else {
+            delayedDataCheckConfig = null;
+        }
     }
 
     /**
@@ -159,6 +170,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         }
         out.writeOptionalVInt(scrollSize);
         out.writeOptionalWriteable(chunkingConfig);
+        if (out.getVersion().onOrAfter(Version.CURRENT)) {
+            out.writeOptionalWriteable(delayedDataCheckConfig);
+        }
     }
 
     @Override
@@ -185,6 +199,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         }
         addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
         addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
+        addOptionalField(builder, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG, delayedDataCheckConfig);
         builder.endObject();
         return builder;
     }
@@ -250,6 +265,10 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         return chunkingConfig;
     }
 
+    public DelayedDataCheckConfig getDelayedDataCheckConfig() {
+        return delayedDataCheckConfig;
+    }
+
     /**
      * Applies the update to the given {@link DatafeedConfig}
      * @return a new {@link DatafeedConfig} that contains the update
@@ -290,6 +309,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         if (chunkingConfig != null) {
             builder.setChunkingConfig(chunkingConfig);
         }
+        if (delayedDataCheckConfig != null) {
+            builder.setDelayedDataCheckConfig(delayedDataCheckConfig);
+        }
 
         if (headers.isEmpty() == false) {
             // Adjust the request, adding security headers from the current thread context
@@ -328,6 +350,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
                 && Objects.equals(this.query, that.query)
                 && Objects.equals(this.scrollSize, that.scrollSize)
                 && Objects.equals(this.aggregations, that.aggregations)
+                && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
                 && Objects.equals(this.scriptFields, that.scriptFields)
                 && Objects.equals(this.chunkingConfig, that.chunkingConfig);
     }
@@ -335,7 +358,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
     @Override
     public int hashCode() {
         return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields,
-                chunkingConfig);
+                chunkingConfig, delayedDataCheckConfig);
     }
 
     @Override
@@ -352,6 +375,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
                 && (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay()))
                 && (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations()))
                 && (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
+                && (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig()))
                 && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));
     }
 
@@ -368,6 +392,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         private List<SearchSourceBuilder.ScriptField> scriptFields;
         private Integer scrollSize;
         private ChunkingConfig chunkingConfig;
+        private DelayedDataCheckConfig delayedDataCheckConfig;
 
         public Builder() {
         }
@@ -388,6 +413,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
             this.scriptFields = config.scriptFields;
             this.scrollSize = config.scrollSize;
             this.chunkingConfig = config.chunkingConfig;
+            this.delayedDataCheckConfig = config.delayedDataCheckConfig;
         }
 
         public void setId(String datafeedId) {
@@ -428,6 +454,10 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
             this.scriptFields = sorted;
         }
 
+        public void setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
+            this.delayedDataCheckConfig = delayedDataCheckConfig;
+        }
+
         public void setScrollSize(int scrollSize) {
             this.scrollSize = scrollSize;
         }
@@ -438,7 +468,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
 
         public DatafeedUpdate build() {
             return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
-                    chunkingConfig);
+                    chunkingConfig, delayedDataCheckConfig);
         }
     }
 }

+ 127 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DelayedDataCheckConfig.java

@@ -0,0 +1,127 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ml.datafeed;
+
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class DelayedDataCheckConfig implements ToXContentObject, Writeable {
+
+    public static final TimeValue MAX_DELAYED_DATA_WINDOW = TimeValue.timeValueHours(24);
+    public static final int MAX_NUMBER_SPANABLE_BUCKETS = 10_000;
+
+    public static final ParseField ENABLED = new ParseField("enabled");
+    public static final ParseField CHECK_WINDOW = new ParseField("check_window");
+
+    // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly
+    public static final ConstructingObjectParser<DelayedDataCheckConfig, Void> LENIENT_PARSER = createParser(true);
+    public static final ConstructingObjectParser<DelayedDataCheckConfig, Void> STRICT_PARSER = createParser(false);
+
+    private static ConstructingObjectParser<DelayedDataCheckConfig, Void> createParser(boolean ignoreUnknownFields) {
+        ConstructingObjectParser<DelayedDataCheckConfig, Void> parser = new ConstructingObjectParser<>(
+            "delayed_data_check_config", ignoreUnknownFields, a -> new DelayedDataCheckConfig((Boolean) a[0], (TimeValue) a[1]));
+
+        parser.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
+        parser.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
+            if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
+                return TimeValue.parseTimeValue(p.text(), CHECK_WINDOW.getPreferredName());
+            }
+            throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
+        }, CHECK_WINDOW, ObjectParser.ValueType.STRING);
+
+        return parser;
+    }
+
+    public static DelayedDataCheckConfig defaultDelayedDataCheckConfig() {
+        return new DelayedDataCheckConfig(true, null);
+    }
+
+    public static DelayedDataCheckConfig enabledDelayedDataCheckConfig(TimeValue timeValue) {
+        return new DelayedDataCheckConfig(true, timeValue);
+    }
+
+    public static DelayedDataCheckConfig disabledDelayedDataCheckConfig() {
+        return new DelayedDataCheckConfig(false, null);
+    }
+
+    private final boolean enabled;
+    private final TimeValue checkWindow;
+
+    DelayedDataCheckConfig(Boolean enabled, TimeValue checkWindow) {
+        this.enabled = enabled;
+        if (enabled && checkWindow != null) {
+            TimeUtils.checkPositive(checkWindow, CHECK_WINDOW);
+            if (checkWindow.compareTo(MAX_DELAYED_DATA_WINDOW) > 0) {
+                throw new IllegalArgumentException("check_window [" + checkWindow.getStringRep() + "] must be less than or equal to [24h]");
+            }
+        }
+        this.checkWindow = checkWindow;
+    }
+
+    public DelayedDataCheckConfig(StreamInput in) throws IOException {
+        enabled = in.readBoolean();
+        checkWindow = in.readOptionalTimeValue();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeBoolean(enabled);
+        out.writeOptionalTimeValue(checkWindow);
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    @Nullable
+    public TimeValue getCheckWindow() {
+        return checkWindow;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+        builder.startObject();
+        builder.field(ENABLED.getPreferredName(), enabled);
+        if (checkWindow != null) {
+            builder.field(CHECK_WINDOW.getPreferredName(), checkWindow.getStringRep());
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(enabled, checkWindow);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        DelayedDataCheckConfig other = (DelayedDataCheckConfig) obj;
+        return Objects.equals(this.enabled, other.enabled) && Objects.equals(this.checkWindow, other.checkWindow);
+    }
+
+}

+ 10 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -22,6 +22,13 @@ public final class Messages {
     public static final String DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS =
             "script_fields cannot be used in combination with aggregations";
     public static final String DATAFEED_CONFIG_INVALID_OPTION_VALUE = "Invalid {0} value ''{1}'' in datafeed configuration";
+    public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL =
+        "delayed_data_check_window [{0}] must be greater than the bucket_span [{1}]";
+    public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_LARGE =
+        "delayed_data_check_window [{0}] must be less than or equal to [24h]";
+    public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS =
+        "delayed_data_check_window [{0}] must be less than 10,000x the bucket_span [{1}]";
+
     public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
     public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
     public static final String DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM =
@@ -63,6 +70,9 @@ public final class Messages {
     public static final String JOB_AUDIT_DATAFEED_LOOKBACK_COMPLETED = "Datafeed lookback completed";
     public static final String JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA = "Datafeed lookback retrieved no data";
     public static final String JOB_AUDIT_DATAFEED_NO_DATA = "Datafeed has been retrieving no data for a while";
+    public static final String JOB_AUDIT_DATAFEED_MISSING_DATA =
+        "Datafeed has missed {0} documents due to ingest latency, latest bucket with missing data is [{1}]." +
+            " Consider increasing query_delay";
     public static final String JOB_AUDIT_DATAFEED_RECOVERED = "Datafeed has recovered data extraction and analysis";
     public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]";
     public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";

+ 17 - 7
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java

@@ -87,21 +87,31 @@ public final class TimeUtils {
         checkMultiple(timeValue, baseUnit, field);
     }
 
-    private static void checkNonNegative(TimeValue timeValue, ParseField field) {
+    /**
+     * Checks that the given {@code timeValue} is positive.
+     *
+     * <ul>
+     *   <li>1s is valid</li>
+     *   <li>-1s is invalid</li>
+     * </ul>
+     */
+    public static void checkPositive(TimeValue timeValue, ParseField field) {
         long nanos = timeValue.getNanos();
-        if (nanos < 0) {
-            throw new IllegalArgumentException(field.getPreferredName() + " cannot be less than 0. Value = " + timeValue.toString());
+        if (nanos <= 0) {
+            throw new IllegalArgumentException(field.getPreferredName() + " cannot be less or equal than 0. Value = "
+                    + timeValue.toString());
         }
     }
 
-    private static void checkPositive(TimeValue timeValue, ParseField field) {
+    private static void checkNonNegative(TimeValue timeValue, ParseField field) {
         long nanos = timeValue.getNanos();
-        if (nanos <= 0) {
-            throw new IllegalArgumentException(field.getPreferredName() + " cannot be less or equal than 0. Value = "
-                    + timeValue.toString());
+        if (nanos < 0) {
+            throw new IllegalArgumentException(field.getPreferredName() + " cannot be less than 0. Value = " + timeValue.toString());
         }
     }
 
+
+
     /**
      * Check the given {@code timeValue} is a multiple of the {@code baseUnit}
      */

+ 3 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java

@@ -109,6 +109,9 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
         if (randomBoolean()) {
             builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
         }
+        if (randomBoolean()) {
+            builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig(bucketSpanMillis));
+        }
         return builder.build();
     }
 

+ 6 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java

@@ -89,6 +89,9 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
         if (randomBoolean()) {
             builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
         }
+        if (randomBoolean()) {
+            builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig(randomLongBetween(300_001, 400_000)));
+        }
         return builder.build();
     }
 
@@ -155,6 +158,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
         update.setScriptFields(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false)));
         update.setScrollSize(8000);
         update.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)));
+        update.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(1)));
 
         DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap());
 
@@ -169,6 +173,8 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
                 equalTo(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false))));
         assertThat(updatedDatafeed.getScrollSize(), equalTo(8000));
         assertThat(updatedDatafeed.getChunkingConfig(), equalTo(ChunkingConfig.newManual(TimeValue.timeValueHours(1))));
+        assertThat(updatedDatafeed.getDelayedDataCheckConfig().isEnabled(), equalTo(true));
+        assertThat(updatedDatafeed.getDelayedDataCheckConfig().getCheckWindow(), equalTo(TimeValue.timeValueHours(1)));
     }
 
     public void testApply_givenAggregations() {

+ 95 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DelayedDataCheckConfigTests.java

@@ -0,0 +1,95 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ml.datafeed;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.core.Is.is;
+
+public class DelayedDataCheckConfigTests extends AbstractSerializingTestCase<DelayedDataCheckConfig> {
+
+    @Override
+    protected DelayedDataCheckConfig createTestInstance(){
+        return createRandomizedConfig(100);
+    }
+
+    @Override
+    protected Writeable.Reader<DelayedDataCheckConfig> instanceReader() {
+        return DelayedDataCheckConfig::new;
+    }
+
+    @Override
+    protected DelayedDataCheckConfig doParseInstance(XContentParser parser) {
+        return DelayedDataCheckConfig.STRICT_PARSER.apply(parser, null);
+    }
+
+    public void testConstructor() {
+        expectThrows(IllegalArgumentException.class, () -> new DelayedDataCheckConfig(true, TimeValue.MINUS_ONE));
+        expectThrows(IllegalArgumentException.class, () -> new DelayedDataCheckConfig(true, TimeValue.timeValueHours(25)));
+    }
+
+    public void testEnabledDelayedDataCheckConfig() {
+        DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(5));
+        assertThat(delayedDataCheckConfig.isEnabled(), equalTo(true));
+        assertThat(delayedDataCheckConfig.getCheckWindow(), equalTo(TimeValue.timeValueHours(5)));
+    }
+
+    public void testDisabledDelayedDataCheckConfig() {
+        DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.disabledDelayedDataCheckConfig();
+        assertThat(delayedDataCheckConfig.isEnabled(), equalTo(false));
+        assertThat(delayedDataCheckConfig.getCheckWindow(), equalTo(null));
+    }
+
+    public void testDefaultDelayedDataCheckConfig() {
+        DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
+        assertThat(delayedDataCheckConfig.isEnabled(), equalTo(true));
+        assertThat(delayedDataCheckConfig.getCheckWindow(), is(nullValue()));
+    }
+
+    public static DelayedDataCheckConfig createRandomizedConfig(long bucketSpanMillis) {
+        boolean enabled = randomBoolean();
+        TimeValue timeWindow = null;
+        if (enabled || randomBoolean()) {
+            // time span is required to be at least 1 millis, so we use a custom method to generate a time value here
+            timeWindow = new TimeValue(randomLongBetween(bucketSpanMillis,bucketSpanMillis*2));
+        }
+        return new DelayedDataCheckConfig(enabled, timeWindow);
+    }
+
+    @Override
+    protected DelayedDataCheckConfig mutateInstance(DelayedDataCheckConfig instance) throws IOException {
+        boolean enabled = instance.isEnabled();
+        TimeValue timeWindow = instance.getCheckWindow();
+        switch (between(0, 1)) {
+        case 0:
+            enabled = !enabled;
+            if (randomBoolean()) {
+                timeWindow = TimeValue.timeValueMillis(randomLongBetween(1, 1000));
+            } else {
+                timeWindow = null;
+            }
+            break;
+        case 1:
+            if (timeWindow == null) {
+                timeWindow = TimeValue.timeValueMillis(randomLongBetween(1, 1000));
+            } else {
+                timeWindow = new TimeValue(timeWindow.getMillis() + between(10, 100));
+            }
+            enabled = true;
+            break;
+        default:
+            throw new AssertionError("Illegal randomisation branch");
+        }
+        return new DelayedDataCheckConfig(enabled, timeWindow);
+    }
+}

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java

@@ -46,7 +46,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
 
         TimeValue bucketSpan = AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN;
         if (randomBoolean()) {
-            bucketSpan = TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000));
+            bucketSpan = TimeValue.timeValueSeconds(randomIntBetween(1, 1_000));
             builder.setBucketSpan(bucketSpan);
         }
         if (isCategorization) {

+ 35 - 13
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java

@@ -20,22 +20,24 @@ import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
 import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
 import org.elasticsearch.xpack.core.ml.action.util.PageParams;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
-import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector;
-import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector.BucketWithMissingData;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
 import org.junit.After;
 import org.junit.Before;
 
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.stream.Collectors;
 
-import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
 import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -64,7 +66,10 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
         final String jobId = "delayed-data-detection-job";
         Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
 
-        DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
+        DatafeedConfig.Builder datafeedConfigBuilder =
+            createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
+        datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12)));
+        DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
         registerJob(job);
         putJob(job);
         openJob(job.getId());
@@ -77,26 +82,32 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
         // Get the latest finalized bucket
         Bucket lastBucket = getLatestFinalizedBucket(jobId);
 
-        DelayedDataDetector delayedDataDetector =
-            new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client());
+        DelayedDataDetector delayedDataDetector = newDetector(job.build(new Date()), datafeedConfig);
 
         List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
         assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L));
 
         long missingDocs = randomIntBetween(32, 128);
         // Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window
-        // for the DelayedDataDetector
+        // for the DatafeedDelayedDataDetector
         writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000);
 
         response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
         assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(missingDocs));
+        // Assert that the are returned in order
+        List<Long> timeStamps = response.stream().map(BucketWithMissingData::getTimeStamp).collect(Collectors.toList());
+        assertEquals(timeStamps.stream().sorted().collect(Collectors.toList()), timeStamps);
     }
 
     public void testMissingDataDetectionInSpecificBucket() throws Exception {
         final String jobId = "delayed-data-detection-job-missing-test-specific-bucket";
         Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);
 
-        DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
+        DatafeedConfig.Builder datafeedConfigBuilder =
+            createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
+        datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12)));
+        DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
+
         registerJob(job);
         putJob(job);
         openJob(job.getId());
@@ -110,8 +121,7 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
         // Get the latest finalized bucket
         Bucket lastBucket = getLatestFinalizedBucket(jobId);
 
-        DelayedDataDetector delayedDataDetector =
-            new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client());
+        DelayedDataDetector delayedDataDetector = newDetector(job.build(new Date()), datafeedConfig);
 
         long missingDocs = randomIntBetween(1, 10);
 
@@ -127,6 +137,10 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
             }
         }
         assertThat(hasBucketWithMissing, equalTo(true));
+
+        // Assert that the are returned in order
+        List<Long> timeStamps = response.stream().map(BucketWithMissingData::getTimeStamp).collect(Collectors.toList());
+        assertEquals(timeStamps.stream().sorted().collect(Collectors.toList()), timeStamps);
     }
 
     public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception {
@@ -147,6 +161,8 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
                     .interval(TimeValue.timeValueMinutes(5).millis())));
         datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2));
         datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5));
+        datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12)));
+
         DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
         registerJob(job);
         putJob(job);
@@ -160,19 +176,21 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
         // Get the latest finalized bucket
         Bucket lastBucket = getLatestFinalizedBucket(jobId);
 
-        DelayedDataDetector delayedDataDetector =
-            new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client());
+        DelayedDataDetector delayedDataDetector = newDetector(job.build(new Date()), datafeedConfig);
 
         List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
         assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L));
 
         long missingDocs = numDocs;
         // Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window
-        // for the DelayedDataDetector
+        // for the DatafeedDelayedDataDetector
         writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000);
 
         response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
         assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo((missingDocs+1)/2));
+        // Assert that the are returned in order
+        List<Long> timeStamps = response.stream().map(BucketWithMissingData::getTimeStamp).collect(Collectors.toList());
+        assertEquals(timeStamps.stream().sorted().collect(Collectors.toList()), timeStamps);
     }
 
     private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
@@ -231,4 +249,8 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {
         getBucketsRequest.setPageParams(new PageParams(0, 1));
         return getBuckets(getBucketsRequest).get(0);
     }
+
+    private DelayedDataDetector newDetector(Job job, DatafeedConfig datafeedConfig) {
+        return DelayedDataDetectorFactory.buildDetector(job, datafeedConfig, client());
+    }
 }

+ 58 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -12,6 +12,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.core.internal.io.Streams;
 import org.elasticsearch.index.mapper.DateFieldMapper;
@@ -23,12 +24,16 @@ import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
 import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
 import org.elasticsearch.xpack.ml.notifications.Auditor;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Date;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,6 +46,7 @@ class DatafeedJob {
 
     private static final Logger LOGGER = LogManager.getLogger(DatafeedJob.class);
     private static final int NEXT_TASK_DELAY_MS = 100;
+    static final long MISSING_DATA_CHECK_INTERVAL_MS = 900_000; //15 minutes in ms
 
     private final Auditor auditor;
     private final String jobId;
@@ -50,15 +56,19 @@ class DatafeedJob {
     private final Client client;
     private final DataExtractorFactory dataExtractorFactory;
     private final Supplier<Long> currentTimeSupplier;
+    private final DelayedDataDetector delayedDataDetector;
 
     private volatile long lookbackStartTimeMs;
+    private volatile long latestFinalBucketEndTimeMs;
+    private volatile long lastDataCheckTimeMs;
+    private volatile int lastDataCheckAudit;
     private volatile Long lastEndTimeMs;
     private AtomicBoolean running = new AtomicBoolean(true);
     private volatile boolean isIsolated;
 
     DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs,
-                 DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier<Long> currentTimeSupplier,
-                 long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
+                DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier<Long> currentTimeSupplier,
+                DelayedDataDetector delayedDataDetector, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
         this.jobId = jobId;
         this.dataDescription = Objects.requireNonNull(dataDescription);
         this.frequencyMs = frequencyMs;
@@ -67,7 +77,8 @@ class DatafeedJob {
         this.client = client;
         this.auditor = auditor;
         this.currentTimeSupplier = currentTimeSupplier;
-
+        this.delayedDataDetector = delayedDataDetector;
+        this.latestFinalBucketEndTimeMs = latestFinalBucketEndTimeMs;
         long lastEndTime = Math.max(latestFinalBucketEndTimeMs, latestRecordTimeMs);
         if (lastEndTime > 0) {
             lastEndTimeMs = lastEndTime;
@@ -151,9 +162,49 @@ class DatafeedJob {
         request.setCalcInterim(true);
         request.setAdvanceTime(String.valueOf(end));
         run(start, end, request);
+        checkForMissingDataIfNecessary();
         return nextRealtimeTimestamp();
     }
 
+    private void checkForMissingDataIfNecessary() {
+        if (isRunning() && !isIsolated && checkForMissingDataTriggered()) {
+
+            // Keep track of the last bucket time for which we did a missing data check
+            this.lastDataCheckTimeMs = this.currentTimeSupplier.get();
+            List<BucketWithMissingData> missingDataBuckets = delayedDataDetector.detectMissingData(latestFinalBucketEndTimeMs);
+            if (missingDataBuckets.isEmpty() == false) {
+
+                long totalRecordsMissing = missingDataBuckets.stream()
+                    .mapToLong(BucketWithMissingData::getMissingDocumentCount)
+                    .sum();
+                // The response is sorted by asc timestamp, so the last entry is the last bucket
+                Date lastBucketDate = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp();
+                int newAudit = Objects.hash(totalRecordsMissing, lastBucketDate);
+                if (newAudit != lastDataCheckAudit) {
+                    auditor.warning(jobId,
+                        Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing,
+                            XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucketDate.getTime())));
+                    lastDataCheckAudit = newAudit;
+                }
+            }
+        }
+    }
+
+    /**
+     * We wait a static interval of 15 minutes till the next missing data check.
+     *
+     * However, if our delayed data window is smaller than that, we will probably want to check at every available window (if freq. allows).
+     * This is to help to miss as few buckets in the delayed data check as possible.
+     *
+     * If our frequency/query delay are longer then our default interval or window size, we will end up looking for missing data on
+     * every real-time trigger. This should be OK as the we are pulling from the Index as such a slow pace, another query will
+     * probably not even be noticeable at such a large timescale.
+     */
+    private boolean checkForMissingDataTriggered() {
+        return this.currentTimeSupplier.get() > this.lastDataCheckTimeMs
+            + Math.min(MISSING_DATA_CHECK_INTERVAL_MS, delayedDataDetector.getWindow());
+    }
+
     /**
      * Stops the datafeed job
      *
@@ -260,7 +311,10 @@ class DatafeedJob {
         // we call flush the job is closed. Thus, we don't flush unless the
         // datafeed is still running.
         if (isRunning() && !isIsolated) {
-            flushJob(flushRequest);
+            Date lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd();
+            if (lastFinalizedBucketEnd != null) {
+                this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.getTime();
+            }
         }
 
         if (recordCount == 0) {

+ 4 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

@@ -13,6 +13,8 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
 import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
@@ -46,8 +48,9 @@ public class DatafeedJobBuilder {
         Consumer<Context> contextHanlder = context -> {
             TimeValue frequency = getFrequencyOrDefault(datafeed, job);
             TimeValue queryDelay = datafeed.getQueryDelay();
+            DelayedDataDetector delayedDataDetector = DelayedDataDetectorFactory.buildDetector(job, datafeed, client);
             DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.millis(), queryDelay.millis(),
-                    context.dataExtractorFactory, client, auditor, currentTimeSupplier,
+                    context.dataExtractorFactory, client, auditor, currentTimeSupplier, delayedDataDetector,
                     context.latestFinalBucketEndMs, context.latestRecordTimeMs);
             listener.onResponse(datafeedJob);
         };

+ 38 - 48
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java

@@ -3,26 +3,26 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.datafeed;
+package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck;
 
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
 import org.elasticsearch.xpack.core.ml.action.util.PageParams;
-import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
-import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.utils.Intervals;
 import org.joda.time.DateTime;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,32 +35,33 @@ import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
 /**
  * This class will search the buckets and indices over a given window to determine if any data is missing
  */
-public class DelayedDataDetector {
+public class DatafeedDelayedDataDetector implements DelayedDataDetector {
 
     private static final String DATE_BUCKETS = "date_buckets";
+
     private final long bucketSpan;
     private final long window;
-    private final DatafeedConfig datafeedConfig;
     private final Client client;
-    private final Job job;
-
-    public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue window, Client client) {
-        this.job = job;
-        this.bucketSpan = job.getAnalysisConfig().getBucketSpan().millis();
-        this.datafeedConfig = datafeedConfig;
-        long windowMillis = window.millis();
-        if (windowMillis < bucketSpan) {
-            throw new IllegalArgumentException("[window] must be greater or equal to the [bucket_span]");
-        }
-        if (Intervals.alignToFloor(windowMillis/bucketSpan, bucketSpan) >= 10000) {
-            throw new IllegalArgumentException("[window] must contain less than 10000 buckets at the current [bucket_span]");
-        }
-        this.window = windowMillis;
+    private final String timeField;
+    private final String jobId;
+    private final QueryBuilder datafeedQuery;
+    private final String[] datafeedIndices;
+
+    DatafeedDelayedDataDetector(long bucketSpan, long window, String jobId, String timeField, QueryBuilder datafeedQuery,
+                                String[] datafeedIndices, Client client) {
+        this.bucketSpan = bucketSpan;
+        this.window = window;
+        this.jobId = jobId;
+        this.timeField = timeField;
+        this.datafeedQuery = datafeedQuery;
+        this.datafeedIndices = datafeedIndices;
         this.client = client;
     }
 
     /**
-     * This method looks at the {@link DatafeedConfig} from {@code latestFinalizedBucket - window} to {@code latestFinalizedBucket}.
+     * This method looks at the {@link DatafeedDelayedDataDetector#datafeedIndices}
+     * from {@code latestFinalizedBucket - window} to {@code latestFinalizedBucket} and compares the document counts with the
+     * {@link DatafeedDelayedDataDetector#jobId}'s finalized buckets' event counts.
      *
      * It is done synchronously, and can block for a considerable amount of time, it should only be executed within the appropriate
      * thread pool.
@@ -68,9 +69,15 @@ public class DelayedDataDetector {
      * @param latestFinalizedBucketMs The latest finalized bucket timestamp in milliseconds, signifies the end of the time window check
      * @return A List of {@link BucketWithMissingData} objects that contain each bucket with the current number of missing docs
      */
+    @Override
     public List<BucketWithMissingData> detectMissingData(long latestFinalizedBucketMs) {
         final long end = Intervals.alignToFloor(latestFinalizedBucketMs, bucketSpan);
         final long start = Intervals.alignToFloor(latestFinalizedBucketMs - window, bucketSpan);
+
+        if (end <= start) {
+            return Collections.emptyList();
+        }
+
         List<Bucket> finalizedBuckets = checkBucketEvents(start, end);
         Map<Long, Long> indexedData = checkCurrentBucketEventCount(start, end);
         return finalizedBuckets.stream()
@@ -81,10 +88,17 @@ public class DelayedDataDetector {
             .collect(Collectors.toList());
     }
 
+    @Override
+    public long getWindow() {
+        return window;
+    }
+
     private List<Bucket> checkBucketEvents(long start, long end) {
-        GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId());
+        GetBucketsAction.Request request = new GetBucketsAction.Request(jobId);
         request.setStart(Long.toString(start));
         request.setEnd(Long.toString(end));
+        request.setSort("timestamp");
+        request.setDescending(false);
         request.setExcludeInterim(true);
         request.setPageParams(new PageParams(0, (int)((end - start)/bucketSpan)));
 
@@ -95,13 +109,12 @@ public class DelayedDataDetector {
     }
 
     private Map<Long, Long> checkCurrentBucketEventCount(long start, long end) {
-        String timeField = job.getDataDescription().getTimeField();
         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
             .size(0)
             .aggregation(new DateHistogramAggregationBuilder(DATE_BUCKETS).interval(bucketSpan).field(timeField))
-            .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedConfig.getQuery(), timeField, start, end));
+            .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedQuery, timeField, start, end));
 
-        SearchRequest searchRequest = new SearchRequest(datafeedConfig.getIndices().toArray(new String[0])).source(searchSourceBuilder);
+        SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder);
         try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
             SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
             List<? extends Histogram.Bucket> buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets();
@@ -132,27 +145,4 @@ public class DelayedDataDetector {
     private static long calculateMissing(Map<Long, Long> indexedData, Bucket bucket) {
         return indexedData.getOrDefault(bucket.getEpoch() * 1000, 0L) - bucket.getEventCount();
     }
-
-    public static class BucketWithMissingData {
-
-        private final long missingDocumentCount;
-        private final Bucket bucket;
-
-        static BucketWithMissingData fromMissingAndBucket(long missingDocumentCount, Bucket bucket) {
-            return new BucketWithMissingData(missingDocumentCount, bucket);
-        }
-
-        private BucketWithMissingData(long missingDocumentCount, Bucket bucket) {
-           this.missingDocumentCount = missingDocumentCount;
-           this.bucket = bucket;
-        }
-
-        public Bucket getBucket() {
-            return bucket;
-        }
-
-        public long getMissingDocumentCount() {
-            return missingDocumentCount;
-        }
-    }
 }

+ 14 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetector.java

@@ -0,0 +1,14 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck;
+
+import java.util.List;
+
+public interface DelayedDataDetector {
+    List<DelayedDataDetectorFactory.BucketWithMissingData> detectMissingData(long endingTimeStamp);
+
+    long getWindow();
+}

+ 125 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java

@@ -0,0 +1,125 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
+import org.elasticsearch.xpack.core.ml.job.results.Bucket;
+
+import java.util.Objects;
+
+/**
+ * Builds the appropriate {@link DelayedDataDetector} implementation, with the appropriate settings, given the parameters.
+ */
+public class DelayedDataDetectorFactory {
+
+    // There are eight 15min buckets in a two hour span, so matching that number as the fallback for very long buckets
+    private static final int FALLBACK_NUMBER_OF_BUCKETS_TO_SPAN = 8;
+    private static final TimeValue DEFAULT_CHECK_WINDOW = TimeValue.timeValueHours(2);
+
+    /**
+     * This will build the appropriate detector given the parameters.
+     *
+     * If {@link DatafeedConfig#getDelayedDataCheckConfig()} is not `isEnabled()`, then a {@link NullDelayedDataDetector} is returned, which
+     * does not do any checks, and only supplies an empty collection.
+     *
+     * @param job The {@link Job} object for the given `datafeedConfig`
+     * @param datafeedConfig The {@link DatafeedConfig} for which to create the {@link DelayedDataDetector}
+     * @param client The {@link Client} capable of taking action against the ES Cluster.
+     * @return A new {@link DelayedDataDetector}
+     */
+    public static DelayedDataDetector buildDetector(Job job, DatafeedConfig datafeedConfig, Client client) {
+        if (datafeedConfig.getDelayedDataCheckConfig().isEnabled()) {
+            long window = validateAndCalculateWindowLength(job.getAnalysisConfig().getBucketSpan(),
+                datafeedConfig.getDelayedDataCheckConfig().getCheckWindow());
+            long bucketSpan = job.getAnalysisConfig().getBucketSpan() == null ? 0 : job.getAnalysisConfig().getBucketSpan().millis();
+            return new DatafeedDelayedDataDetector(bucketSpan,
+                window,
+                job.getId(),
+                job.getDataDescription().getTimeField(),
+                datafeedConfig.getQuery(),
+                datafeedConfig.getIndices().toArray(new String[0]),
+                client);
+        } else {
+            return new NullDelayedDataDetector();
+        }
+    }
+
+    private static long validateAndCalculateWindowLength(TimeValue bucketSpan, TimeValue currentWindow) {
+        if (bucketSpan == null) {
+            return 0;
+        }
+        if (currentWindow == null) { // we should provide a good default as the user did not specify a window
+            if(bucketSpan.compareTo(DEFAULT_CHECK_WINDOW) >= 0) {
+                return FALLBACK_NUMBER_OF_BUCKETS_TO_SPAN * bucketSpan.millis();
+            } else {
+                return DEFAULT_CHECK_WINDOW.millis();
+            }
+        }
+        if (currentWindow.compareTo(bucketSpan) < 0) {
+            throw new IllegalArgumentException(
+                Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL, currentWindow.getStringRep(),
+                    bucketSpan.getStringRep()));
+        } else if (currentWindow.millis() > bucketSpan.millis() * DelayedDataCheckConfig.MAX_NUMBER_SPANABLE_BUCKETS) {
+            throw new IllegalArgumentException(
+                Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS, currentWindow.getStringRep(),
+                    bucketSpan.getStringRep()));
+        }
+        return currentWindow.millis();
+    }
+
+    public static class BucketWithMissingData {
+
+        private final long missingDocumentCount;
+        private final Bucket bucket;
+
+        public static BucketWithMissingData fromMissingAndBucket(long missingDocumentCount, Bucket bucket) {
+            return new BucketWithMissingData(missingDocumentCount, bucket);
+        }
+
+        private BucketWithMissingData(long missingDocumentCount, Bucket bucket) {
+            this.missingDocumentCount = missingDocumentCount;
+            this.bucket = bucket;
+        }
+
+        public long getTimeStamp() {
+            return bucket.getEpoch();
+        }
+
+        public Bucket getBucket() {
+            return bucket;
+        }
+
+        public long getMissingDocumentCount() {
+            return missingDocumentCount;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (other == this) {
+                return true;
+            }
+
+            if (other == null || getClass() != other.getClass()) {
+                return false;
+            }
+
+            BucketWithMissingData that = (BucketWithMissingData) other;
+
+            return Objects.equals(that.bucket, bucket) && Objects.equals(that.missingDocumentCount, missingDocumentCount);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(bucket, missingDocumentCount);
+        }
+    }
+
+}

+ 35 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/NullDelayedDataDetector.java

@@ -0,0 +1,35 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class will always return an {@link Collections#emptyList()}.
+ */
+public class NullDelayedDataDetector implements DelayedDataDetector {
+
+    /**
+     * Always returns an empty collection
+     * @param unusedTimeStamp unused Parameter
+     * @return {@link Collections#emptyList()}
+     */
+    @Override
+    public List<DelayedDataDetectorFactory.BucketWithMissingData> detectMissingData(long unusedTimeStamp) {
+        return Collections.emptyList();
+    }
+
+    /**
+     * Always returns 0
+     * @return a 0
+     */
+    @Override
+    public long getWindow() {
+        return 0L;
+    }
+
+}

+ 43 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

@@ -10,6 +10,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.mock.orig.Mockito;
 import org.elasticsearch.test.ESTestCase;
@@ -18,6 +19,10 @@ import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
 import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
 import org.elasticsearch.xpack.core.ml.action.PostDataAction;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
+import org.elasticsearch.xpack.core.ml.job.results.Bucket;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
+import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
 import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
@@ -30,6 +35,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Optional;
@@ -56,10 +62,12 @@ public class DatafeedJobTests extends ESTestCase {
     private DataExtractorFactory dataExtractorFactory;
     private DataExtractor dataExtractor;
     private Client client;
+    private DelayedDataDetector delayedDataDetector;
     private DataDescription.Builder dataDescription;
     ActionFuture<PostDataAction.Response> postDataFuture;
     private ActionFuture<FlushJobAction.Response> flushJobFuture;
     private ArgumentCaptor<FlushJobAction.Request> flushJobRequests;
+    private FlushJobAction.Response flushJobResponse;
 
     private long currentTime;
     private XContentType xContentType;
@@ -79,6 +87,9 @@ public class DatafeedJobTests extends ESTestCase {
         dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
         postDataFuture = mock(ActionFuture.class);
         flushJobFuture = mock(ActionFuture.class);
+        flushJobResponse = new FlushJobAction.Response();
+        delayedDataDetector = mock(DelayedDataDetector.class);
+        when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS);
         currentTime = 0;
         xContentType = XContentType.JSON;
 
@@ -96,6 +107,7 @@ public class DatafeedJobTests extends ESTestCase {
         when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts));
 
         flushJobRequests = ArgumentCaptor.forClass(FlushJobAction.Request.class);
+        when(flushJobFuture.actionGet()).thenReturn(flushJobResponse);
         when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
     }
 
@@ -193,6 +205,13 @@ public class DatafeedJobTests extends ESTestCase {
     }
 
     public void testRealtimeRun() throws Exception {
+        flushJobResponse = new FlushJobAction.Response(true, new Date(2000));
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.getTimestamp()).thenReturn(new Date(2000));
+        when(flushJobFuture.actionGet()).thenReturn(flushJobResponse);
+        when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture);
+        when(delayedDataDetector.detectMissingData(2000))
+            .thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(10, bucket)));
         currentTime = 60000L;
         long frequencyMs = 100;
         long queryDelayMs = 1000;
@@ -206,6 +225,29 @@ public class DatafeedJobTests extends ESTestCase {
         flushRequest.setAdvanceTime("59000");
         verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
         verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
+
+        // Execute a second valid time, but do so in a smaller window than the interval
+        currentTime = 62000L;
+        byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8);
+        InputStream inputStream = new ByteArrayInputStream(contentBytes);
+        when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
+        when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
+        when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
+        datafeedJob.runRealtime();
+
+        // Execute a third time, but this time make sure we exceed the data check interval, but keep the delayedDataDetector response
+        // the same
+        currentTime = 62000L + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1;
+        inputStream = new ByteArrayInputStream(contentBytes);
+        when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
+        when(dataExtractor.next()).thenReturn(Optional.of(inputStream));
+        when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
+        datafeedJob.runRealtime();
+
+        verify(auditor, times(1)).warning(jobId,
+            Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA,
+                10,
+                XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000)));
     }
 
     public void testEmptyDataCountGivenlookback() throws Exception {
@@ -321,6 +363,6 @@ public class DatafeedJobTests extends ESTestCase {
                                             long latestRecordTimeMs) {
         Supplier<Long> currentTimeSupplier = () -> currentTime;
         return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor,
-                currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs);
+                currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs);
     }
 }

+ 25 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
+import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
@@ -176,6 +177,30 @@ public class DatafeedJobValidatorTests extends ESTestCase {
         assertEquals("Datafeed frequency [1.5m] must be a multiple of the aggregation interval [60000ms]", e.getMessage());
     }
 
+    public void testVerify_BucketIntervalAndDataCheckWindowAreValid() {
+        Job.Builder builder = buildJobBuilder("foo");
+        AnalysisConfig.Builder ac = createAnalysisConfig();
+        ac.setSummaryCountFieldName("some_count");
+        ac.setBucketSpan(TimeValue.timeValueSeconds(2));
+        builder.setAnalysisConfig(ac);
+        Job job = builder.build(new Date());
+        DatafeedConfig.Builder datafeedBuilder = createValidDatafeedConfig();
+        datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueMinutes(10)));
+
+        DatafeedJobValidator.validate(datafeedBuilder.build(), job);
+
+        datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueSeconds(1)));
+        ElasticsearchStatusException e = ESTestCase.expectThrows(ElasticsearchStatusException.class,
+            () -> DatafeedJobValidator.validate(datafeedBuilder.build(), job));
+        assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL, "1s", "2s"), e.getMessage());
+
+        datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(24)));
+        e = ESTestCase.expectThrows(ElasticsearchStatusException.class,
+            () -> DatafeedJobValidator.validate(datafeedBuilder.build(), job));
+        assertEquals(Messages.getMessage(
+            Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS, "1d", "2s"), e.getMessage());
+    }
+
     private static Job.Builder buildJobBuilder(String id) {
         Job.Builder builder = new Job.Builder(id);
         AnalysisConfig.Builder ac = createAnalysisConfig();

+ 0 - 76
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java

@@ -1,76 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-package org.elasticsearch.xpack.ml.datafeed;
-
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
-import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
-import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
-import org.elasticsearch.xpack.core.ml.job.config.Detector;
-import org.elasticsearch.xpack.core.ml.job.config.Job;
-
-import java.util.Collections;
-import java.util.Date;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.mockito.Mockito.mock;
-
-
-public class DelayedDataDetectorTests extends ESTestCase {
-
-
-    public void testConstructorWithValueValues() {
-        TimeValue window = TimeValue.timeValueSeconds(10);
-        Job job = createJob(TimeValue.timeValueSeconds(1));
-        DelayedDataDetector delayedDataDetector = new DelayedDataDetector(job, createDatafeed(), window, mock(Client.class));
-        assertNotNull(delayedDataDetector);
-    }
-
-    public void testConstructorWithInvalidValues() {
-        TimeValue shortWindow = TimeValue.timeValueMillis(500);
-        Job job = createJob(TimeValue.timeValueSeconds(1));
-
-        Exception exception = expectThrows(IllegalArgumentException.class,
-            ()-> new DelayedDataDetector(job, createDatafeed(), shortWindow, mock(Client.class)));
-        assertThat(exception.getMessage(), equalTo("[window] must be greater or equal to the [bucket_span]"));
-
-        TimeValue longWindow = TimeValue.timeValueSeconds(20000);
-
-        exception = expectThrows(IllegalArgumentException.class,
-            ()-> new DelayedDataDetector(job, createDatafeed(), longWindow, mock(Client.class)));
-        assertThat(exception.getMessage(), equalTo("[window] must contain less than 10000 buckets at the current [bucket_span]"));
-    }
-
-
-    private Job createJob(TimeValue bucketSpan) {
-        DataDescription.Builder dataDescription = new DataDescription.Builder();
-        dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
-        dataDescription.setTimeField("time");
-        dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
-
-        Detector.Builder d = new Detector.Builder("count", null);
-        AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
-        analysisConfig.setBucketSpan(bucketSpan);
-
-        Job.Builder builder = new Job.Builder();
-        builder.setId("test-job");
-        builder.setAnalysisConfig(analysisConfig);
-        builder.setDataDescription(dataDescription);
-        return builder.build(new Date());
-    }
-
-    private DatafeedConfig createDatafeed() {
-        DatafeedConfig.Builder builder = new DatafeedConfig.Builder("id", "jobId");
-        builder.setIndices(Collections.singletonList("index1"));
-        builder.setTypes(Collections.singletonList("doc"));
-        return builder.build();
-    }
-
-
-
-}

+ 103 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactoryTests.java

@@ -0,0 +1,103 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
+import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
+import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
+import org.elasticsearch.xpack.core.ml.job.config.Detector;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
+
+import java.util.Collections;
+import java.util.Date;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.mockito.Mockito.mock;
+
+
+public class DelayedDataDetectorFactoryTests extends ESTestCase {
+
+    public void testBuilder() {
+        Job job = createJob(TimeValue.timeValueSeconds(2));
+
+        DatafeedConfig datafeedConfig = createDatafeed(false, null);
+
+        // Should not throw
+        assertThat(DelayedDataDetectorFactory.buildDetector(job, datafeedConfig, mock(Client.class)),
+            instanceOf(NullDelayedDataDetector.class));
+
+        datafeedConfig = createDatafeed(true, TimeValue.timeValueMinutes(10));
+
+        // Should not throw
+        assertThat(DelayedDataDetectorFactory.buildDetector(job, datafeedConfig, mock(Client.class)),
+            instanceOf(DatafeedDelayedDataDetector.class));
+
+        DatafeedConfig tooSmallDatafeedConfig = createDatafeed(true, TimeValue.timeValueSeconds(1));
+        IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
+            () -> DelayedDataDetectorFactory.buildDetector(job, tooSmallDatafeedConfig, mock(Client.class)));
+        assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL, "1s", "2s"), e.getMessage());
+
+        DatafeedConfig tooBigDatafeedConfig = createDatafeed(true, TimeValue.timeValueHours(12));
+        e = ESTestCase.expectThrows(IllegalArgumentException.class,
+            () -> DelayedDataDetectorFactory.buildDetector(job, tooBigDatafeedConfig, mock(Client.class)));
+        assertEquals(Messages.getMessage(
+            Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS, "12h", "2s"), e.getMessage());
+
+        Job withBigBucketSpan = createJob(TimeValue.timeValueHours(3));
+        datafeedConfig = createDatafeed(true, null);
+
+        // Should not throw
+        DelayedDataDetector delayedDataDetector =
+            DelayedDataDetectorFactory.buildDetector(withBigBucketSpan, datafeedConfig, mock(Client.class));
+        assertThat(delayedDataDetector.getWindow(), equalTo(TimeValue.timeValueHours(3).millis() * 8));
+
+        datafeedConfig = createDatafeed(true, null);
+
+        // Should not throw
+        delayedDataDetector =
+            DelayedDataDetectorFactory.buildDetector(job, datafeedConfig, mock(Client.class));
+        assertThat(delayedDataDetector.getWindow(), equalTo(TimeValue.timeValueHours(2).millis()));
+
+    }
+
+    private Job createJob(TimeValue bucketSpan) {
+        DataDescription.Builder dataDescription = new DataDescription.Builder();
+        dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
+        dataDescription.setTimeField("time");
+        dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
+
+        Detector.Builder d = new Detector.Builder("count", null);
+        AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
+        analysisConfig.setBucketSpan(bucketSpan);
+
+        Job.Builder builder = new Job.Builder();
+        builder.setId("test-job");
+        builder.setAnalysisConfig(analysisConfig);
+        builder.setDataDescription(dataDescription);
+        return builder.build(new Date());
+    }
+
+    private DatafeedConfig createDatafeed(boolean shouldDetectDelayedData, TimeValue delayedDatacheckWindow) {
+        DatafeedConfig.Builder builder = new DatafeedConfig.Builder("id", "jobId");
+        builder.setIndices(Collections.singletonList("index1"));
+        builder.setTypes(Collections.singletonList("doc"));
+
+        if (shouldDetectDelayedData) {
+            builder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(delayedDatacheckWindow));
+        } else {
+            builder.setDelayedDataCheckConfig(DelayedDataCheckConfig.disabledDelayedDataCheckConfig());
+        }
+        return builder.build();
+    }
+
+
+}