Browse Source

Use the breadth first collection mode for significant terms aggs.

This helps avoid memory issues when computing deep sub-aggregations. Because it
should be rare to use sub-aggregations with significant terms, we opted to always
choose breadth first as opposed to exposing a `collect_mode` option.

Closes #28652.
Antonio Matarrese 6 years ago
parent
commit
badb8559fb

+ 6 - 0
docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc

@@ -542,6 +542,12 @@ It is possible (although rarely required) to filter the values for which buckets
 `exclude` parameters which are based on a regular expression string or arrays of exact terms. This functionality mirrors the features
 described in the <<search-aggregations-bucket-terms-aggregation,terms aggregation>> documentation.
 
+==== Collect mode
+
+To avoid memory issues, the `significant_terms` aggregation always computes child aggregations in `breadth_first` mode.
+A description of the different collection modes can be found in the
+<<search-aggregations-bucket-terms-aggregation-collect, terms aggregation>> documentation.
+
 ==== Execution hint
 
 There are different mechanisms by which terms aggregations can be executed:

+ 1 - 0
docs/reference/aggregations/bucket/terms-aggregation.asciidoc

@@ -775,6 +775,7 @@ fields, then use `copy_to` in your mapping to create a new dedicated field at
 index time which contains the values from both fields.  You can aggregate on
 this single field, which will benefit from the global ordinals optimization.
 
+[[search-aggregations-bucket-terms-aggregation-collect]]
 ==== Collect mode
 
 Deferring calculation of child aggregations

+ 9 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java

@@ -65,7 +65,7 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
                                                     List<PipelineAggregator> pipelineAggregators,
                                                     Map<String, Object> metaData) throws IOException {
         super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, context, parent,
-            forceRemapGlobalOrds, SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
+            forceRemapGlobalOrds, SubAggCollectionMode.BREADTH_FIRST, false, pipelineAggregators, metaData);
         this.significanceHeuristic = significanceHeuristic;
         this.termsAggFactory = termsAggFactory;
         this.numCollectedDocs = 0;
@@ -146,12 +146,19 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
         }
 
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
+        final long[] survivingBucketOrds = new long[ordered.size()];
         for (int i = ordered.size() - 1; i >= 0; i--) {
             final SignificantStringTerms.Bucket bucket = ordered.pop();
+            survivingBucketOrds[i] = bucket.bucketOrd;
+            list[i] = bucket;
+        }
+
+        runDeferredCollections(survivingBucketOrds);
+
+        for (SignificantStringTerms.Bucket bucket : list) {
             // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
             bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
             bucket.aggregations = bucketAggregations(bucket.bucketOrd);
-            list[i] = bucket;
         }
 
         return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),

+ 11 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java

@@ -50,7 +50,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
             List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
 
         super(name, factories, valuesSource, format, null, bucketCountThresholds, context, parent,
-                SubAggCollectionMode.DEPTH_FIRST, false, includeExclude, pipelineAggregators, metaData);
+                SubAggCollectionMode.BREADTH_FIRST, false, includeExclude, pipelineAggregators, metaData);
         this.significanceHeuristic = significanceHeuristic;
         this.termsAggFactory = termsAggFactory;
     }
@@ -106,12 +106,20 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
             }
         }
 
-        final SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
+        SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
+        final long[] survivingBucketOrds = new long[ordered.size()];
         for (int i = ordered.size() - 1; i >= 0; i--) {
             final SignificantLongTerms.Bucket bucket = ordered.pop();
-            bucket.aggregations = bucketAggregations(bucket.bucketOrd);
+            survivingBucketOrds[i] = bucket.bucketOrd;
             list[i] = bucket;
         }
+
+        runDeferredCollections(survivingBucketOrds);
+
+        for (SignificantLongTerms.Bucket bucket : list) {
+            bucket.aggregations = bucketAggregations(bucket.bucketOrd);
+        }
+
         return new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
                 pipelineAggregators(), metaData(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
     }

+ 11 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java

@@ -57,7 +57,7 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
             List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
 
         super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, aggregationContext, parent,
-                SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
+                SubAggCollectionMode.BREADTH_FIRST, false, pipelineAggregators, metaData);
         this.significanceHeuristic = significanceHeuristic;
         this.termsAggFactory = termsAggFactory;
     }
@@ -113,12 +113,20 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
         }
 
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
+        final long[] survivingBucketOrds = new long[ordered.size()];
         for (int i = ordered.size() - 1; i >= 0; i--) {
             final SignificantStringTerms.Bucket bucket = ordered.pop();
-            // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
+            survivingBucketOrds[i] = bucket.bucketOrd;
+            list[i] = bucket;
+        }
+
+        runDeferredCollections(survivingBucketOrds);
+
+        for (SignificantStringTerms.Bucket bucket : list) {
+            // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be
+            // recycled at some point
             bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
             bucket.aggregations = bucketAggregations(bucket.bucketOrd);
-            list[i] = bucket;
         }
 
         return new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(),

+ 33 - 0
server/src/test/java/org/elasticsearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java

@@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryShardException;
 import org.elasticsearch.plugins.Plugin;
@@ -38,6 +39,7 @@ import org.elasticsearch.script.MockScriptPlugin;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
 import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
@@ -543,6 +545,37 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
         }
     }
 
+    /**
+     * A simple test that adds a sub-aggregation to a significant terms aggregation,
+     * to help check that sub-aggregation collection is handled correctly.
+     */
+    public void testSubAggregations() throws Exception {
+        indexEqualTestData();
+
+        QueryBuilder query = QueryBuilders.termsQuery(TEXT_FIELD, "a", "b");
+        AggregationBuilder subAgg = terms("class").field(CLASS_FIELD);
+        AggregationBuilder agg = significantTerms("significant_terms")
+            .field(TEXT_FIELD)
+            .executionHint(randomExecutionHint())
+            .significanceHeuristic(new ChiSquare(true, true))
+            .minDocCount(1).shardSize(1000).size(1000)
+            .subAggregation(subAgg);
+
+        SearchResponse response = client().prepareSearch("test")
+            .setQuery(query)
+            .addAggregation(agg)
+            .get();
+        assertSearchResponse(response);
+
+        SignificantTerms sigTerms = response.getAggregations().get("significant_terms");
+        assertThat(sigTerms.getBuckets().size(), equalTo(2));
+
+        for (SignificantTerms.Bucket bucket : sigTerms) {
+            StringTerms terms = bucket.getAggregations().get("class");
+            assertThat(terms.getBuckets().size(), equalTo(2));
+        }
+    }
+
     private void indexEqualTestData() throws ExecutionException, InterruptedException {
         assertAcked(prepareCreate("test")
             .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))