Browse Source

[ML][Data Frame] Add version and create_time to transform config (#43384)

* [ML][Data Frame] Add version and create_time to transform config

* s/transform_version/version s/Date/Instant

* fixing getter/setter for version
Benjamin Trent 6 years ago
parent
commit
ade8766bbf
12 changed files with 306 additions and 37 deletions
  1. 42 9
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java
  2. 59 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/util/TimeUtil.java
  3. 2 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java
  4. 9 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java
  5. 0 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java
  6. 96 14
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java
  7. 11 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/utils/TimeUtils.java
  8. 60 4
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java
  9. 12 2
      x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java
  10. 7 0
      x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java
  11. 6 2
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java
  12. 2 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

+ 42 - 9
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java

@@ -19,16 +19,20 @@
 
 package org.elasticsearch.client.dataframe.transforms;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig;
+import org.elasticsearch.client.dataframe.transforms.util.TimeUtil;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Objects;
 
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@@ -40,6 +44,8 @@ public class DataFrameTransformConfig implements ToXContentObject {
     public static final ParseField SOURCE = new ParseField("source");
     public static final ParseField DEST = new ParseField("dest");
     public static final ParseField DESCRIPTION = new ParseField("description");
+    public static final ParseField VERSION = new ParseField("version");
+    public static final ParseField CREATE_TIME = new ParseField("create_time");
     // types of transforms
     public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
 
@@ -48,6 +54,8 @@ public class DataFrameTransformConfig implements ToXContentObject {
     private final DestConfig dest;
     private final PivotConfig pivotConfig;
     private final String description;
+    private final Version transformVersion;
+    private final Instant createTime;
 
     public static final ConstructingObjectParser<DataFrameTransformConfig, Void> PARSER =
             new ConstructingObjectParser<>("data_frame_transform", true,
@@ -57,7 +65,9 @@ public class DataFrameTransformConfig implements ToXContentObject {
                     DestConfig dest = (DestConfig) args[2];
                     PivotConfig pivotConfig = (PivotConfig) args[3];
                     String description = (String)args[4];
-                    return new DataFrameTransformConfig(id, source, dest, pivotConfig, description);
+                    Instant createTime = (Instant)args[5];
+                    String transformVersion = (String)args[6];
+                    return new DataFrameTransformConfig(id, source, dest, pivotConfig, description, createTime, transformVersion);
                 });
 
     static {
@@ -66,6 +76,9 @@ public class DataFrameTransformConfig implements ToXContentObject {
         PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST);
         PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
         PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
+        PARSER.declareField(optionalConstructorArg(),
+            p -> TimeUtil.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
+        PARSER.declareString(optionalConstructorArg(), VERSION);
     }
 
     public static DataFrameTransformConfig fromXContent(final XContentParser parser) {
@@ -84,19 +97,23 @@ public class DataFrameTransformConfig implements ToXContentObject {
      * @return A DataFrameTransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
      */
     public static DataFrameTransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
-        return new DataFrameTransformConfig(null, source, null, pivotConfig, null);
+        return new DataFrameTransformConfig(null, source, null, pivotConfig, null, null, null);
     }
 
     DataFrameTransformConfig(final String id,
                              final SourceConfig source,
                              final DestConfig dest,
                              final PivotConfig pivotConfig,
-                             final String description) {
+                             final String description,
+                             final Instant createTime,
+                             final String version) {
         this.id = id;
         this.source = source;
         this.dest = dest;
         this.pivotConfig = pivotConfig;
         this.description = description;
+        this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
+        this.transformVersion = version == null ? null : Version.fromString(version);
     }
 
     public String getId() {
@@ -115,6 +132,14 @@ public class DataFrameTransformConfig implements ToXContentObject {
         return pivotConfig;
     }
 
+    public Version getVersion() {
+        return transformVersion;
+    }
+
+    public Instant getCreateTime() {
+        return createTime;
+    }
+
     @Nullable
     public String getDescription() {
         return description;
@@ -138,6 +163,12 @@ public class DataFrameTransformConfig implements ToXContentObject {
         if (description != null) {
             builder.field(DESCRIPTION.getPreferredName(), description);
         }
+        if (createTime != null) {
+            builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
+        }
+        if (transformVersion != null) {
+            builder.field(VERSION.getPreferredName(), transformVersion);
+        }
         builder.endObject();
         return builder;
     }
@@ -155,15 +186,17 @@ public class DataFrameTransformConfig implements ToXContentObject {
         final DataFrameTransformConfig that = (DataFrameTransformConfig) other;
 
         return Objects.equals(this.id, that.id)
-                && Objects.equals(this.source, that.source)
-                && Objects.equals(this.dest, that.dest)
-                && Objects.equals(this.description, that.description)
-                && Objects.equals(this.pivotConfig, that.pivotConfig);
+            && Objects.equals(this.source, that.source)
+            && Objects.equals(this.dest, that.dest)
+            && Objects.equals(this.description, that.description)
+            && Objects.equals(this.transformVersion, that.transformVersion)
+            && Objects.equals(this.createTime, that.createTime)
+            && Objects.equals(this.pivotConfig, that.pivotConfig);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, source, dest, pivotConfig, description);
+        return Objects.hash(id, source, dest, pivotConfig, description, createTime, transformVersion);
     }
 
     @Override
@@ -209,7 +242,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
         }
 
         public DataFrameTransformConfig build() {
-            return new DataFrameTransformConfig(id, source, dest, pivotConfig, description);
+            return new DataFrameTransformConfig(id, source, dest, pivotConfig, description, null, null);
         }
     }
 }

+ 59 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/util/TimeUtil.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.client.dataframe.transforms.util;
+
+import org.elasticsearch.common.time.DateFormatters;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
+public final class TimeUtil {
+
+    /**
+     * Parse out a Date object given the current parser and field name.
+     *
+     * @param parser current XContentParser
+     * @param fieldName the field's preferred name (utilized in exception)
+     * @return parsed Date object
+     * @throws IOException from XContentParser
+     */
+    public static Date parseTimeField(XContentParser parser, String fieldName) throws IOException {
+        if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
+            return new Date(parser.longValue());
+        } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
+            return new Date(DateFormatters.from(DateTimeFormatter.ISO_INSTANT.parse(parser.text())).toInstant().toEpochMilli());
+        }
+        throw new IllegalArgumentException(
+            "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
+    }
+
+    public static Instant parseTimeFieldToInstant(XContentParser parser, String fieldName) throws IOException {
+        if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
+            return Instant.ofEpochMilli(parser.longValue());
+        } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
+            return DateFormatters.from(DateTimeFormatter.ISO_INSTANT.parse(parser.text())).toInstant();
+        }
+        throw new IllegalArgumentException(
+            "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
+    }
+
+}

+ 2 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

@@ -195,7 +195,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
                 client::getDataFrameTransformAsync);
         assertNull(getResponse.getInvalidTransforms());
         assertThat(getResponse.getTransformConfigurations(), hasSize(1));
-        assertEquals(transform, getResponse.getTransformConfigurations().get(0));
+        assertEquals(transform.getId(), getResponse.getTransformConfigurations().get(0).getId());
     }
 
     public void testGetAllAndPageTransforms() throws IOException {
@@ -219,7 +219,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
                 client::getDataFrameTransformAsync);
         assertNull(getResponse.getInvalidTransforms());
         assertThat(getResponse.getTransformConfigurations(), hasSize(2));
-        assertEquals(transform, getResponse.getTransformConfigurations().get(1));
+        assertEquals(transform.getId(), getResponse.getTransformConfigurations().get(1).getId());
 
         getRequest.setPageParams(new PageParams(0,1));
         getResponse = execute(getRequest, client::getDataFrameTransform,

+ 9 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.client.dataframe.transforms;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -27,6 +28,7 @@ import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.test.AbstractXContentTestCase;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.function.Predicate;
 
@@ -36,8 +38,13 @@ import static org.elasticsearch.client.dataframe.transforms.SourceConfigTests.ra
 public class DataFrameTransformConfigTests extends AbstractXContentTestCase<DataFrameTransformConfig> {
 
     public static DataFrameTransformConfig randomDataFrameTransformConfig() {
-        return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(),
-                randomDestConfig(), PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100));
+        return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10),
+            randomSourceConfig(),
+            randomDestConfig(),
+            PivotConfigTests.randomPivotConfig(),
+            randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100),
+            randomBoolean() ? null : Instant.now(),
+            randomBoolean() ? null : Version.CURRENT.toString());
     }
 
     @Override

+ 0 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

@@ -478,7 +478,6 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
 
         RestHighLevelClient client = highLevelClient();
 
-        QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
         GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
             TermsGroupSource.builder().setField("user_id").build()).build();
         AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
@@ -564,7 +563,6 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
     public void testGetDataFrameTransform() throws IOException, InterruptedException {
         createIndex("source-data");
 
-        QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
         GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer",
             TermsGroupSource.builder().setField("user_id").build()).build();
         AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();

+ 96 - 14
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java

@@ -6,6 +6,7 @@
 
 package org.elasticsearch.xpack.core.dataframe.transforms;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.AbstractDiffable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
@@ -14,6 +15,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
@@ -21,8 +23,10 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
 import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.core.dataframe.utils.TimeUtils;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
@@ -42,6 +46,8 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
     public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
 
     public static final ParseField DESCRIPTION = new ParseField("description");
+    public static final ParseField VERSION = new ParseField("version");
+    public static final ParseField CREATE_TIME = new ParseField("create_time");
     private static final ConstructingObjectParser<DataFrameTransformConfig, String> STRICT_PARSER = createParser(false);
     private static final ConstructingObjectParser<DataFrameTransformConfig, String> LENIENT_PARSER = createParser(true);
     private static final int MAX_DESCRIPTION_LENGTH = 1_000;
@@ -53,9 +59,17 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
     // headers store the user context from the creating user, which allows us to run the transform as this user
     // the header only contains name, groups and other context but no authorization keys
     private Map<String, String> headers;
+    private Version transformVersion;
+    private Instant createTime;
 
     private final PivotConfig pivotConfig;
 
+    private static void validateStrictParsingParams(Object arg, String parameterName) {
+        if (arg != null) {
+            throw new IllegalArgumentException("Found [" + parameterName + "], not allowed for strict parsing");
+        }
+    }
+
     private static ConstructingObjectParser<DataFrameTransformConfig, String> createParser(boolean lenient) {
         ConstructingObjectParser<DataFrameTransformConfig, String> parser = new ConstructingObjectParser<>(NAME, lenient,
                 (args, optionalId) -> {
@@ -74,9 +88,11 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
 
                     // ignored, only for internal storage: String docType = (String) args[3];
 
-                    // on strict parsing do not allow injection of headers
-                    if (lenient == false && args[4] != null) {
-                        throw new IllegalArgumentException("Found [headers], not allowed for strict parsing");
+                    // on strict parsing do not allow injection of headers, transform version, or create time
+                    if (lenient == false) {
+                        validateStrictParsingParams(args[4], HEADERS.getPreferredName());
+                        validateStrictParsingParams(args[7], CREATE_TIME.getPreferredName());
+                        validateStrictParsingParams(args[8], VERSION.getPreferredName());
                     }
 
                     @SuppressWarnings("unchecked")
@@ -84,7 +100,14 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
 
                     PivotConfig pivotConfig = (PivotConfig) args[5];
                     String description = (String)args[6];
-                    return new DataFrameTransformConfig(id, source, dest, headers, pivotConfig, description);
+                    return new DataFrameTransformConfig(id,
+                        source,
+                        dest,
+                        headers,
+                        pivotConfig,
+                        description,
+                        (Instant)args[7],
+                        (String)args[8]);
                 });
 
         parser.declareString(optionalConstructorArg(), DataFrameField.ID);
@@ -95,7 +118,9 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS);
         parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
         parser.declareString(optionalConstructorArg(), DESCRIPTION);
-
+        parser.declareField(optionalConstructorArg(),
+            p -> TimeUtils.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
+        parser.declareString(optionalConstructorArg(), VERSION);
         return parser;
     }
 
@@ -103,12 +128,14 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         return NAME + "-" + transformId;
     }
 
-    public DataFrameTransformConfig(final String id,
-                                    final SourceConfig source,
-                                    final DestConfig dest,
-                                    final Map<String, String> headers,
-                                    final PivotConfig pivotConfig,
-                                    final String description) {
+    DataFrameTransformConfig(final String id,
+                             final SourceConfig source,
+                             final DestConfig dest,
+                             final Map<String, String> headers,
+                             final PivotConfig pivotConfig,
+                             final String description,
+                             final Instant createTime,
+                             final String version){
         this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
         this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName());
         this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName());
@@ -123,6 +150,17 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         if (this.description != null && this.description.length() > MAX_DESCRIPTION_LENGTH) {
             throw new IllegalArgumentException("[description] must be less than 1000 characters in length.");
         }
+        this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
+        this.transformVersion = version == null ? null : Version.fromString(version);
+    }
+
+    public DataFrameTransformConfig(final String id,
+                                    final SourceConfig source,
+                                    final DestConfig dest,
+                                    final Map<String, String> headers,
+                                    final PivotConfig pivotConfig,
+                                    final String description) {
+        this(id, source, dest, headers, pivotConfig, description, null, null);
     }
 
     public DataFrameTransformConfig(final StreamInput in) throws IOException {
@@ -132,6 +170,13 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         setHeaders(in.readMap(StreamInput::readString, StreamInput::readString));
         pivotConfig = in.readOptionalWriteable(PivotConfig::new);
         description = in.readOptionalString();
+        if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
+            createTime = in.readOptionalInstant();
+            transformVersion = in.readBoolean() ? Version.readVersion(in) : null;
+        } else {
+            createTime = null;
+            transformVersion = null;
+        }
     }
 
     public String getId() {
@@ -150,8 +195,28 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         return headers;
     }
 
-    public void setHeaders(Map<String, String> headers) {
+    public DataFrameTransformConfig setHeaders(Map<String, String> headers) {
         this.headers = headers;
+        return this;
+    }
+
+    public Version getVersion() {
+        return transformVersion;
+    }
+
+    public DataFrameTransformConfig setVersion(Version transformVersion) {
+        this.transformVersion = transformVersion;
+        return this;
+    }
+
+    public Instant getCreateTime() {
+        return createTime;
+    }
+
+    public DataFrameTransformConfig setCreateTime(Instant createTime) {
+        ExceptionsHelper.requireNonNull(createTime, CREATE_TIME.getPreferredName());
+        this.createTime = Instant.ofEpochMilli(createTime.toEpochMilli());
+        return this;
     }
 
     public PivotConfig getPivotConfig() {
@@ -179,6 +244,15 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
         out.writeOptionalWriteable(pivotConfig);
         out.writeOptionalString(description);
+        if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
+            out.writeOptionalInstant(createTime);
+           if (transformVersion != null) {
+                out.writeBoolean(true);
+                Version.writeVersion(transformVersion, out);
+            } else {
+                out.writeBoolean(false);
+            }
+        }
     }
 
     @Override
@@ -199,6 +273,12 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
         if (description != null) {
             builder.field(DESCRIPTION.getPreferredName(), description);
         }
+        if (transformVersion != null) {
+            builder.field(VERSION.getPreferredName(), transformVersion);
+        }
+        if (createTime != null) {
+            builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
+        }
         builder.endObject();
         return builder;
     }
@@ -220,12 +300,14 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
                 && Objects.equals(this.dest, that.dest)
                 && Objects.equals(this.headers, that.headers)
                 && Objects.equals(this.pivotConfig, that.pivotConfig)
-                && Objects.equals(this.description, that.description);
+                && Objects.equals(this.description, that.description)
+                && Objects.equals(this.createTime, that.createTime)
+                && Objects.equals(this.transformVersion, that.transformVersion);
     }
 
     @Override
     public int hashCode(){
-        return Objects.hash(id, source, dest, headers, pivotConfig, description);
+        return Objects.hash(id, source, dest, headers, pivotConfig, description, createTime, transformVersion);
     }
 
     @Override

+ 11 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/utils/TimeUtils.java

@@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
 
@@ -31,6 +32,16 @@ public final class TimeUtils {
                 "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
     }
 
+    public static Instant parseTimeFieldToInstant(XContentParser parser, String fieldName) throws IOException {
+        if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
+            return Instant.ofEpochMilli(parser.longValue());
+        } else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
+            return Instant.ofEpochMilli(TimeUtils.dateStringToEpoch(parser.text()));
+        }
+        throw new IllegalArgumentException(
+            "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
+    }
+
     /**
      * First tries to parse the date first as a Long and convert that to an
      * epoch time. If the long number has more than 10 digits it is considered a

+ 60 - 4
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java

@@ -6,6 +6,7 @@
 
 package org.elasticsearch.xpack.core.dataframe.transforms;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
@@ -18,6 +19,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -41,13 +43,25 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
     }
 
     public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders(String id) {
-        return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), null,
-                PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
+        return new DataFrameTransformConfig(id,
+            randomSourceConfig(),
+            randomDestConfig(),
+            null,
+            PivotConfigTests.randomPivotConfig(),
+            randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
+            null,
+            null);
     }
 
     public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) {
-        return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), randomHeaders(),
-                PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
+        return new DataFrameTransformConfig(id,
+            randomSourceConfig(),
+            randomDestConfig(),
+            randomHeaders(),
+            PivotConfigTests.randomPivotConfig(),
+            randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
+            randomBoolean() ? null : Instant.now(),
+            randomBoolean() ? null : Version.CURRENT.toString());
     }
 
     public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() {
@@ -147,6 +161,48 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
                 () -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection"));
     }
 
+    public void testPreventCreateTimeInjection() throws IOException {
+        String pivotTransform = "{"
+            + " \"create_time\" : " + Instant.now().toEpochMilli() + " },"
+            + " \"source\" : {\"index\":\"src\"},"
+            + " \"dest\" : {\"index\": \"dest\"},"
+            + " \"pivot\" : {"
+            + " \"group_by\": {"
+            + "   \"id\": {"
+            + "     \"terms\": {"
+            + "       \"field\": \"id\""
+            + "} } },"
+            + " \"aggs\": {"
+            + "   \"avg\": {"
+            + "     \"avg\": {"
+            + "       \"field\": \"points\""
+            + "} } } } }";
+
+        expectThrows(IllegalArgumentException.class,
+            () -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection"));
+    }
+
+    public void testPreventVersionInjection() throws IOException {
+        String pivotTransform = "{"
+            + " \"version\" : \"7.3.0\","
+            + " \"source\" : {\"index\":\"src\"},"
+            + " \"dest\" : {\"index\": \"dest\"},"
+            + " \"pivot\" : {"
+            + " \"group_by\": {"
+            + "   \"id\": {"
+            + "     \"terms\": {"
+            + "       \"field\": \"id\""
+            + "} } },"
+            + " \"aggs\": {"
+            + "   \"avg\": {"
+            + "     \"avg\": {"
+            + "       \"field\": \"points\""
+            + "} } } } }";
+
+        expectThrows(IllegalArgumentException.class,
+            () -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection"));
+    }
+
     public void testXContentForInternalStorage() throws IOException {
         DataFrameTransformConfig dataFrameTransformConfig = randomDataFrameTransformConfig();
 

+ 12 - 2
x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java

@@ -15,6 +15,8 @@ import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.core.AcknowledgedResponse;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
 import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
@@ -57,6 +59,7 @@ import java.time.ZoneId;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -118,6 +121,11 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
         return restClient.dataFrame().getDataFrameTransformStats(new GetDataFrameTransformStatsRequest(id), RequestOptions.DEFAULT);
     }
 
+    protected GetDataFrameTransformResponse getDataFrameTransform(String id) throws IOException {
+        RestHighLevelClient restClient = new TestRestHighLevelClient();
+        return restClient.dataFrame().getDataFrameTransform(new GetDataFrameTransformRequest(id), RequestOptions.DEFAULT);
+    }
+
     protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception {
         waitUntilCheckpoint(id, checkpoint, TimeValue.timeValueSeconds(30));
     }
@@ -321,9 +329,11 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
             .build();
     }
 
-    private class TestRestHighLevelClient extends RestHighLevelClient {
+    private static class TestRestHighLevelClient extends RestHighLevelClient {
+        private static final List<NamedXContentRegistry.Entry> X_CONTENT_ENTRIES =
+            new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents();
         TestRestHighLevelClient() {
-            super(client(), restClient -> {}, Collections.emptyList());
+            super(client(), restClient -> {}, X_CONTENT_ENTRIES);
         }
     }
 }

+ 7 - 0
x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java

@@ -6,6 +6,7 @@
 
 package org.elasticsearch.xpack.dataframe.integration;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.core.IndexerState;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
@@ -17,6 +18,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter
 import org.junit.After;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -58,6 +60,11 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
             assertThat(getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0).getTransformState().getIndexerState(),
                 equalTo(IndexerState.STOPPED)));
         stopDataFrameTransform(config.getId());
+
+        DataFrameTransformConfig storedConfig = getDataFrameTransform(config.getId()).getTransformConfigurations().get(0);
+        assertThat(storedConfig.getVersion(), equalTo(Version.CURRENT));
+        Instant now = Instant.now();
+        assertTrue("[create_time] is not before current time", storedConfig.getCreateTime().isBefore(now));
         deleteDataFrameTransform(config.getId());
     }
 

+ 6 - 2
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.action;
 
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.IndicesOptions;
@@ -51,6 +52,7 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa
 import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -110,8 +112,10 @@ public class TransportPutDataFrameTransformAction
                     .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
                     .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
-        DataFrameTransformConfig config = request.getConfig();
-        config.setHeaders(filteredHeaders);
+        DataFrameTransformConfig config = request.getConfig()
+            .setHeaders(filteredHeaders)
+            .setCreateTime(Instant.now())
+            .setVersion(Version.CURRENT);
 
         String transformId = config.getId();
         // quick check whether a transform has already been created under that name

+ 2 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

@@ -90,6 +90,8 @@ setup:
   - match: { transforms.0.source.index.0: "airline-data" }
   - match: { transforms.0.dest.index: "airline-data-by-airline" }
   - is_true: transforms.0.source.query.match_all
+  - is_true: transforms.0.create_time
+  - is_true: transforms.0.version
   - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" }
   - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" }
   - match: { transforms.0.description: "yaml test transform on airline-data" }