|
@@ -21,28 +21,52 @@ package org.elasticsearch.search.aggregations.bucket.terms;
|
|
|
import org.apache.lucene.search.IndexSearcher;
|
|
|
import org.elasticsearch.common.ParseField;
|
|
|
import org.elasticsearch.common.ParseFieldMatcher;
|
|
|
+import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
+import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
+import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.search.aggregations.AggregationExecutionException;
|
|
|
import org.elasticsearch.search.aggregations.Aggregator;
|
|
|
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
|
|
|
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
|
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
|
|
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
|
|
|
+import org.elasticsearch.search.aggregations.bucket.BucketUtils;
|
|
|
+import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
|
|
|
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
|
|
|
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
|
|
|
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
|
|
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
|
|
+import org.elasticsearch.search.aggregations.support.ValueType;
|
|
|
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
|
|
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
|
|
|
-import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
|
|
|
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
|
|
|
+/**
|
|
|
+ *
|
|
|
+ */
|
|
|
/**
|
|
|
*
|
|
|
*/
|
|
|
public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<ValuesSource> {
|
|
|
|
|
|
+ public static final ParseField EXECUTION_HINT_FIELD_NAME = new ParseField("execution_hint");
|
|
|
+ public static final ParseField SHARD_SIZE_FIELD_NAME = new ParseField("shard_size");
|
|
|
+ public static final ParseField MIN_DOC_COUNT_FIELD_NAME = new ParseField("min_doc_count");
|
|
|
+ public static final ParseField SHARD_MIN_DOC_COUNT_FIELD_NAME = new ParseField("shard_min_doc_count");
|
|
|
+ public static final ParseField REQUIRED_SIZE_FIELD_NAME = new ParseField("size");
|
|
|
+
|
|
|
+ static final TermsAggregator.BucketCountThresholds DEFAULT_BUCKET_COUNT_THRESHOLDS = new TermsAggregator.BucketCountThresholds(1, 0, 10,
|
|
|
+ -1);
|
|
|
+ public static final ParseField SHOW_TERM_DOC_COUNT_ERROR = new ParseField("show_term_doc_count_error");
|
|
|
+ public static final ParseField ORDER_FIELD = new ParseField("order");
|
|
|
+
|
|
|
public enum ExecutionMode {
|
|
|
|
|
|
MAP(new ParseField("map")) {
|
|
@@ -155,28 +179,100 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private final Terms.Order order;
|
|
|
- private final IncludeExclude includeExclude;
|
|
|
- private final String executionHint;
|
|
|
- private final SubAggCollectionMode collectMode;
|
|
|
- private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
|
|
|
- private final boolean showTermDocCountError;
|
|
|
-
|
|
|
- public TermsAggregatorFactory(String name, ValuesSourceParser.Input input, Terms.Order order,
|
|
|
- TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude, String executionHint,
|
|
|
- SubAggCollectionMode executionMode, boolean showTermDocCountError) {
|
|
|
- super(name, StringTerms.TYPE, input);
|
|
|
- this.order = order;
|
|
|
- this.includeExclude = includeExclude;
|
|
|
- this.executionHint = executionHint;
|
|
|
+ private List<Terms.Order> orders = Collections.singletonList(Terms.Order.count(false));
|
|
|
+ private IncludeExclude includeExclude = null;
|
|
|
+ private String executionHint = null;
|
|
|
+ private SubAggCollectionMode collectMode = SubAggCollectionMode.DEPTH_FIRST;
|
|
|
+ private TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds(
|
|
|
+ DEFAULT_BUCKET_COUNT_THRESHOLDS);
|
|
|
+ private boolean showTermDocCountError = false;
|
|
|
+
|
|
|
+ public TermsAggregatorFactory(String name, ValuesSourceType valuesSourceType, ValueType valueType) {
|
|
|
+ super(name, StringTerms.TYPE, valuesSourceType, valueType);
|
|
|
+ }
|
|
|
+
|
|
|
+ public TermsAggregator.BucketCountThresholds bucketCountThresholds() {
|
|
|
+ return bucketCountThresholds;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void bucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) {
|
|
|
this.bucketCountThresholds = bucketCountThresholds;
|
|
|
- this.collectMode = executionMode;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the order in which the buckets will be returned.
|
|
|
+ */
|
|
|
+ public void order(List<Terms.Order> order) {
|
|
|
+ this.orders = order;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Gets the order in which the buckets will be returned.
|
|
|
+ */
|
|
|
+ public List<Terms.Order> order() {
|
|
|
+ return orders;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Expert: sets an execution hint to the aggregation.
|
|
|
+ */
|
|
|
+ public void executionHint(String executionHint) {
|
|
|
+ this.executionHint = executionHint;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Expert: gets an execution hint to the aggregation.
|
|
|
+ */
|
|
|
+ public String executionHint() {
|
|
|
+ return executionHint;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Expert: set the collection mode.
|
|
|
+ */
|
|
|
+ public void collectMode(SubAggCollectionMode mode) {
|
|
|
+ this.collectMode = mode;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Expert: get the collection mode.
|
|
|
+ */
|
|
|
+ public SubAggCollectionMode collectMode() {
|
|
|
+ return collectMode;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set terms to include and exclude from the aggregation results
|
|
|
+ */
|
|
|
+ public void includeExclude(IncludeExclude includeExclude) {
|
|
|
+ this.includeExclude = includeExclude;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get terms to include and exclude from the aggregation results
|
|
|
+ */
|
|
|
+ public IncludeExclude includeExclude() {
|
|
|
+ return includeExclude;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get whether doc count error will be return for individual terms
|
|
|
+ */
|
|
|
+ public boolean showTermDocCountError() {
|
|
|
+ return showTermDocCountError;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set whether doc count error will be return for individual terms
|
|
|
+ */
|
|
|
+ public void showTermDocCountError(boolean showTermDocCountError) {
|
|
|
this.showTermDocCountError = showTermDocCountError;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent,
|
|
|
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
|
|
|
+ Terms.Order order = resolveOrder(orders);
|
|
|
final InternalAggregation aggregation = new UnmappedTerms(name, order, bucketCountThresholds.getRequiredSize(),
|
|
|
bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), pipelineAggregators, metaData);
|
|
|
return new NonCollectingAggregator(name, aggregationContext, parent, factories, pipelineAggregators, metaData) {
|
|
@@ -192,13 +288,38 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ private Order resolveOrder(List<Order> orders) {
|
|
|
+ Terms.Order order;
|
|
|
+ if (orders.size() == 1 && (orders.get(0) == InternalOrder.TERM_ASC || orders.get(0) == InternalOrder.TERM_DESC)) {
|
|
|
+ // If order is only terms order then we don't need compound
|
|
|
+ // ordering
|
|
|
+ order = orders.get(0);
|
|
|
+ } else {
|
|
|
+ // for all other cases we need compound order so term order asc
|
|
|
+ // can be added to make the order deterministic
|
|
|
+ order = Order.compound(orders);
|
|
|
+ }
|
|
|
+ return order;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected Aggregator doCreateInternal(ValuesSource valuesSource, AggregationContext aggregationContext, Aggregator parent,
|
|
|
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
|
|
|
throws IOException {
|
|
|
+ Terms.Order order = resolveOrder(orders);
|
|
|
if (collectsFromSingleBucket == false) {
|
|
|
return asMultiBucketAggregator(this, aggregationContext, parent);
|
|
|
}
|
|
|
+ BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
|
|
|
+ if (!(order == InternalOrder.TERM_ASC || order == InternalOrder.TERM_DESC)
|
|
|
+ && bucketCountThresholds.getShardSize() == DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
|
|
|
+ // The user has not made a shardSize selection. Use default
|
|
|
+ // heuristic to avoid any wrong-ranking caused by distributed
|
|
|
+ // counting
|
|
|
+ bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
|
|
|
+ aggregationContext.searchContext().numberOfShards()));
|
|
|
+ }
|
|
|
+ bucketCountThresholds.ensureValidity();
|
|
|
if (valuesSource instanceof ValuesSource.Bytes) {
|
|
|
ExecutionMode execution = null;
|
|
|
if (executionHint != null) {
|
|
@@ -278,4 +399,76 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
|
|
|
+ "]. It can only be applied to numeric or string fields.");
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
|
|
+ bucketCountThresholds.toXContent(builder, params);
|
|
|
+ builder.field(SHOW_TERM_DOC_COUNT_ERROR.getPreferredName(), showTermDocCountError);
|
|
|
+ if (executionHint != null) {
|
|
|
+ builder.field(TermsAggregatorFactory.EXECUTION_HINT_FIELD_NAME.getPreferredName(), executionHint);
|
|
|
+ }
|
|
|
+ builder.startArray(ORDER_FIELD.getPreferredName());
|
|
|
+ for (Terms.Order order : orders) {
|
|
|
+ order.toXContent(builder, params);
|
|
|
+ }
|
|
|
+ builder.endArray();
|
|
|
+ builder.field(SubAggCollectionMode.KEY.getPreferredName(), collectMode.parseField().getPreferredName());
|
|
|
+ if (includeExclude != null) {
|
|
|
+ includeExclude.toXContent(builder, params);
|
|
|
+ }
|
|
|
+ return builder;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ValuesSourceAggregatorFactory<ValuesSource> innerReadFrom(String name, ValuesSourceType valuesSourceType,
|
|
|
+ ValueType targetValueType, StreamInput in) throws IOException {
|
|
|
+ TermsAggregatorFactory factory = new TermsAggregatorFactory(name, valuesSourceType, targetValueType);
|
|
|
+ factory.bucketCountThresholds = BucketCountThresholds.readFromStream(in);
|
|
|
+ factory.collectMode = SubAggCollectionMode.BREADTH_FIRST.readFrom(in);
|
|
|
+ factory.executionHint = in.readOptionalString();
|
|
|
+ if (in.readBoolean()) {
|
|
|
+ factory.includeExclude = IncludeExclude.readFromStream(in);
|
|
|
+ }
|
|
|
+ int numOrders = in.readVInt();
|
|
|
+ List<Terms.Order> orders = new ArrayList<>(numOrders);
|
|
|
+ for (int i = 0; i < numOrders; i++) {
|
|
|
+ orders.add(InternalOrder.Streams.readOrder(in));
|
|
|
+ }
|
|
|
+ factory.orders = orders;
|
|
|
+ factory.showTermDocCountError = in.readBoolean();
|
|
|
+ return factory;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void innerWriteTo(StreamOutput out) throws IOException {
|
|
|
+ bucketCountThresholds.writeTo(out);
|
|
|
+ collectMode.writeTo(out);
|
|
|
+ out.writeOptionalString(executionHint);
|
|
|
+ boolean hasIncExc = includeExclude != null;
|
|
|
+ out.writeBoolean(hasIncExc);
|
|
|
+ if (hasIncExc) {
|
|
|
+ includeExclude.writeTo(out);
|
|
|
+ }
|
|
|
+ out.writeVInt(orders.size());
|
|
|
+ for (Terms.Order order : orders) {
|
|
|
+ InternalOrder.Streams.writeOrder(order, out);
|
|
|
+ }
|
|
|
+ out.writeBoolean(showTermDocCountError);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected int innerHashCode() {
|
|
|
+ return Objects.hash(bucketCountThresholds, collectMode, executionHint, includeExclude, orders, showTermDocCountError);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected boolean innerEquals(Object obj) {
|
|
|
+ TermsAggregatorFactory other = (TermsAggregatorFactory) obj;
|
|
|
+ return Objects.equals(bucketCountThresholds, other.bucketCountThresholds)
|
|
|
+ && Objects.equals(collectMode, other.collectMode)
|
|
|
+ && Objects.equals(executionHint, other.executionHint)
|
|
|
+ && Objects.equals(includeExclude, other.includeExclude)
|
|
|
+ && Objects.equals(orders, other.orders)
|
|
|
+ && Objects.equals(showTermDocCountError, other.showTermDocCountError);
|
|
|
+ }
|
|
|
+
|
|
|
}
|