Browse Source

[ML][Data Frame] adds new pipeline field to dest config (#43124)

* [ML][Data Frame] adds new pipeline field to dest config

* Adding pipeline support to _preview

* removing unused import

* moving towards extracting _source from pipeline simulation

* fixing permission requirement, adding _index entry to doc
Benjamin Trent 6 years ago
parent
commit
9f2974985f
18 changed files with 410 additions and 78 deletions
  1. 50 4
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DestConfig.java
  2. 2 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java
  3. 2 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DestConfigTests.java
  4. 11 6
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java
  5. 11 1
      docs/java-rest/high-level/dataframe/put_data_frame.asciidoc
  6. 4 2
      docs/reference/data-frames/apis/put-transform.asciidoc
  7. 16 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java
  8. 26 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java
  9. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java
  10. 2 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfigTests.java
  11. 1 1
      x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java
  12. 126 35
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java
  13. 14 3
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java
  14. 1 1
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java
  15. 78 10
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java
  16. 3 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java
  17. 61 2
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml
  18. 1 1
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

+ 50 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DestConfig.java

@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.Objects;
 
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
 
 /**
  * Configuration containing the destination index for the {@link DataFrameTransformConfig}
@@ -35,29 +36,40 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
 public class DestConfig implements ToXContentObject {
 
     public static final ParseField INDEX = new ParseField("index");
+    public static final ParseField PIPELINE = new ParseField("pipeline");
 
     public static final ConstructingObjectParser<DestConfig, Void> PARSER = new ConstructingObjectParser<>("data_frame_config_dest",
         true,
-        args -> new DestConfig((String)args[0]));
+        args -> new DestConfig((String)args[0], (String)args[1]));
 
     static {
         PARSER.declareString(constructorArg(), INDEX);
+        PARSER.declareString(optionalConstructorArg(), PIPELINE);
     }
 
     private final String index;
+    private final String pipeline;
 
-    public DestConfig(String index) {
+    DestConfig(String index, String pipeline) {
         this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
+        this.pipeline = pipeline;
     }
 
     public String getIndex() {
         return index;
     }
 
+    public String getPipeline() {
+        return pipeline;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
         builder.field(INDEX.getPreferredName(), index);
+        if (pipeline != null) {
+            builder.field(PIPELINE.getPreferredName(), pipeline);
+        }
         builder.endObject();
         return builder;
     }
@@ -72,11 +84,45 @@ public class DestConfig implements ToXContentObject {
         }
 
         DestConfig that = (DestConfig) other;
-        return Objects.equals(index, that.index);
+        return Objects.equals(index, that.index) &&
+            Objects.equals(pipeline, that.pipeline);
     }
 
     @Override
     public int hashCode(){
-        return Objects.hash(index);
+        return Objects.hash(index, pipeline);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String index;
+        private String pipeline;
+
+        /**
+         * Sets which index to which to write the data
+         * @param index where to write the data
+         * @return The {@link Builder} with index set
+         */
+        public Builder setIndex(String index) {
+            this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
+            return this;
+        }
+
+        /**
+         * Sets the pipeline through which the indexed documents should be processed
+         * @param pipeline The pipeline ID
+         * @return The {@link Builder} with pipeline set
+         */
+        public Builder setPipeline(String pipeline) {
+            this.pipeline = pipeline;
+            return this;
+        }
+
+        public DestConfig build() {
+            return new DestConfig(index, pipeline);
+        }
     }
 }

+ 2 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

@@ -307,7 +307,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
         PivotConfig pivotConfig = PivotConfig.builder().setGroups(groupConfig).setAggregations(aggBuilder).build();
 
-        DestConfig destConfig = (destination != null) ? new DestConfig(destination) : null;
+        DestConfig destConfig = (destination != null) ? DestConfig.builder().setIndex(destination).build() : null;
 
         return DataFrameTransformConfig.builder()
             .setId(id)
@@ -333,7 +333,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         DataFrameTransformConfig transform = DataFrameTransformConfig.builder()
             .setId(id)
             .setSource(SourceConfig.builder().setIndex(sourceIndex).setQuery(new MatchAllQueryBuilder()).build())
-            .setDest(new DestConfig("pivot-dest"))
+            .setDest(DestConfig.builder().setIndex("pivot-dest").build())
             .setPivotConfig(pivotConfig)
             .setDescription("transform for testing stats")
             .build();

+ 2 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DestConfigTests.java

@@ -27,7 +27,8 @@ import java.io.IOException;
 public class DestConfigTests extends AbstractXContentTestCase<DestConfig> {
 
     public static DestConfig randomDestConfig() {
-        return new DestConfig(randomAlphaOfLength(10));
+        return new DestConfig(randomAlphaOfLength(10),
+            randomBoolean() ? null : randomAlphaOfLength(10));
     }
 
     @Override

+ 11 - 6
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

@@ -125,6 +125,11 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
             .setIndex("source-index")
             .setQueryConfig(queryConfig).build();
         // end::put-data-frame-transform-source-config
+        // tag::put-data-frame-transform-dest-config
+        DestConfig destConfig = DestConfig.builder()
+            .setIndex("pivot-destination")
+            .setPipeline("my-pipeline").build();
+        // end::put-data-frame-transform-dest-config
         // tag::put-data-frame-transform-group-config
         GroupConfig groupConfig = GroupConfig.builder()
             .groupBy("reviewer", // <1>
@@ -149,7 +154,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
             .builder()
             .setId("reviewer-avg-rating") // <1>
             .setSource(sourceConfig) // <2>
-            .setDest(new DestConfig("pivot-destination")) // <3>
+            .setDest(destConfig) // <3>
             .setPivotConfig(pivotConfig) // <4>
             .setDescription("This is my test transform") // <5>
             .build();
@@ -222,7 +227,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
         DataFrameTransformConfig transformConfig = DataFrameTransformConfig.builder()
             .setId("mega-transform")
             .setSource(SourceConfig.builder().setIndex("source-data").setQueryConfig(queryConfig).build())
-            .setDest(new DestConfig("pivot-dest"))
+            .setDest(DestConfig.builder().setIndex("pivot-dest").build())
             .setPivotConfig(pivotConfig)
             .build();
 
@@ -344,7 +349,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
                 .setIndex("source-data")
                 .setQuery(new MatchAllQueryBuilder())
                 .build())
-            .setDest(new DestConfig("pivot-dest"))
+            .setDest(DestConfig.builder().setIndex("pivot-dest").build())
             .setPivotConfig(pivotConfig)
             .build();
         DataFrameTransformConfig transformConfig2 = DataFrameTransformConfig.builder()
@@ -353,7 +358,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
                 .setIndex("source-data")
                 .setQuery(new MatchAllQueryBuilder())
                 .build())
-            .setDest(new DestConfig("pivot-dest2"))
+            .setDest(DestConfig.builder().setIndex("pivot-dest2").build())
             .setPivotConfig(pivotConfig)
             .build();
 
@@ -488,7 +493,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
                 .setIndex("source-data")
                 .setQuery(new MatchAllQueryBuilder())
                 .build())
-            .setDest(new DestConfig("pivot-dest"))
+            .setDest(DestConfig.builder().setIndex("pivot-dest").build())
             .setPivotConfig(pivotConfig)
             .build();
         client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
@@ -574,7 +579,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
                 .setIndex("source-data")
                 .setQuery(new MatchAllQueryBuilder())
                 .build())
-            .setDest(new DestConfig("pivot-dest"))
+            .setDest(DestConfig.builder().setIndex("pivot-dest").build())
             .setPivotConfig(pivotConfig)
             .build();
 

+ 11 - 1
docs/java-rest/high-level/dataframe/put_data_frame.asciidoc

@@ -33,7 +33,7 @@ include-tagged::{doc-tests-file}[{api}-config]
 --------------------------------------------------
 <1> The {dataframe-transform} ID
 <2> The source indices and query from which to gather data
-<3> The destination index
+<3> The destination index and optional pipeline
 <4> The PivotConfig
 <5> Optional free text description of the transform
 
@@ -49,6 +49,16 @@ If query is not set, a `match_all` query is used by default.
 include-tagged::{doc-tests-file}[{api}-source-config]
 --------------------------------------------------
 
+==== DestConfig
+
+The index where to write the data and the optional pipeline
+through which the docs should be indexed
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-dest-config]
+--------------------------------------------------
+
 ===== QueryConfig
 
 The query with which to select data from the source.

+ 4 - 2
docs/reference/data-frames/apis/put-transform.asciidoc

@@ -38,7 +38,8 @@ IMPORTANT:  You must use {kib} or this API to create a {dataframe-transform}.
 `source` (required):: (object) The source configuration, consisting of `index` and optionally
 a `query`.
 
-`dest` (required):: (object) The destination configuration, consisting of `index`.
+`dest` (required):: (object) The destination configuration, consisting of `index` and optionally a
+`pipeline` id.
 
 `pivot`:: (object) Defines the pivot function `group by` fields and the aggregation to
 reduce the data. See <<data-frame-transform-pivot, data frame transform pivot objects>>.
@@ -76,7 +77,8 @@ PUT _data_frame/transforms/ecommerce_transform
     }
   },
   "dest": {
-    "index": "kibana_sample_data_ecommerce_transform"
+    "index": "kibana_sample_data_ecommerce_transform",
+    "pipeline": "add_timestamp_pipeline"
   },
   "pivot": {
     "group_by": {

+ 16 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java

@@ -24,10 +24,11 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -66,8 +67,20 @@ public class PreviewDataFrameTransformAction extends Action<PreviewDataFrameTran
 
         public static Request fromXContent(final XContentParser parser) throws IOException {
             Map<String, Object> content = parser.map();
-            // Destination and ID are not required for Preview, so we just supply our own
-            content.put(DataFrameField.DESTINATION.getPreferredName(), Collections.singletonMap("index", "unused-transform-preview-index"));
+            // dest.index and ID are not required for Preview, so we just supply our own
+            Map<String, String> tempDestination = new HashMap<>();
+            tempDestination.put(DestConfig.INDEX.getPreferredName(), "unused-transform-preview-index");
+            // Users can still provide just dest.pipeline to preview what their data would look like given the pipeline ID
+            Object providedDestination = content.get(DataFrameField.DESTINATION.getPreferredName());
+            if (providedDestination instanceof Map) {
+                @SuppressWarnings("unchecked")
+                Map<String, String> destMap = (Map<String, String>)providedDestination;
+                String pipeline = destMap.get(DestConfig.PIPELINE.getPreferredName());
+                if (pipeline != null) {
+                    tempDestination.put(DestConfig.PIPELINE.getPreferredName(), pipeline);
+                }
+            }
+            content.put(DataFrameField.DESTINATION.getPreferredName(), tempDestination);
             content.put(DataFrameField.ID.getPreferredName(), "transform-preview");
             try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
                 XContentParser newParser = XContentType.JSON

+ 26 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java

@@ -6,6 +6,7 @@
 
 package org.elasticsearch.xpack.core.dataframe.transforms;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -20,10 +21,12 @@ import java.io.IOException;
 import java.util.Objects;
 
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
 
 public class DestConfig implements Writeable, ToXContentObject {
 
     public static final ParseField INDEX = new ParseField("index");
+    public static final ParseField PIPELINE = new ParseField("pipeline");
 
     public static final ConstructingObjectParser<DestConfig, Void> STRICT_PARSER = createParser(false);
     public static final ConstructingObjectParser<DestConfig, Void> LENIENT_PARSER = createParser(true);
@@ -31,25 +34,37 @@ public class DestConfig implements Writeable, ToXContentObject {
     private static ConstructingObjectParser<DestConfig, Void> createParser(boolean lenient) {
         ConstructingObjectParser<DestConfig, Void> parser = new ConstructingObjectParser<>("data_frame_config_dest",
             lenient,
-            args -> new DestConfig((String)args[0]));
+            args -> new DestConfig((String)args[0], (String) args[1]));
         parser.declareString(constructorArg(), INDEX);
+        parser.declareString(optionalConstructorArg(), PIPELINE);
         return parser;
     }
 
     private final String index;
+    private final String pipeline;
 
-    public DestConfig(String index) {
+    public DestConfig(String index, String pipeline) {
         this.index = ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName());
+        this.pipeline = pipeline;
     }
 
     public DestConfig(final StreamInput in) throws IOException {
         index = in.readString();
+        if (in.getVersion().onOrAfter(Version.CURRENT)) {
+            pipeline = in.readOptionalString();
+        } else {
+            pipeline = null;
+        }
     }
 
     public String getIndex() {
         return index;
     }
 
+    public String getPipeline() {
+        return pipeline;
+    }
+
     public boolean isValid() {
         return index.isEmpty() == false;
     }
@@ -57,12 +72,18 @@ public class DestConfig implements Writeable, ToXContentObject {
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(index);
+        if (out.getVersion().onOrAfter(Version.CURRENT)) {
+            out.writeOptionalString(pipeline);
+        }
     }
 
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
         builder.field(INDEX.getPreferredName(), index);
+        if (pipeline != null) {
+            builder.field(PIPELINE.getPreferredName(), pipeline);
+        }
         builder.endObject();
         return builder;
     }
@@ -77,12 +98,13 @@ public class DestConfig implements Writeable, ToXContentObject {
         }
 
         DestConfig that = (DestConfig) other;
-        return Objects.equals(index, that.index);
+        return Objects.equals(index, that.index) &&
+            Objects.equals(pipeline, that.pipeline);
     }
 
     @Override
     public int hashCode(){
-        return Objects.hash(index);
+        return Objects.hash(index, pipeline);
     }
 
     public static DestConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java

@@ -40,7 +40,7 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractSeriali
     @Override
     protected Request createTestInstance() {
         DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomSourceConfig(),
-                new DestConfig("unused-transform-preview-index"),
+                new DestConfig("unused-transform-preview-index", null),
                 null, PivotConfigTests.randomPivotConfig(), null);
         return new Request(config);
     }

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfigTests.java

@@ -17,7 +17,8 @@ public class DestConfigTests extends AbstractSerializingDataFrameTestCase<DestCo
     private boolean lenient;
 
     public static DestConfig randomDestConfig() {
-        return new DestConfig(randomAlphaOfLength(10));
+        return new DestConfig(randomAlphaOfLength(10),
+            randomBoolean() ? null : randomAlphaOfLength(10));
     }
 
     @Before

+ 1 - 1
x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java

@@ -205,7 +205,7 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
         return DataFrameTransformConfig.builder()
             .setId(id)
             .setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build())
-            .setDest(new DestConfig(destinationIndex))
+            .setDest(DestConfig.builder().setIndex(destinationIndex).build())
             .setPivotConfig(createPivotConfig(groups, aggregations))
             .setDescription("Test data frame transform config id: " + id)
             .build();

+ 126 - 35
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java

@@ -55,7 +55,7 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
         String dataFrameIndex = "pivot_reviews";
         setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
 
-        createPivotReviewsTransform(transformId, dataFrameIndex, null, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
+        createPivotReviewsTransform(transformId, dataFrameIndex, null, null, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
 
         startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
 
@@ -77,7 +77,7 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
         setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
         String query = "\"match\": {\"user_id\": \"user_26\"}";
 
-        createPivotReviewsTransform(transformId, dataFrameIndex, query, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
+        createPivotReviewsTransform(transformId, dataFrameIndex, query, null, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
 
         startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
 
@@ -87,6 +87,46 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
         assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
     }
 
+    public void testPivotWithPipeline() throws Exception {
+        String transformId = "simple_pivot_with_pipeline";
+        String dataFrameIndex = "pivot_with_pipeline";
+        String pipelineId = "my-pivot-pipeline";
+        int pipelineValue = 42;
+        Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId);
+        pipelineRequest.setJsonEntity("{\n" +
+            "  \"description\" : \"my pivot pipeline\",\n" +
+            "  \"processors\" : [\n" +
+            "    {\n" +
+            "      \"set\" : {\n" +
+            "        \"field\": \"pipeline_field\",\n" +
+            "        \"value\": " + pipelineValue +
+            "      }\n" +
+            "    }\n" +
+            "  ]\n" +
+            "}");
+        client().performRequest(pipelineRequest);
+
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
+        createPivotReviewsTransform(transformId, dataFrameIndex, null, pipelineId, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
+
+        startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
+
+        // we expect 27 documents as there shall be 27 user_id's
+        Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
+        assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
+
+        // get and check some users
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769);
+        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
+
+        Map<String, Object> searchResult = getAsMap(dataFrameIndex + "/_search?q=reviewer:user_0");
+        Integer actual = (Integer) ((List<?>) XContentMapValues.extractValue("hits.hits._source.pipeline_field", searchResult)).get(0);
+        assertThat(actual, equalTo(pipelineValue));
+    }
+
     public void testHistogramPivot() throws Exception {
         String transformId = "simple_histogram_pivot";
         String dataFrameIndex = "pivot_reviews_via_histogram";
@@ -138,38 +178,38 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
             + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
 
         config += " \"pivot\": {"
-                + "   \"group_by\": {"
-                + "     \"reviewer\": {"
-                + "       \"terms\": {"
-                + "         \"field\": \"user_id\""
-                + " } } },"
-                + "   \"aggregations\": {"
-                + "     \"avg_rating\": {"
-                + "       \"avg\": {"
-                + "         \"field\": \"stars\""
-                + " } },"
-                + "     \"sum_rating\": {"
-                + "       \"sum\": {"
-                + "         \"field\": \"stars\""
-                + " } },"
-                + "     \"cardinality_business\": {"
-                + "       \"cardinality\": {"
-                + "         \"field\": \"business_id\""
-                + " } },"
-                + "     \"min_rating\": {"
-                + "       \"min\": {"
-                + "         \"field\": \"stars\""
-                + " } },"
-                + "     \"max_rating\": {"
-                + "       \"max\": {"
-                + "         \"field\": \"stars\""
-                + " } },"
-                + "     \"count\": {"
-                + "       \"value_count\": {"
-                + "         \"field\": \"business_id\""
-                + " } }"
-                + " } }"
-                + "}";
+            + "   \"group_by\": {"
+            + "     \"reviewer\": {"
+            + "       \"terms\": {"
+            + "         \"field\": \"user_id\""
+            + " } } },"
+            + "   \"aggregations\": {"
+            + "     \"avg_rating\": {"
+            + "       \"avg\": {"
+            + "         \"field\": \"stars\""
+            + " } },"
+            + "     \"sum_rating\": {"
+            + "       \"sum\": {"
+            + "         \"field\": \"stars\""
+            + " } },"
+            + "     \"cardinality_business\": {"
+            + "       \"cardinality\": {"
+            + "         \"field\": \"business_id\""
+            + " } },"
+            + "     \"min_rating\": {"
+            + "       \"min\": {"
+            + "         \"field\": \"stars\""
+            + " } },"
+            + "     \"max_rating\": {"
+            + "       \"max\": {"
+            + "         \"field\": \"stars\""
+            + " } },"
+            + "     \"count\": {"
+            + "       \"value_count\": {"
+            + "         \"field\": \"business_id\""
+            + " } }"
+            + " } }"
+            + "}";
 
         createDataframeTransformRequest.setJsonEntity(config);
         Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
@@ -260,7 +300,7 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
         createPreviewRequest.setJsonEntity(config);
 
         Map<String, Object> previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest));
-        List<Map<String, Object>> preview = (List<Map<String, Object>>)previewDataframeResponse.get("preview");
+        List<Map<String, Object>> preview = (List<Map<String, Object>>) previewDataframeResponse.get("preview");
         // preview is limited to 100
         assertThat(preview.size(), equalTo(100));
         Set<String> expectedTopLevelFields = new HashSet<>(Arrays.asList("user", "by_day"));
@@ -268,6 +308,57 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
         preview.forEach(p -> {
             Set<String> keys = p.keySet();
             assertThat(keys, equalTo(expectedTopLevelFields));
+            Map<String, Object> nestedObj = (Map<String, Object>) p.get("user");
+            keys = nestedObj.keySet();
+            assertThat(keys, equalTo(expectedNestedFields));
+        });
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testPreviewTransformWithPipeline() throws Exception {
+        String pipelineId = "my-preview-pivot-pipeline";
+        int pipelineValue = 42;
+        Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId);
+        pipelineRequest.setJsonEntity("{\n" +
+            "  \"description\" : \"my pivot preview pipeline\",\n" +
+            "  \"processors\" : [\n" +
+            "    {\n" +
+            "      \"set\" : {\n" +
+            "        \"field\": \"pipeline_field\",\n" +
+            "        \"value\": " + pipelineValue +
+            "      }\n" +
+            "    }\n" +
+            "  ]\n" +
+            "}");
+        client().performRequest(pipelineRequest);
+
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
+        final Request createPreviewRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + "_preview", null);
+
+        String config = "{ \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"} ,"
+            + "\"dest\": {\"pipeline\": \"" + pipelineId + "\"},"
+            + " \"pivot\": {"
+            + "   \"group_by\": {"
+            + "     \"user.id\": {\"terms\": { \"field\": \"user_id\" }},"
+            + "     \"by_day\": {\"date_histogram\": {\"fixed_interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-dd\"}}},"
+            + "   \"aggregations\": {"
+            + "     \"user.avg_rating\": {"
+            + "       \"avg\": {"
+            + "         \"field\": \"stars\""
+            + " } } } }"
+            + "}";
+        createPreviewRequest.setJsonEntity(config);
+
+        Map<String, Object> previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest));
+        List<Map<String, Object>> preview = (List<Map<String, Object>>)previewDataframeResponse.get("preview");
+        // preview is limited to 100
+        assertThat(preview.size(), equalTo(100));
+        Set<String> expectedTopLevelFields = new HashSet<>(Arrays.asList("user", "by_day", "pipeline_field"));
+        Set<String> expectedNestedFields = new HashSet<>(Arrays.asList("id", "avg_rating"));
+        preview.forEach(p -> {
+            Set<String> keys = p.keySet();
+            assertThat(keys, equalTo(expectedTopLevelFields));
+            assertThat(p.get("pipeline_field"), equalTo(pipelineValue));
             Map<String, Object> nestedObj = (Map<String, Object>)p.get("user");
             keys = nestedObj.keySet();
             assertThat(keys, equalTo(expectedNestedFields));

+ 14 - 3
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

@@ -147,12 +147,23 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         createPivotReviewsTransform(transformId, dataFrameIndex, query, null);
     }
 
-    protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String authHeader)
+    protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline)
+        throws IOException {
+        createPivotReviewsTransform(transformId, dataFrameIndex, query, pipeline, null);
+    }
+
+
+    protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline, String authHeader)
         throws IOException {
         final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, authHeader);
 
-        String config = "{"
-            + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
+        String config = "{";
+
+        if (pipeline != null) {
+            config += " \"dest\": {\"index\":\"" + dataFrameIndex + "\", \"pipeline\":\"" + pipeline + "\"},";
+        } else {
+            config += " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
+        }
 
         if (query != null) {
             config += " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\", \"query\":{" + query + "}},";

+ 1 - 1
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java

@@ -119,7 +119,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase {
     public void testGetProgress() throws Exception {
         createReviewsIndex();
         SourceConfig sourceConfig = new SourceConfig(REVIEWS_INDEX_NAME);
-        DestConfig destConfig = new DestConfig("unnecessary");
+        DestConfig destConfig = new DestConfig("unnecessary", null);
         GroupConfig histgramGroupConfig = new GroupConfig(Collections.emptyMap(),
             Collections.singletonMap("every_50", new HistogramGroupSource("count", 50.0)));
         AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();

+ 78 - 10
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java

@@ -6,8 +6,13 @@
 
 package org.elasticsearch.xpack.dataframe.action;
 
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ingest.SimulatePipelineAction;
+import org.elasticsearch.action.ingest.SimulatePipelineRequest;
+import org.elasticsearch.action.ingest.SimulatePipelineResponse;
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
@@ -16,7 +21,14 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.rest.RestStatus;
@@ -26,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.XPackField;
+import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
@@ -34,15 +47,19 @@ import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xpack.dataframe.transforms.DataFrameIndexer.COMPOSITE_AGGREGATION_NAME;
 
 public class TransportPreviewDataFrameTransformAction extends
     HandledTransportAction<PreviewDataFrameTransformAction.Request, PreviewDataFrameTransformAction.Response> {
 
+    private static final Logger logger = LogManager.getLogger(TransportPreviewDataFrameTransformAction.class);
     private static final int NUMBER_OF_PREVIEW_BUCKETS = 100;
     private final XPackLicenseState licenseState;
     private final Client client;
@@ -87,13 +104,41 @@ public class TransportPreviewDataFrameTransformAction extends
 
         Pivot pivot = new Pivot(config.getPivotConfig());
 
-        getPreview(pivot, config.getSource(), ActionListener.wrap(
-            previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
-            listener::onFailure
+        getPreview(pivot,
+            config.getSource(),
+            config.getDestination().getPipeline(),
+            config.getDestination().getIndex(),
+            ActionListener.wrap(
+                previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
+                error -> {
+                    logger.error("Failure gathering preview", error);
+                    listener.onFailure(error);
+                }
         ));
     }
 
-    private void getPreview(Pivot pivot, SourceConfig source, ActionListener<List<Map<String, Object>>> listener) {
+    @SuppressWarnings("unchecked")
+    private void getPreview(Pivot pivot,
+                            SourceConfig source,
+                            String pipeline,
+                            String dest,
+                            ActionListener<List<Map<String, Object>>> listener) {
+        ActionListener<SimulatePipelineResponse> pipelineResponseActionListener = ActionListener.wrap(
+            simulatePipelineResponse -> {
+                List<Map<String, Object>> response = new ArrayList<>(simulatePipelineResponse.getResults().size());
+                for(var simulateDocumentResult : simulatePipelineResponse.getResults()) {
+                    try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
+                        XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
+                        Map<String, Object> tempMap = XContentHelper.convertToMap(BytesReference.bytes(content),
+                            true,
+                            XContentType.JSON).v2();
+                        response.add((Map<String, Object>)XContentMapValues.extractValue("doc._source", tempMap));
+                    }
+                }
+                listener.onResponse(response);
+            },
+            listener::onFailure
+        );
         pivot.deduceMappings(client, source, ActionListener.wrap(
             deducedMappings -> {
                 ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
@@ -103,17 +148,40 @@ public class TransportPreviewDataFrameTransformAction extends
                     pivot.buildSearchRequest(source, null, NUMBER_OF_PREVIEW_BUCKETS),
                     ActionListener.wrap(
                         r -> {
-
                             try {
                                 final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
                                 DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId();
                                 // remove all internal fields
-                                List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
-                                        .peek(record -> {
-                                            record.keySet().removeIf(k -> k.startsWith("_"));
-                                        }).collect(Collectors.toList());
 
-                                listener.onResponse(results);
+                                if (pipeline == null) {
+                                    List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
+                                        .peek(doc -> doc.keySet().removeIf(k -> k.startsWith("_")))
+                                        .collect(Collectors.toList());
+                                    listener.onResponse(results);
+                                } else {
+                                    List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
+                                        .map(doc -> {
+                                            Map<String, Object> src = new HashMap<>();
+                                            String id = (String) doc.get(DataFrameField.DOCUMENT_ID_FIELD);
+                                            doc.keySet().removeIf(k -> k.startsWith("_"));
+                                            src.put("_source", doc);
+                                            src.put("_id", id);
+                                            src.put("_index", dest);
+                                            return src;
+                                        }).collect(Collectors.toList());
+                                    try (XContentBuilder builder = jsonBuilder()) {
+                                        builder.startObject();
+                                        builder.field("docs", results);
+                                        builder.endObject();
+                                        var pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON);
+                                        pipelineRequest.setId(pipeline);
+                                        ClientHelper.executeAsyncWithOrigin(client,
+                                            ClientHelper.DATA_FRAME_ORIGIN,
+                                            SimulatePipelineAction.INSTANCE,
+                                            pipelineRequest,
+                                            pipelineResponseActionListener);
+                                    }
+                                }
                             } catch (AggregationResultUtils.AggregationExtractionException extractionException) {
                                 listener.onFailure(
                                         new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST));

+ 3 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java

@@ -179,6 +179,9 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
             }
 
             IndexRequest request = new IndexRequest(indexName).source(builder).id(id);
+            if (transformConfig.getDestination().getPipeline() != null) {
+                request.setPipeline(transformConfig.getDestination().getPipeline());
+            }
             return request;
         });
     }

+ 61 - 2
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml

@@ -99,6 +99,49 @@ setup:
   - match: { preview.2.time.max: "2017-02-18T01:01:00.000Z" }
   - match: { preview.2.time.min: "2017-02-18T01:01:00.000Z" }
 
+  - do:
+      ingest.put_pipeline:
+        id: "data_frame_simple_pipeline"
+        body:  >
+          {
+            "processors": [
+             {
+               "set" : {
+                 "field" : "my_field",
+                 "value": 42
+               }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+  - do:
+      data_frame.preview_data_frame_transform:
+        body: >
+          {
+            "source": { "index": "airline-data" },
+            "dest": { "pipeline": "data_frame_simple_pipeline" },
+            "pivot": {
+              "group_by": {
+                "airline": {"terms": {"field": "airline"}},
+                "by-hour": {"date_histogram": {"fixed_interval": "1h", "field": "time", "format": "yyyy-MM-dd HH"}}},
+              "aggs": {
+                "avg_response": {"avg": {"field": "responsetime"}}
+              }
+            }
+          }
+  - match: { preview.0.airline: foo }
+  - match: { preview.0.by-hour: "2017-02-18 00" }
+  - match: { preview.0.avg_response: 1.0 }
+  - match: { preview.0.my_field: 42 }
+  - match: { preview.1.airline: bar }
+  - match: { preview.1.by-hour: "2017-02-18 01" }
+  - match: { preview.1.avg_response: 42.0 }
+  - match: { preview.1.my_field: 42 }
+  - match: { preview.2.airline: foo }
+  - match: { preview.2.by-hour: "2017-02-18 01" }
+  - match: { preview.2.avg_response: 42.0 }
+  - match: { preview.2.my_field: 42 }
+
 ---
 "Test preview transform with invalid config":
   - do:
@@ -127,7 +170,6 @@ setup:
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
             }
           }
-
 ---
 "Test preview returns bad request with invalid agg":
   - do:
@@ -161,4 +203,21 @@ setup:
               }
             }
           }
-
+---
+"Test preview with missing pipeline":
+  - do:
+      catch: bad_request
+      data_frame.preview_data_frame_transform:
+        body: >
+          {
+            "source": { "index": "airline-data" },
+            "dest": { "pipeline": "missing-pipeline" },
+            "pivot": {
+              "group_by": {
+                "time": {"date_histogram": {"fixed_interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}},
+              "aggs": {
+                "avg_response": {"avg": {"field": "responsetime"}},
+                "time.min": {"min": {"field": "time"}}
+              }
+            }
+          }

+ 1 - 1
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

@@ -74,7 +74,7 @@ setup:
         body: >
           {
             "source": { "index": "airline-data" },
-            "dest": { "index": "airline-data-by-airline-again" },
+            "dest": { "index": "airline-data-by-airline-again", "pipeline": "airline-pipeline" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}