|
@@ -14,6 +14,7 @@ import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.Writeable;
|
|
|
+import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|
|
import org.elasticsearch.common.xcontent.ObjectParser;
|
|
|
import org.elasticsearch.common.xcontent.ToXContentObject;
|
|
@@ -56,6 +57,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
private final String id;
|
|
|
private final SourceConfig source;
|
|
|
private final DestConfig dest;
|
|
|
+ private final TimeValue frequency;
|
|
|
private final SyncConfig syncConfig;
|
|
|
private final String description;
|
|
|
// headers store the user context from the creating user, which allows us to run the transform as this user
|
|
@@ -88,35 +90,40 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
SourceConfig source = (SourceConfig) args[1];
|
|
|
DestConfig dest = (DestConfig) args[2];
|
|
|
|
|
|
- SyncConfig syncConfig = (SyncConfig) args[3];
|
|
|
- // ignored, only for internal storage: String docType = (String) args[4];
|
|
|
+ TimeValue frequency =
|
|
|
+ args[3] == null ? null : TimeValue.parseTimeValue((String) args[3], DataFrameField.FREQUENCY.getPreferredName());
|
|
|
+
|
|
|
+ SyncConfig syncConfig = (SyncConfig) args[4];
|
|
|
+ // ignored, only for internal storage: String docType = (String) args[5];
|
|
|
|
|
|
// on strict parsing do not allow injection of headers, transform version, or create time
|
|
|
if (lenient == false) {
|
|
|
- validateStrictParsingParams(args[5], HEADERS.getPreferredName());
|
|
|
- validateStrictParsingParams(args[8], CREATE_TIME.getPreferredName());
|
|
|
- validateStrictParsingParams(args[9], VERSION.getPreferredName());
|
|
|
+ validateStrictParsingParams(args[6], HEADERS.getPreferredName());
|
|
|
+ validateStrictParsingParams(args[9], CREATE_TIME.getPreferredName());
|
|
|
+ validateStrictParsingParams(args[10], VERSION.getPreferredName());
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- Map<String, String> headers = (Map<String, String>) args[5];
|
|
|
+ Map<String, String> headers = (Map<String, String>) args[6];
|
|
|
|
|
|
- PivotConfig pivotConfig = (PivotConfig) args[6];
|
|
|
- String description = (String)args[7];
|
|
|
+ PivotConfig pivotConfig = (PivotConfig) args[7];
|
|
|
+ String description = (String)args[8];
|
|
|
return new DataFrameTransformConfig(id,
|
|
|
source,
|
|
|
dest,
|
|
|
+ frequency,
|
|
|
syncConfig,
|
|
|
headers,
|
|
|
pivotConfig,
|
|
|
description,
|
|
|
- (Instant)args[8],
|
|
|
- (String)args[9]);
|
|
|
+ (Instant)args[9],
|
|
|
+ (String)args[10]);
|
|
|
});
|
|
|
|
|
|
parser.declareString(optionalConstructorArg(), DataFrameField.ID);
|
|
|
parser.declareObject(constructorArg(), (p, c) -> SourceConfig.fromXContent(p, lenient), DataFrameField.SOURCE);
|
|
|
parser.declareObject(constructorArg(), (p, c) -> DestConfig.fromXContent(p, lenient), DataFrameField.DESTINATION);
|
|
|
+ parser.declareString(optionalConstructorArg(), DataFrameField.FREQUENCY);
|
|
|
|
|
|
parser.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p, lenient), DataFrameField.SYNC);
|
|
|
|
|
@@ -146,6 +153,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
DataFrameTransformConfig(final String id,
|
|
|
final SourceConfig source,
|
|
|
final DestConfig dest,
|
|
|
+ final TimeValue frequency,
|
|
|
final SyncConfig syncConfig,
|
|
|
final Map<String, String> headers,
|
|
|
final PivotConfig pivotConfig,
|
|
@@ -155,6 +163,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
|
|
|
this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName());
|
|
|
this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName());
|
|
|
+ this.frequency = frequency;
|
|
|
this.syncConfig = syncConfig;
|
|
|
this.setHeaders(headers == null ? Collections.emptyMap() : headers);
|
|
|
this.pivotConfig = pivotConfig;
|
|
@@ -174,17 +183,23 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
public DataFrameTransformConfig(final String id,
|
|
|
final SourceConfig source,
|
|
|
final DestConfig dest,
|
|
|
+ final TimeValue frequency,
|
|
|
final SyncConfig syncConfig,
|
|
|
final Map<String, String> headers,
|
|
|
final PivotConfig pivotConfig,
|
|
|
final String description) {
|
|
|
- this(id, source, dest, syncConfig, headers, pivotConfig, description, null, null);
|
|
|
+ this(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, null, null);
|
|
|
}
|
|
|
|
|
|
public DataFrameTransformConfig(final StreamInput in) throws IOException {
|
|
|
id = in.readString();
|
|
|
source = new SourceConfig(in);
|
|
|
dest = new DestConfig(in);
|
|
|
+ if (in.getVersion().onOrAfter(Version.CURRENT)) {
|
|
|
+ frequency = in.readOptionalTimeValue();
|
|
|
+ } else {
|
|
|
+ frequency = null;
|
|
|
+ }
|
|
|
setHeaders(in.readMap(StreamInput::readString, StreamInput::readString));
|
|
|
pivotConfig = in.readOptionalWriteable(PivotConfig::new);
|
|
|
description = in.readOptionalString();
|
|
@@ -211,6 +226,10 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
return dest;
|
|
|
}
|
|
|
|
|
|
+ public TimeValue getFrequency() {
|
|
|
+ return frequency;
|
|
|
+ }
|
|
|
+
|
|
|
public SyncConfig getSyncConfig() {
|
|
|
return syncConfig;
|
|
|
}
|
|
@@ -269,6 +288,9 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
out.writeString(id);
|
|
|
source.writeTo(out);
|
|
|
dest.writeTo(out);
|
|
|
+ if (out.getVersion().onOrAfter(Version.CURRENT)) {
|
|
|
+ out.writeOptionalTimeValue(frequency);
|
|
|
+ }
|
|
|
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
|
|
|
out.writeOptionalWriteable(pivotConfig);
|
|
|
out.writeOptionalString(description);
|
|
@@ -290,6 +312,9 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
builder.field(DataFrameField.ID.getPreferredName(), id);
|
|
|
builder.field(DataFrameField.SOURCE.getPreferredName(), source);
|
|
|
builder.field(DataFrameField.DESTINATION.getPreferredName(), dest);
|
|
|
+ if (frequency != null) {
|
|
|
+ builder.field(DataFrameField.FREQUENCY.getPreferredName(), frequency.getStringRep());
|
|
|
+ }
|
|
|
if (syncConfig != null) {
|
|
|
builder.startObject(DataFrameField.SYNC.getPreferredName());
|
|
|
builder.field(syncConfig.getWriteableName(), syncConfig);
|
|
@@ -332,6 +357,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
return Objects.equals(this.id, that.id)
|
|
|
&& Objects.equals(this.source, that.source)
|
|
|
&& Objects.equals(this.dest, that.dest)
|
|
|
+ && Objects.equals(this.frequency, that.frequency)
|
|
|
&& Objects.equals(this.syncConfig, that.syncConfig)
|
|
|
&& Objects.equals(this.headers, that.headers)
|
|
|
&& Objects.equals(this.pivotConfig, that.pivotConfig)
|
|
@@ -342,7 +368,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
|
|
|
|
|
@Override
|
|
|
public int hashCode(){
|
|
|
- return Objects.hash(id, source, dest, syncConfig, headers, pivotConfig, description, createTime, transformVersion);
|
|
|
+ return Objects.hash(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, createTime, transformVersion);
|
|
|
}
|
|
|
|
|
|
@Override
|