1
0
Эх сурвалжийг харах

Optimize composite aggregation based on index sorting (#48399)

Co-authored-by: Daniel Huang <danielhuang@tencent.com>

This is a spinoff of #48130 that generalizes the proposal to allow early termination with the composite aggregation when leading sources match a prefix or the entire index sort specification.
In such case the composite aggregation can use the index sort natural order to early terminate the collection when it reaches a composite key that is greater than the bottom of the queue.
The optimization is also applicable when a query other than match_all is provided. However the optimization is deactivated for sources that match the index sort in the following cases:
  * Multi-valued source, in such case early termination is not possible.
  * missing_bucket is set to true
Jim Ferenczi 5 жил өмнө
parent
commit
804a5042e7
18 өөрчлөгдсөн 662 нэмэгдсэн , 131 устгасан
  1. 124 1
      docs/reference/aggregations/bucket/composite-aggregation.asciidoc
  2. 2 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java
  3. 166 31
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java
  4. 46 14
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java
  5. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java
  6. 13 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java
  7. 2 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java
  8. 2 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java
  9. 2 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java
  10. 20 7
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java
  11. 1 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java
  12. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java
  13. 5 0
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java
  14. 2 1
      server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java
  15. 182 40
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java
  16. 52 21
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java
  17. 2 2
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java
  18. 39 5
      test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

+ 124 - 1
docs/reference/aggregations/bucket/composite-aggregation.asciidoc

@@ -116,6 +116,7 @@ Example:
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -134,6 +135,7 @@ Like the `terms` aggregation it is also possible to use a script to create the v
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -168,6 +170,7 @@ Example:
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -186,6 +189,7 @@ The values are built from a numeric field or a script that return numerical valu
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -218,6 +222,7 @@ is specified by date/time expression:
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -247,6 +252,7 @@ the format specified with the format parameter:
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -289,6 +295,7 @@ For example:
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -311,6 +318,7 @@ in the composite buckets.
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -340,6 +348,7 @@ For example:
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -366,6 +375,7 @@ It is possible to include them in the response by setting `missing_bucket` to
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -391,7 +401,7 @@ first 10 composite buckets created from the values source.
 The response contains the values for each composite bucket in an array containing the values extracted
 from each value source.
 
-==== After
+==== Pagination
 
 If the number of composite buckets is too high (or unknown) to be returned in a single response
 it is possible to split the retrieval in multiple requests.
@@ -405,6 +415,7 @@ For example:
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -470,6 +481,7 @@ round of result can be retrieved with:
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {
@@ -487,6 +499,116 @@ GET /_search
 
 <1> Should restrict the aggregation to buckets that sort **after** the provided values.
 
+==== Early termination
+
+For optimal performance the <<index-modules-index-sorting,index sort>> should be set on the index so that it matches
+parts or fully the source order in the composite aggregation.
+For instance the following index sort:
+
+[source,console]
+--------------------------------------------------
+PUT twitter
+{
+    "settings" : {
+        "index" : {
+            "sort.field" : ["username", "timestamp"],   <1>
+            "sort.order" : ["asc", "desc"]              <2>
+        }
+    },
+    "mappings": {
+        "properties": {
+            "username": {
+                "type": "keyword",
+                "doc_values": true
+            },
+            "timestamp": {
+                "type": "date"
+            }
+        }
+    }
+}
+--------------------------------------------------
+
+<1> This index is sorted by `username` first then by `timestamp`.
+<2> ... in ascending order for the `username` field and in descending order for the `timestamp` field.
+
+.. could be used to optimize these composite aggregations:
+
+[source,console]
+--------------------------------------------------
+GET /_search
+{
+    "size": 0,
+    "aggs" : {
+        "my_buckets": {
+            "composite" : {
+                "sources" : [
+                    { "user_name": { "terms" : { "field": "user_name" } } }     <1>
+                ]
+            }
+        }
+     }
+}
+--------------------------------------------------
+
+<1> `user_name` is a prefix of the index sort and the order matches (`asc`).
+
+[source,console]
+--------------------------------------------------
+GET /_search
+{
+    "size": 0,
+    "aggs" : {
+        "my_buckets": {
+            "composite" : {
+                "sources" : [
+                    { "user_name": { "terms" : { "field": "user_name" } } }, <1>
+                    { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } } <2>
+                ]
+            }
+        }
+     }
+}
+--------------------------------------------------
+
+<1> `user_name` is a prefix of the index sort and the order matches (`asc`).
+<2> `timestamp` matches also the prefix and the order matches (`desc`).
+
+In order to optimize the early termination it is advised to set `track_total_hits` in the request
+to `false`. The number of total hits that match the request can be retrieved on the first request
+and it would be costly to compute this number on every page:
+
+[source,console]
+--------------------------------------------------
+GET /_search
+{
+    "size": 0,
+    "track_total_hits": false,
+    "aggs" : {
+        "my_buckets": {
+            "composite" : {
+                "sources" : [
+                    { "user_name": { "terms" : { "field": "user_name" } } },
+                    { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } }
+                ]
+            }
+        }
+     }
+}
+--------------------------------------------------
+
+Note that the order of the source is important, in the example below switching the `user_name` with the `timestamp`
+would deactivate the sort optimization since this configuration wouldn't match the index sort specification.
+If the order of sources do not matter for your use case you can follow these simple guidelines:
+
+  * Put the fields with the highest cardinality first.
+  * Make sure that the order of the field matches the order of the index sort.
+  * Put multi-valued fields last since they cannot be used for early termination.
+
+WARNING: <<index-modules-index-sorting,index sort>> can slowdown indexing, it is very important to test index sorting
+with your specific use case and dataset to ensure that it matches your requirement. If it doesn't note that `composite`
+aggregations will also try to early terminate on non-sorted indices if the query matches all document (`match_all` query).
+
 ==== Sub-aggregations
 
 Like any `multi-bucket` aggregations the `composite` aggregation can hold sub-aggregations.
@@ -499,6 +621,7 @@ per composite bucket:
 --------------------------------------------------
 GET /_search
 {
+    "size": 0,
     "aggs" : {
         "my_buckets": {
             "composite" : {

+ 2 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java

@@ -235,7 +235,8 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
         } else {
             afterKey = null;
         }
-        return new CompositeAggregationFactory(name, queryShardContext, parent, subfactoriesBuilder, metaData, size, configs, afterKey);
+        return new CompositeAggregationFactory(name, queryShardContext, parent, subfactoriesBuilder, metaData, size,
+            configs, afterKey);
     }
 
 

+ 166 - 31
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java

@@ -20,18 +20,28 @@
 package org.elasticsearch.search.aggregations.bucket.composite;
 
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.DocValues;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SortedNumericDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.queries.SearchAfterSortedDocQuery;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.CollectionTerminatedException;
 import org.apache.lucene.search.DocIdSet;
 import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.FieldDoc;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.util.RoaringDocIdSet;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.index.IndexSortConfig;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.Aggregator;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -46,6 +56,8 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.CellIdSource;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 import org.elasticsearch.search.aggregations.support.ValuesSource;
 import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.searchafter.SearchAfterBuilder;
+import org.elasticsearch.search.sort.SortAndFormats;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -60,11 +72,12 @@ import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.M
 
 final class CompositeAggregator extends BucketsAggregator {
     private final int size;
-    private final SortedDocsProducer sortedDocsProducer;
     private final List<String> sourceNames;
     private final int[] reverseMuls;
     private final List<DocValueFormat> formats;
+    private final CompositeKey rawAfterKey;
 
+    private final CompositeValuesSourceConfig[] sourceConfigs;
     private final SingleDimensionValuesSource<?>[] sources;
     private final CompositeValuesCollectorQueue queue;
 
@@ -73,6 +86,8 @@ final class CompositeAggregator extends BucketsAggregator {
     private RoaringDocIdSet.Builder docIdSetBuilder;
     private BucketCollector deferredCollectors;
 
+    private boolean earlyTerminated;
+
     CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
                         List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
                         int size, CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey) throws IOException {
@@ -89,11 +104,12 @@ final class CompositeAggregator extends BucketsAggregator {
                 " to: [" + bucketLimit + "] but was [" + size + "]. This limit can be set by changing the [" + MAX_BUCKET_SETTING.getKey() +
                 "] cluster level setting.", bucketLimit);
         }
+        this.sourceConfigs = sourceConfigs;
         for (int i = 0; i < sourceConfigs.length; i++) {
             this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), sourceConfigs[i], size);
         }
         this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
-        this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query());
+        this.rawAfterKey = rawAfterKey;
     }
 
     @Override
@@ -121,7 +137,6 @@ final class CompositeAggregator extends BucketsAggregator {
     public InternalAggregation buildAggregation(long zeroBucket) throws IOException {
         assert zeroBucket == 0L;
         consumeBucketsAndMaybeBreak(queue.size());
-
         if (deferredCollectors != NO_OP_COLLECTOR) {
             // Replay all documents that contain at least one top bucket (collected during the first pass).
             runDeferredCollections();
@@ -138,13 +153,13 @@ final class CompositeAggregator extends BucketsAggregator {
         }
         CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null;
         return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), lastBucket, reverseMuls,
-            pipelineAggregators(), metaData());
+            earlyTerminated, pipelineAggregators(), metaData());
     }
 
     @Override
     public InternalAggregation buildEmptyAggregation() {
         return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls,
-            pipelineAggregators(), metaData());
+            false, pipelineAggregators(), metaData());
     }
 
     private void finishLeaf() {
@@ -156,58 +171,179 @@ final class CompositeAggregator extends BucketsAggregator {
         }
     }
 
+    /** Return true if the provided field may have multiple values per document in the leaf **/
+    private boolean isMaybeMultivalued(LeafReaderContext context, SortField sortField) throws IOException {
+        SortField.Type type = IndexSortConfig.getSortFieldType(sortField);
+        switch (type) {
+            case STRING:
+                final SortedSetDocValues v1 = context.reader().getSortedSetDocValues(sortField.getField());
+                return v1 != null && DocValues.unwrapSingleton(v1) == null;
+
+            case DOUBLE:
+            case FLOAT:
+            case LONG:
+            case INT:
+                final SortedNumericDocValues v2 = context.reader().getSortedNumericDocValues(sortField.getField());
+                return v2 != null && DocValues.unwrapSingleton(v2) == null;
+
+            default:
+                // we have no clue whether the field is multi-valued or not so we assume it is.
+                return true;
+        }
+    }
+
+    /**
+     * Returns the {@link Sort} prefix that is eligible to index sort
+     * optimization and null if index sort is not applicable.
+     */
+    private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException {
+        Sort indexSort = context.reader().getMetaData().getSort();
+        if (indexSort == null) {
+            return null;
+        }
+        List<SortField> sortFields = new ArrayList<>();
+        for (int i = 0; i < indexSort.getSort().length; i++) {
+            CompositeValuesSourceConfig sourceConfig = sourceConfigs[i];
+            SingleDimensionValuesSource<?> source = sources[i];
+            SortField indexSortField = indexSort.getSort()[i];
+            if (source.fieldType == null
+                    // TODO: can we handle missing bucket when using index sort optimization ?
+                    || source.missingBucket
+                    || indexSortField.getField().equals(source.fieldType.name()) == false
+                    || isMaybeMultivalued(context, indexSortField)
+                    || sourceConfig.hasScript()) {
+                break;
+            }
+
+            if (indexSortField.getReverse() != (source.reverseMul == -1)) {
+                if (i == 0) {
+                    // the leading index sort matches the leading source field but the order is reversed
+                    // so we don't check the other sources.
+                    return new Sort(indexSortField);
+                }
+                break;
+            }
+            sortFields.add(indexSortField);
+        }
+        return sortFields.isEmpty() ? null : new Sort(sortFields.toArray(new SortField[0]));
+    }
+
+    /**
+     * Return the number of leading sources that match the index sort.
+     *
+     * @param indexSortPrefix The index sort prefix that matches the sources
+     * @return The length of the index sort prefix if the sort order matches
+     *         or -1 if the leading index sort is in the reverse order of the
+     *         leading source. A value of 0 indicates that the index sort is
+     *         not applicable.
+     */
+    private int computeSortPrefixLen(Sort indexSortPrefix) {
+        if (indexSortPrefix == null) {
+            return 0;
+        }
+        if (indexSortPrefix.getSort()[0].getReverse() != (sources[0].reverseMul == -1)) {
+            assert indexSortPrefix.getSort().length == 1;
+            return -1;
+        } else {
+            return indexSortPrefix.getSort().length;
+        }
+    }
+
+    private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) throws IOException {
+        DocValueFormat[] formats = new DocValueFormat[indexSortPrefix.getSort().length];
+        for (int i = 0; i < formats.length; i++) {
+            formats[i] = sources[i].format;
+        }
+        FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(new SortAndFormats(indexSortPrefix, formats),
+            Arrays.copyOfRange(rawAfterKey.values(), 0, formats.length));
+        if (indexSortPrefix.getSort().length < sources.length) {
+            // include all docs that belong to the partial bucket
+            fieldDoc.doc = 0;
+        }
+        BooleanQuery newQuery = new BooleanQuery.Builder()
+            .add(context.query(), BooleanClause.Occur.MUST)
+            .add(new SearchAfterSortedDocQuery(indexSortPrefix, fieldDoc), BooleanClause.Occur.FILTER)
+            .build();
+        Weight weight = context.searcher().createWeight(context.searcher().rewrite(newQuery), ScoreMode.COMPLETE_NO_SCORES, 1f);
+        Scorer scorer = weight.scorer(ctx);
+        if (scorer != null) {
+            DocIdSetIterator docIt = scorer.iterator();
+            final LeafBucketCollector inner = queue.getLeafCollector(ctx,
+                getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length));
+            inner.setScorer(scorer);
+            while (docIt.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
+                inner.collect(docIt.docID());
+            }
+        }
+    }
+
     @Override
     protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
         finishLeaf();
+
         boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
+
+        Sort indexSortPrefix = buildIndexSortPrefix(ctx);
+        int sortPrefixLen = computeSortPrefixLen(indexSortPrefix);
+
+        SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0  ?
+            sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) : null;
         if (sortedDocsProducer != null) {
-            /*
-              The producer will visit documents sorted by the leading source of the composite definition
-              and terminates when the leading source value is guaranteed to be greater than the lowest
-              composite bucket in the queue.
-             */
+            // Visit documents sorted by the leading source of the composite definition and terminates
+            // when the leading source value is guaranteed to be greater than the lowest composite bucket
+            // in the queue.
             DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet);
             if (fillDocIdSet) {
                 entries.add(new Entry(ctx, docIdSet));
             }
-
-            /*
-              We can bypass search entirely for this segment, all the processing has been done in the previous call.
-              Throwing this exception will terminate the execution of the search for this root aggregation,
-              see {@link org.apache.lucene.search.MultiCollector} for more details on how we handle early termination in aggregations.
-             */
+            // We can bypass search entirely for this segment, the processing is done in the previous call.
+            // Throwing this exception will terminate the execution of the search for this root aggregation,
+            // see {@link MultiCollector} for more details on how we handle early termination in aggregations.
+            earlyTerminated = true;
             throw new CollectionTerminatedException();
         } else {
             if (fillDocIdSet) {
                 currentLeaf = ctx;
                 docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
             }
-            final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder));
-            return new LeafBucketCollector() {
-                @Override
-                public void collect(int doc, long zeroBucket) throws IOException {
-                    assert zeroBucket == 0L;
-                    inner.collect(doc);
-                }
-            };
+            if (rawAfterKey != null && sortPrefixLen > 0) {
+                // We have an after key and index sort is applicable so we jump directly to the doc
+                // that is after the index sort prefix using the rawAfterKey and we start collecting
+                // document from there.
+                processLeafFromQuery(ctx, indexSortPrefix);
+                throw new CollectionTerminatedException();
+            } else {
+                final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen));
+                return new LeafBucketCollector() {
+                    @Override
+                    public void collect(int doc, long zeroBucket) throws IOException {
+                        assert zeroBucket == 0L;
+                        inner.collect(doc);
+                    }
+                };
+            }
         }
     }
 
     /**
      * The first pass selects the top composite buckets from all matching documents.
      */
-    private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder) {
+    private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder, int indexSortPrefix) {
         return new LeafBucketCollector() {
             int lastDoc = -1;
 
             @Override
             public void collect(int doc, long bucket) throws IOException {
-                int slot = queue.addIfCompetitive();
-                if (slot != -1) {
-                    if (builder != null && lastDoc != doc) {
-                        builder.add(doc);
-                        lastDoc = doc;
+                try {
+                    if (queue.addIfCompetitive(indexSortPrefix)) {
+                        if (builder != null && lastDoc != doc) {
+                            builder.add(doc);
+                            lastDoc = doc;
+                        }
                     }
+                } catch (CollectionTerminatedException exc) {
+                    earlyTerminated = true;
+                    throw exc;
                 }
             }
         };
@@ -274,7 +410,6 @@ final class CompositeAggregator extends BucketsAggregator {
 
     private SingleDimensionValuesSource<?> createValuesSource(BigArrays bigArrays, IndexReader reader,
                                                               CompositeValuesSourceConfig config, int size) {
-
         final int reverseMul = config.reverseMul();
         if (config.valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) {
             ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) config.valuesSource();

+ 46 - 14
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.search.aggregations.bucket.composite;
 
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.CollectionTerminatedException;
 import org.apache.lucene.util.PriorityQueue;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
@@ -63,6 +64,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
     private final int maxSize;
     private final Map<Slot, Integer> map;
     private final SingleDimensionValuesSource<?>[] arrays;
+
     private IntArray docCounts;
     private boolean afterKeyIsSet = false;
 
@@ -153,7 +155,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
                 cmp = arrays[i].compare(slot1, slot2);
             }
             if (cmp != 0) {
-                return cmp;
+                return cmp > 0 ? i+1 : -(i+1);
             }
         }
         return 0;
@@ -244,27 +246,57 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
 
     /**
      * Check if the current candidate should be added in the queue.
-     * @return The target slot of the candidate or -1 is the candidate is not competitive.
+     * @return <code>true</code> if the candidate is competitive (added or already in the queue).
+     */
+    boolean addIfCompetitive() {
+        return addIfCompetitive(0);
+    }
+
+
+    /**
+     * Add or update the current composite key in the queue if the values are competitive.
+     *
+     * @param indexSortSourcePrefix 0 if the index sort is null or doesn't match any of the sources field,
+     *                              a value greater than 0 indicates the prefix len of the sources that match the index sort
+     *                              and a negative value indicates that the index sort match the source field but the order is reversed.
+     * @return <code>true</code> if the candidate is competitive (added or already in the queue).
+     *
+     * @throws CollectionTerminatedException if the current collection can be terminated early due to index sorting.
      */
-    int addIfCompetitive() {
+    boolean addIfCompetitive(int indexSortSourcePrefix) {
         // checks if the candidate key is competitive
         Integer topSlot = compareCurrent();
         if (topSlot != null) {
             // this key is already in the top N, skip it
             docCounts.increment(topSlot, 1);
-            return topSlot;
+            return true;
         }
-        if (afterKeyIsSet && compareCurrentWithAfter() <= 0) {
-            // this key is greater than the top value collected in the previous round, skip it
-            return -1;
+        if (afterKeyIsSet) {
+            int cmp = compareCurrentWithAfter();
+            if (cmp <= 0) {
+                if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) {
+                    // the leading index sort is in the reverse order of the leading source
+                    // so we can early terminate when we reach a document that is smaller
+                    // than the after key (collected on a previous page).
+                    throw new CollectionTerminatedException();
+                }
+                // key was collected on a previous page, skip it (>= afterKey).
+                return false;
+            }
         }
-        if (size() >= maxSize
-                // the tree map is full, check if the candidate key should be kept
-                && compare(CANDIDATE_SLOT, top()) > 0) {
-            // the candidate key is not competitive, skip it
-            return -1;
+        if (size() >= maxSize) {
+            // the tree map is full, check if the candidate key should be kept
+            int cmp = compare(CANDIDATE_SLOT, top());
+            if (cmp > 0) {
+                if (cmp <= indexSortSourcePrefix) {
+                    // index sort guarantees that there is no key greater or equal than the
+                    // current one in the subsequent documents so we can early terminate.
+                    throw new CollectionTerminatedException();
+                }
+                // the candidate key is not competitive, skip it.
+                return false;
+            }
         }
-
         // the candidate key is competitive
         final int newSlot;
         if (size() >= maxSize) {
@@ -280,7 +312,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
         copyCurrent(newSlot);
         map.put(new Slot(newSlot), newSlot);
         add(newSlot);
-        return newSlot;
+        return true;
     }
 
     @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java

@@ -200,7 +200,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
     }
 
     /**
-     * If true an explicit `null bucket will represent documents with missing values.
+     * If <code>true</code> an explicit <code>null</code> bucket will represent documents with missing values.
      */
     @SuppressWarnings("unchecked")
     public AB missingBucket(boolean missingBucket) {

+ 13 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java

@@ -33,23 +33,28 @@ class CompositeValuesSourceConfig {
     private final DocValueFormat format;
     private final int reverseMul;
     private final boolean missingBucket;
+    private final boolean hasScript;
 
     /**
      * Creates a new {@link CompositeValuesSourceConfig}.
+     *
      * @param name The name of the source.
      * @param fieldType The field type or null if the source is a script.
      * @param vs The underlying {@link ValuesSource}.
      * @param format The {@link DocValueFormat} of this source.
      * @param order The sort order associated with this source.
+     * @param missingBucket If <code>true</code> an explicit <code>null</code> bucket will represent documents with missing values.
+     * @param hasScript <code>true</code> if the source contains a script that can change the value.
      */
     CompositeValuesSourceConfig(String name, @Nullable MappedFieldType fieldType, ValuesSource vs, DocValueFormat format,
-                                SortOrder order, boolean missingBucket) {
+                                SortOrder order, boolean missingBucket, boolean hasScript) {
         this.name = name;
         this.fieldType = fieldType;
         this.vs = vs;
         this.format = format;
         this.reverseMul = order == SortOrder.ASC ? 1 : -1;
         this.missingBucket = missingBucket;
+        this.hasScript = hasScript;
     }
 
     /**
@@ -88,6 +93,13 @@ class CompositeValuesSourceConfig {
         return missingBucket;
     }
 
+    /**
+     * Returns true if the source contains a script that can change the value.
+     */
+    boolean hasScript() {
+        return hasScript;
+    }
+
     /**
      * The sort order for the values source (e.g. -1 for descending and 1 for ascending).
      */

+ 2 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java

@@ -228,7 +228,8 @@ public class DateHistogramValuesSourceBuilder
             // is specified in the builder.
             final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format();
             final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
-            return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), missingBucket());
+            return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(),
+                missingBucket(), config.script() != null);
         } else {
             throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
         }

+ 2 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java

@@ -113,7 +113,8 @@ public class GeoTileGridValuesSourceBuilder extends CompositeValuesSourceBuilder
             // is specified in the builder.
             final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
             CellIdSource cellIdSource = new CellIdSource(geoPoint, precision, GeoTileUtils::longEncode);
-            return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(), missingBucket());
+            return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(),
+                missingBucket(), script() != null);
         } else {
             throw new IllegalArgumentException("invalid source, expected geo_point, got " + orig.getClass().getSimpleName());
         }

+ 2 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java

@@ -119,7 +119,8 @@ public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<H
             ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig;
             final HistogramValuesSource vs = new HistogramValuesSource(numeric, interval);
             final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
-            return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(), missingBucket());
+            return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(),
+                missingBucket(), script() != null);
         } else {
             throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
         }

+ 20 - 7
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.search.aggregations.bucket.composite;
 
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -53,8 +54,10 @@ public class InternalComposite
     private final List<String> sourceNames;
     private final List<DocValueFormat> formats;
 
+    private final boolean earlyTerminated;
+
     InternalComposite(String name, int size, List<String> sourceNames, List<DocValueFormat> formats,
-                      List<InternalBucket> buckets, CompositeKey afterKey, int[] reverseMuls,
+                      List<InternalBucket> buckets, CompositeKey afterKey, int[] reverseMuls, boolean earlyTerminated,
                       List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
         super(name, pipelineAggregators, metaData);
         this.sourceNames = sourceNames;
@@ -63,6 +66,7 @@ public class InternalComposite
         this.afterKey = afterKey;
         this.size = size;
         this.reverseMuls = reverseMuls;
+        this.earlyTerminated = earlyTerminated;
     }
 
     public InternalComposite(StreamInput in) throws IOException {
@@ -75,7 +79,8 @@ public class InternalComposite
         }
         this.reverseMuls = in.readIntArray();
         this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls));
-        this.afterKey = in.readBoolean() ? new CompositeKey(in) : null;
+        this.afterKey = in.readOptionalWriteable(CompositeKey::new);
+        this.earlyTerminated = in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readBoolean() : false;
     }
 
     @Override
@@ -87,9 +92,9 @@ public class InternalComposite
         }
         out.writeIntArray(reverseMuls);
         out.writeList(buckets);
-        out.writeBoolean(afterKey != null);
-        if (afterKey != null) {
-            afterKey.writeTo(out);
+        out.writeOptionalWriteable(afterKey);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeBoolean(earlyTerminated);
         }
     }
 
@@ -111,7 +116,7 @@ public class InternalComposite
          * to be able to retrieve the next page even if all buckets have been filtered.
          */
         return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey,
-            reverseMuls, pipelineAggregators(), getMetaData());
+            reverseMuls, earlyTerminated, pipelineAggregators(), getMetaData());
     }
 
     @Override
@@ -137,6 +142,11 @@ public class InternalComposite
         return null;
     }
 
+    // Visible for tests
+    boolean isTerminatedEarly() {
+        return earlyTerminated;
+    }
+
     // Visible for tests
     int[] getReverseMuls() {
         return reverseMuls;
@@ -145,8 +155,10 @@ public class InternalComposite
     @Override
     public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
         PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
+        boolean earlyTerminated = false;
         for (InternalAggregation agg : aggregations) {
             InternalComposite sortedAgg = (InternalComposite) agg;
+            earlyTerminated |= sortedAgg.earlyTerminated;
             BucketIterator it = new BucketIterator(sortedAgg.buckets);
             if (it.next() != null) {
                 pq.add(it);
@@ -178,7 +190,8 @@ public class InternalComposite
             result.add(reduceBucket);
         }
         final CompositeKey lastKey = result.size() > 0 ? result.get(result.size()-1).getRawKey() : null;
-        return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, pipelineAggregators(), metaData);
+        return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls,
+            earlyTerminated, pipelineAggregators(), metaData);
     }
 
     @Override

+ 1 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java

@@ -66,8 +66,7 @@ abstract class SortedDocsProducer {
             @Override
             public void collect(int doc, long bucket) throws IOException {
                 hasCollected[0] = true;
-                int slot = queue.addIfCompetitive();
-                if (slot != -1) {
+                if (queue.addIfCompetitive()) {
                     topCompositeCollected[0]++;
                     if (adder != null && doc != lastDoc) {
                         if (remainingBits == 0) {

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java

@@ -85,6 +85,6 @@ public class TermsValuesSourceBuilder extends CompositeValuesSourceBuilder<Terms
         } else {
             format = config.format();
         }
-        return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missingBucket());
+        return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missingBucket(), script() != null);
     }
 }

+ 5 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.query.QueryRewriteContext;
 import org.elasticsearch.index.query.QueryShardContext;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
@@ -385,4 +386,8 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
         return NAME;
     }
 
+    @Override
+    protected AggregationBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException {
+        return super.doRewrite(queryShardContext);
+    }
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java

@@ -184,7 +184,8 @@ public class SearchAfterBuilder implements ToXContentObject, Writeable {
                     if (value instanceof Number) {
                         return ((Number) value).longValue();
                     }
-                    return Long.parseLong(value.toString());
+                    return format.parseLong(value.toString(), false,
+                        () -> { throw new IllegalStateException("now() is not allowed in [search_after] key"); });
 
                 case FLOAT:
                     if (value instanceof Number) {

+ 182 - 40
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.search.aggregations.bucket.composite;
 
+import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.DoublePoint;
 import org.apache.lucene.document.Field;
@@ -31,17 +32,28 @@ import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
 import org.apache.lucene.search.DocValuesFieldExistsQuery;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.SortedNumericSortField;
+import org.apache.lucene.search.SortedSetSortField;
+import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.NumericUtils;
+import org.apache.lucene.util.TestUtil;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.time.DateFormatters;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.mapper.ContentPath;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.GeoPointFieldMapper;
@@ -63,6 +75,7 @@ import org.elasticsearch.search.aggregations.metrics.TopHits;
 import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
 import org.elasticsearch.search.aggregations.support.ValueType;
 import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.IndexSettingsModule;
 import org.junit.After;
 import org.junit.Before;
 
@@ -82,12 +95,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 
-public class CompositeAggregatorTests extends AggregatorTestCase {
+public class CompositeAggregatorTests  extends AggregatorTestCase {
     private static MappedFieldType[] FIELD_TYPES;
 
     @Override
@@ -109,6 +123,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
 
         DateFieldMapper.Builder builder = new DateFieldMapper.Builder("date");
         builder.docValues(true);
+        builder.format("yyyy-MM-dd||epoch_millis");
         DateFieldMapper fieldMapper =
             builder.build(new Mapper.BuilderContext(createIndexSettings().getSettings(), new ContentPath(0)));
         FIELD_TYPES[3] = fieldMapper.fieldType();
@@ -419,7 +434,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
         );
     }
 
-   public void testWithKeywordDesc() throws Exception {
+    public void testWithKeywordDesc() throws Exception {
         final List<Map<String, List<Object>>> dataset = new ArrayList<>();
         dataset.addAll(
             Arrays.asList(
@@ -485,19 +500,19 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
                 return new CompositeAggregationBuilder("name", Collections.singletonList(terms));
 
             }, (result) -> {
-                    assertEquals(5, result.getBuckets().size());
-                    assertEquals("{keyword=z}", result.afterKey().toString());
-                    assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString());
-                    assertEquals(2L, result.getBuckets().get(0).getDocCount());
-                    assertEquals("{keyword=b}", result.getBuckets().get(1).getKeyAsString());
-                    assertEquals(2L, result.getBuckets().get(1).getDocCount());
-                    assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString());
-                    assertEquals(1L, result.getBuckets().get(2).getDocCount());
-                    assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString());
-                    assertEquals(1L, result.getBuckets().get(3).getDocCount());
-                    assertEquals("{keyword=z}", result.getBuckets().get(4).getKeyAsString());
-                    assertEquals(1L, result.getBuckets().get(4).getDocCount());
-                }
+                assertEquals(5, result.getBuckets().size());
+                assertEquals("{keyword=z}", result.afterKey().toString());
+                assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(0).getDocCount());
+                assertEquals("{keyword=b}", result.getBuckets().get(1).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(1).getDocCount());
+                assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(2).getDocCount());
+                assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(3).getDocCount());
+                assertEquals("{keyword=z}", result.getBuckets().get(4).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(4).getDocCount());
+            }
         );
 
         testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
@@ -589,10 +604,10 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
         );
         testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
             () -> new CompositeAggregationBuilder("name",
-                    Arrays.asList(
-                        new TermsValuesSourceBuilder("keyword").field("keyword"),
-                        new TermsValuesSourceBuilder("long").field("long")
-                    )
+                Arrays.asList(
+                    new TermsValuesSourceBuilder("keyword").field("keyword"),
+                    new TermsValuesSourceBuilder("long").field("long")
+                )
             ),
             (result) -> {
                 assertEquals(4, result.getBuckets().size());
@@ -610,11 +625,11 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
 
         testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
             () -> new CompositeAggregationBuilder("name",
-                    Arrays.asList(
-                        new TermsValuesSourceBuilder("keyword").field("keyword"),
-                        new TermsValuesSourceBuilder("long").field("long")
-                    )
-                ).aggregateAfter(createAfterKey("keyword", "a", "long", 100L)
+                Arrays.asList(
+                    new TermsValuesSourceBuilder("keyword").field("keyword"),
+                    new TermsValuesSourceBuilder("long").field("long")
+                )
+            ).aggregateAfter(createAfterKey("keyword", "a", "long", 100L)
             ),
             (result) -> {
                 assertEquals(2, result.getBuckets().size());
@@ -942,7 +957,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
                         new TermsValuesSourceBuilder("double").field("double")
                     )
                 ).aggregateAfter(createAfterKey("keyword", "a", "long", 100L, "double", 0.4d))
-            ,(result) -> {
+            , (result) -> {
                 assertEquals(10, result.getBuckets().size());
                 assertEquals("{keyword=z, long=0, double=0.09}", result.afterKey().toString());
                 assertEquals("{keyword=b, long=100, double=0.4}", result.getBuckets().get(0).getKeyAsString());
@@ -1152,8 +1167,9 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
                     return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
                         .aggregateAfter(createAfterKey("date", "now"));
                 },
-                (result) -> {}
-        ));
+                (result) -> {
+                }
+            ));
         assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
         assertThat(exc.getCause().getMessage(), containsString("now() is not supported in [after] key"));
 
@@ -1167,7 +1183,8 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
                     return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
                         .aggregateAfter(createAfterKey("date", "1474329600000"));
                 },
-                (result) -> {}
+                (result) -> {
+                }
             ));
         assertThat(exc.getMessage(), containsString("failed to parse date field [1474329600000]"));
         assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
@@ -1486,7 +1503,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
                         new DateHistogramValuesSourceBuilder("date_histo").field("date")
                             .dateHistogramInterval(DateHistogramInterval.days(1))
                     )
-                ).aggregateAfter(createAfterKey("keyword","c", "date_histo", 1474329600000L))
+                ).aggregateAfter(createAfterKey("keyword", "c", "date_histo", 1474329600000L))
             , (result) -> {
                 assertEquals(4, result.getBuckets().size());
                 assertEquals("{keyword=z, date_histo=1474329600000}", result.afterKey().toString());
@@ -1668,7 +1685,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
             builders.add(new TermsValuesSourceBuilder("duplicate1").field("baz"));
             builders.add(new TermsValuesSourceBuilder("duplicate2").field("bar"));
             builders.add(new TermsValuesSourceBuilder("duplicate2").field("baz"));
-           new CompositeAggregationBuilder("foo", builders);
+            new CompositeAggregationBuilder("foo", builders);
         });
         assertThat(e.getMessage(), equalTo("Composite source names must be unique, found duplicates: [duplicate2, duplicate1]"));
     }
@@ -1705,7 +1722,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
         List<Map<String, List<Object>>> dataset = new ArrayList<>();
 
         Set<T> valuesSet = new HashSet<>();
-        Map<Comparable<?>, AtomicLong> expectedDocCounts = new HashMap<> ();
+        Map<Comparable<?>, AtomicLong> expectedDocCounts = new HashMap<>();
         for (int i = 0; i < numDocs; i++) {
             int numValues = randomIntBetween(1, 5);
             Set<Object> values = new HashSet<>();
@@ -1725,13 +1742,13 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
 
         List<Comparable<T>> seen = new ArrayList<>();
         AtomicBoolean finish = new AtomicBoolean(false);
-        int size = randomIntBetween(1,  expected.size());
+        int size = randomIntBetween(1, expected.size());
         while (finish.get() == false) {
             testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(field)), dataset,
                 () -> {
                     Map<String, Object> afterKey = null;
                     if (seen.size() > 0) {
-                        afterKey = Collections.singletonMap(field, seen.get(seen.size()-1));
+                        afterKey = Collections.singletonMap(field, seen.get(seen.size() - 1));
                     }
                     TermsValuesSourceBuilder source = new TermsValuesSourceBuilder(field).field(field);
                     return new CompositeAggregationBuilder("name", Collections.singletonList(source))
@@ -1838,44 +1855,130 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
         );
     }
 
+    public void testEarlyTermination() throws Exception {
+        final List<Map<String, List<Object>>> dataset = new ArrayList<>();
+        dataset.addAll(
+            Arrays.asList(
+                createDocument("keyword", "a", "long", 100L, "foo", "bar"),
+                createDocument("keyword", "c", "long", 100L, "foo", "bar"),
+                createDocument("keyword", "a", "long", 0L, "foo", "bar"),
+                createDocument("keyword", "d", "long", 10L, "foo", "bar"),
+                createDocument("keyword", "b", "long", 10L, "foo", "bar"),
+                createDocument("keyword", "c", "long", 10L, "foo", "bar"),
+                createDocument("keyword", "e", "long", 100L, "foo", "bar"),
+                createDocument("keyword", "e", "long", 10L, "foo", "bar")
+            )
+        );
+
+        executeTestCase(true, false, new TermQuery(new Term("foo", "bar")),
+            dataset,
+            () ->
+                new CompositeAggregationBuilder("name",
+                    Arrays.asList(
+                        new TermsValuesSourceBuilder("keyword").field("keyword"),
+                        new TermsValuesSourceBuilder("long").field("long")
+                    )).aggregateAfter(createAfterKey("keyword", "b", "long", 10L)).size(2),
+            (result) -> {
+                assertEquals(2, result.getBuckets().size());
+                assertEquals("{keyword=c, long=100}", result.afterKey().toString());
+                assertEquals("{keyword=c, long=10}", result.getBuckets().get(0).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(0).getDocCount());
+                assertEquals("{keyword=c, long=100}", result.getBuckets().get(1).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(1).getDocCount());
+                assertTrue(result.isTerminatedEarly());
+            }
+        );
+
+        // source field and index sorting config have different order
+        executeTestCase(true, false, new TermQuery(new Term("foo", "bar")),
+            dataset,
+            () ->
+                new CompositeAggregationBuilder("name",
+                    Arrays.asList(
+                        // reverse source order
+                        new TermsValuesSourceBuilder("keyword").field("keyword").order(SortOrder.DESC),
+                        new TermsValuesSourceBuilder("long").field("long").order(SortOrder.DESC)
+                    )
+                ).aggregateAfter(createAfterKey("keyword", "c", "long", 10L)).size(2),
+            (result) -> {
+                assertEquals(2, result.getBuckets().size());
+                assertEquals("{keyword=a, long=100}", result.afterKey().toString());
+                assertEquals("{keyword=b, long=10}", result.getBuckets().get(0).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(0).getDocCount());
+                assertEquals("{keyword=a, long=100}", result.getBuckets().get(1).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(1).getDocCount());
+                assertTrue(result.isTerminatedEarly());
+            }
+        );
+    }
+
     private void testSearchCase(List<Query> queries,
                                 List<Map<String, List<Object>>> dataset,
                                 Supplier<CompositeAggregationBuilder> create,
                                 Consumer<InternalComposite> verify) throws IOException {
         for (Query query : queries) {
-            executeTestCase(false, query, dataset, create, verify);
-            executeTestCase(true, query, dataset, create, verify);
+            executeTestCase(false, false, query, dataset, create, verify);
+            executeTestCase(false, true, query, dataset, create, verify);
+            executeTestCase(true, true, query, dataset, create, verify);
         }
     }
 
-    private void executeTestCase(boolean reduced,
+    private void executeTestCase(boolean useIndexSort,
+                                 boolean reduced,
                                  Query query,
                                  List<Map<String, List<Object>>> dataset,
                                  Supplier<CompositeAggregationBuilder> create,
                                  Consumer<InternalComposite> verify) throws IOException {
+        Map<String, MappedFieldType> types =
+            Arrays.stream(FIELD_TYPES).collect(Collectors.toMap(MappedFieldType::name,  Function.identity()));
+        CompositeAggregationBuilder aggregationBuilder = create.get();
+        Sort indexSort = useIndexSort ? buildIndexSort(aggregationBuilder.sources(), types) : null;
+        IndexSettings indexSettings = createIndexSettings(indexSort);
         try (Directory directory = newDirectory()) {
-            try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
+            IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random()));
+            if (indexSort != null) {
+                config.setIndexSort(indexSort);
+                config.setCodec(TestUtil.getDefaultCodec());
+            }
+            try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) {
                 Document document = new Document();
                 for (Map<String, List<Object>> fields : dataset) {
                     addToDocument(document, fields);
                     indexWriter.addDocument(document);
                     document.clear();
                 }
+                if (reduced == false && randomBoolean()) {
+                    indexWriter.forceMerge(1);
+                }
             }
             try (IndexReader indexReader = DirectoryReader.open(directory)) {
                 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
-                CompositeAggregationBuilder aggregationBuilder = create.get();
                 final InternalComposite composite;
                 if (reduced) {
-                    composite = searchAndReduce(indexSearcher, query, aggregationBuilder, FIELD_TYPES);
+                    composite = searchAndReduce(indexSettings, indexSearcher, query, aggregationBuilder, FIELD_TYPES);
                 } else {
-                    composite = search(indexSearcher, query, aggregationBuilder, FIELD_TYPES);
+                    composite = search(indexSettings, indexSearcher, query, aggregationBuilder, FIELD_TYPES);
                 }
                 verify.accept(composite);
             }
         }
     }
 
+    private static IndexSettings createIndexSettings(Sort sort) {
+        Settings.Builder builder = Settings.builder();
+        if (sort != null) {
+            String[] fields = Arrays.stream(sort.getSort())
+                .map(SortField::getField)
+                .toArray(String[]::new);
+            String[] orders = Arrays.stream(sort.getSort())
+                .map((o) -> o.getReverse() ? "desc" : "asc")
+                .toArray(String[]::new);
+            builder.putList("index.sort.field", fields);
+            builder.putList("index.sort.order", orders);
+        }
+        return IndexSettingsModule.newIndexSettings(new Index("_index", "0"), builder.build());
+    }
+
     private void addToDocument(Document doc, Map<String, List<Object>> keys) {
         for (Map.Entry<String, List<Object>> entry : keys.entrySet()) {
             final String name = entry.getKey();
@@ -1935,4 +2038,43 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
     private static long asLong(String dateTime) {
         return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli();
     }
+
+    private static Sort buildIndexSort(List<CompositeValuesSourceBuilder<?>> sources, Map<String, MappedFieldType> fieldTypes) {
+        List<SortField> sortFields = new ArrayList<>();
+        for (CompositeValuesSourceBuilder<?> source : sources) {
+            MappedFieldType type = fieldTypes.get(source.field());
+            if (type instanceof KeywordFieldMapper.KeywordFieldType) {
+                sortFields.add(new SortedSetSortField(type.name(), false));
+            } else if (type instanceof DateFieldMapper.DateFieldType) {
+                sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.LONG, false));
+            } else if (type instanceof NumberFieldMapper.NumberFieldType) {
+                boolean comp = false;
+                switch (type.typeName()) {
+                    case "byte":
+                    case "short":
+                    case "integer":
+                        comp = true;
+                        sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.INT, false));
+                        break;
+
+                    case "long":
+                        sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.LONG, false));
+                        break;
+
+                    case "float":
+                    case "double":
+                        comp = true;
+                        sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.DOUBLE, false));
+                        break;
+
+                    default:
+                        break;
+                }
+                if (comp == false) {
+                    break;
+                }
+            }
+        }
+        return sortFields.size() > 0 ? new Sort(sortFields.toArray(new SortField[0])) : null;
+    }
 }

+ 52 - 21
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java

@@ -29,10 +29,16 @@ import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.DocValues;
 import org.apache.lucene.index.IndexOptions;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.search.CollectionTerminatedException;
 import org.apache.lucene.search.DocIdSet;
 import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.SortedNumericSortField;
+import org.apache.lucene.search.SortedSetSortField;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
@@ -56,6 +62,7 @@ import java.util.Set;
 import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE;
 import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.LONG;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 
 public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
     static class ClassAndName {
@@ -133,31 +140,47 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
     }
 
     private void testRandomCase(ClassAndName... types) throws IOException {
-        testRandomCase(true, true, types);
-        testRandomCase(true, false, types);
-        testRandomCase(false, true, types);
-        testRandomCase(false, false, types);
+        for (int i = 0; i < types.length; i++) {
+            testRandomCase(true, true, i, types);
+            testRandomCase(true, false, i, types);
+            testRandomCase(false, true, i, types);
+            testRandomCase(false, false, i, types);
+        }
     }
 
-    private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndName... types) throws IOException {
+    private void testRandomCase(boolean forceMerge,
+                                boolean missingBucket,
+                                int indexSortSourcePrefix,
+                                ClassAndName... types) throws IOException {
         final BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
         int numDocs = randomIntBetween(50, 100);
         List<Comparable<?>[]> possibleValues = new ArrayList<>();
-        for (ClassAndName type : types) {
+        SortField[] indexSortFields = indexSortSourcePrefix == 0 ? null : new SortField[indexSortSourcePrefix];
+        for (int i = 0; i < types.length; i++) {
+            ClassAndName type = types[i];
             final Comparable<?>[] values;
             int numValues = randomIntBetween(1, numDocs * 2);
             values = new Comparable[numValues];
             if (type.clazz == Long.class) {
-                for (int i = 0; i < numValues; i++) {
-                    values[i] = randomLong();
+                if (i < indexSortSourcePrefix) {
+                    indexSortFields[i] = new SortedNumericSortField(type.fieldType.name(), SortField.Type.LONG);
+                }
+                for (int j = 0; j < numValues; j++) {
+                    values[j] = randomLong();
                 }
             } else if (type.clazz == Double.class) {
-                for (int i = 0; i < numValues; i++) {
-                    values[i] = randomDouble();
+                if (i < indexSortSourcePrefix) {
+                    indexSortFields[i] = new SortedNumericSortField(type.fieldType.name(), SortField.Type.DOUBLE);
+                }
+                for (int j = 0; j < numValues; j++) {
+                    values[j] = randomDouble();
                 }
             } else if (type.clazz == BytesRef.class) {
-                for (int i = 0; i < numValues; i++) {
-                    values[i] = new BytesRef(randomAlphaOfLengthBetween(5, 50));
+                if (i < indexSortSourcePrefix) {
+                    indexSortFields[i] = new SortedSetSortField(type.fieldType.name(), false);
+                }
+                for (int j = 0; j < numValues; j++) {
+                    values[j] = new BytesRef(randomAlphaOfLengthBetween(5, 50));
                 }
             } else {
                 assert (false);
@@ -167,13 +190,17 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
 
         Set<CompositeKey> keys = new HashSet<>();
         try (Directory directory = newDirectory()) {
+            final IndexWriterConfig writerConfig = newIndexWriterConfig();
+            if (indexSortFields != null) {
+                writerConfig.setIndexSort(new Sort(indexSortFields));
+            }
             try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, new KeywordAnalyzer())) {
                 for (int i = 0; i < numDocs; i++) {
                     Document document = new Document();
                     List<List<Comparable<?>>> docValues = new ArrayList<>();
                     boolean hasAllField = true;
                     for (int j = 0; j < types.length; j++) {
-                        int numValues = randomIntBetween(0, 5);
+                        int numValues = indexSortSourcePrefix-1 >= j ? 1 : randomIntBetween(0, 5);
                         List<Comparable<?>> values = new ArrayList<>();
                         if (numValues == 0) {
                             hasAllField = false;
@@ -212,7 +239,7 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
                 }
             }
             IndexReader reader = DirectoryReader.open(directory);
-            int size = randomIntBetween(1, keys.size());
+            int size = keys.size() > 1 ? randomIntBetween(1, keys.size()) : 1;
             SingleDimensionValuesSource<?>[] sources = new SingleDimensionValuesSource[types.length];
             for (int i = 0; i < types.length; i++) {
                 final MappedFieldType fieldType = types[i].fieldType;
@@ -276,21 +303,25 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
                         new CompositeValuesCollectorQueue(BigArrays.NON_RECYCLING_INSTANCE, sources, size, last);
                     final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery());
                     for (LeafReaderContext leafReaderContext : reader.leaves()) {
-                        final LeafBucketCollector leafCollector = new LeafBucketCollector() {
-                            @Override
-                            public void collect(int doc, long bucket) throws IOException {
-                                queue.addIfCompetitive();
-                            }
-                        };
                         if (docsProducer != null && withProducer) {
                             assertEquals(DocIdSet.EMPTY,
                                 docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false));
                         } else {
+                            final LeafBucketCollector leafCollector = new LeafBucketCollector() {
+                                @Override
+                                public void collect(int doc, long bucket) throws IOException {
+                                    queue.addIfCompetitive(indexSortSourcePrefix);
+                                }
+                            };
                             final LeafBucketCollector queueCollector = queue.getLeafCollector(leafReaderContext, leafCollector);
                             final Bits liveDocs = leafReaderContext.reader().getLiveDocs();
                             for (int i = 0; i < leafReaderContext.reader().maxDoc(); i++) {
                                 if (liveDocs == null || liveDocs.get(i)) {
-                                    queueCollector.collect(i);
+                                    try {
+                                        queueCollector.collect(i);
+                                    } catch (CollectionTerminatedException exc) {
+                                        assertThat(indexSortSourcePrefix, greaterThan(0));
+                                    }
                                 }
                             }
                         }

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java

@@ -170,7 +170,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
         }
         Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2));
         CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size()-1).getRawKey() : null;
-        return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls,
+        return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, randomBoolean(),
             Collections.emptyList(), metaData);
     }
 
@@ -207,7 +207,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
         }
         CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size()-1).getRawKey() : null;
         return new InternalComposite(instance.getName(), instance.getSize(), sourceNames, formats, buckets, lastBucket, reverseMuls,
-            instance.pipelineAggregators(), metaData);
+            randomBoolean(), instance.pipelineAggregators(), metaData);
     }
 
     @Override

+ 39 - 5
test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

@@ -306,7 +306,15 @@ public abstract class AggregatorTestCase extends ESTestCase {
                                                                              Query query,
                                                                              AggregationBuilder builder,
                                                                              MappedFieldType... fieldTypes) throws IOException {
-        return search(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
+        return search(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
+    }
+
+    protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSettings indexSettings,
+                                                                             IndexSearcher searcher,
+                                                                             Query query,
+                                                                             AggregationBuilder builder,
+                                                                             MappedFieldType... fieldTypes) throws IOException {
+        return search(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
     }
 
     protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSearcher searcher,
@@ -314,8 +322,17 @@ public abstract class AggregatorTestCase extends ESTestCase {
                                                                              AggregationBuilder builder,
                                                                              int maxBucket,
                                                                              MappedFieldType... fieldTypes) throws IOException {
+        return search(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes);
+    }
+
+    protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSettings indexSettings,
+                                                                             IndexSearcher searcher,
+                                                                             Query query,
+                                                                             AggregationBuilder builder,
+                                                                             int maxBucket,
+                                                                             MappedFieldType... fieldTypes) throws IOException {
         MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket);
-        C a = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes);
+        C a = createAggregator(query, builder, searcher, indexSettings, bucketConsumer, fieldTypes);
         a.preCollection();
         searcher.search(query, a);
         a.postCollection();
@@ -329,7 +346,23 @@ public abstract class AggregatorTestCase extends ESTestCase {
                                                                                       Query query,
                                                                                       AggregationBuilder builder,
                                                                                       MappedFieldType... fieldTypes) throws IOException {
-        return searchAndReduce(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
+        return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
+    }
+
+    protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSettings indexSettings,
+                                                                                      IndexSearcher searcher,
+                                                                                      Query query,
+                                                                                      AggregationBuilder builder,
+                                                                                      MappedFieldType... fieldTypes) throws IOException {
+        return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
+    }
+
+    protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSearcher searcher,
+                                                                                      Query query,
+                                                                                      AggregationBuilder builder,
+                                                                                      int maxBucket,
+                                                                                      MappedFieldType... fieldTypes) throws IOException {
+        return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes);
     }
 
     /**
@@ -337,7 +370,8 @@ public abstract class AggregatorTestCase extends ESTestCase {
      * builds an aggregator for each sub-searcher filtered by the provided {@link Query} and
      * returns the reduced {@link InternalAggregation}.
      */
-    protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSearcher searcher,
+    protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSettings indexSettings,
+                                                                                      IndexSearcher searcher,
                                                                                       Query query,
                                                                                       AggregationBuilder builder,
                                                                                       int maxBucket,
@@ -366,7 +400,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
 
         for (ShardSearcher subSearcher : subSearchers) {
             MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket);
-            C a = createAggregator(query, builder, subSearcher, shardBucketConsumer, fieldTypes);
+            C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes);
             a.preCollection();
             subSearcher.search(weight, a);
             a.postCollection();