瀏覽代碼

[ML] adds start and end params to _preview and excludes cold/frozen tiers from unbounded previews (#86989)

n larger clusters with complicated datafeed requirements, being able to preview only a specific window of time is important. Previously, datafeed previews would always start at 0 (or from the beginning of the data). This causes issues if the index pattern contains indices on slower hardware, but when the datafeed is actually started, the "start" time is set to more recent data (and thus on faster hardware).

Additionally, when _preview is unbounded (as before), it attempts to only preview indices that are NOT frozen or cold. This is done through a query against the _tier field. Meaning, it only effects newer indices that actually have that field set.
Benjamin Trent 3 年之前
父節點
當前提交
115f19ff6d
共有 15 個文件被更改,包括 291 次插入34 次删除
  1. 6 0
      docs/changelog/86989.yaml
  2. 33 4
      docs/reference/ml/anomaly-detection/apis/preview-datafeed.asciidoc
  3. 12 0
      rest-api-spec/src/main/resources/rest-api-spec/api/ml.preview_datafeed.json
  4. 89 7
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java
  5. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java
  6. 10 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedActionRequestTests.java
  7. 24 12
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java
  8. 10 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java
  9. 16 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java
  10. 11 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorFactory.java
  11. 16 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java
  12. 16 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java
  13. 16 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java
  14. 5 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPreviewDatafeedAction.java
  15. 26 0
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/preview_datafeed.yml

+ 6 - 0
docs/changelog/86989.yaml

@@ -0,0 +1,6 @@
+pr: 86989
+summary: Adds start and end params to `_preview` and excludes cold/frozen tiers from
+  unbounded previews
+area: Machine Learning
+type: enhancement
+issues: []

+ 33 - 4
docs/reference/ml/anomaly-detection/apis/preview-datafeed.asciidoc

@@ -25,16 +25,16 @@ Previews a {dfeed}.
 
 Requires the following privileges:
 
-* cluster: `manage_ml` (the `machine_learning_admin` built-in role grants this 
+* cluster: `manage_ml` (the `machine_learning_admin` built-in role grants this
    privilege)
 * source index configured in the {dfeed}: `read`.
 
 [[ml-preview-datafeed-desc]]
 == {api-description-title}
 
-The preview {dfeeds} API returns the first "page" of search results from a 
+The preview {dfeeds} API returns the first "page" of search results from a
 {dfeed}. You can preview an existing {dfeed} or provide configuration details
-for the {dfeed} and {anomaly-job} in the API. The preview shows the structure of 
+for the {dfeed} and {anomaly-job} in the API. The preview shows the structure of
 the data that will be passed to the anomaly detection engine.
 
 IMPORTANT: When {es} {security-features} are enabled, the {dfeed} query is
@@ -57,6 +57,35 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=datafeed-id]
 NOTE: If you provide the `<datafeed_id>` as a path parameter, you cannot
 provide {dfeed} or {anomaly-job} configuration details in the request body.
 
+[[ml-preview-datafeed-query-parms]]
+== {api-query-parms-title}
+
+`end`::
+(Optional, string) The time that the {dfeed} preview should end. The preview may not go to the end of the provided value
+as only the first page of results are returned. The time can be specified by using one of the following formats:
++
+--
+* ISO 8601 format with milliseconds, for example `2017-01-22T06:00:00.000Z`
+* ISO 8601 format without milliseconds, for example `2017-01-22T06:00:00+00:00`
+* Milliseconds since the epoch, for example `1485061200000`
+
+Date-time arguments using either of the ISO 8601 formats must have a time zone
+designator, where `Z` is accepted as an abbreviation for UTC time.
+
+NOTE: When a URL is expected (for example, in browsers), the `+` used in time
+zone designators must be encoded as `%2B`.
+
+This value is exclusive.
+--
+
+`start`::
+(Optional, string) The time that the {dfeed} preview should begin, which can be
+specified by using the same formats as the `end` parameter. This value is
+inclusive.
+
+NOTE: If you don't provide either the `start` or `end` parameter, the {dfeed} preview will search over the entire
+time of data but exclude data within `cold` or `frozen` <<data-tiers, data tiers>>.
+
 [[ml-preview-datafeed-request-body]]
 == {api-request-body-title}
 
@@ -115,7 +144,7 @@ The data that is returned for this example is as follows:
 ]
 ----
 
-The following example provides {dfeed} and {anomaly-job} configuration 
+The following example provides {dfeed} and {anomaly-job} configuration
 details in the API:
 
 [source,console]

+ 12 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/ml.preview_datafeed.json

@@ -34,6 +34,18 @@
         }
       ]
     },
+    "params":{
+      "start":{
+        "type":"string",
+        "required":false,
+        "description":"The start time from where the datafeed preview should begin"
+      },
+      "end":{
+        "type":"string",
+        "required":false,
+        "description":"The end time when the datafeed preview should stop"
+      }
+    },
     "body":{
       "description":"The datafeed config and job config with which to execute the preview",
       "required":false

+ 89 - 7
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedAction.java

@@ -6,10 +6,12 @@
  */
 package org.elasticsearch.xpack.core.ml.action;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.ValidateActions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -28,6 +30,11 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Objects;
+import java.util.OptionalLong;
+
+import static org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.DatafeedParams.parseDateOrThrow;
+import static org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.END_TIME;
+import static org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.START_TIME;
 
 public class PreviewDatafeedAction extends ActionType<PreviewDatafeedAction.Response> {
 
@@ -49,38 +56,61 @@ public class PreviewDatafeedAction extends ActionType<PreviewDatafeedAction.Resp
         static {
             PARSER.declareObject(Builder::setDatafeedBuilder, DatafeedConfig.STRICT_PARSER, DATAFEED_CONFIG);
             PARSER.declareObject(Builder::setJobBuilder, Job.STRICT_PARSER, JOB_CONFIG);
+            PARSER.declareString(Builder::setStart, START_TIME);
+            PARSER.declareString(Builder::setEnd, END_TIME);
         }
 
-        public static Request fromXContent(XContentParser parser, @Nullable String datafeedId) {
+        public static Request.Builder fromXContent(XContentParser parser, @Nullable String datafeedId) {
             Builder builder = PARSER.apply(parser, null);
             // We don't need to check for "inconsistent ids" as we don't parse an ID from the body
             if (datafeedId != null) {
                 builder.setDatafeedId(datafeedId);
             }
-            return builder.build();
+            return builder;
         }
 
         private final String datafeedId;
         private final DatafeedConfig datafeedConfig;
         private final Job.Builder jobConfig;
+        private final Long startTime;
+        private final Long endTime;
 
         public Request(StreamInput in) throws IOException {
             super(in);
             datafeedId = in.readString();
             datafeedConfig = in.readOptionalWriteable(DatafeedConfig::new);
             jobConfig = in.readOptionalWriteable(Job.Builder::new);
+            if (in.getVersion().onOrAfter(Version.V_8_3_0)) {
+                this.startTime = in.readOptionalLong();
+                this.endTime = in.readOptionalLong();
+            } else {
+                this.startTime = null;
+                this.endTime = null;
+            }
         }
 
-        public Request(String datafeedId) {
+        public Request(String datafeedId, String start, String end) {
             this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID);
             this.datafeedConfig = null;
             this.jobConfig = null;
+            this.startTime = start == null ? null : parseDateOrThrow(start, START_TIME, System::currentTimeMillis);
+            this.endTime = end == null ? null : parseDateOrThrow(end, END_TIME, System::currentTimeMillis);
         }
 
-        public Request(DatafeedConfig datafeedConfig, Job.Builder jobConfig) {
+        Request(String datafeedId, Long start, Long end) {
+            this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID);
+            this.datafeedConfig = null;
+            this.jobConfig = null;
+            this.startTime = start;
+            this.endTime = end;
+        }
+
+        public Request(DatafeedConfig datafeedConfig, Job.Builder jobConfig, Long start, Long end) {
             this.datafeedId = BLANK_ID;
             this.datafeedConfig = ExceptionsHelper.requireNonNull(datafeedConfig, DATAFEED_CONFIG.getPreferredName());
             this.jobConfig = jobConfig;
+            this.startTime = start;
+            this.endTime = end;
         }
 
         public String getDatafeedId() {
@@ -95,9 +125,31 @@ public class PreviewDatafeedAction extends ActionType<PreviewDatafeedAction.Resp
             return jobConfig;
         }
 
+        public OptionalLong getStartTime() {
+            return startTime == null ? OptionalLong.empty() : OptionalLong.of(startTime);
+        }
+
+        public OptionalLong getEndTime() {
+            return endTime == null ? OptionalLong.empty() : OptionalLong.of(endTime);
+        }
+
         @Override
         public ActionRequestValidationException validate() {
-            return null;
+            ActionRequestValidationException e = null;
+            if (endTime != null && startTime != null && endTime <= startTime) {
+                e = ValidateActions.addValidationError(
+                    START_TIME.getPreferredName()
+                        + " ["
+                        + startTime
+                        + "] must be earlier than "
+                        + END_TIME.getPreferredName()
+                        + " ["
+                        + endTime
+                        + "]",
+                    e
+                );
+            }
+            return e;
         }
 
         @Override
@@ -106,6 +158,10 @@ public class PreviewDatafeedAction extends ActionType<PreviewDatafeedAction.Resp
             out.writeString(datafeedId);
             out.writeOptionalWriteable(datafeedConfig);
             out.writeOptionalWriteable(jobConfig);
+            if (out.getVersion().onOrAfter(Version.V_8_3_0)) {
+                out.writeOptionalLong(startTime);
+                out.writeOptionalLong(endTime);
+            }
         }
 
         @Override
@@ -147,6 +203,8 @@ public class PreviewDatafeedAction extends ActionType<PreviewDatafeedAction.Resp
             private String datafeedId;
             private DatafeedConfig.Builder datafeedBuilder;
             private Job.Builder jobBuilder;
+            private Long startTime;
+            private Long endTime;
 
             public Builder setDatafeedId(String datafeedId) {
                 this.datafeedId = datafeedId;
@@ -163,6 +221,30 @@ public class PreviewDatafeedAction extends ActionType<PreviewDatafeedAction.Resp
                 return this;
             }
 
+            public Builder setStart(String startTime) {
+                if (startTime == null) {
+                    return this;
+                }
+                return setStart(parseDateOrThrow(startTime, START_TIME, System::currentTimeMillis));
+            }
+
+            public Builder setStart(long start) {
+                this.startTime = start;
+                return this;
+            }
+
+            public Builder setEnd(String endTime) {
+                if (endTime == null) {
+                    return this;
+                }
+                return setEnd(parseDateOrThrow(endTime, END_TIME, System::currentTimeMillis));
+            }
+
+            public Builder setEnd(long end) {
+                this.endTime = end;
+                return this;
+            }
+
             public Request build() {
                 if (datafeedBuilder != null) {
                     datafeedBuilder.setId("preview_id");
@@ -196,8 +278,8 @@ public class PreviewDatafeedAction extends ActionType<PreviewDatafeedAction.Resp
                     );
                 }
                 return datafeedId != null
-                    ? new Request(datafeedId)
-                    : new Request(datafeedBuilder == null ? null : datafeedBuilder.build(), jobBuilder);
+                    ? new Request(datafeedId, startTime, endTime)
+                    : new Request(datafeedBuilder == null ? null : datafeedBuilder.build(), jobBuilder, startTime, endTime);
             }
         }
     }

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java

@@ -164,7 +164,7 @@ public class StartDatafeedAction extends ActionType<NodeAcknowledgedResponse> {
             );
         }
 
-        static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
+        public static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
             DateMathParser dateMathParser = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.toDateMathParser();
 
             try {

+ 10 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PreviewDatafeedActionRequestTests.java

@@ -28,11 +28,18 @@ public class PreviewDatafeedActionRequestTests extends AbstractWireSerializingTe
     @Override
     protected Request createTestInstance() {
         String jobId = randomAlphaOfLength(10);
+        long start = randomLongBetween(0, Long.MAX_VALUE / 4);
         return switch (randomInt(2)) {
-            case 0 -> new Request(randomAlphaOfLength(10));
+            case 0 -> new Request(
+                randomAlphaOfLength(10),
+                randomBoolean() ? null : start,
+                randomBoolean() ? null : randomLongBetween(start + 1, Long.MAX_VALUE)
+            );
             case 1 -> new Request(
                 DatafeedConfigTests.createRandomizedDatafeedConfig(jobId),
-                randomBoolean() ? JobTests.buildJobBuilder(jobId) : null
+                randomBoolean() ? JobTests.buildJobBuilder(jobId) : null,
+                randomBoolean() ? null : start,
+                randomBoolean() ? null : randomLongBetween(start + 1, Long.MAX_VALUE)
             );
             case 2 -> new Request.Builder().setJobBuilder(
                 JobTests.buildJobBuilder(jobId)
@@ -48,7 +55,7 @@ public class PreviewDatafeedActionRequestTests extends AbstractWireSerializingTe
     }
 
     public void testCtor() {
-        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new Request((String) null));
+        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new Request(null, randomLong(), null));
         assertThat(ex.getMessage(), equalTo("[datafeed_id] must not be null."));
     }
 

+ 24 - 12
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

@@ -20,10 +20,13 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.time.DateUtils;
 import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.elasticsearch.xpack.cluster.routing.allocation.mapper.DataTierFieldMapper;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
@@ -49,6 +52,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeWithHeadersAsync;
+import static org.elasticsearch.xpack.ml.MachineLearning.UTILITY_THREAD_POOL_NAME;
 import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
 
 public class TransportPreviewDatafeedAction extends HandledTransportAction<PreviewDatafeedAction.Request, PreviewDatafeedAction.Response> {
@@ -89,12 +93,15 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
     protected void doExecute(Task task, PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
         ActionListener<DatafeedConfig> datafeedConfigActionListener = ActionListener.wrap(datafeedConfig -> {
             if (request.getJobConfig() != null) {
-                previewDatafeed(task, datafeedConfig, request.getJobConfig().build(new Date()), listener);
+                previewDatafeed(task, datafeedConfig, request.getJobConfig().build(new Date()), request, listener);
                 return;
             }
             jobConfigProvider.getJob(
                 datafeedConfig.getJobId(),
-                ActionListener.wrap(jobBuilder -> previewDatafeed(task, datafeedConfig, jobBuilder.build(), listener), listener::onFailure)
+                ActionListener.wrap(
+                    jobBuilder -> previewDatafeed(task, datafeedConfig, jobBuilder.build(), request, listener),
+                    listener::onFailure
+                )
             );
         }, listener::onFailure);
         if (request.getDatafeedConfig() != null) {
@@ -111,6 +118,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
         Task task,
         DatafeedConfig datafeedConfig,
         Job job,
+        PreviewDatafeedAction.Request request,
         ActionListener<PreviewDatafeedAction.Response> listener
     ) {
         DatafeedConfig.Builder previewDatafeedBuilder = buildPreviewDatafeed(datafeedConfig);
@@ -129,19 +137,23 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
                 xContentRegistry,
                 // Fake DatafeedTimingStatsReporter that does not have access to results index
                 new DatafeedTimingStatsReporter(new DatafeedTimingStats(datafeedConfig.getJobId()), (ts, refreshPolicy) -> {}),
-                listener.delegateFailure((l, dataExtractorFactory) -> {
-                    isDateNanos(
+                listener.delegateFailure(
+                    (l, dataExtractorFactory) -> isDateNanos(
                         previewDatafeedConfig,
                         job.getDataDescription().getTimeField(),
                         listener.delegateFailure((l2, isDateNanos) -> {
-                            DataExtractor dataExtractor = dataExtractorFactory.newExtractor(
-                                0,
-                                isDateNanos ? DateUtils.MAX_NANOSECOND_INSTANT.toEpochMilli() : Long.MAX_VALUE
-                            );
-                            threadPool.generic().execute(() -> previewDatafeed(dataExtractor, l));
+                            final QueryBuilder hotOnly = QueryBuilders.boolQuery()
+                                .mustNot(QueryBuilders.termsQuery(DataTierFieldMapper.NAME, "data_frozen", "data_cold"));
+                            final long start = request.getStartTime().orElse(0);
+                            final long end = request.getEndTime()
+                                .orElse(isDateNanos ? DateUtils.MAX_NANOSECOND_INSTANT.toEpochMilli() : Long.MAX_VALUE);
+                            DataExtractor dataExtractor = request.getStartTime().isPresent() || request.getEndTime().isPresent()
+                                ? dataExtractorFactory.newExtractor(start, end)
+                                : dataExtractorFactory.newExtractor(start, end, hotOnly);
+                            threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> previewDatafeed(dataExtractor, l));
                         })
-                    );
-                })
+                    )
+                )
             );
         });
     }
@@ -175,7 +187,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
             fieldCapabilitiesRequest,
             ActionListener.wrap(fieldCapsResponse -> {
                 Map<String, FieldCapabilities> timeFieldCaps = fieldCapsResponse.getField(timeField);
-                listener.onResponse(timeFieldCaps.keySet().contains(DateFieldMapper.DATE_NANOS_CONTENT_TYPE));
+                listener.onResponse(timeFieldCaps.containsKey(DateFieldMapper.DATE_NANOS_CONTENT_TYPE));
             }, listener::onFailure)
         );
     }

+ 10 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java

@@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xpack.core.ClientHelper;
@@ -30,6 +31,15 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorF
 public interface DataExtractorFactory {
     DataExtractor newExtractor(long start, long end);
 
+    /**
+     * Creates a new extractor with the additional filter
+     * @param start start time of the extractor
+     * @param end end time of the extractor
+     * @param queryBuilder An additional query filter to apply to the supplied datafeed query
+     * @return new extractor
+     */
+    DataExtractor newExtractor(long start, long end, QueryBuilder queryBuilder);
+
     /**
      * Creates a {@code DataExtractorFactory} for the given datafeed-job combination.
      */

+ 16 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java

@@ -10,6 +10,8 @@ import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
@@ -35,13 +37,26 @@ public record AggregationDataExtractorFactory(
 
     @Override
     public DataExtractor newExtractor(long start, long end) {
+        return buildExtractor(start, end, datafeedConfig.getParsedQuery(xContentRegistry));
+    }
+
+    @Override
+    public DataExtractor newExtractor(long start, long end, QueryBuilder queryBuilder) {
+        return buildExtractor(
+            start,
+            end,
+            QueryBuilders.boolQuery().filter(datafeedConfig.getParsedQuery(xContentRegistry)).filter(queryBuilder)
+        );
+    }
+
+    private DataExtractor buildExtractor(long start, long end, QueryBuilder queryBuilder) {
         long histogramInterval = datafeedConfig.getHistogramIntervalMillis(xContentRegistry);
         AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext(
             job.getId(),
             job.getDataDescription().getTimeField(),
             job.getAnalysisConfig().analysisFields(),
             datafeedConfig.getIndices(),
-            datafeedConfig.getParsedQuery(xContentRegistry),
+            queryBuilder,
             datafeedConfig.getParsedAggregations(xContentRegistry),
             Intervals.alignToCeil(start, histogramInterval),
             Intervals.alignToFloor(end, histogramInterval),

+ 11 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorFactory.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
 
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
@@ -91,6 +92,15 @@ public class CompositeAggregationDataExtractorFactory implements DataExtractorFa
 
     @Override
     public DataExtractor newExtractor(long start, long end) {
+        return buildNewExtractor(start, end, parsedQuery);
+    }
+
+    @Override
+    public DataExtractor newExtractor(long start, long end, QueryBuilder queryBuilder) {
+        return buildNewExtractor(start, end, QueryBuilders.boolQuery().filter(parsedQuery).filter(queryBuilder));
+    }
+
+    private DataExtractor buildNewExtractor(long start, long end, QueryBuilder queryBuilder) {
         CompositeAggregationBuilder compositeAggregationBuilder = new CompositeAggregationBuilder(
             compositeAggName,
             compositeValuesSourceBuilders
@@ -104,7 +114,7 @@ public class CompositeAggregationDataExtractorFactory implements DataExtractorFa
             job.getDataDescription().getTimeField(),
             job.getAnalysisConfig().analysisFields(),
             datafeedConfig.getIndices(),
-            parsedQuery,
+            queryBuilder,
             compositeAggregationBuilder,
             this.dateHistogramGroupSourceName,
             Intervals.alignToCeil(start, histogramInterval),

+ 16 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java

@@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
@@ -78,13 +80,26 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
 
     @Override
     public DataExtractor newExtractor(long start, long end) {
+        return buildExtractor(start, end, datafeedConfig.getParsedQuery(xContentRegistry));
+    }
+
+    @Override
+    public DataExtractor newExtractor(long start, long end, QueryBuilder queryBuilder) {
+        return buildExtractor(
+            start,
+            end,
+            QueryBuilders.boolQuery().filter(datafeedConfig.getParsedQuery(xContentRegistry)).filter(queryBuilder)
+        );
+    }
+
+    private DataExtractor buildExtractor(long start, long end, QueryBuilder queryBuilder) {
         long histogramInterval = datafeedConfig.getHistogramIntervalMillis(xContentRegistry);
         AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext(
             job.getId(),
             job.getDataDescription().getTimeField(),
             job.getAnalysisConfig().analysisFields(),
             datafeedConfig.getIndices(),
-            datafeedConfig.getParsedQuery(xContentRegistry),
+            queryBuilder,
             datafeedConfig.getParsedAggregations(xContentRegistry),
             Intervals.alignToCeil(start, histogramInterval),
             Intervals.alignToFloor(end, histogramInterval),

+ 16 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java

@@ -7,6 +7,8 @@
 package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
 
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
@@ -44,12 +46,25 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory {
 
     @Override
     public DataExtractor newExtractor(long start, long end) {
+        return buildExtractor(start, end, datafeedConfig.getParsedQuery(xContentRegistry));
+    }
+
+    @Override
+    public DataExtractor newExtractor(long start, long end, QueryBuilder queryBuilder) {
+        return buildExtractor(
+            start,
+            end,
+            QueryBuilders.boolQuery().filter(datafeedConfig.getParsedQuery(xContentRegistry)).filter(queryBuilder)
+        );
+    }
+
+    private DataExtractor buildExtractor(long start, long end, QueryBuilder queryBuilder) {
         ChunkedDataExtractorContext.TimeAligner timeAligner = newTimeAligner();
         ChunkedDataExtractorContext dataExtractorContext = new ChunkedDataExtractorContext(
             job.getId(),
             job.getDataDescription().getTimeField(),
             datafeedConfig.getIndices(),
-            datafeedConfig.getParsedQuery(xContentRegistry),
+            queryBuilder,
             datafeedConfig.getScrollSize(),
             timeAligner.alignToCeil(start),
             timeAligner.alignToFloor(end),

+ 16 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java

@@ -13,6 +13,8 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@@ -52,11 +54,24 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
 
     @Override
     public DataExtractor newExtractor(long start, long end) {
+        return buildExtractor(start, end, datafeedConfig.getParsedQuery(xContentRegistry));
+    }
+
+    @Override
+    public DataExtractor newExtractor(long start, long end, QueryBuilder queryBuilder) {
+        return buildExtractor(
+            start,
+            end,
+            QueryBuilders.boolQuery().filter(datafeedConfig.getParsedQuery(xContentRegistry)).filter(queryBuilder)
+        );
+    }
+
+    private DataExtractor buildExtractor(long start, long end, QueryBuilder queryBuilder) {
         ScrollDataExtractorContext dataExtractorContext = new ScrollDataExtractorContext(
             job.getId(),
             extractedFields,
             datafeedConfig.getIndices(),
-            datafeedConfig.getParsedQuery(xContentRegistry),
+            queryBuilder,
             datafeedConfig.getScriptFields(),
             datafeedConfig.getScrollSize(),
             start,

+ 5 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPreviewDatafeedAction.java

@@ -12,6 +12,7 @@ import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
+import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 
 import java.io.IOException;
@@ -49,12 +50,14 @@ public class RestPreviewDatafeedAction extends BaseRestHandler {
 
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
+        String startTime = restRequest.param(StartDatafeedAction.START_TIME.getPreferredName(), null);
+        String endTime = restRequest.param(StartDatafeedAction.END_TIME.getPreferredName(), null);
         PreviewDatafeedAction.Request request = restRequest.hasContentOrSourceParam()
             ? PreviewDatafeedAction.Request.fromXContent(
                 restRequest.contentOrSourceParamParser(),
                 restRequest.param(DatafeedConfig.ID.getPreferredName(), null)
-            )
-            : new PreviewDatafeedAction.Request(restRequest.param(DatafeedConfig.ID.getPreferredName()));
+            ).setStart(startTime).setEnd(endTime).build()
+            : new PreviewDatafeedAction.Request(restRequest.param(DatafeedConfig.ID.getPreferredName()), startTime, endTime);
         return channel -> client.execute(PreviewDatafeedAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 }

+ 26 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/preview_datafeed.yml

@@ -853,3 +853,29 @@ setup:
   - match: { 3.time: 1487379660000 }
   - match: { 3.airline: foo }
   - match: { 3.responsetime: 42.0 }
+---
+"Test preview datafeed with start and end":
+  - do:
+      ml.preview_datafeed:
+        start: "2017-02-18T00:30:00Z"
+        end: "2017-02-18T01:00:00Z"
+        body: >
+          {
+            "datafeed_config": {
+              "job_id":"preview-datafeed-job",
+              "indexes":"airline-data"
+            },
+            "job_config": {
+              "analysis_config": {
+                "bucket_span": "1h",
+                "detectors": [{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
+              },
+              "data_description": {
+                "time_field":"time"
+              }
+            }
+          }
+  - length: { $body: 1 }
+  - match: { 0.time: 1487377800000 }
+  - match: { 0.airline: foo }
+  - match: { 0.responsetime: 1.0 }