Browse Source

[ML] make source and dest objects in the transform config (#40337)

* [ML] make source and dest objects in the transform config

* addressing PR comments

* Fixing compilation post merge

* adding comment for Arrays.hashCode

* addressing changes for moving dest to object

* fixing data_frame yml tests

* fixing API test
Benjamin Trent 6 years ago
parent
commit
aaf383aa39
39 changed files with 924 additions and 231 deletions
  1. 16 28
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java
  2. 82 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DestConfig.java
  3. 3 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/QueryConfig.java
  4. 124 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/SourceConfig.java
  5. 8 3
      client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java
  6. 4 5
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java
  7. 1 3
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java
  8. 5 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java
  9. 48 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DestConfigTests.java
  10. 68 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/SourceConfigTests.java
  11. 14 10
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java
  12. 14 5
      docs/java-rest/high-level/dataframe/put_data_frame.asciidoc
  13. 2 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java
  14. 19 49
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java
  15. 91 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java
  16. 8 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/QueryConfig.java
  17. 139 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/SourceConfig.java
  18. 3 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsActionResponseTests.java
  19. 8 6
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java
  20. 21 23
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java
  21. 48 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfigTests.java
  22. 62 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/SourceConfigTests.java
  23. 9 9
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java
  24. 4 5
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java
  25. 2 2
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java
  26. 17 17
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java
  27. 2 2
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java
  28. 16 2
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java
  29. 2 2
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataframeIndex.java
  30. 3 3
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java
  31. 3 3
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java
  32. 3 3
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java
  33. 3 3
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java
  34. 5 11
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java
  35. 0 9
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIteratorTests.java
  36. 2 2
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml
  37. 55 11
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml
  38. 2 2
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml
  39. 8 8
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

+ 16 - 28
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java

@@ -38,32 +38,28 @@ public class DataFrameTransformConfig implements ToXContentObject {
     public static final ParseField ID = new ParseField("id");
     public static final ParseField SOURCE = new ParseField("source");
     public static final ParseField DEST = new ParseField("dest");
-    public static final ParseField QUERY = new ParseField("query");
     // types of transforms
     public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
 
     private final String id;
-    private final String source;
-    private final String dest;
-    private final QueryConfig queryConfig;
+    private final SourceConfig source;
+    private final DestConfig dest;
     private final PivotConfig pivotConfig;
 
     public static final ConstructingObjectParser<DataFrameTransformConfig, String> PARSER =
             new ConstructingObjectParser<>("data_frame_transform", true,
                 (args) -> {
                     String id = (String) args[0];
-                    String source = (String) args[1];
-                    String dest = (String) args[2];
-                    QueryConfig queryConfig = (QueryConfig) args[3];
-                    PivotConfig pivotConfig = (PivotConfig) args[4];
-                    return new DataFrameTransformConfig(id, source, dest, queryConfig, pivotConfig);
+                    SourceConfig source = (SourceConfig) args[1];
+                    DestConfig dest = (DestConfig) args[2];
+                    PivotConfig pivotConfig = (PivotConfig) args[3];
+                    return new DataFrameTransformConfig(id, source, dest, pivotConfig);
                 });
 
     static {
         PARSER.declareString(constructorArg(), ID);
-        PARSER.declareString(constructorArg(), SOURCE);
-        PARSER.declareString(constructorArg(), DEST);
-        PARSER.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p), QUERY);
+        PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE);
+        PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST);
         PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
     }
 
@@ -73,14 +69,12 @@ public class DataFrameTransformConfig implements ToXContentObject {
 
 
     public DataFrameTransformConfig(final String id,
-                                    final String source,
-                                    final String dest,
-                                    final QueryConfig queryConfig,
+                                    final SourceConfig source,
+                                    final DestConfig dest,
                                     final PivotConfig pivotConfig) {
         this.id = id;
         this.source = source;
         this.dest = dest;
-        this.queryConfig = queryConfig;
         this.pivotConfig = pivotConfig;
     }
 
@@ -88,11 +82,11 @@ public class DataFrameTransformConfig implements ToXContentObject {
         return id;
     }
 
-    public String getSource() {
+    public SourceConfig getSource() {
         return source;
     }
 
-    public String getDestination() {
+    public DestConfig getDestination() {
         return dest;
     }
 
@@ -100,23 +94,18 @@ public class DataFrameTransformConfig implements ToXContentObject {
         return pivotConfig;
     }
 
-    public QueryConfig getQueryConfig() {
-        return queryConfig;
-    }
-
     @Override
     public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
         builder.startObject();
         if (id != null) {
             builder.field(ID.getPreferredName(), id);
         }
-        builder.field(SOURCE.getPreferredName(), source);
+        if (source != null) {
+            builder.field(SOURCE.getPreferredName(), source);
+        }
         if (dest != null) {
             builder.field(DEST.getPreferredName(), dest);
         }
-        if (queryConfig != null) {
-            builder.field(QUERY.getPreferredName(), queryConfig);
-        }
         if (pivotConfig != null) {
             builder.field(PIVOT_TRANSFORM.getPreferredName(), pivotConfig);
         }
@@ -139,13 +128,12 @@ public class DataFrameTransformConfig implements ToXContentObject {
         return Objects.equals(this.id, that.id)
                 && Objects.equals(this.source, that.source)
                 && Objects.equals(this.dest, that.dest)
-                && Objects.equals(this.queryConfig, that.queryConfig)
                 && Objects.equals(this.pivotConfig, that.pivotConfig);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, source, dest, queryConfig, pivotConfig);
+        return Objects.hash(id, source, dest, pivotConfig);
     }
 
     @Override

+ 82 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DestConfig.java

@@ -0,0 +1,82 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+
+/**
+ * Configuration containing the destination index for the {@link DataFrameTransformConfig}
+ */
+public class DestConfig implements ToXContentObject {
+
+    public static final ParseField INDEX = new ParseField("index");
+
+    public static final ConstructingObjectParser<DestConfig, Void> PARSER = new ConstructingObjectParser<>("data_frame_config_dest",
+        true,
+        args -> new DestConfig((String)args[0]));
+
+    static {
+        PARSER.declareString(constructorArg(), INDEX);
+    }
+
+    private final String index;
+
+    public DestConfig(String index) {
+        this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
+    }
+
+    public String getIndex() {
+        return index;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(INDEX.getPreferredName(), index);
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other == this) {
+            return true;
+        }
+        if (other == null || other.getClass() != getClass()) {
+            return false;
+        }
+
+        DestConfig that = (DestConfig) other;
+        return Objects.equals(index, that.index);
+    }
+
+    @Override
+    public int hashCode(){
+        return Objects.hash(index);
+    }
+}

+ 3 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/QueryConfig.java

@@ -28,6 +28,9 @@ import org.elasticsearch.index.query.QueryBuilder;
 import java.io.IOException;
 import java.util.Objects;
 
+/**
+ * Object for encapsulating the desired Query for a DataFrameTransform
+ */
 public class QueryConfig implements ToXContentObject {
 
     private final QueryBuilder query;

+ 124 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/SourceConfig.java

@@ -0,0 +1,124 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+
+/**
+ * Class encapsulating all options for a {@link DataFrameTransformConfig} gathering data
+ */
+public class SourceConfig implements ToXContentObject {
+
+    public static final ParseField QUERY = new ParseField("query");
+    public static final ParseField INDEX = new ParseField("index");
+
+    public static final ConstructingObjectParser<SourceConfig, Void> PARSER = new ConstructingObjectParser<>("data_frame_config_source",
+    true,
+    args -> {
+        @SuppressWarnings("unchecked")
+        String[] index = ((List<String>)args[0]).toArray(new String[0]);
+        // default handling: if the user does not specify a query, we default to match_all
+        QueryConfig queryConfig = (QueryConfig) args[1];
+        return new SourceConfig(index, queryConfig);
+    });
+    static {
+        PARSER.declareStringArray(constructorArg(), INDEX);
+        PARSER.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p), QUERY);
+    }
+
+    private final String[] index;
+    private final QueryConfig queryConfig;
+
+    /**
+     * Create a new SourceConfig for the provided indices.
+     *
+     * {@link QueryConfig} defaults to a MatchAll query.
+     *
+     * @param index Any number of indices. At least one non-null, non-empty, index should be provided
+     */
+    public SourceConfig(String... index) {
+        this.index = index;
+        this.queryConfig = null;
+    }
+
+    /**
+     * Create a new SourceConfig for the provided indices, from which data is gathered with the provided {@link QueryConfig}
+     *
+     * @param index Any number of indices. At least one non-null, non-empty, index should be provided
+     * @param queryConfig A QueryConfig object that contains the desired query. Defaults to MatchAll query.
+     */
+    public SourceConfig(String[] index, QueryConfig queryConfig) {
+        this.index = index;
+        this.queryConfig = queryConfig;
+    }
+
+    public String[] getIndex() {
+        return index;
+    }
+
+    public QueryConfig getQueryConfig() {
+        return queryConfig;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        if (index != null) {
+            builder.array(INDEX.getPreferredName(), index);
+        }
+        if (queryConfig != null) {
+            builder.field(QUERY.getPreferredName(), queryConfig);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other == this) {
+            return true;
+        }
+        if (other == null || other.getClass() != getClass()) {
+            return false;
+        }
+
+        SourceConfig that = (SourceConfig) other;
+        return Arrays.equals(index, that.index) && Objects.equals(queryConfig, that.queryConfig);
+    }
+
+    @Override
+    public int hashCode(){
+        // Using Arrays.hashCode as Objects.hash does not deeply hash nested arrays. Since we are doing Array.equals, this is necessary
+        int hash = Arrays.hashCode(index);
+        return 31 * hash + (queryConfig == null ? 0 : queryConfig.hashCode());
+    }
+}

+ 8 - 3
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

@@ -34,7 +34,9 @@ import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.client.dataframe.transforms.DestConfig;
 import org.elasticsearch.client.dataframe.transforms.QueryConfig;
+import org.elasticsearch.client.dataframe.transforms.SourceConfig;
 import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
 import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig;
 import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig;
@@ -152,7 +154,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
 
         String id = "test-crud";
-        DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig);
+        DataFrameTransformConfig transform = new DataFrameTransformConfig(id,
+            new SourceConfig(new String[]{sourceIndex}, queryConfig), new DestConfig("pivot-dest"), pivotConfig);
 
         DataFrameClient client = highLevelClient().dataFrame();
         AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
@@ -182,7 +185,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
 
         String id = "test-stop-start";
-        DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig);
+        DataFrameTransformConfig transform = new DataFrameTransformConfig(id,
+            new SourceConfig(new String[]{sourceIndex}, queryConfig), new DestConfig("pivot-dest"), pivotConfig);
 
         DataFrameClient client = highLevelClient().dataFrame();
         AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
@@ -219,7 +223,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
         PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
 
-        DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview", sourceIndex, null, queryConfig, pivotConfig);
+        DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview",
+            new SourceConfig(new String[]{sourceIndex}, queryConfig), null, pivotConfig);
 
         DataFrameClient client = highLevelClient().dataFrame();
         PreviewDataFrameTransformResponse preview = execute(new PreviewDataFrameTransformRequest(transform),

+ 4 - 5
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java

@@ -22,7 +22,6 @@ package org.elasticsearch.client.dataframe;
 import org.elasticsearch.client.ValidationException;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests;
-import org.elasticsearch.client.dataframe.transforms.QueryConfigTests;
 import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -34,6 +33,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Optional;
 
+import static org.elasticsearch.client.dataframe.transforms.SourceConfigTests.randomSourceConfig;
 import static org.hamcrest.Matchers.containsString;
 
 public class PreviewDataFrameTransformRequestTests extends AbstractXContentTestCase<PreviewDataFrameTransformRequest> {
@@ -65,14 +65,13 @@ public class PreviewDataFrameTransformRequestTests extends AbstractXContentTestC
                 containsString("preview requires a non-null data frame config"));
 
         // null id and destination is valid
-        DataFrameTransformConfig config = new DataFrameTransformConfig(null, "source", null,
-                QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
+        DataFrameTransformConfig config = new DataFrameTransformConfig(null, randomSourceConfig(), null,
+            PivotConfigTests.randomPivotConfig());
 
         assertFalse(new PreviewDataFrameTransformRequest(config).validate().isPresent());
 
         // null source is not valid
-        config = new DataFrameTransformConfig(null, null, null,
-                QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
+        config = new DataFrameTransformConfig(null, null, null, PivotConfigTests.randomPivotConfig());
 
         Optional<ValidationException> error = new PreviewDataFrameTransformRequest(config).validate();
         assertTrue(error.isPresent());

+ 1 - 3
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java

@@ -22,7 +22,6 @@ package org.elasticsearch.client.dataframe;
 import org.elasticsearch.client.ValidationException;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests;
-import org.elasticsearch.client.dataframe.transforms.QueryConfigTests;
 import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -41,8 +40,7 @@ public class PutDataFrameTransformRequestTests extends AbstractXContentTestCase<
     public void testValidate() {
         assertFalse(createTestInstance().validate().isPresent());
 
-        DataFrameTransformConfig config = new DataFrameTransformConfig(null, null, null,
-                QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
+        DataFrameTransformConfig config = new DataFrameTransformConfig(null, null, null, PivotConfigTests.randomPivotConfig());
 
         Optional<ValidationException> error = new PutDataFrameTransformRequest(config).validate();
         assertTrue(error.isPresent());

+ 5 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java

@@ -30,11 +30,14 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.function.Predicate;
 
+import static org.elasticsearch.client.dataframe.transforms.DestConfigTests.randomDestConfig;
+import static org.elasticsearch.client.dataframe.transforms.SourceConfigTests.randomSourceConfig;
+
 public class DataFrameTransformConfigTests extends AbstractXContentTestCase<DataFrameTransformConfig> {
 
     public static DataFrameTransformConfig randomDataFrameTransformConfig() {
-        return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
-                randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
+        return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(),
+                randomDestConfig(), PivotConfigTests.randomPivotConfig());
     }
 
     @Override

+ 48 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DestConfigTests.java

@@ -0,0 +1,48 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+
+public class DestConfigTests extends AbstractXContentTestCase<DestConfig> {
+
+    public static DestConfig randomDestConfig() {
+        return new DestConfig(randomAlphaOfLength(10));
+    }
+
+    @Override
+    protected DestConfig doParseInstance(XContentParser parser) throws IOException {
+        return DestConfig.PARSER.apply(parser, null);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+    @Override
+    protected DestConfig createTestInstance() {
+        return randomDestConfig();
+    }
+
+}

+ 68 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/SourceConfigTests.java

@@ -0,0 +1,68 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+import java.util.function.Predicate;
+
+import static java.util.Collections.emptyList;
+
+
+public class SourceConfigTests extends AbstractXContentTestCase<SourceConfig> {
+
+    public static SourceConfig randomSourceConfig() {
+        return new SourceConfig(generateRandomStringArray(10, 10, false, false),
+            QueryConfigTests.randomQueryConfig());
+    }
+
+
+    @Override
+    protected SourceConfig doParseInstance(XContentParser parser) throws IOException {
+        return SourceConfig.PARSER.apply(parser, null);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+    @Override
+    protected Predicate<String> getRandomFieldsExcludeFilter() {
+        // allow unknown fields in the root of the object only as QueryConfig stores a Map<String, Object>
+        return field -> !field.isEmpty();
+    }
+
+    @Override
+    protected SourceConfig createTestInstance() {
+        return randomSourceConfig();
+    }
+
+    @Override
+    protected NamedXContentRegistry xContentRegistry() {
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
+        return new NamedXContentRegistry(searchModule.getNamedXContents());
+    }
+}

+ 14 - 10
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

@@ -34,7 +34,9 @@ import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.client.dataframe.transforms.DestConfig;
 import org.elasticsearch.client.dataframe.transforms.QueryConfig;
+import org.elasticsearch.client.dataframe.transforms.SourceConfig;
 import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
 import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig;
 import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig;
@@ -106,6 +108,10 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
         // tag::put-data-frame-transform-query-config
         QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
         // end::put-data-frame-transform-query-config
+        // tag::put-data-frame-transform-source-config
+        SourceConfig sourceConfig =
+            new SourceConfig(new String[]{"source-index"}, queryConfig);
+        // end::put-data-frame-transform-source-config
         // tag::put-data-frame-transform-group-config
         GroupConfig groupConfig =
                 new GroupConfig(Collections.singletonMap("reviewer", // <1>
@@ -123,10 +129,9 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
         // tag::put-data-frame-transform-config
         DataFrameTransformConfig transformConfig =
                 new DataFrameTransformConfig("reviewer-avg-rating", // <1>
-                "source-index", // <2>
-                "pivot-destination",  // <3>
-                queryConfig,   // <4>
-                pivotConfig);  // <5>
+                sourceConfig, // <2>
+                new DestConfig("pivot-destination"),  // <3>
+                pivotConfig);  // <4>
         // end::put-data-frame-transform-config
 
         {
@@ -145,7 +150,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
         }
         {
             DataFrameTransformConfig configWithDifferentId = new DataFrameTransformConfig("reviewer-avg-rating2",
-                    transformConfig.getSource(), transformConfig.getDestination(), transformConfig.getQueryConfig(),
+                    transformConfig.getSource(), transformConfig.getDestination(),
                     transformConfig.getPivotConfig());
             PutDataFrameTransformRequest request = new PutDataFrameTransformRequest(configWithDifferentId);
 
@@ -191,7 +196,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
         PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
 
         DataFrameTransformConfig transformConfig = new DataFrameTransformConfig("mega-transform",
-                "source-data", "pivot-dest", queryConfig, pivotConfig);
+                new SourceConfig(new String[]{"source-data"}, queryConfig), new DestConfig("pivot-dest"), pivotConfig);
 
         client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
         transformsToClean.add(transformConfig.getId());
@@ -308,9 +313,9 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
         PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
 
         DataFrameTransformConfig transformConfig1 = new DataFrameTransformConfig("mega-transform",
-                "source-data", "pivot-dest", queryConfig, pivotConfig);
+                new SourceConfig(new String[]{"source-data"}, queryConfig), new DestConfig("pivot-dest"), pivotConfig);
         DataFrameTransformConfig transformConfig2 = new DataFrameTransformConfig("mega-transform2",
-                "source-data", "pivot-dest2", queryConfig, pivotConfig);
+                new SourceConfig(new String[]{"source-data"}, queryConfig), new DestConfig("pivot-dest2"), pivotConfig);
 
         client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig1), RequestOptions.DEFAULT);
         client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig2), RequestOptions.DEFAULT);
@@ -375,9 +380,8 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
         // tag::preview-data-frame-transform-request
         DataFrameTransformConfig transformConfig =
                 new DataFrameTransformConfig(null,  // <1>
-                "source-data",
+                new SourceConfig(new String[]{"source-data"}, queryConfig),
                 null,                               // <2>
-                queryConfig,
                 pivotConfig);
 
         PreviewDataFrameTransformRequest request =

+ 14 - 5
docs/java-rest/high-level/dataframe/put_data_frame.asciidoc

@@ -32,16 +32,25 @@ configuration and contains the following arguments:
 include-tagged::{doc-tests-file}[{api}-config]
 --------------------------------------------------
 <1> The {dataframe-transform} ID
-<2> The source index or index pattern
+<2> The source indices and query from which to gather data
 <3> The destination index
-<4> Optionally a QueryConfig
-<5> The PivotConfig
+<4> The PivotConfig
 
 [id="{upid}-{api}-query-config"]
-==== QueryConfig
+
+==== SourceConfig
+
+The indices and the query from which to collect data.
+If query is not set, a `match_all` query is used by default.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-source-config]
+--------------------------------------------------
+
+===== QueryConfig
 
 The query with which to select data from the source.
-If not set a `match_all` query is used by default.
 
 ["source","java",subs="attributes,callouts,macros"]
 --------------------------------------------------

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java

@@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfi
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -59,7 +60,7 @@ public class PreviewDataFrameTransformAction extends Action<PreviewDataFrameTran
         public static Request fromXContent(final XContentParser parser) throws IOException {
             Map<String, Object> content = parser.map();
             // Destination and ID are not required for Preview, so we just supply our own
-            content.put(DataFrameField.DESTINATION.getPreferredName(), "unused-transform-preview-index");
+            content.put(DataFrameField.DESTINATION.getPreferredName(), Collections.singletonMap("index", "unused-transform-preview-index"));
             content.put(DataFrameField.ID.getPreferredName(), "transform-preview");
             try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
                 XContentParser newParser = XContentType.JSON

+ 19 - 49
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java

@@ -17,7 +17,6 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.index.query.MatchAllQueryBuilder;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
@@ -38,7 +37,6 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
 
     public static final String NAME = "data_frame_transform_config";
     public static final ParseField HEADERS = new ParseField("headers");
-    public static final ParseField QUERY = new ParseField("query");
 
     // types of transforms
     public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
@@ -47,14 +45,13 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
     private static final ConstructingObjectParser<DataFrameTransformConfig, String> LENIENT_PARSER = createParser(true);
 
     private final String id;
-    private final String source;
-    private final String dest;
+    private final SourceConfig source;
+    private final DestConfig dest;
 
     // headers store the user context from the creating user, which allows us to run the transform as this user
     // the header only contains name, groups and other context but no authorization keys
     private Map<String, String> headers;
 
-    private final QueryConfig queryConfig;
     private final PivotConfig pivotConfig;
 
     private static ConstructingObjectParser<DataFrameTransformConfig, String> createParser(boolean lenient) {
@@ -70,8 +67,8 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
                                 DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_INCONSISTENT_ID, id, optionalId));
                     }
 
-                    String source = (String) args[1];
-                    String dest = (String) args[2];
+                    SourceConfig source = (SourceConfig) args[1];
+                    DestConfig dest = (DestConfig) args[2];
 
                     // ignored, only for internal storage: String docType = (String) args[3];
 
@@ -83,26 +80,16 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
                     @SuppressWarnings("unchecked")
                     Map<String, String> headers = (Map<String, String>) args[4];
 
-                    // default handling: if the user does not specify a query, we default to match_all
-                    QueryConfig queryConfig = null;
-                    if (args[5] == null) {
-                        queryConfig = new QueryConfig(Collections.singletonMap(MatchAllQueryBuilder.NAME, Collections.emptyMap()),
-                                new MatchAllQueryBuilder());
-                    } else {
-                        queryConfig = (QueryConfig) args[5];
-                    }
-
-                    PivotConfig pivotConfig = (PivotConfig) args[6];
-                    return new DataFrameTransformConfig(id, source, dest, headers, queryConfig, pivotConfig);
+                    PivotConfig pivotConfig = (PivotConfig) args[5];
+                    return new DataFrameTransformConfig(id, source, dest, headers, pivotConfig);
                 });
 
         parser.declareString(optionalConstructorArg(), DataFrameField.ID);
-        parser.declareString(constructorArg(), DataFrameField.SOURCE);
-        parser.declareString(constructorArg(), DataFrameField.DESTINATION);
+        parser.declareObject(constructorArg(), (p, c) -> SourceConfig.fromXContent(p, lenient), DataFrameField.SOURCE);
+        parser.declareObject(constructorArg(), (p, c) -> DestConfig.fromXContent(p, lenient), DataFrameField.DESTINATION);
 
         parser.declareString(optionalConstructorArg(), DataFrameField.INDEX_DOC_TYPE);
         parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS);
-        parser.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p, lenient), QUERY);
         parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
 
         return parser;
@@ -113,15 +100,13 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
     }
 
     public DataFrameTransformConfig(final String id,
-                                    final String source,
-                                    final String dest,
+                                    final SourceConfig source,
+                                    final DestConfig dest,
                                     final Map<String, String> headers,
-                                    final QueryConfig queryConfig,
                                     final PivotConfig pivotConfig) {
         this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
         this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName());
         this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName());
-        this.queryConfig = ExceptionsHelper.requireNonNull(queryConfig, QUERY.getPreferredName());
         this.setHeaders(headers == null ? Collections.emptyMap() : headers);
         this.pivotConfig = pivotConfig;
 
@@ -133,10 +118,9 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
 
     public DataFrameTransformConfig(final StreamInput in) throws IOException {
         id = in.readString();
-        source = in.readString();
-        dest = in.readString();
+        source = new SourceConfig(in);
+        dest = new DestConfig(in);
         setHeaders(in.readMap(StreamInput::readString, StreamInput::readString));
-        queryConfig = in.readOptionalWriteable(QueryConfig::new);
         pivotConfig = in.readOptionalWriteable(PivotConfig::new);
     }
 
@@ -144,11 +128,11 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         return id;
     }
 
-    public String getSource() {
+    public SourceConfig getSource() {
         return source;
     }
 
-    public String getDestination() {
+    public DestConfig getDestination() {
         return dest;
     }
 
@@ -164,30 +148,20 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         return pivotConfig;
     }
 
-    public QueryConfig getQueryConfig() {
-        return queryConfig;
-    }
-
     public boolean isValid() {
-        // collect validation results from all child objects
-        if (queryConfig != null && queryConfig.isValid() == false) {
-            return false;
-        }
-
         if (pivotConfig != null && pivotConfig.isValid() == false) {
             return false;
         }
 
-        return true;
+        return source.isValid() && dest.isValid();
     }
 
     @Override
     public void writeTo(final StreamOutput out) throws IOException {
         out.writeString(id);
-        out.writeString(source);
-        out.writeString(dest);
+        source.writeTo(out);
+        dest.writeTo(out);
         out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
-        out.writeOptionalWriteable(queryConfig);
         out.writeOptionalWriteable(pivotConfig);
     }
 
@@ -197,9 +171,6 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         builder.field(DataFrameField.ID.getPreferredName(), id);
         builder.field(DataFrameField.SOURCE.getPreferredName(), source);
         builder.field(DataFrameField.DESTINATION.getPreferredName(), dest);
-        if (queryConfig != null) {
-            builder.field(QUERY.getPreferredName(), queryConfig);
-        }
         if (pivotConfig != null) {
             builder.field(PIVOT_TRANSFORM.getPreferredName(), pivotConfig);
         }
@@ -230,13 +201,12 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
                 && Objects.equals(this.source, that.source)
                 && Objects.equals(this.dest, that.dest)
                 && Objects.equals(this.headers, that.headers)
-                && Objects.equals(this.queryConfig, that.queryConfig)
                 && Objects.equals(this.pivotConfig, that.pivotConfig);
     }
 
     @Override
-    public int hashCode() {
-        return Objects.hash(id, source, dest, headers, queryConfig, pivotConfig);
+    public int hashCode(){
+        return Objects.hash(id, source, dest, headers, pivotConfig);
     }
 
     @Override

+ 91 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java

@@ -0,0 +1,91 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+
+public class DestConfig implements Writeable, ToXContentObject {
+
+    public static final ParseField INDEX = new ParseField("index");
+
+    public static final ConstructingObjectParser<DestConfig, Void> STRICT_PARSER = createParser(false);
+    public static final ConstructingObjectParser<DestConfig, Void> LENIENT_PARSER = createParser(true);
+
+    private static ConstructingObjectParser<DestConfig, Void> createParser(boolean lenient) {
+        ConstructingObjectParser<DestConfig, Void> parser = new ConstructingObjectParser<>("data_frame_config_dest",
+            lenient,
+            args -> new DestConfig((String)args[0]));
+        parser.declareString(constructorArg(), INDEX);
+        return parser;
+    }
+
+    private final String index;
+
+    public DestConfig(String index) {
+        this.index = ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName());
+    }
+
+    public DestConfig(final StreamInput in) throws IOException {
+        index = in.readString();
+    }
+
+    public String getIndex() {
+        return index;
+    }
+
+    public boolean isValid() {
+        return index.isEmpty() == false;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeString(index);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(INDEX.getPreferredName(), index);
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other == this) {
+            return true;
+        }
+        if (other == null || other.getClass() != getClass()) {
+            return false;
+        }
+
+        DestConfig that = (DestConfig) other;
+        return Objects.equals(index, that.index);
+    }
+
+    @Override
+    public int hashCode(){
+        return Objects.hash(index);
+    }
+
+    public static DestConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {
+        return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
+    }
+}

+ 8 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/QueryConfig.java

@@ -21,10 +21,12 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.AbstractQueryBuilder;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 
@@ -37,6 +39,11 @@ public class QueryConfig extends AbstractDiffable<QueryConfig> implements Writea
     private final Map<String, Object> source;
     private final QueryBuilder query;
 
+    static QueryConfig matchAll() {
+        return new QueryConfig(Collections.singletonMap(MatchAllQueryBuilder.NAME, Collections.emptyMap()),
+            new MatchAllQueryBuilder());
+    }
+
     public QueryConfig(final Map<String, Object> source, final QueryBuilder query) {
         this.source = Objects.requireNonNull(source);
         this.query = query;
@@ -110,4 +117,4 @@ public class QueryConfig extends AbstractDiffable<QueryConfig> implements Writea
     public boolean isValid() {
         return this.query != null;
     }
-}
+}

+ 139 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/SourceConfig.java

@@ -0,0 +1,139 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+
+public class SourceConfig implements Writeable, ToXContentObject {
+
+    public static final ParseField QUERY = new ParseField("query");
+    public static final ParseField INDEX = new ParseField("index");
+
+    public static final ConstructingObjectParser<SourceConfig, Void> STRICT_PARSER = createParser(false);
+    public static final ConstructingObjectParser<SourceConfig, Void> LENIENT_PARSER = createParser(true);
+
+    private static ConstructingObjectParser<SourceConfig, Void> createParser(boolean lenient) {
+        ConstructingObjectParser<SourceConfig, Void> parser = new ConstructingObjectParser<>("data_frame_config_source",
+            lenient,
+            args -> {
+                @SuppressWarnings("unchecked")
+                String[] index = ((List<String>)args[0]).toArray(new String[0]);
+                // default handling: if the user does not specify a query, we default to match_all
+                QueryConfig queryConfig = args[1] == null ? QueryConfig.matchAll() : (QueryConfig) args[1];
+                return new SourceConfig(index, queryConfig);
+            });
+        parser.declareStringArray(constructorArg(), INDEX);
+        parser.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p, lenient), QUERY);
+        return parser;
+    }
+
+    private final String[] index;
+    private final QueryConfig queryConfig;
+
+    /**
+     * Create a new SourceConfig for the provided indices.
+     *
+     * {@link QueryConfig} defaults to a MatchAll query.
+     *
+     * @param index Any number of indices. At least one non-null, non-empty, index should be provided
+     */
+    public SourceConfig(String... index) {
+        this(index, QueryConfig.matchAll());
+    }
+
+    /**
+     * Create a new SourceConfig for the provided indices, from which data is gathered with the provided {@link QueryConfig}
+     *
+     * @param index Any number of indices. At least one non-null, non-empty, index should be provided
+     * @param queryConfig A QueryConfig object that contains the desired query, needs to be non-null
+     */
+    public SourceConfig(String[] index, QueryConfig queryConfig) {
+        ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName());
+        if (index.length == 0) {
+            throw new IllegalArgumentException("must specify at least one index");
+        }
+        if (Arrays.stream(index).anyMatch(Strings::isNullOrEmpty)) {
+            throw new IllegalArgumentException("all indices need to be non-null and non-empty");
+        }
+        this.index = index;
+        this.queryConfig = ExceptionsHelper.requireNonNull(queryConfig, QUERY.getPreferredName());
+    }
+
+    public SourceConfig(final StreamInput in) throws IOException {
+        index = in.readStringArray();
+        queryConfig = new QueryConfig(in);
+    }
+
+    public String[] getIndex() {
+        return index;
+    }
+
+    public QueryConfig getQueryConfig() {
+        return queryConfig;
+    }
+
+    public boolean isValid() {
+        return queryConfig.isValid();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeStringArray(index);
+        queryConfig.writeTo(out);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.array(INDEX.getPreferredName(), index);
+        builder.field(QUERY.getPreferredName(), queryConfig);
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other == this) {
+            return true;
+        }
+        if (other == null || other.getClass() != getClass()) {
+            return false;
+        }
+
+        SourceConfig that = (SourceConfig) other;
+        return Arrays.equals(index, that.index) && Objects.equals(queryConfig, that.queryConfig);
+    }
+
+    @Override
+    public int hashCode(){
+        // Using Arrays.hashCode as Objects.hash does not deeply hash nested arrays. Since we are doing Array.equals, this is necessary
+        int hash = Arrays.hashCode(index);
+        return 31 * hash + (queryConfig == null ? 0 : queryConfig.hashCode());
+    }
+
+    public static SourceConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {
+        return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
+    }
+}

+ 3 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsActionResponseTests.java

@@ -44,6 +44,7 @@ public class GetDataFrameTransformsActionResponseTests extends AbstractWireSeria
         assertWarnings(LoggerMessageFormat.format(Response.INVALID_TRANSFORMS_DEPRECATION_WARNING, 2));
     }
 
+    @SuppressWarnings("unchecked")
     public void testNoHeaderInResponse() throws IOException {
         List<DataFrameTransformConfig> transforms = new ArrayList<>();
 
@@ -62,7 +63,8 @@ public class GetDataFrameTransformsActionResponseTests extends AbstractWireSeria
 
         assertEquals(transforms.size(), transformsResponse.size());
         for (int i = 0; i < transforms.size(); ++i) {
-            assertEquals(transforms.get(i).getSource(), XContentMapValues.extractValue("source", transformsResponse.get(i)));
+            assertArrayEquals(transforms.get(i).getSource().getIndex(),
+                ((ArrayList<String>)XContentMapValues.extractValue("source.index", transformsResponse.get(i))).toArray(new String[0]));
             assertEquals(null, XContentMapValues.extractValue("headers", transformsResponse.get(i)));
         }
     }

+ 8 - 6
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java

@@ -17,13 +17,14 @@ import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.test.AbstractStreamableXContentTestCase;
 import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction.Request;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
-import org.elasticsearch.xpack.core.dataframe.transforms.QueryConfigTests;
+import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests;
 import org.junit.Before;
 
 import java.io.IOException;
 
 import static java.util.Collections.emptyList;
+import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomSourceConfig;
 
 public class PreviewDataFrameTransformActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
 
@@ -65,8 +66,8 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractStreama
 
     @Override
     protected Request createTestInstance() {
-        DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomAlphaOfLength(10),
-                "unused-transform-preview-index", null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
+        DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomSourceConfig(),
+                new DestConfig("unused-transform-preview-index"), null, PivotConfigTests.randomPivotConfig());
         return new Request(config);
     }
 
@@ -74,8 +75,9 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractStreama
         // id & dest fields will be set by the parser
         BytesArray json = new BytesArray(
                 "{ " +
-                    "\"source\":\"foo\", " +
-                    "\"query\": {\"match_all\": {}}," +
+                    "\"source\":{" +
+                    "   \"index\":\"foo\", " +
+                    "   \"query\": {\"match_all\": {}}}," +
                     "\"pivot\": {" +
                         "\"group_by\": {\"destination-field2\": {\"terms\": {\"field\": \"term-field\"}}}," +
                         "\"aggs\": {\"avg_response\": {\"avg\": {\"field\": \"responsetime\"}}}" +
@@ -87,7 +89,7 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractStreama
 
             Request request = Request.fromXContent(parser);
             assertEquals("transform-preview", request.getConfig().getId());
-            assertEquals("unused-transform-preview-index", request.getConfig().getDestination());
+            assertEquals("unused-transform-preview-index", request.getConfig().getDestination().getIndex());
         }
     }
 }

+ 21 - 23
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java

@@ -22,6 +22,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.elasticsearch.test.TestMatchers.matchesPattern;
+import static org.elasticsearch.xpack.core.dataframe.transforms.DestConfigTests.randomDestConfig;
+import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomInvalidSourceConfig;
+import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomSourceConfig;
 
 public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformConfig> {
 
@@ -29,35 +32,30 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
     private boolean runWithHeaders;
 
     public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders() {
-        return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
-                randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
+        return randomDataFrameTransformConfigWithoutHeaders(randomAlphaOfLengthBetween(1, 10));
     }
 
     public static DataFrameTransformConfig randomDataFrameTransformConfig() {
-        return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
-                randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(),
-                PivotConfigTests.randomPivotConfig());
+        return randomDataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10));
     }
 
     public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders(String id) {
-        return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), null,
-                QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
+        return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), null,
+                PivotConfigTests.randomPivotConfig());
     }
 
     public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) {
-        return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(),
-                QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
+        return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), randomHeaders(),
+                PivotConfigTests.randomPivotConfig());
     }
 
     public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() {
         if (randomBoolean()) {
-            return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
-                    randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomInvalidQueryConfig(),
-                    PivotConfigTests.randomPivotConfig());
+            return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomInvalidSourceConfig(),
+                    randomDestConfig(), randomHeaders(), PivotConfigTests.randomPivotConfig());
         } // else
-        return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
-                randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(),
-                PivotConfigTests.randomInvalidPivotConfig());
+        return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(),
+                randomDestConfig(), randomHeaders(), PivotConfigTests.randomInvalidPivotConfig());
     }
 
     @Before
@@ -99,8 +97,8 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
 
     public void testDefaultMatchAll() throws IOException {
         String pivotTransform = "{"
-                + " \"source\" : \"src\","
-                + " \"dest\" : \"dest\","
+                + " \"source\" : {\"index\":\"src\"},"
+                + " \"dest\" : {\"index\": \"dest\"},"
                 + " \"pivot\" : {"
                 + " \"group_by\": {"
                 + "   \"id\": {"
@@ -114,8 +112,8 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
                 + "} } } } }";
 
         DataFrameTransformConfig dataFrameTransformConfig = createDataFrameTransformConfigFromString(pivotTransform, "test_match_all");
-        assertNotNull(dataFrameTransformConfig.getQueryConfig());
-        assertTrue(dataFrameTransformConfig.getQueryConfig().isValid());
+        assertNotNull(dataFrameTransformConfig.getSource().getQueryConfig());
+        assertTrue(dataFrameTransformConfig.getSource().getQueryConfig().isValid());
 
         try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
             XContentBuilder content = dataFrameTransformConfig.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
@@ -128,8 +126,8 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
     public void testPreventHeaderInjection() throws IOException {
         String pivotTransform = "{"
                 + " \"headers\" : {\"key\" : \"value\" },"
-                + " \"source\" : \"src\","
-                + " \"dest\" : \"dest\","
+                + " \"source\" : {\"index\":\"src\"},"
+                + " \"dest\" : {\"index\": \"dest\"},"
                 + " \"pivot\" : {"
                 + " \"group_by\": {"
                 + "   \"id\": {"
@@ -167,8 +165,8 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
     public void testSetIdInBody() throws IOException {
         String pivotTransform = "{"
                 + " \"id\" : \"body_id\","
-                + " \"source\" : \"src\","
-                + " \"dest\" : \"dest\","
+                + " \"source\" : {\"index\":\"src\"},"
+                + " \"dest\" : {\"index\": \"dest\"},"
                 + " \"pivot\" : {"
                 + " \"group_by\": {"
                 + "   \"id\": {"

+ 48 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfigTests.java

@@ -0,0 +1,48 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.junit.Before;
+
+import java.io.IOException;
+
+public class DestConfigTests extends AbstractSerializingDataFrameTestCase<DestConfig> {
+
+    private boolean lenient;
+
+    public static DestConfig randomDestConfig() {
+        return new DestConfig(randomAlphaOfLength(10));
+    }
+
+    @Before
+    public void setRandomFeatures() {
+        lenient = randomBoolean();
+    }
+
+    @Override
+    protected DestConfig doParseInstance(XContentParser parser) throws IOException {
+        return DestConfig.fromXContent(parser, lenient);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return lenient;
+    }
+
+    @Override
+    protected DestConfig createTestInstance() {
+        return randomDestConfig();
+    }
+
+    @Override
+    protected Reader<DestConfig> instanceReader() {
+        return DestConfig::new;
+    }
+
+}

+ 62 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/SourceConfigTests.java

@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.transforms;
+
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.function.Predicate;
+
+public class SourceConfigTests extends AbstractSerializingDataFrameTestCase<SourceConfig> {
+
+    private boolean lenient;
+
+    public static SourceConfig randomSourceConfig() {
+        return new SourceConfig(generateRandomStringArray(10, 10, false, false),
+            QueryConfigTests.randomQueryConfig());
+    }
+
+    public static SourceConfig randomInvalidSourceConfig() {
+        // create something broken but with a source
+        return new SourceConfig(generateRandomStringArray(10, 10, false, false),
+            QueryConfigTests.randomInvalidQueryConfig());
+    }
+
+    @Before
+    public void setRandomFeatures() {
+        lenient = randomBoolean();
+    }
+
+    @Override
+    protected SourceConfig doParseInstance(XContentParser parser) throws IOException {
+        return SourceConfig.fromXContent(parser, lenient);
+    }
+
+    @Override
+    protected SourceConfig createTestInstance() {
+        return lenient ? randomBoolean() ? randomSourceConfig() : randomInvalidSourceConfig() : randomSourceConfig();
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return lenient;
+    }
+
+    @Override
+    protected Predicate<String> getRandomFieldsExcludeFilter() {
+        // allow unknown fields in the root of the object only as QueryConfig stores a Map<String, Object>
+        return field -> !field.isEmpty();
+    }
+
+    @Override
+    protected Reader<SourceConfig> instanceReader() {
+        return SourceConfig::new;
+    }
+
+}

+ 9 - 9
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java

@@ -96,8 +96,8 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
             BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
 
         String config = "{"
-            + " \"source\": \"" + REVIEWS_INDEX_NAME + "\","
-            + " \"dest\": \"" + dataFrameIndex + "\",";
+            + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+            + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
 
         config += " \"pivot\": {"
             + "   \"group_by\": {"
@@ -134,8 +134,8 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
             BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
 
         String config = "{"
-                + " \"source\": \"reviews\","
-                + " \"dest\": \"" + dataFrameIndex + "\",";
+            + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+            + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
 
         config += " \"pivot\": {"
                 + "   \"group_by\": {"
@@ -209,8 +209,8 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
             BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
 
         String config = "{"
-            + " \"source\": \"" + REVIEWS_INDEX_NAME + "\","
-            + " \"dest\": \"" + dataFrameIndex + "\",";
+            + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+            + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
 
         config += " \"pivot\": {"
             + "   \"group_by\": {"
@@ -245,7 +245,7 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
             BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
 
         String config = "{"
-            + " \"source\": \"" + REVIEWS_INDEX_NAME + "\",";
+            + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"}  ,";
 
         config += " \"pivot\": {"
             + "   \"group_by\": {"
@@ -277,8 +277,8 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
             BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
 
         String config = "{"
-            + " \"source\": \"" + REVIEWS_INDEX_NAME + "\","
-            + " \"dest\": \"" + dataFrameIndex + "\",";
+            + " \"source\": {\"index\": \"" + REVIEWS_INDEX_NAME + "\"},"
+            + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
 
         config +="    \"pivot\": { \n" +
             "        \"group_by\": {\n" +

+ 4 - 5
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

@@ -139,13 +139,12 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, authHeader);
 
         String config = "{"
-                + " \"source\": \"" + REVIEWS_INDEX_NAME + "\","
-                + " \"dest\": \"" + dataFrameIndex + "\",";
+            + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";
 
         if (query != null) {
-            config += "\"query\": {"
-                    + query
-                    + "},";
+            config += " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\", \"query\":{" + query + "}},";
+        } else {
+            config += " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},";
         }
 
         config += " \"pivot\": {"

+ 2 - 2
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java

@@ -60,8 +60,8 @@ public class TransportPreviewDataFrameTransformAction extends
 
         final DataFrameTransformConfig config = request.getConfig();
 
-        Pivot pivot = new Pivot(config.getSource(),
-            config.getQueryConfig().getQuery(),
+        Pivot pivot = new Pivot(config.getSource().getIndex(),
+            config.getSource().getQueryConfig().getQuery(),
             config.getPivotConfig());
 
         getPreview(pivot, ActionListener.wrap(

+ 17 - 17
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java

@@ -30,7 +30,6 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
-import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -72,7 +71,6 @@ public class TransportPutDataFrameTransformAction
     public TransportPutDataFrameTransformAction(Settings settings, TransportService transportService, ThreadPool threadPool,
                                                 ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
                                                 ClusterService clusterService, XPackLicenseState licenseState,
-                                                PersistentTasksService persistentTasksService,
                                                 DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client) {
         super(PutDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
                 PutDataFrameTransformAction.Request::new);
@@ -119,36 +117,36 @@ public class TransportPutDataFrameTransformAction
             return;
         }
 
-        String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState,
+        final String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState,
             IndicesOptions.lenientExpandOpen(),
-            config.getDestination());
+            config.getDestination().getIndex());
 
         if (dest.length > 0) {
             listener.onFailure(new ElasticsearchStatusException(
-                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS, config.getDestination()),
+                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS,
+                    config.getDestination().getIndex()),
                 RestStatus.BAD_REQUEST));
             return;
         }
 
-        String[] src = indexNameExpressionResolver.concreteIndexNames(clusterState,
-            IndicesOptions.lenientExpandOpen(),
-            config.getSource());
-        if (src.length == 0) {
-            listener.onFailure(new ElasticsearchStatusException(
-                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, config.getSource()),
-                RestStatus.BAD_REQUEST));
-            return;
+        for(String src : config.getSource().getIndex()) {
+            if (indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src).length == 0) {
+                listener.onFailure(new ElasticsearchStatusException(
+                    DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
+                    RestStatus.BAD_REQUEST));
+                return;
+            }
         }
 
         // Early check to verify that the user can create the destination index and can read from the source
         if (licenseState.isAuthAllowed()) {
             final String username = securityContext.getUser().principal();
             RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
-                .indices(config.getSource())
+                .indices(config.getSource().getIndex())
                 .privileges("read")
                 .build();
             RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
-                .indices(config.getDestination())
+                .indices(config.getDestination().getIndex())
                 .privileges("read", "index", "create_index")
                 .build();
 
@@ -196,7 +194,9 @@ public class TransportPutDataFrameTransformAction
 
     private void putDataFrame(DataFrameTransformConfig config, ActionListener<Response> listener) {
 
-        final Pivot pivot = new Pivot(config.getSource(), config.getQueryConfig().getQuery(), config.getPivotConfig());
+        final Pivot pivot = new Pivot(config.getSource().getIndex(),
+            config.getSource().getQueryConfig().getQuery(),
+            config.getPivotConfig());
 
 
         // <5> Return the listener, or clean up destination index on failure.
@@ -206,7 +206,7 @@ public class TransportPutDataFrameTransformAction
                 ClientHelper.executeAsyncWithOrigin(client,
                     ClientHelper.DATA_FRAME_ORIGIN,
                     DeleteIndexAction.INSTANCE,
-                    new DeleteIndexRequest(config.getDestination()), ActionListener.wrap(
+                    new DeleteIndexRequest(config.getDestination().getIndex()), ActionListener.wrap(
                         deleteIndexResponse -> listener.onFailure(putTransformConfigurationException),
                         deleteIndexException -> {
                             String msg = "Failed to delete destination index after creating transform [" + config.getId() + "] failed";

+ 2 - 2
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java

@@ -68,7 +68,7 @@ public class DataFrameTransformsCheckpointService {
         long timeUpperBound = 0;
 
         // 1st get index to see the indexes the user has access to
-        GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource());
+        GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource().getIndex());
 
         ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, GetIndexAction.INSTANCE,
                 getIndexRequest, ActionListener.wrap(getIndexResponse -> {
@@ -76,7 +76,7 @@ public class DataFrameTransformsCheckpointService {
 
                     // 2nd get stats request
                     ClientHelper.executeAsyncWithOrigin(client, ClientHelper.DATA_FRAME_ORIGIN, IndicesStatsAction.INSTANCE,
-                            new IndicesStatsRequest().indices(transformConfig.getSource()), ActionListener.wrap(response -> {
+                            new IndicesStatsRequest().indices(transformConfig.getSource().getIndex()), ActionListener.wrap(response -> {
                                 if (response.getFailedShards() != 0) {
                                     throw new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards");
                                 }

+ 16 - 2
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java

@@ -15,6 +15,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
+import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -39,6 +41,7 @@ public final class DataFrameInternalIndex {
     public static final String DYNAMIC = "dynamic";
     public static final String PROPERTIES = "properties";
     public static final String TYPE = "type";
+    public static final String ENABLED = "enabled";
     public static final String DATE = "date";
     public static final String TEXT = "text";
     public static final String FIELDS = "fields";
@@ -138,10 +141,21 @@ public final class DataFrameInternalIndex {
                 .field(TYPE, KEYWORD)
             .endObject()
             .startObject(DataFrameField.SOURCE.getPreferredName())
-                .field(TYPE, KEYWORD)
+                .startObject(PROPERTIES)
+                    .startObject(SourceConfig.INDEX.getPreferredName())
+                        .field(TYPE, KEYWORD)
+                    .endObject()
+                    .startObject(SourceConfig.QUERY.getPreferredName())
+                        .field(ENABLED, "false")
+                    .endObject()
+                .endObject()
             .endObject()
             .startObject(DataFrameField.DESTINATION.getPreferredName())
-                .field(TYPE, KEYWORD)
+                .startObject(PROPERTIES)
+                    .startObject(DestConfig.INDEX.getPreferredName())
+                        .field(TYPE, KEYWORD)
+                    .endObject()
+                .endObject()
             .endObject();
     }
 

+ 2 - 2
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataframeIndex.java

@@ -38,7 +38,7 @@ public final class DataframeIndex {
 
     public static void createDestinationIndex(Client client, DataFrameTransformConfig transformConfig, Map<String, String> mappings,
             final ActionListener<Boolean> listener) {
-        CreateIndexRequest request = new CreateIndexRequest(transformConfig.getDestination());
+        CreateIndexRequest request = new CreateIndexRequest(transformConfig.getDestination().getIndex());
 
         // TODO: revisit number of shards, number of replicas
         request.settings(Settings.builder() // <1>
@@ -50,7 +50,7 @@ public final class DataframeIndex {
             listener.onResponse(true);
         }, e -> {
             String message = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_CREATE_DESTINATION_INDEX,
-                    transformConfig.getDestination(), transformConfig.getId());
+                    transformConfig.getDestination().getIndex(), transformConfig.getId());
             logger.error(message);
             listener.onFailure(new RuntimeException(message, e));
         }));

+ 3 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java

@@ -48,9 +48,9 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
 
     @Override
     protected void onStartJob(long now) {
-        QueryBuilder queryBuilder = getConfig().getQueryConfig().getQuery();
+        QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();
 
-        pivot = new Pivot(getConfig().getSource(), queryBuilder, getConfig().getPivotConfig());
+        pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
     }
 
     @Override
@@ -70,7 +70,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
      */
     private Stream<IndexRequest> processBucketsToIndexRequests(CompositeAggregation agg) {
         final DataFrameTransformConfig transformConfig = getConfig();
-        String indexName = transformConfig.getDestination();
+        String indexName = transformConfig.getDestination().getIndex();
 
         return pivot.extractResults(agg, getFieldMappings(), getStats()).map(document -> {
             XContentBuilder builder;

+ 3 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -299,13 +299,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
             if (fieldMappings == null) {
                 CountDownLatch latch = new CountDownLatch(1);
-                SchemaUtil.getDestinationFieldMappings(client, transformConfig.getDestination(), new LatchedActionListener<>(
+                SchemaUtil.getDestinationFieldMappings(client, transformConfig.getDestination().getIndex(), new LatchedActionListener<>(
                     ActionListener.wrap(
                         destinationMappings -> fieldMappings = destinationMappings,
                         e -> {
                             throw new RuntimeException(
                                 DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
-                                    transformConfig.getDestination()),
+                                    transformConfig.getDestination().getIndex()),
                                 e);
                         }), latch));
                 try {
@@ -313,7 +313,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 } catch (InterruptedException e) {
                    throw new RuntimeException(
                                 DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
-                                    transformConfig.getDestination()),
+                                    transformConfig.getDestination().getIndex()),
                                 e);
                 }
             }

+ 3 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java

@@ -38,13 +38,13 @@ public class Pivot {
     private static final String COMPOSITE_AGGREGATION_NAME = "_data_frame";
 
     private final PivotConfig config;
-    private final String source;
+    private final String[] source;
 
     // objects for re-using
     private final CompositeAggregationBuilder cachedCompositeAggregation;
     private final SearchRequest cachedSearchRequest;
 
-    public Pivot(String source, QueryBuilder query, PivotConfig config) {
+    public Pivot(String[] source, QueryBuilder query, PivotConfig config) {
         this.source = source;
         this.config = config;
         this.cachedCompositeAggregation = createCompositeAggregation(config);
@@ -108,7 +108,7 @@ public class Pivot {
         }));
     }
 
-    private static SearchRequest createSearchRequest(String index, QueryBuilder query, CompositeAggregationBuilder compositeAggregation) {
+    private static SearchRequest createSearchRequest(String[] index, QueryBuilder query, CompositeAggregationBuilder compositeAggregation) {
         SearchRequest searchRequest = new SearchRequest(index);
         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
         sourceBuilder.aggregation(compositeAggregation);

+ 3 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java

@@ -57,7 +57,7 @@ public final class SchemaUtil {
      */
     public static void deduceMappings(final Client client,
                                       final PivotConfig config,
-                                      final String source,
+                                      final String[] source,
                                       final ActionListener<Map<String, String>> listener) {
         // collects the fieldnames used as source for aggregations
         Map<String, String> aggregationSourceFieldNames = new HashMap<>();
@@ -156,7 +156,7 @@ public final class SchemaUtil {
     /*
      * Very "magic" helper method to extract the source mappings
      */
-    private static void getSourceFieldMappings(Client client, String index, String[] fields,
+    private static void getSourceFieldMappings(Client client, String[] index, String[] fields,
             ActionListener<Map<String, String>> listener) {
         GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest();
         fieldMappingRequest.indices(index);
@@ -182,7 +182,7 @@ public final class SchemaUtil {
                             final Map<?, ?> map = (Map<?, ?>) typeMap;
                             if (map.containsKey("type")) {
                                 String type = map.get("type").toString();
-                                logger.debug("Extracted type for [" + fieldName + "] : [" + type + "]");
+                                logger.debug("Extracted type for [" + fieldName + "] : [" + type + "] from index [" + indexName +"]");
                                 // TODO: overwrites types, requires resolve if
                                 // types are mixed
                                 extractedTypes.put(fieldName, type);

+ 5 - 11
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java

@@ -85,13 +85,13 @@ public class PivotTests extends ESTestCase {
     }
 
     public void testValidateExistingIndex() throws Exception {
-        Pivot pivot = new Pivot("existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig());
+        Pivot pivot = new Pivot(new String[]{"existing_source_index"}, new MatchAllQueryBuilder(), getValidPivotConfig());
 
         assertValidTransform(client, pivot);
     }
 
     public void testValidateNonExistingIndex() throws Exception {
-        Pivot pivot = new Pivot("non_existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig());
+        Pivot pivot = new Pivot(new String[]{"non_existing_source_index"}, new MatchAllQueryBuilder(), getValidPivotConfig());
 
         assertInvalidTransform(client, pivot);
     }
@@ -99,7 +99,7 @@ public class PivotTests extends ESTestCase {
     public void testSearchFailure() throws Exception {
         // test a failure during the search operation, transform creation fails if
         // search has failures although they might just be temporary
-        Pivot pivot = new Pivot("existing_source_index_with_failing_shards",
+        Pivot pivot = new Pivot(new String[]{"existing_source_index_with_failing_shards"},
             new MatchAllQueryBuilder(),
             getValidPivotConfig());
 
@@ -110,10 +110,7 @@ public class PivotTests extends ESTestCase {
         for (String agg : supportedAggregations) {
             AggregationConfig aggregationConfig = getAggregationConfig(agg);
 
-            Pivot pivot = new Pivot("existing_source",
-                new MatchAllQueryBuilder(),
-                getValidPivotConfig(aggregationConfig));
-
+            Pivot pivot = new Pivot(new String[]{"existing_source"}, new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig));
             assertValidTransform(client, pivot);
         }
     }
@@ -122,10 +119,7 @@ public class PivotTests extends ESTestCase {
         for (String agg : unsupportedAggregations) {
             AggregationConfig aggregationConfig = getAggregationConfig(agg);
 
-            Pivot pivot = new Pivot("existing_source",
-                new MatchAllQueryBuilder(),
-                getValidPivotConfig(aggregationConfig));
-
+            Pivot pivot = new Pivot(new String[]{"existing_source"}, new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig));
             assertInvalidTransform(client, pivot);
         }
     }

+ 0 - 9
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIteratorTests.java

@@ -315,15 +315,6 @@ public class BatchedDataIteratorTests extends ESTestCase {
             fields = new HashMap<>();
         }
 
-        public SearchHitBuilder addField(String name, Object value) {
-            return addField(name, Arrays.asList(value));
-        }
-
-        public SearchHitBuilder addField(String name, List<Object> values) {
-            fields.put(name, new DocumentField(name, values));
-            return this;
-        }
-
         public SearchHitBuilder setSource(String sourceJson) {
             hit.sourceRef(new BytesArray(sourceJson));
             return this;

+ 2 - 2
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml

@@ -71,7 +71,7 @@ setup:
       data_frame.preview_data_frame_transform:
         body: >
           {
-            "source": "airline-data",
+            "source": { "index": "airline-data" },
             "pivot": {
               "group_by": {
                 "airline": {"terms": {"field": "airline"}},
@@ -96,7 +96,7 @@ setup:
       data_frame.preview_data_frame_transform:
         body: >
           {
-            "source": "airline-data",
+            "source": { "index": "airline-data" },
             "pivot": {
               "group_by": {"airline": {"terms": {"not_a_terms_param": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}

+ 55 - 11
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

@@ -44,8 +44,8 @@ setup:
         transform_id: "missing-source-transform"
         body: >
           {
-            "source": "missing-index",
-            "dest": "missing-source-dest",
+            "source": { "index": "missing-index" },
+            "dest": { "index": "missing-source-dest" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
@@ -58,8 +58,8 @@ setup:
         transform_id: "airline-transform"
         body: >
           {
-            "source": "airline-data",
-            "dest": "airline-data-by-airline",
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
@@ -72,8 +72,8 @@ setup:
         transform_id: "airline-transform-dos"
         body: >
           {
-            "source": "airline-data",
-            "dest": "airline-data-by-airline-again",
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline-again" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
@@ -86,9 +86,9 @@ setup:
         transform_id: "airline-transform"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform" }
-  - match: { transforms.0.source: "airline-data" }
-  - match: { transforms.0.dest: "airline-data-by-airline" }
-  - is_true: transforms.0.query.match_all
+  - match: { transforms.0.source.index.0: "airline-data" }
+  - match: { transforms.0.dest.index: "airline-data-by-airline" }
+  - is_true: transforms.0.source.query.match_all
   - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" }
   - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" }
 
@@ -136,6 +136,50 @@ setup:
   - match: { transforms.0.id: "airline-transform-dos" }
 
 ---
+"Test transform with query and array of indices in source":
+  - do:
+      indices.create:
+        index: airline-data-other
+        body:
+          mappings:
+            properties:
+              time:
+                type: date
+              airline:
+                type: keyword
+              responsetime:
+                type: float
+              event_rate:
+                type: integer
+
+  - do:
+      data_frame.put_data_frame_transform:
+        transform_id: "airline-transform"
+        body: >
+          {
+            "source": {
+              "index": ["airline-data", "airline-data-other"],
+              "query": {"bool":{"filter":{"term":{"airline":"FOO"}}}}
+            },
+            "dest": { "index": "airline-data-by-airline" },
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            }
+          }
+  - match: { acknowledged: true }
+  - do:
+      data_frame.get_data_frame_transform:
+        transform_id: "airline-transform"
+  - match: { count: 1 }
+  - match: { transforms.0.id: "airline-transform" }
+  - match: { transforms.0.source.index.0: "airline-data" }
+  - match: { transforms.0.source.index.1: "airline-data-other" }
+  - match: { transforms.0.source.query.bool.filter.term.airline: "FOO" }
+  - match: { transforms.0.dest.index: "airline-data-by-airline" }
+  - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" }
+  - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" }
+---
 "Test transform with invalid page parameter":
   - do:
       catch: /Param \[size\] has a max acceptable value of \[1000\]/
@@ -151,8 +195,8 @@ setup:
         transform_id: "airline-transform"
         body: >
           {
-            "source": "airline-data",
-            "dest": "airline-data-by-airline",
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}, "time": {"max": {"field": "time"}}}

+ 2 - 2
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

@@ -18,8 +18,8 @@ setup:
         transform_id: "airline-transform-start-stop"
         body: >
           {
-            "source": "airline-data",
-            "dest": "airline-data-by-airline-start-stop",
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline-start-stop" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}

+ 8 - 8
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

@@ -18,8 +18,8 @@ setup:
         transform_id: "airline-transform-stats"
         body: >
           {
-            "source": "airline-data",
-            "dest": "airline-data-by-airline-stats",
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline-stats" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
@@ -74,8 +74,8 @@ teardown:
         transform_id: "airline-transform-stats-dos"
         body: >
           {
-            "source": "airline-data",
-            "dest": "airline-data-by-airline-stats-dos",
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline-stats-dos" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
@@ -112,8 +112,8 @@ teardown:
         transform_id: "airline-transform-stats-dos"
         body: >
           {
-            "source": "airline-data",
-            "dest": "airline-data-by-airline-stats-dos",
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline-stats-dos" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
@@ -145,8 +145,8 @@ teardown:
         transform_id: "airline-transform-stats-dos"
         body: >
           {
-            "source": "airline-data",
-            "dest": "airline-data-by-airline-stats-dos",
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline-stats-dos" },
             "pivot": {
               "group_by": { "airline": {"terms": {"field": "airline"}}},
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}