|
@@ -20,36 +20,37 @@ package org.elasticsearch.client.ml.datafeed;
|
|
|
|
|
|
import org.elasticsearch.client.ml.job.config.Job;
|
|
|
import org.elasticsearch.common.ParseField;
|
|
|
+import org.elasticsearch.common.bytes.BytesArray;
|
|
|
+import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|
|
+import org.elasticsearch.common.xcontent.ObjectParser;
|
|
|
import org.elasticsearch.common.xcontent.ToXContentObject;
|
|
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
+import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
import org.elasticsearch.common.xcontent.XContentParser;
|
|
|
-import org.elasticsearch.index.query.AbstractQueryBuilder;
|
|
|
+import org.elasticsearch.common.xcontent.XContentType;
|
|
|
+import org.elasticsearch.common.xcontent.json.JsonXContent;
|
|
|
import org.elasticsearch.index.query.QueryBuilder;
|
|
|
-import org.elasticsearch.index.query.QueryBuilders;
|
|
|
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
|
|
|
/**
|
|
|
- * Datafeed configuration options pojo. Describes where to proactively pull input
|
|
|
- * data from.
|
|
|
- * <p>
|
|
|
- * If a value has not been set it will be <code>null</code>. Object wrappers are
|
|
|
- * used around integral types and booleans so they can take <code>null</code>
|
|
|
- * values.
|
|
|
+ * The datafeed configuration object. It specifies which indices
|
|
|
+ * to get the data from and offers parameters for customizing different
|
|
|
+ * aspects of the process.
|
|
|
*/
|
|
|
public class DatafeedConfig implements ToXContentObject {
|
|
|
|
|
|
- public static final int DEFAULT_SCROLL_SIZE = 1000;
|
|
|
-
|
|
|
public static final ParseField ID = new ParseField("datafeed_id");
|
|
|
public static final ParseField QUERY_DELAY = new ParseField("query_delay");
|
|
|
public static final ParseField FREQUENCY = new ParseField("frequency");
|
|
@@ -59,7 +60,6 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
public static final ParseField QUERY = new ParseField("query");
|
|
|
public static final ParseField SCROLL_SIZE = new ParseField("scroll_size");
|
|
|
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
|
|
|
- public static final ParseField AGGS = new ParseField("aggs");
|
|
|
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
|
|
|
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
|
|
|
|
|
@@ -77,9 +77,8 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY);
|
|
|
PARSER.declareString((builder, val) ->
|
|
|
builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY);
|
|
|
- PARSER.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY);
|
|
|
- PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS);
|
|
|
- PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS);
|
|
|
+ PARSER.declareField(Builder::setQuery, DatafeedConfig::parseBytes, QUERY, ObjectParser.ValueType.OBJECT);
|
|
|
+ PARSER.declareField(Builder::setAggregations, DatafeedConfig::parseBytes, AGGREGATIONS, ObjectParser.ValueType.OBJECT);
|
|
|
PARSER.declareObject(Builder::setScriptFields, (p, c) -> {
|
|
|
List<SearchSourceBuilder.ScriptField> parsedScriptFields = new ArrayList<>();
|
|
|
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
|
|
@@ -91,29 +90,26 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
|
|
|
}
|
|
|
|
|
|
+ private static BytesReference parseBytes(XContentParser parser) throws IOException {
|
|
|
+ XContentBuilder contentBuilder = JsonXContent.contentBuilder();
|
|
|
+ contentBuilder.generator().copyCurrentStructure(parser);
|
|
|
+ return BytesReference.bytes(contentBuilder);
|
|
|
+ }
|
|
|
+
|
|
|
private final String id;
|
|
|
private final String jobId;
|
|
|
-
|
|
|
- /**
|
|
|
- * The delay before starting to query a period of time
|
|
|
- */
|
|
|
private final TimeValue queryDelay;
|
|
|
-
|
|
|
- /**
|
|
|
- * The frequency with which queries are executed
|
|
|
- */
|
|
|
private final TimeValue frequency;
|
|
|
-
|
|
|
private final List<String> indices;
|
|
|
private final List<String> types;
|
|
|
- private final QueryBuilder query;
|
|
|
- private final AggregatorFactories.Builder aggregations;
|
|
|
+ private final BytesReference query;
|
|
|
+ private final BytesReference aggregations;
|
|
|
private final List<SearchSourceBuilder.ScriptField> scriptFields;
|
|
|
private final Integer scrollSize;
|
|
|
private final ChunkingConfig chunkingConfig;
|
|
|
|
|
|
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
|
|
|
- QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
|
|
|
+ BytesReference query, BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
|
|
|
Integer scrollSize, ChunkingConfig chunkingConfig) {
|
|
|
this.id = id;
|
|
|
this.jobId = jobId;
|
|
@@ -156,11 +152,11 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
return scrollSize;
|
|
|
}
|
|
|
|
|
|
- public QueryBuilder getQuery() {
|
|
|
+ public BytesReference getQuery() {
|
|
|
return query;
|
|
|
}
|
|
|
|
|
|
- public AggregatorFactories.Builder getAggregations() {
|
|
|
+ public BytesReference getAggregations() {
|
|
|
return aggregations;
|
|
|
}
|
|
|
|
|
@@ -183,11 +179,17 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
if (frequency != null) {
|
|
|
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
|
|
|
}
|
|
|
- builder.field(INDICES.getPreferredName(), indices);
|
|
|
- builder.field(TYPES.getPreferredName(), types);
|
|
|
- builder.field(QUERY.getPreferredName(), query);
|
|
|
+ if (indices != null) {
|
|
|
+ builder.field(INDICES.getPreferredName(), indices);
|
|
|
+ }
|
|
|
+ if (types != null) {
|
|
|
+ builder.field(TYPES.getPreferredName(), types);
|
|
|
+ }
|
|
|
+ if (query != null) {
|
|
|
+ builder.field(QUERY.getPreferredName(), asMap(query));
|
|
|
+ }
|
|
|
if (aggregations != null) {
|
|
|
- builder.field(AGGREGATIONS.getPreferredName(), aggregations);
|
|
|
+ builder.field(AGGREGATIONS.getPreferredName(), asMap(aggregations));
|
|
|
}
|
|
|
if (scriptFields != null) {
|
|
|
builder.startObject(SCRIPT_FIELDS.getPreferredName());
|
|
@@ -196,7 +198,9 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
}
|
|
|
builder.endObject();
|
|
|
}
|
|
|
- builder.field(SCROLL_SIZE.getPreferredName(), scrollSize);
|
|
|
+ if (scrollSize != null) {
|
|
|
+ builder.field(SCROLL_SIZE.getPreferredName(), scrollSize);
|
|
|
+ }
|
|
|
if (chunkingConfig != null) {
|
|
|
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
|
|
|
}
|
|
@@ -205,10 +209,18 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
return builder;
|
|
|
}
|
|
|
|
|
|
+ private static Map<String, Object> asMap(BytesReference bytesReference) {
|
|
|
+ return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The lists of indices and types are compared for equality but they are not
|
|
|
* sorted first so this test could fail simply because the indices and types
|
|
|
* lists are in different orders.
|
|
|
+ *
|
|
|
+ * Also note this could be a heavy operation when a query or aggregations
|
|
|
+ * are set as we need to convert the bytes references into maps to correctly
|
|
|
+ * compare them.
|
|
|
*/
|
|
|
@Override
|
|
|
public boolean equals(Object other) {
|
|
@@ -228,31 +240,40 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
&& Objects.equals(this.queryDelay, that.queryDelay)
|
|
|
&& Objects.equals(this.indices, that.indices)
|
|
|
&& Objects.equals(this.types, that.types)
|
|
|
- && Objects.equals(this.query, that.query)
|
|
|
+ && Objects.equals(asMap(this.query), asMap(that.query))
|
|
|
&& Objects.equals(this.scrollSize, that.scrollSize)
|
|
|
- && Objects.equals(this.aggregations, that.aggregations)
|
|
|
+ && Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
|
|
|
&& Objects.equals(this.scriptFields, that.scriptFields)
|
|
|
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Note this could be a heavy operation when a query or aggregations
|
|
|
+ * are set as we need to convert the bytes references into maps to
|
|
|
+ * compute a stable hash code.
|
|
|
+ */
|
|
|
@Override
|
|
|
public int hashCode() {
|
|
|
- return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields,
|
|
|
+ return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
|
|
|
chunkingConfig);
|
|
|
}
|
|
|
|
|
|
+ public static Builder builder(String id, String jobId) {
|
|
|
+ return new Builder(id, jobId);
|
|
|
+ }
|
|
|
+
|
|
|
public static class Builder {
|
|
|
|
|
|
private String id;
|
|
|
private String jobId;
|
|
|
private TimeValue queryDelay;
|
|
|
private TimeValue frequency;
|
|
|
- private List<String> indices = Collections.emptyList();
|
|
|
- private List<String> types = Collections.emptyList();
|
|
|
- private QueryBuilder query = QueryBuilders.matchAllQuery();
|
|
|
- private AggregatorFactories.Builder aggregations;
|
|
|
+ private List<String> indices;
|
|
|
+ private List<String> types;
|
|
|
+ private BytesReference query;
|
|
|
+ private BytesReference aggregations;
|
|
|
private List<SearchSourceBuilder.ScriptField> scriptFields;
|
|
|
- private Integer scrollSize = DEFAULT_SCROLL_SIZE;
|
|
|
+ private Integer scrollSize;
|
|
|
private ChunkingConfig chunkingConfig;
|
|
|
|
|
|
public Builder(String id, String jobId) {
|
|
@@ -279,8 +300,12 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
+ public Builder setIndices(String... indices) {
|
|
|
+ return setIndices(Arrays.asList(indices));
|
|
|
+ }
|
|
|
+
|
|
|
public Builder setTypes(List<String> types) {
|
|
|
- this.types = Objects.requireNonNull(types, TYPES.getPreferredName());
|
|
|
+ this.types = types;
|
|
|
return this;
|
|
|
}
|
|
|
|
|
@@ -294,16 +319,36 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
- public Builder setQuery(QueryBuilder query) {
|
|
|
- this.query = Objects.requireNonNull(query, QUERY.getPreferredName());
|
|
|
+ private Builder setQuery(BytesReference query) {
|
|
|
+ this.query = query;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Builder setQuery(String queryAsJson) {
|
|
|
+ this.query = queryAsJson == null ? null : new BytesArray(queryAsJson);
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Builder setQuery(QueryBuilder query) throws IOException {
|
|
|
+ this.query = query == null ? null : xContentToBytes(query);
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
- public Builder setAggregations(AggregatorFactories.Builder aggregations) {
|
|
|
+ private Builder setAggregations(BytesReference aggregations) {
|
|
|
this.aggregations = aggregations;
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
+ public Builder setAggregations(String aggsAsJson) {
|
|
|
+ this.aggregations = aggsAsJson == null ? null : new BytesArray(aggsAsJson);
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Builder setAggregations(AggregatorFactories.Builder aggregations) throws IOException {
|
|
|
+ this.aggregations = aggregations == null ? null : xContentToBytes(aggregations);
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
public Builder setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
|
|
|
List<SearchSourceBuilder.ScriptField> sorted = new ArrayList<>(scriptFields);
|
|
|
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
|
|
@@ -325,5 +370,12 @@ public class DatafeedConfig implements ToXContentObject {
|
|
|
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
|
|
|
chunkingConfig);
|
|
|
}
|
|
|
+
|
|
|
+ private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
|
|
|
+ try (XContentBuilder builder = JsonXContent.contentBuilder()) {
|
|
|
+ object.toXContent(builder, ToXContentObject.EMPTY_PARAMS);
|
|
|
+ return BytesReference.bytes(builder);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|