浏览代码

[ML] adding new _preview endpoint for data frame analytics (#69453)

This commit adds a new `_preview` endpoint for data frame analytics. 

This allows users to see the data on which their model will be trained. This is especially useful 
in the arrival of custom feature processors.

The API design is a similar to datafeed `_preview` and data frame analytics `_explain`.
Benjamin Trent 4 年之前
父节点
当前提交
2279cafb4e
共有 21 个文件被更改,包括 1002 次插入17 次删除
  1. 2 0
      docs/reference/ml/df-analytics/apis/index.asciidoc
  2. 1 0
      docs/reference/ml/df-analytics/apis/ml-df-analytics-apis.asciidoc
  3. 107 0
      docs/reference/ml/df-analytics/apis/preview-dfanalytics.asciidoc
  4. 175 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsAction.java
  5. 5 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java
  6. 45 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsActionRequestTests.java
  7. 39 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsActionResponseTests.java
  8. 2 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java
  9. 4 0
      x-pack/plugin/ml/qa/ml-with-security/build.gradle
  10. 63 1
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java
  11. 10 0
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java
  12. 41 0
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java
  13. 37 1
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java
  14. 5 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  15. 117 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDataFrameAnalyticsAction.java
  16. 66 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java
  17. 100 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestPreviewDataFrameAnalyticsAction.java
  18. 1 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java
  19. 43 0
      x-pack/plugin/src/test/resources/rest-api-spec/api/ml.preview_data_frame_analytics.json
  20. 0 4
      x-pack/plugin/src/test/resources/rest-api-spec/test/ml/explain_data_frame_analytics.yml
  21. 139 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/ml/preview_data_frame_analytics.yml

+ 2 - 0
docs/reference/ml/df-analytics/apis/index.asciidoc

@@ -18,6 +18,8 @@ include::get-dfanalytics.asciidoc[leveloffset=+2]
 include::get-dfanalytics-stats.asciidoc[leveloffset=+2]
 include::get-trained-models.asciidoc[leveloffset=+2]
 include::get-trained-models-stats.asciidoc[leveloffset=+2]
+//PREVIEW
+include::preview-dfanalytics.asciidoc[leveloffset=+2]
 //SET/START/STOP
 include::start-dfanalytics.asciidoc[leveloffset=+2]
 include::stop-dfanalytics.asciidoc[leveloffset=+2]

+ 1 - 0
docs/reference/ml/df-analytics/apis/ml-df-analytics-apis.asciidoc

@@ -5,6 +5,7 @@
 
 You can use the following APIs to perform {ml} {dfanalytics} activities.
 
+* <<preview-dfanalytics,Preview {dfanalytics}>>
 * <<put-dfanalytics,Create {dfanalytics-jobs}>>
 * <<update-dfanalytics,Update {dfanalytics-jobs}>>
 * <<delete-dfanalytics,Delete {dfanalytics-jobs}>>

+ 107 - 0
docs/reference/ml/df-analytics/apis/preview-dfanalytics.asciidoc

@@ -0,0 +1,107 @@
+[role="xpack"]
+[testenv="platinum"]
+[[preview-dfanalytics]]
+= Preview {dfanalytics} API
+
+[subs="attributes"]
+++++
+<titleabbrev>Preview {dfanalytics}</titleabbrev>
+++++
+
+Previews the features used by a {dataframe-analytics-config}.
+
+beta::[]
+
+
+[[ml-preview-dfanalytics-request]]
+== {api-request-title}
+
+`GET _ml/data_frame/analytics/_preview` +
+
+`POST _ml/data_frame/analytics/_preview` +
+
+`GET _ml/data_frame/analytics/<data_frame_analytics_id>/_preview` +
+
+`POST _ml/data_frame/analytics/<data_frame_analytics_id>/_preview`
+
+
+[[ml-preview-dfanalytics-prereq]]
+== {api-prereq-title}
+
+If the {es} {security-features} are enabled, you must have the following
+privileges:
+
+* cluster: `monitor_ml`
+
+For more information, see <<security-privileges>> and {ml-docs-setup-privileges}.
+
+
+[[ml-preview-dfanalytics-desc]]
+== {api-description-title}
+
+This API provides preview of the extracted features for a {dataframe-analytics-config}
+that either exists already or one that has not been created yet.
+
+
+[[ml-preview-dfanalytics-path-params]]
+== {api-path-parms-title}
+
+`<data_frame_analytics_id>`::
+(Optional, string)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics]
+
+[[ml-preview-dfanalytics-request-body]]
+== {api-request-body-title}
+
+`config`::
+(Optional, object)
+A {dataframe-analytics-config} as described in <<put-dfanalytics>>.
+Note that `id` and `dest` don't need to be provided in the context of this API.
+
+[role="child_attributes"]
+[[ml-preview-dfanalytics-results]]
+== {api-response-body-title}
+
+The API returns a response that contains the following:
+
+`feature_values`::
+(array)
+An array of objects that contain feature name and value pairs. The features have
+been processed and indicate what will be sent to the model for training.
+
+[[ml-preview-dfanalytics-example]]
+== {api-examples-title}
+
+[source,console]
+--------------------------------------------------
+POST _ml/data_frame/analytics/_preview
+{
+  "config": {
+    "source": {
+      "index": "houses_sold_last_10_yrs"
+    },
+    "analysis": {
+      "regression": {
+        "dependent_variable": "price"
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TEST[skip:TBD]
+
+The API returns the following results:
+
+[source,console-result]
+----
+{
+  "feature_values": [
+    {
+      "number_of_bedrooms": "1",
+      "postcode": "29655",
+      "price": "140.4"
+    },
+    ...
+  ]
+}
+----

+ 175 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsAction.java

@@ -0,0 +1,175 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+package org.elasticsearch.xpack.core.ml.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+
+public class PreviewDataFrameAnalyticsAction extends ActionType<PreviewDataFrameAnalyticsAction.Response> {
+
+    public static final PreviewDataFrameAnalyticsAction INSTANCE = new PreviewDataFrameAnalyticsAction();
+    public static final String NAME = "cluster:admin/xpack/ml/data_frame/analytics/preview";
+
+    private PreviewDataFrameAnalyticsAction() {
+        super(NAME, PreviewDataFrameAnalyticsAction.Response::new);
+    }
+
+    public static class Request extends ActionRequest {
+
+        public static final ParseField CONFIG = new ParseField("config");
+
+        private final DataFrameAnalyticsConfig config;
+
+        static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>(
+            "preview_data_frame_analytics_response",
+            Request.Builder::new
+        );
+        static {
+            PARSER.declareObject(Request.Builder::setConfig, DataFrameAnalyticsConfig.STRICT_PARSER::apply, CONFIG);
+        }
+
+        public static Request.Builder fromXContent(XContentParser parser) {
+            return PARSER.apply(parser, null);
+        }
+
+        public Request(DataFrameAnalyticsConfig config) {
+            this.config = ExceptionsHelper.requireNonNull(config, CONFIG);
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            this.config = new DataFrameAnalyticsConfig(in);
+        }
+
+        public DataFrameAnalyticsConfig getConfig() {
+            return config;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            config.writeTo(out);
+        }
+
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return Objects.equals(config, request.config);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(config);
+        }
+
+        public static class Builder {
+            private DataFrameAnalyticsConfig config;
+
+            private Builder setConfig(DataFrameAnalyticsConfig.Builder config) {
+                this.config = config.buildForExplain();
+                return this;
+            }
+
+            public Builder setConfig(DataFrameAnalyticsConfig config) {
+                this.config = config;
+                return this;
+            }
+
+            public DataFrameAnalyticsConfig getConfig() {
+                return config;
+            }
+
+            public Request build() {
+                return new Request(config);
+            }
+        }
+    }
+
+    public static class Response extends ActionResponse implements ToXContentObject {
+
+        public static final ParseField TYPE = new ParseField("preview_data_frame_analytics_response");
+        public static final ParseField FEATURE_VALUES = new ParseField("feature_values");
+
+        @SuppressWarnings("unchecked")
+        static final ConstructingObjectParser<Response, Void> PARSER =
+            new ConstructingObjectParser<>(
+                TYPE.getPreferredName(),
+                args -> new Response((List<Map<String, Object>>) args[0]));
+
+        static {
+            PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), FEATURE_VALUES);
+        }
+
+        private final List<Map<String, Object>> featureValues;
+
+        public Response(List<Map<String, Object>> featureValues) {
+            this.featureValues = Objects.requireNonNull(featureValues);
+        }
+
+        public Response(StreamInput in) throws IOException {
+            super(in);
+            this.featureValues = in.readList(StreamInput::readMap);
+        }
+
+        public List<Map<String, Object>> getFeatureValues() {
+            return featureValues;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeCollection(featureValues, StreamOutput::writeMap);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field(FEATURE_VALUES.getPreferredName(), featureValues);
+            builder.endObject();
+            return builder;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) return true;
+            if (other == null || getClass() != other.getClass()) return false;
+
+            Response that = (Response) other;
+            return Objects.equals(featureValues, that.featureValues);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(featureValues);
+        }
+    }
+}

+ 5 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java

@@ -40,6 +40,9 @@ import static org.elasticsearch.xpack.core.ml.utils.ToXContentParams.EXCLUDE_GEN
 
 public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
 
+    public static final String BLANK_ID = "blank_data_frame_id";
+    public static final String BLANK_DEST_INDEX = "blank_dest_index";
+
     public static final String TYPE = "data_frame_analytics_config";
 
     public static final ByteSizeValue DEFAULT_MODEL_MEMORY_LIMIT = ByteSizeValue.ofGb(1);
@@ -456,10 +459,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
          */
         public DataFrameAnalyticsConfig buildForExplain() {
             return new DataFrameAnalyticsConfig(
-                id != null ? id : "dummy",
+                id != null ? id : BLANK_ID,
                 description,
                 source,
-                dest != null ? dest : new DataFrameAnalyticsDest("dummy", null),
+                dest != null ? dest : new DataFrameAnalyticsDest(BLANK_DEST_INDEX, null),
                 analysis,
                 headers,
                 modelMemoryLimit,

+ 45 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsActionRequestTests.java

@@ -0,0 +1,45 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.action;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction.Request;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
+import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
+import org.elasticsearch.xpack.core.ml.inference.MlInferenceNamedXContentProvider;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+
+public class PreviewDataFrameAnalyticsActionRequestTests extends AbstractWireSerializingTestCase<Request> {
+
+    @Override
+    protected NamedWriteableRegistry getNamedWriteableRegistry() {
+        List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
+        namedWriteables.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedWriteables());
+        namedWriteables.addAll(new MlInferenceNamedXContentProvider().getNamedWriteables());
+        namedWriteables.addAll(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedWriteables());
+        return new NamedWriteableRegistry(namedWriteables);
+    }
+
+    @Override
+    protected Writeable.Reader<Request> instanceReader() {
+        return Request::new;
+    }
+
+    @Override
+    protected Request createTestInstance() {
+        return new Request(DataFrameAnalyticsConfigTests.createRandom(randomAlphaOfLength(10)));
+    }
+}

+ 39 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PreviewDataFrameAnalyticsActionResponseTests.java

@@ -0,0 +1,39 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+package org.elasticsearch.xpack.core.ml.action;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction.Response;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class PreviewDataFrameAnalyticsActionResponseTests extends AbstractWireSerializingTestCase<Response> {
+
+    @Override
+    protected Response createTestInstance() {
+        return new Response(
+            Stream.generate(() -> randomHashMap("foo", "bar", "baz"))
+                .limit(randomIntBetween(1, 10))
+                .collect(Collectors.toList())
+        );
+    }
+
+    private static Map<String, Object> randomHashMap(String... keys) {
+        return Arrays.stream(keys).collect(Collectors.toMap(Function.identity(), k -> randomAlphaOfLength(10)));
+
+    }
+
+    @Override
+    protected Writeable.Reader<Response> instanceReader() {
+        return Response::new;
+    }
+}

+ 2 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java

@@ -417,7 +417,7 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
 
         DataFrameAnalyticsConfig config = builder.buildForExplain();
 
-        assertThat(config.getId(), equalTo("dummy"));
+        assertThat(config.getId(), equalTo(DataFrameAnalyticsConfig.BLANK_ID));
     }
 
     public void testBuildForExplain_MissingDest() {
@@ -428,7 +428,7 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
 
         DataFrameAnalyticsConfig config = builder.buildForExplain();
 
-        assertThat(config.getDest().getIndex(), equalTo("dummy"));
+        assertThat(config.getDest().getIndex(), equalTo(DataFrameAnalyticsConfig.BLANK_DEST_INDEX));
     }
 
     public void testPreventCreateTimeInjection() throws IOException {

+ 4 - 0
x-pack/plugin/ml/qa/ml-with-security/build.gradle

@@ -126,6 +126,10 @@ tasks.named("yamlRestTest").configure {
     'ml/explain_data_frame_analytics/Test both job id and body',
     'ml/explain_data_frame_analytics/Test missing job',
     'ml/explain_data_frame_analytics/Test empty data frame given body',
+    'ml/preview_data_frame_analytics/Test neither job id nor body',
+    'ml/preview_data_frame_analytics/Test both job id and body',
+    'ml/preview_data_frame_analytics/Test missing job',
+    'ml/preview_data_frame_analytics/Test id that matches multiple jobs',
     'ml/delete_job_force/Test cannot force delete a non-existent job',
     'ml/delete_model_snapshot/Test delete snapshot missing snapshotId',
     'ml/delete_model_snapshot/Test delete snapshot missing job_id',

+ 63 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

@@ -40,7 +40,6 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
 import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
 import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
 import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
-import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
 import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.Accuracy;
 import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.AucRoc;
 import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.MulticlassConfusionMatrix;
@@ -66,12 +65,14 @@ import java.util.Set;
 import static java.util.stream.Collectors.toList;
 import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
 import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.emptyString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.in;
@@ -893,6 +894,67 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
         assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
     }
 
+    public void testPreview() throws Exception {
+        initialize("preview_analytics");
+        indexData(sourceIndex, 300, 50, KEYWORD_FIELD);
+        DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
+        putAnalytics(config);
+
+        List<Map<String, Object>> preview = previewDataFrame(jobId).getFeatureValues();
+        for (Map<String, Object> feature : preview) {
+            assertThat(feature.keySet(), containsInAnyOrder(
+                BOOLEAN_FIELD,
+                KEYWORD_FIELD,
+                NUMERICAL_FIELD,
+                DISCRETE_NUMERICAL_FIELD,
+                TEXT_FIELD+".keyword",
+                NESTED_FIELD,
+                ALIAS_TO_KEYWORD_FIELD,
+                ALIAS_TO_NESTED_FIELD
+            ));
+        }
+    }
+
+    public void testPreviewWithProcessors() throws Exception {
+        initialize("processed_preview_analytics");
+        indexData(sourceIndex, 300, 50, KEYWORD_FIELD);
+        DataFrameAnalyticsConfig config =
+            buildAnalytics(jobId, sourceIndex, destIndex, null,
+                new Classification(
+                    KEYWORD_FIELD,
+                    BoostedTreeParams.builder().setNumTopFeatureImportanceValues(0).build(),
+                    null,
+                    null,
+                    2,
+                    10.0,
+                    42L,
+                    Arrays.asList(
+                        new OneHotEncoding(NESTED_FIELD, MapBuilder.<String, String>newMapBuilder()
+                            .put(KEYWORD_FIELD_VALUES.get(0), "cat_column_custom_2")
+                            .put(KEYWORD_FIELD_VALUES.get(1), "dog_column_custom_2").map(), true),
+                        new OneHotEncoding(TEXT_FIELD, MapBuilder.<String, String>newMapBuilder()
+                            .put(KEYWORD_FIELD_VALUES.get(0), "cat_column_custom_3")
+                            .put(KEYWORD_FIELD_VALUES.get(1), "dog_column_custom_3").map(), true)
+                    ),
+                    null));
+        putAnalytics(config);
+
+        List<Map<String, Object>> preview = previewDataFrame(jobId).getFeatureValues();
+        for (Map<String, Object> feature : preview) {
+            assertThat(feature.keySet(), hasItems(
+                BOOLEAN_FIELD,
+                KEYWORD_FIELD,
+                NUMERICAL_FIELD,
+                DISCRETE_NUMERICAL_FIELD,
+                "cat_column_custom_2",
+                "dog_column_custom_2",
+                "cat_column_custom_3",
+                "dog_column_custom_3"
+            ));
+            assertThat(feature.keySet(), not(hasItems(NESTED_FIELD, TEXT_FIELD)));
+        }
+    }
+
     private static <T> T getOnlyElement(List<T> list) {
         assertThat(list, hasSize(1));
         return list.get(0);

+ 10 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

@@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
 import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
@@ -190,6 +191,15 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
         return client().execute(EvaluateDataFrameAction.INSTANCE, request).actionGet();
     }
 
+    protected PreviewDataFrameAnalyticsAction.Response previewDataFrame(String id) {
+        List<DataFrameAnalyticsConfig> analytics = getAnalytics(id);
+        assertThat(analytics, hasSize(1));
+        return client().execute(
+            PreviewDataFrameAnalyticsAction.INSTANCE,
+            new PreviewDataFrameAnalyticsAction.Request(analytics.get(0))
+        ).actionGet();
+    }
+
     protected static DataFrameAnalyticsConfig buildAnalytics(String id, String sourceIndex, String destIndex,
                                                              @Nullable String resultsField, DataFrameAnalysis analysis) throws Exception {
         return buildAnalytics(id, sourceIndex, destIndex, resultsField, analysis, QueryBuilders.matchAllQuery());

+ 41 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

@@ -48,10 +48,12 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.Matchers.emptyString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
@@ -773,6 +775,45 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             "Finished analysis");
     }
 
+    public void testPreview() throws Exception {
+        initialize("preview_analytics");
+        indexData(sourceIndex, 300, 50);
+
+        DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
+        putAnalytics(config);
+        List<Map<String, Object>> preview = previewDataFrame(jobId).getFeatureValues();
+        for (Map<String, Object> feature : preview) {
+            assertThat(feature.keySet(), hasItems(NUMERICAL_FEATURE_FIELD, DISCRETE_NUMERICAL_FEATURE_FIELD, DEPENDENT_VARIABLE_FIELD));
+        }
+    }
+
+    public void testPreviewWithProcessors() throws Exception {
+        initialize("processed_preview_analytics");
+        indexData(sourceIndex, 300, 50);
+
+        DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null,
+            new Regression(
+                DEPENDENT_VARIABLE_FIELD,
+                BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(),
+                null,
+                null,
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new OneHotEncoding(DISCRETE_NUMERICAL_FEATURE_FIELD,
+                        Collections.singletonMap(DISCRETE_NUMERICAL_FEATURE_VALUES.get(0).toString(), "tenner"), true)
+                ),
+                null)
+        );
+        putAnalytics(config);
+        List<Map<String, Object>> preview = previewDataFrame(jobId).getFeatureValues();
+        for (Map<String, Object> feature : preview) {
+            assertThat(feature.keySet(), hasItems(NUMERICAL_FEATURE_FIELD, "tenner", DEPENDENT_VARIABLE_FIELD));
+            assertThat(feature, not(hasKey(DISCRETE_NUMERICAL_FEATURE_VALUES)));
+        }
+    }
+
     private void initialize(String jobId) {
         this.jobId = jobId;
         this.sourceIndex = jobId + "_source_index";

+ 37 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java

@@ -30,7 +30,6 @@ import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
 import org.junit.After;
 import org.junit.Before;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +40,8 @@ import static org.hamcrest.Matchers.emptyString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -158,6 +159,41 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
             "Finished analysis");
     }
 
+    public void testPreview() throws Exception {
+        String sourceIndex = "test-outlier-detection-preview";
+
+        client().admin().indices().prepareCreate(sourceIndex)
+            .setMapping("numeric_1", "type=double", "numeric_2", "type=unsigned_long", "categorical_1", "type=keyword")
+            .get();
+
+        BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
+        bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+
+        for (int i = 0; i < 5; i++) {
+            IndexRequest indexRequest = new IndexRequest(sourceIndex);
+
+            // We insert one odd value out of 5 for one feature
+            String docId = i == 0 ? "outlier" : "normal" + i;
+            indexRequest.id(docId);
+            indexRequest.source("numeric_1", i == 0 ? 100.0 : 1.0, "numeric_2", 1, "categorical_1", "foo_" + i);
+            bulkRequestBuilder.add(indexRequest);
+        }
+        BulkResponse bulkResponse = bulkRequestBuilder.get();
+        if (bulkResponse.hasFailures()) {
+            fail("Failed to index data: " + bulkResponse.buildFailureMessage());
+        }
+
+        String id = "test_outlier_detection_preview";
+        DataFrameAnalyticsConfig config = buildAnalytics(id, sourceIndex, sourceIndex + "-results", null,
+            new OutlierDetection.Builder().build());
+        putAnalytics(config);
+        List<Map<String, Object>> preview = previewDataFrame(id).getFeatureValues();
+        for (Map<String, Object> feature : preview) {
+            assertThat(feature.keySet(), hasItems("numeric_1", "numeric_2"));
+            assertThat(feature, not(hasKey("categorical_1")));
+        }
+    }
+
     public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
         String sourceIndex = "test-outlier-detection-with-enough-docs-to-scroll";
 

+ 5 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -96,6 +96,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteTrainedModelAliasAction;
 import org.elasticsearch.xpack.core.ml.action.EstimateModelMemoryAction;
 import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
 import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
 import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
 import org.elasticsearch.xpack.core.ml.action.ForecastJobAction;
@@ -175,6 +176,7 @@ import org.elasticsearch.xpack.ml.action.TransportDeleteTrainedModelAliasAction;
 import org.elasticsearch.xpack.ml.action.TransportEstimateModelMemoryAction;
 import org.elasticsearch.xpack.ml.action.TransportEvaluateDataFrameAction;
 import org.elasticsearch.xpack.ml.action.TransportExplainDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.ml.action.TransportPreviewDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.ml.action.TransportFinalizeJobExecutionAction;
 import org.elasticsearch.xpack.ml.action.TransportFlushJobAction;
 import org.elasticsearch.xpack.ml.action.TransportForecastJobAction;
@@ -308,6 +310,7 @@ import org.elasticsearch.xpack.ml.rest.datafeeds.RestUpdateDatafeedAction;
 import org.elasticsearch.xpack.ml.rest.dataframe.RestDeleteDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.ml.rest.dataframe.RestEvaluateDataFrameAction;
 import org.elasticsearch.xpack.ml.rest.dataframe.RestExplainDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.ml.rest.dataframe.RestPreviewDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsStatsAction;
 import org.elasticsearch.xpack.ml.rest.dataframe.RestPostDataFrameAnalyticsUpdateAction;
@@ -945,6 +948,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
             new RestUpgradeJobModelSnapshotAction(),
             new RestPutTrainedModelAliasAction(),
             new RestDeleteTrainedModelAliasAction(),
+            new RestPreviewDataFrameAnalyticsAction(),
             // CAT Handlers
             new RestCatJobsAction(),
             new RestCatTrainedModelsAction(),
@@ -1030,6 +1034,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
                 new ActionHandler<>(UpgradeJobModelSnapshotAction.INSTANCE, TransportUpgradeJobModelSnapshotAction.class),
                 new ActionHandler<>(PutTrainedModelAliasAction.INSTANCE, TransportPutTrainedModelAliasAction.class),
                 new ActionHandler<>(DeleteTrainedModelAliasAction.INSTANCE, TransportDeleteTrainedModelAliasAction.class),
+                new ActionHandler<>(PreviewDataFrameAnalyticsAction.INSTANCE, TransportPreviewDataFrameAnalyticsAction.class),
                 usageAction,
                 infoAction);
     }

+ 117 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDataFrameAnalyticsAction.java

@@ -0,0 +1,117 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+package org.elasticsearch.xpack.ml.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.client.ParentTaskAssigningClient;
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.license.LicenseUtils;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.XPackField;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction.Request;
+import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction.Response;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
+import org.elasticsearch.xpack.core.security.SecurityContext;
+import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
+import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
+import org.elasticsearch.xpack.ml.dataframe.extractor.ExtractedFieldsDetectorFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.elasticsearch.xpack.core.ClientHelper.filterSecurityHeaders;
+import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
+
+/**
+ * Previews the data that is sent to the analytics model for training
+ */
+public class TransportPreviewDataFrameAnalyticsAction extends HandledTransportAction<Request, Response> {
+
+    private final XPackLicenseState licenseState;
+    private final NodeClient client;
+    private final SecurityContext securityContext;
+    private final ThreadPool threadPool;
+
+    @Inject
+    public TransportPreviewDataFrameAnalyticsAction(
+        TransportService transportService,
+        ActionFilters actionFilters,
+        NodeClient client,
+        XPackLicenseState licenseState,
+        Settings settings,
+        ThreadPool threadPool
+    ) {
+        super(PreviewDataFrameAnalyticsAction.NAME, transportService, actionFilters, Request::new);
+        this.client = Objects.requireNonNull(client);
+        this.licenseState = licenseState;
+        this.threadPool = threadPool;
+        this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings)
+            ? new SecurityContext(settings, threadPool.getThreadContext())
+            : null;
+    }
+
+    private static Map<String, Object> mergeRow(DataFrameDataExtractor.Row row, List<String> fieldNames) {
+        return row.getValues() == null
+            ? Collections.emptyMap()
+            : IntStream.range(0, row.getValues().length).boxed().collect(Collectors.toMap(fieldNames::get, i -> row.getValues()[i]));
+    }
+
+    @Override
+    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
+        if (licenseState.checkFeature(XPackLicenseState.Feature.MACHINE_LEARNING) == false) {
+            listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
+            return;
+        }
+        if (licenseState.isSecurityEnabled()) {
+            useSecondaryAuthIfAvailable(this.securityContext, () -> {
+                // Set the auth headers (preferring the secondary headers) to the caller's.
+                // Regardless if the config was previously stored or not.
+                DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder(request.getConfig()).setHeaders(
+                    filterSecurityHeaders(threadPool.getThreadContext().getHeaders())
+                ).build();
+                preview(task, config, listener);
+            });
+        } else {
+            preview(task, request.getConfig(), listener);
+        }
+    }
+
+    void preview(Task task, DataFrameAnalyticsConfig config, ActionListener<Response> listener) {
+        final ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(
+            new ParentTaskAssigningClient(client, task.getParentTaskId())
+        );
+        extractedFieldsDetectorFactory.createFromSource(config, ActionListener.wrap(extractedFieldsDetector -> {
+            DataFrameDataExtractor extractor = DataFrameDataExtractorFactory.createForSourceIndices(
+                client,
+                task.getParentTaskId().toString(),
+                config,
+                extractedFieldsDetector.detect().v1()
+            ).newExtractor(false);
+            extractor.preview(ActionListener.wrap(
+                rows -> {
+                    List<String> fieldNames = extractor.getFieldNames();
+                    listener.onResponse(new Response(rows.stream().map((r) -> mergeRow(r, fieldNames)).collect(Collectors.toList())));
+                },
+                listener::onFailure
+            ));
+        }, listener::onFailure));
+    }
+
+}

+ 66 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java

@@ -118,6 +118,57 @@ public class DataFrameDataExtractor {
         return hits;
     }
 
+    /**
+     * Provides a preview of the data. Assumes this was created from the source indices.
+     * Does no sorting of the results.
+     * @param listener To alert with the extracted rows
+     */
+    public void preview(ActionListener<List<Row>> listener) {
+
+        SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
+            // This ensures the search throws if there are failures and the scroll context gets cleared automatically
+            .setAllowPartialSearchResults(false)
+            .setIndices(context.indices)
+            .setSize(context.scrollSize)
+            .setQuery(QueryBuilders.boolQuery().filter(context.query));
+
+        setFetchSource(searchRequestBuilder);
+
+        for (ExtractedField docValueField : context.extractedFields.getDocValueFields()) {
+            searchRequestBuilder.addDocValueField(docValueField.getSearchField(), docValueField.getDocValueFormat());
+        }
+
+        searchRequestBuilder.setRuntimeMappings(context.runtimeMappings);
+
+        ClientHelper.executeWithHeadersAsync(
+            context.headers,
+            ClientHelper.ML_ORIGIN,
+            client,
+            SearchAction.INSTANCE,
+            searchRequestBuilder.request(),
+            ActionListener.wrap(
+                searchResponse -> {
+                    if (searchResponse.getHits().getHits().length == 0) {
+                        listener.onResponse(Collections.emptyList());
+                        return;
+                    }
+
+                    final SearchHit[] hits = searchResponse.getHits().getHits();
+                    List<Row> rows = new ArrayList<>(hits.length);
+                    for (SearchHit hit : hits) {
+                        String[] extractedValues = extractValues(hit);
+                        rows.add(extractedValues == null ?
+                            new Row(null, hit, true) :
+                            new Row(extractedValues, hit, false)
+                        );
+                    }
+                    listener.onResponse(rows);
+                },
+                listener::onFailure
+            )
+        );
+    }
+
     protected List<Row> nextSearch() throws IOException {
         return tryRequestWithSearchResponse(() -> executeSearchRequest(buildSearchRequest()));
     }
@@ -266,29 +317,37 @@ public class DataFrameDataExtractor {
     }
 
     private Row createRow(SearchHit hit) {
+        String[] extractedValues = extractValues(hit);
+        if (extractedValues == null) {
+            return new Row(null, hit, true);
+        }
+        boolean isTraining = trainTestSplitter.get().isTraining(extractedValues);
+        Row row = new Row(extractedValues, hit, isTraining);
+        LOGGER.trace(() -> new ParameterizedMessage("[{}] Extracted row: sort key = [{}], is_training = [{}], values = {}",
+            context.jobId, row.getSortKey(), isTraining, Arrays.toString(row.values)));
+        return row;
+    }
+
+    private String[] extractValues(SearchHit hit) {
         String[] extractedValues = new String[organicFeatures.length + processedFeatures.length];
         int i = 0;
         for (String organicFeature : organicFeatures) {
             String extractedValue = extractNonProcessedValues(hit, organicFeature);
             if (extractedValue == null) {
-                return new Row(null, hit, true);
+                return null;
             }
             extractedValues[i++] = extractedValue;
         }
         for (ProcessedField processedField : context.extractedFields.getProcessedFields()) {
             String[] processedValues = extractProcessedValue(processedField, hit);
             if (processedValues == null) {
-                return new Row(null, hit, true);
+                return null;
             }
             for (String processedValue : processedValues) {
                 extractedValues[i++] = processedValue;
             }
         }
-        boolean isTraining = trainTestSplitter.get().isTraining(extractedValues);
-        Row row = new Row(extractedValues, hit, isTraining);
-        LOGGER.trace(() -> new ParameterizedMessage("[{}] Extracted row: sort key = [{}], is_training = [{}], values = {}",
-            context.jobId, row.getSortKey(), isTraining, Arrays.toString(row.values)));
-        return row;
+        return extractedValues;
     }
 
     private void markScrollAsErrored() {

+ 100 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestPreviewDataFrameAnalyticsAction.java

@@ -0,0 +1,100 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+package org.elasticsearch.xpack.ml.rest.dataframe;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.ml.MachineLearning;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+import static org.elasticsearch.rest.RestRequest.Method.POST;
+
+public class RestPreviewDataFrameAnalyticsAction extends BaseRestHandler {
+
+    @Override
+    public List<Route> routes() {
+        return List.of(
+            new Route(GET, MachineLearning.BASE_PATH + "data_frame/analytics/_preview"),
+            new Route(POST, MachineLearning.BASE_PATH + "data_frame/analytics/_preview"),
+            new Route(
+                GET,
+                MachineLearning.BASE_PATH + "data_frame/analytics/{" + DataFrameAnalyticsConfig.ID.getPreferredName() + "}/_preview"
+            ),
+            new Route(
+                POST,
+                MachineLearning.BASE_PATH + "data_frame/analytics/{" + DataFrameAnalyticsConfig.ID.getPreferredName() + "}/_preview"
+            )
+        );
+    }
+
+    @Override
+    public String getName() {
+        return "ml_preview_data_frame_analytics_action";
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
+        final String jobId = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName());
+
+        if (Strings.isNullOrEmpty(jobId) && restRequest.hasContentOrSourceParam() == false) {
+            throw ExceptionsHelper.badRequestException(
+                "Please provide a job [{}] or the config object",
+                DataFrameAnalyticsConfig.ID.getPreferredName()
+            );
+        }
+
+        if (Strings.isNullOrEmpty(jobId) == false && restRequest.hasContentOrSourceParam()) {
+            throw ExceptionsHelper.badRequestException(
+                "Please provide either a job [{}] or the config object but not both",
+                DataFrameAnalyticsConfig.ID.getPreferredName()
+            );
+        }
+        final PreviewDataFrameAnalyticsAction.Request.Builder requestBuilder = Strings.isNullOrEmpty(jobId) ?
+            PreviewDataFrameAnalyticsAction.Request.fromXContent(restRequest.contentOrSourceParamParser()) :
+            new PreviewDataFrameAnalyticsAction.Request.Builder();
+
+        return channel -> {
+            RestToXContentListener<PreviewDataFrameAnalyticsAction.Response> listener = new RestToXContentListener<>(channel);
+
+            if (requestBuilder.getConfig() != null) {
+                client.execute(PreviewDataFrameAnalyticsAction.INSTANCE, requestBuilder.build(), listener);
+            } else {
+                GetDataFrameAnalyticsAction.Request getRequest = new GetDataFrameAnalyticsAction.Request(jobId);
+                getRequest.setAllowNoResources(false);
+                client.execute(GetDataFrameAnalyticsAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
+                    List<DataFrameAnalyticsConfig> jobs = getResponse.getResources().results();
+                    if (jobs.size() > 1) {
+                        listener.onFailure(
+                            ExceptionsHelper.badRequestException(
+                                "expected only one config but matched {}",
+                                jobs.stream().map(DataFrameAnalyticsConfig::getId).collect(Collectors.toList())
+                            )
+                        );
+                    } else {
+                        client.execute(
+                            PreviewDataFrameAnalyticsAction.INSTANCE,
+                            requestBuilder.setConfig(jobs.get(0)).build(),
+                            listener
+                        );
+                    }
+                }, listener::onFailure));
+            }
+        };
+    }
+}

+ 1 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -119,6 +119,7 @@ public class Constants {
         "cluster:admin/xpack/ml/data_frame/analytics/delete",
         "cluster:admin/xpack/ml/data_frame/analytics/explain",
         "cluster:admin/xpack/ml/data_frame/analytics/put",
+        "cluster:admin/xpack/ml/data_frame/analytics/preview",
         "cluster:admin/xpack/ml/data_frame/analytics/start",
         "cluster:admin/xpack/ml/data_frame/analytics/stop",
         "cluster:admin/xpack/ml/data_frame/analytics/update",

+ 43 - 0
x-pack/plugin/src/test/resources/rest-api-spec/api/ml.preview_data_frame_analytics.json

@@ -0,0 +1,43 @@
+{
+  "ml.preview_data_frame_analytics":{
+    "documentation":{
+      "url":"http://www.elastic.co/guide/en/elasticsearch/reference/current/preview-dfanalytics.html",
+      "description":"Previews that will be analyzed given a data frame analytics config."
+    },
+    "stability":"beta",
+    "visibility":"public",
+    "headers":{
+      "accept":[ "application/json"],
+      "content_type":["application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_ml/data_frame/analytics/_preview",
+          "methods":[
+            "GET",
+            "POST"
+          ],
+          "parts":{}
+        },
+        {
+          "path":"/_ml/data_frame/analytics/{id}/_preview",
+          "methods":[
+            "GET",
+            "POST"
+          ],
+          "parts":{
+            "id":{
+              "type":"string",
+              "description":"The ID of the data frame analytics to preview"
+            }
+          }
+        }
+      ]
+    },
+    "body":{
+      "description":"The data frame analytics config to preview",
+      "required":false
+    }
+  }
+}

+ 0 - 4
x-pack/plugin/src/test/resources/rest-api-spec/test/ml/explain_data_frame_analytics.yml

@@ -241,10 +241,6 @@
 
 ---
 "Test field_selection given job":
-  - skip:
-      version: "all"
-      reason: "Awaits fix: https://github.com/elastic/elasticsearch/issues/68337"
-
   - do:
       indices.create:
         index: index-source

+ 139 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/ml/preview_data_frame_analytics.yml

@@ -0,0 +1,139 @@
+---
+"Test neither job id nor body":
+  - do:
+      catch: /Please provide a job \[id\] or the config object/
+      ml.preview_data_frame_analytics:
+        id: ""
+
+---
+"Test both job id and body":
+  - do:
+      catch: /Please provide either a job \[id\] or the config object but not both/
+      ml.preview_data_frame_analytics:
+        id: "foo"
+        body:
+          config:
+            source: { index: "index-source" }
+            analysis: { outlier_detection: {} }
+
+---
+"Test missing job":
+  - do:
+      catch: missing
+      ml.preview_data_frame_analytics:
+        id: "no_such_job"
+
+---
+"Test id that matches multiple jobs":
+
+  - do:
+      indices.create:
+        index: index-source
+
+  - do:
+      ml.put_data_frame_analytics:
+        id: "foo-1"
+        body: >
+          {
+            "source": {
+              "index": "index-source"
+            },
+            "dest": {
+              "index": "index-dest"
+            },
+            "analysis": {"outlier_detection":{}}
+          }
+
+  - do:
+      ml.put_data_frame_analytics:
+        id: "foo-2"
+        body: >
+          {
+            "source": {
+              "index": "index-source"
+            },
+            "dest": {
+              "index": "index-dest"
+            },
+            "analysis": {"outlier_detection":{}}
+          }
+
+  - do:
+      catch: /expected only one config but matched \[foo-1, foo-2\]/
+      ml.preview_data_frame_analytics:
+        id: "foo-*"
+
+---
+"Test feature preview given body":
+
+  - do:
+      indices.create:
+        index: index-source
+        body:
+          mappings:
+            properties:
+              field_1:
+                type: integer
+              field_2:
+                type: double
+              field_3:
+                type: date
+
+  - do:
+      index:
+        index: index-source
+        refresh: true
+        body: { field_1: 3, field_2: 3.14, field_3: "2019-11-11T00:00:00", field_4: "blah" }
+  - match: { result: "created" }
+
+  - do:
+      ml.preview_data_frame_analytics:
+        body:
+          config:
+            source: { index: "index-source" }
+            analysis: { regression: { dependent_variable: "field_1", feature_processors: [{n_gram_encoding: {field: "field_4", feature_prefix: "f", length: 2, n_grams: [1]}}] } }
+  - match: { feature_values.0: {"field_2": "3.14", "f.10": "b", "f.11": "l", "field_1": "3" } }
+  - is_false: feature_values.0.field_4
+
+---
+"Test feature preview given job":
+
+  - do:
+      indices.create:
+        index: index-source
+        body:
+          mappings:
+            properties:
+              field_1:
+                type: integer
+              field_2:
+                type: double
+              field_3:
+                type: date
+
+  - do:
+      index:
+        index: index-source
+        refresh: true
+        body: { field_1: 3, field_2: 3.14, field_3: "2019-11-11T00:00:00", field_4: "blah" }
+  - match: { result: "created" }
+
+  - do:
+      ml.put_data_frame_analytics:
+        id: "ngram-encoding-preview-job"
+        body: >
+          {
+            "source": {
+              "index": "index-source"
+            },
+            "dest": {
+              "index": "index-dest"
+            },
+            "analysis": { "regression": { "dependent_variable": "field_1", "feature_processors": [{"n_gram_encoding": {"field": "field_4", "feature_prefix": "f", "length": 2, "n_grams": [1]}}] } }
+          }
+  - do:
+      ml.preview_data_frame_analytics:
+        id: "ngram-encoding-preview-job"
+
+  - match: { feature_values.0: {"field_2": "3.14", "f.10": "b", "f.11": "l", "field_1": "3" } }
+  - is_false: feature_values.0.field_4