Browse Source

Improve reduction of terms aggregations (#61779)

* Improve reduction of terms aggregations

Today, the terms aggregation reduces multiple aggregations at once using a map
to group same buckets together. This operation can be costly since it requires
to lookup every bucket in a global map with no particular order.
This commit changes how term buckets are sorted by shards and partial reduces in
order to be able to reduce results using a merge-sort strategy.
For bwc, results are merged with the legacy code if any of the aggregations use
a different sort (if it was returned by a node in prior versions).

Relates #51857
Jim Ferenczi 5 years ago
parent
commit
49ae2bb56a
24 changed files with 421 additions and 226 deletions
  1. 53 0
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/IteratorAndCurrent.java
  2. 12 24
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java
  3. 12 25
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java
  4. 12 25
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java
  5. 13 25
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java
  6. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java
  7. 5 5
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java
  8. 14 4
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java
  9. 2 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedTerms.java
  10. 175 50
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java
  11. 6 6
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java
  12. 13 4
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java
  13. 23 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java
  14. 5 5
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java
  15. 3 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java
  16. 2 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java
  17. 5 7
      server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java
  18. 2 2
      server/src/test/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregationTests.java
  19. 6 2
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsTests.java
  20. 6 2
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsTests.java
  21. 6 2
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java
  22. 39 23
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java
  23. 2 3
      server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java
  24. 4 4
      x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java

+ 53 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/IteratorAndCurrent.java

@@ -0,0 +1,53 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket;
+
+import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+
+import java.util.Iterator;
+
+public class IteratorAndCurrent<B extends InternalMultiBucketAggregation.InternalBucket> implements Iterator<B> {
+    private final Iterator<B> iterator;
+    private B current;
+
+    public IteratorAndCurrent(Iterator<B> iterator) {
+        this.iterator = iterator;
+        this.current = iterator.next();
+    }
+
+    public Iterator<B> getIterator() {
+        return iterator;
+    }
+
+    public B current() {
+        return current;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public B next() {
+        return current = iterator.next();
+    }
+}
+

+ 12 - 24
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

@@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.KeyComparable;
+import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
 
@@ -38,7 +39,6 @@ import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -270,17 +270,6 @@ public final class InternalAutoDateHistogram extends
         return new Bucket(prototype.key, prototype.docCount, prototype.format, aggregations);
     }
 
-    private static class IteratorAndCurrent {
-
-        private final Iterator<Bucket> iterator;
-        private Bucket current;
-
-        IteratorAndCurrent(Iterator<Bucket> iterator) {
-            this.iterator = iterator;
-            current = iterator.next();
-        }
-    }
-
     /**
      * This method works almost exactly the same as
      * InternalDateHistogram#reduceBuckets(List, ReduceContext), the different
@@ -305,10 +294,10 @@ public final class InternalAutoDateHistogram extends
         }
         Rounding.Prepared reduceRounding = prepare(reduceRoundingIdx, min, max);
 
-        final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
+        final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
             @Override
-            protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
-                return a.current.key < b.current.key;
+            protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
+                return a.current().key < b.current().key;
             }
         };
         for (InternalAggregation aggregation : aggregations) {
@@ -322,25 +311,24 @@ public final class InternalAutoDateHistogram extends
         if (pq.size() > 0) {
             // list of buckets coming from different shards that have the same key
             List<Bucket> currentBuckets = new ArrayList<>();
-            long key = reduceRounding.round(pq.top().current.key);
+            long key = reduceRounding.round(pq.top().current().key);
 
             do {
-                final IteratorAndCurrent top = pq.top();
+                final IteratorAndCurrent<Bucket> top = pq.top();
 
-                if (reduceRounding.round(top.current.key) != key) {
+                if (reduceRounding.round(top.current().key) != key) {
                     // the key changes, reduce what we already buffered and reset the buffer for current buckets
                     final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
                     reducedBuckets.add(reduced);
                     currentBuckets.clear();
-                    key = reduceRounding.round(top.current.key);
+                    key = reduceRounding.round(top.current().key);
                 }
 
-                currentBuckets.add(top.current);
+                currentBuckets.add(top.current());
 
-                if (top.iterator.hasNext()) {
-                    final Bucket next = top.iterator.next();
-                    assert next.key > top.current.key : "shards must return data sorted by key";
-                    top.current = next;
+                if (top.hasNext()) {
+                    top.next();
+                    assert top.current().key > key: "shards must return data sorted by key";
                     pq.updateTop();
                 } else {
                     pq.pop();

+ 12 - 25
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

@@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.InternalOrder;
 import org.elasticsearch.search.aggregations.KeyComparable;
+import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 
 import java.io.IOException;
@@ -39,7 +40,6 @@ import java.time.Instant;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -289,24 +289,12 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
         return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
     }
 
-    private static class IteratorAndCurrent {
-
-        private final Iterator<Bucket> iterator;
-        private Bucket current;
-
-        IteratorAndCurrent(Iterator<Bucket> iterator) {
-            this.iterator = iterator;
-            current = iterator.next();
-        }
-
-    }
-
     private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
 
-        final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
+        final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
             @Override
-            protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
-                return a.current.key < b.current.key;
+            protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
+                return a.current().key < b.current().key;
             }
         };
         for (InternalAggregation aggregation : aggregations) {
@@ -320,27 +308,26 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
         if (pq.size() > 0) {
             // list of buckets coming from different shards that have the same key
             List<Bucket> currentBuckets = new ArrayList<>();
-            double key = pq.top().current.key;
+            double key = pq.top().current().key;
 
             do {
-                final IteratorAndCurrent top = pq.top();
+                final IteratorAndCurrent<Bucket> top = pq.top();
 
-                if (top.current.key != key) {
+                if (top.current().key != key) {
                     // the key changes, reduce what we already buffered and reset the buffer for current buckets
                     final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
                     if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
                         reducedBuckets.add(reduced);
                     }
                     currentBuckets.clear();
-                    key = top.current.key;
+                    key = top.current().key;
                 }
 
-                currentBuckets.add(top.current);
+                currentBuckets.add(top.current());
 
-                if (top.iterator.hasNext()) {
-                    final Bucket next = top.iterator.next();
-                    assert next.key > top.current.key : "shards must return data sorted by key";
-                    top.current = next;
+                if (top.hasNext()) {
+                    top.next();
+                    assert top.current().key > key : "shards must return data sorted by key";
                     pq.updateTop();
                 } else {
                     pq.pop();

+ 12 - 25
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

@@ -31,12 +31,12 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.InternalOrder;
 import org.elasticsearch.search.aggregations.KeyComparable;
+import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -279,24 +279,12 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
         return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
     }
 
-    private static class IteratorAndCurrent {
-
-        private final Iterator<Bucket> iterator;
-        private Bucket current;
-
-        IteratorAndCurrent(Iterator<Bucket> iterator) {
-            this.iterator = iterator;
-            current = iterator.next();
-        }
-
-    }
-
     private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
 
-        final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
+        final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
             @Override
-            protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
-                return Double.compare(a.current.key, b.current.key) < 0;
+            protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
+                return Double.compare(a.current().key, b.current().key) < 0;
             }
         };
         for (InternalAggregation aggregation : aggregations) {
@@ -310,12 +298,12 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
         if (pq.size() > 0) {
             // list of buckets coming from different shards that have the same key
             List<Bucket> currentBuckets = new ArrayList<>();
-            double key = pq.top().current.key;
+            double key = pq.top().current().key;
 
             do {
-                final IteratorAndCurrent top = pq.top();
+                final IteratorAndCurrent<Bucket> top = pq.top();
 
-                if (Double.compare(top.current.key, key) != 0) {
+                if (Double.compare(top.current().key, key) != 0) {
                     // The key changes, reduce what we already buffered and reset the buffer for current buckets.
                     // Using Double.compare instead of != to handle NaN correctly.
                     final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
@@ -323,15 +311,14 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
                         reducedBuckets.add(reduced);
                     }
                     currentBuckets.clear();
-                    key = top.current.key;
+                    key = top.current().key;
                 }
 
-                currentBuckets.add(top.current);
+                currentBuckets.add(top.current());
 
-                if (top.iterator.hasNext()) {
-                    final Bucket next = top.iterator.next();
-                    assert Double.compare(next.key, top.current.key) > 0 : "shards must return data sorted by key";
-                    top.current = next;
+                if (top.hasNext()) {
+                    top.next();
+                    assert Double.compare(top.current().key, key) > 0 : "shards must return data sorted by key";
                     pq.updateTop();
                 } else {
                     pq.pop();

+ 13 - 25
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java

@@ -29,13 +29,13 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.KeyComparable;
+import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -317,18 +317,6 @@ public class InternalVariableWidthHistogram
 =    */
     private double nextKey(double key){ return key + 1; }
 
-    private static class IteratorAndCurrent {
-
-        private final Iterator<Bucket> iterator;
-        private Bucket current;
-
-        IteratorAndCurrent(Iterator<Bucket> iterator) {
-            this.iterator = iterator;
-            current = iterator.next();
-        }
-
-    }
-
     @Override
     protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
         List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
@@ -350,10 +338,10 @@ public class InternalVariableWidthHistogram
     }
 
     public List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
-        PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
+        PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
             @Override
-            protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
-                return Double.compare(a.current.centroid, b.current.centroid) < 0;
+            protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
+                return Double.compare(a.current().centroid, b.current().centroid) < 0;
             }
         };
         for (InternalAggregation aggregation : aggregations) {
@@ -365,27 +353,27 @@ public class InternalVariableWidthHistogram
 
         List<Bucket> reducedBuckets = new ArrayList<>();
         if(pq.size() > 0) {
-            double key = pq.top().current.centroid();
+            double key = pq.top().current().centroid();
             // list of buckets coming from different shards that have the same key
             List<Bucket> currentBuckets = new ArrayList<>();
             do {
-                IteratorAndCurrent top = pq.top();
+                IteratorAndCurrent<Bucket> top = pq.top();
 
-                if (Double.compare(top.current.centroid(), key) != 0) {
+                if (Double.compare(top.current().centroid(), key) != 0) {
                     // The key changes, reduce what we already buffered and reset the buffer for current buckets.
                     final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
                     reduceContext.consumeBucketsAndMaybeBreak(1);
                     reducedBuckets.add(reduced);
                     currentBuckets.clear();
-                    key = top.current.centroid();
+                    key = top.current().centroid();
                 }
 
-                currentBuckets.add(top.current);
+                currentBuckets.add(top.current());
 
-                if (top.iterator.hasNext()) {
-                    Bucket next = top.iterator.next();
-                    assert next.compareKey(top.current) >= 0 : "shards must return data sorted by centroid";
-                    top.current = next;
+                if (top.hasNext()) {
+                    Bucket prev = top.current();
+                    top.next();
+                    assert top.current().compareKey(prev) >= 0 : "shards must return data sorted by centroid";
                     pq.updateTop();
                 } else {
                     pq.pop();

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java

@@ -45,7 +45,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {
     }
 
     protected StringTerms buildEmptyTermsAggregation() {
-        return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+        return new StringTerms(name, order, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
                 metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
     }
 

+ 5 - 5
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java

@@ -100,10 +100,10 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
         }
     }
 
-    public DoubleTerms(String name, BucketOrder order, int requiredSize, long minDocCount,
+    public DoubleTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
             Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
             List<Bucket> buckets, long docCountError) {
-        super(name, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
+        super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
                 otherDocCount, buckets, docCountError);
     }
 
@@ -121,7 +121,7 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
 
     @Override
     public DoubleTerms create(List<Bucket> buckets) {
-        return new DoubleTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
+        return new DoubleTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
                 showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
@@ -132,8 +132,8 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
     }
 
     @Override
-    protected DoubleTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
-        return new DoubleTerms(name, order, requiredSize, minDocCount, getMetadata(), format,
+    protected DoubleTerms create(String name, List<Bucket> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) {
+        return new DoubleTerms(name, reduceOrder, order, requiredSize, minDocCount, getMetadata(), format,
                 shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 

+ 14 - 4
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

@@ -42,6 +42,7 @@ import org.elasticsearch.search.aggregations.BucketOrder;
 import org.elasticsearch.search.aggregations.CardinalityUpperBound;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.InternalOrder;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
 import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
 import org.elasticsearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
@@ -58,6 +59,7 @@ import java.util.function.LongPredicate;
 import java.util.function.LongUnaryOperator;
 
 import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
+import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
 
 /**
  * An aggregator of string values that relies on global ordinals in order to build buckets.
@@ -275,6 +277,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
         LowCardinality(
             String name,
             AggregatorFactories factories,
+            Function<GlobalOrdinalsStringTermsAggregator, ResultStrategy<?, ?, ?>> resultStrategy,
             ValuesSource.Bytes.WithOrdinals valuesSource,
             BucketOrder order,
             DocValueFormat format,
@@ -286,8 +289,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
             boolean showTermDocCountError,
             Map<String, Object> metadata
         ) throws IOException {
-            super(name, factories, a -> a.new StandardTermsResults(), valuesSource, order, format, bucketCountThresholds, null,
-                context, parent, remapGlobalOrds, collectionMode, showTermDocCountError, CardinalityUpperBound.ONE, metadata);
+            super(name, factories, resultStrategy, valuesSource, order, format, bucketCountThresholds, null, context,
+                parent, remapGlobalOrds, collectionMode, showTermDocCountError, CardinalityUpperBound.ONE, metadata);
             assert factories == null || factories.countAggregators() == 0;
             this.segmentDocCounts = context.bigArrays().newIntArray(1, true);
         }
@@ -724,8 +727,15 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
 
         @Override
         StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) {
-            return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
-                metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
+            final BucketOrder reduceOrder;
+            if (isKeyOrder(order) == false) {
+                reduceOrder = InternalOrder.key(true);
+                Arrays.sort(topBuckets, reduceOrder.comparator());
+            } else {
+                reduceOrder = order;
+            }
+            return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
+                bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
                 otherDocCount, Arrays.asList(topBuckets), 0);
         }
 

+ 2 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedTerms.java

@@ -45,10 +45,10 @@ public abstract class InternalMappedTerms<A extends InternalTerms<A, B>, B exten
 
     protected long docCountError;
 
-    protected InternalMappedTerms(String name, BucketOrder order, int requiredSize, long minDocCount,
+    protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
             Map<String, Object> metadata, DocValueFormat format, int shardSize,
             boolean showTermDocCountError, long otherDocCount, List<B> buckets, long docCountError) {
-        super(name, order, requiredSize, minDocCount, metadata);
+        super(name, reduceOrder, order, requiredSize, minDocCount, metadata);
         this.format = format;
         this.shardSize = shardSize;
         this.showTermDocCountError = showTermDocCountError;

+ 175 - 50
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java

@@ -18,6 +18,8 @@
  */
 package org.elasticsearch.search.aggregations.bucket.terms;
 
+import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -31,15 +33,20 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.InternalOrder;
 import org.elasticsearch.search.aggregations.KeyComparable;
+import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
+
 public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends InternalTerms.Bucket<B>>
         extends InternalMultiBucketAggregation<A, B> implements Terms {
 
@@ -152,12 +159,28 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
         }
     }
 
+    protected final BucketOrder reduceOrder;
     protected final BucketOrder order;
     protected final int requiredSize;
     protected final long minDocCount;
 
-    protected InternalTerms(String name, BucketOrder order, int requiredSize, long minDocCount, Map<String, Object> metadata) {
+    /**
+     * Creates a new {@link InternalTerms}
+     * @param name The name of the aggregation
+     * @param reduceOrder The {@link BucketOrder} that should be used to merge shard results.
+     * @param order The {@link BucketOrder} that should be used to sort the final reduce.
+     * @param requiredSize The number of top buckets.
+     * @param minDocCount The minimum number of documents allowed per bucket.
+     * @param metadata The metadata associated with the aggregation.
+     */
+    protected InternalTerms(String name,
+                            BucketOrder reduceOrder,
+                            BucketOrder order,
+                            int requiredSize,
+                            long minDocCount,
+                            Map<String, Object> metadata) {
         super(name, metadata);
+        this.reduceOrder = reduceOrder;
         this.order = order;
         this.requiredSize = requiredSize;
         this.minDocCount = minDocCount;
@@ -168,13 +191,21 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
      */
     protected InternalTerms(StreamInput in) throws IOException {
        super(in);
-       order = InternalOrder.Streams.readOrder(in);
+       reduceOrder = InternalOrder.Streams.readOrder(in);
+       if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+           order = InternalOrder.Streams.readOrder(in);
+       } else {
+           order = reduceOrder;
+       }
        requiredSize = readSize(in);
        minDocCount = in.readVLong();
     }
 
     @Override
     protected final void doWriteTo(StreamOutput out) throws IOException {
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            reduceOrder.writeTo(out);
+        }
         order.writeTo(out);
         writeSize(requiredSize, out);
         out.writeVLong(minDocCount);
@@ -189,21 +220,128 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
     @Override
     public abstract B getBucketByKey(String term);
 
-    @Override
+    private BucketOrder getReduceOrder(List<InternalAggregation> aggregations) {
+        BucketOrder thisReduceOrder = null;
+        for (InternalAggregation aggregation : aggregations) {
+            @SuppressWarnings("unchecked")
+            InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
+            if (terms.getBuckets().size() == 0) {
+                continue;
+            }
+            if (thisReduceOrder == null) {
+                thisReduceOrder = terms.reduceOrder;
+            } else if (thisReduceOrder.equals(terms.reduceOrder) == false) {
+                return order;
+            }
+        }
+        return thisReduceOrder != null ? thisReduceOrder : order;
+    }
+
+    private long getDocCountError(InternalTerms<?, ?> terms) {
+        int size = terms.getBuckets().size();
+        if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.order)) {
+            return 0;
+        } else if (InternalOrder.isCountDesc(terms.order)) {
+            if (terms.getDocCountError() > 0) {
+                // If there is an existing docCountError for this agg then
+                // use this as the error for this aggregation
+                return terms.getDocCountError();
+            } else {
+                // otherwise use the doc count of the last term in the
+                // aggregation
+                return terms.getBuckets().stream().mapToLong(Bucket::getDocCount).min().getAsLong();
+            }
+        } else {
+            return -1;
+        }
+    }
+
+    private List<B> reduceMergeSort(List<InternalAggregation> aggregations,
+                                    BucketOrder reduceOrder, ReduceContext reduceContext) {
+        assert isKeyOrder(reduceOrder);
+        final Comparator<MultiBucketsAggregation.Bucket> cmp = reduceOrder.comparator();
+        final PriorityQueue<IteratorAndCurrent<B>> pq = new PriorityQueue<>(aggregations.size()) {
+            @Override
+            protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
+                return cmp.compare(a.current(), b.current()) < 0;
+            }
+        };
+        for (InternalAggregation aggregation : aggregations) {
+            @SuppressWarnings("unchecked")
+            InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
+            if (terms.getBuckets().isEmpty() == false) {
+                assert reduceOrder.equals(reduceOrder);
+                pq.add(new IteratorAndCurrent(terms.getBuckets().iterator()));
+            }
+        }
+        List<B> reducedBuckets = new ArrayList<>();
+        // list of buckets coming from different shards that have the same key
+        List<B> currentBuckets = new ArrayList<>();
+        B lastBucket = null;
+        while (pq.size() > 0) {
+            final IteratorAndCurrent<B> top = pq.top();
+            assert lastBucket == null || cmp.compare(top.current(), lastBucket) >= 0;
+            if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) {
+                // the key changes, reduce what we already buffered and reset the buffer for current buckets
+                final B reduced = reduceBucket(currentBuckets, reduceContext);
+                reducedBuckets.add(reduced);
+                currentBuckets.clear();
+            }
+            lastBucket = top.current();
+            currentBuckets.add(top.current());
+            if (top.hasNext()) {
+                top.next();
+                assert cmp.compare(top.current(), lastBucket) > 0 : "shards must return data sorted by key";
+                pq.updateTop();
+            } else {
+                pq.pop();
+            }
+        }
+
+        if (currentBuckets.isEmpty() == false) {
+            final B reduced = reduceBucket(currentBuckets, reduceContext);
+            reducedBuckets.add(reduced);
+        }
+        return reducedBuckets;
+    }
+
+    private List<B> reduceLegacy(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+        Map<Object, List<B>> bucketMap = new HashMap<>();
+        for (InternalAggregation aggregation : aggregations) {
+            @SuppressWarnings("unchecked")
+            InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
+            if (terms.getBuckets().isEmpty() == false) {
+                for (B bucket : terms.getBuckets()) {
+                    List<B> bucketList = bucketMap.get(bucket.getKey());
+                    if (bucketList == null) {
+                        bucketList = new ArrayList<>();
+                        bucketMap.put(bucket.getKey(), bucketList);
+                    }
+                    bucketList.add(bucket);
+                }
+            }
+        }
+        List<B> reducedBuckets =  new ArrayList<>();
+        for (List<B> sameTermBuckets : bucketMap.values()) {
+            final B b = reduceBucket(sameTermBuckets, reduceContext);
+            reducedBuckets.add(b);
+        }
+        return reducedBuckets;
+    }
+
     public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
-        Map<Object, List<B>> buckets = new HashMap<>();
         long sumDocCountError = 0;
         long otherDocCount = 0;
         InternalTerms<A, B> referenceTerms = null;
         for (InternalAggregation aggregation : aggregations) {
             @SuppressWarnings("unchecked")
             InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
-            if (referenceTerms == null && !aggregation.getClass().equals(UnmappedTerms.class)) {
+            if (referenceTerms == null && aggregation.getClass().equals(UnmappedTerms.class) == false) {
                 referenceTerms = terms;
             }
             if (referenceTerms != null &&
-                    !referenceTerms.getClass().equals(terms.getClass()) &&
-                    !terms.getClass().equals(UnmappedTerms.class)) {
+                    referenceTerms.getClass().equals(terms.getClass()) == false &&
+                    terms.getClass().equals(UnmappedTerms.class) == false) {
                 // control gets into this loop when the same field name against which the query is executed
                 // is of different types in different indices.
                 throw new AggregationExecutionException("Merging/Reducing the aggregations failed when computing the aggregation ["
@@ -211,22 +349,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
                         + "types in two different indices");
             }
             otherDocCount += terms.getSumOfOtherDocCounts();
-            final long thisAggDocCountError;
-            if (terms.getBuckets().size() < getShardSize() || InternalOrder.isKeyOrder(order)) {
-                thisAggDocCountError = 0;
-            } else if (InternalOrder.isCountDesc(order)) {
-                if (terms.getDocCountError() > 0) {
-                    // If there is an existing docCountError for this agg then
-                    // use this as the error for this aggregation
-                    thisAggDocCountError = terms.getDocCountError();
-                } else {
-                    // otherwise use the doc count of the last term in the
-                    // aggregation
-                    thisAggDocCountError = terms.getBuckets().get(terms.getBuckets().size() - 1).docCount;
-                }
-            } else {
-                thisAggDocCountError = -1;
-            }
+            final long thisAggDocCountError = getDocCountError(terms);
             if (sumDocCountError != -1) {
                 if (thisAggDocCountError == -1) {
                     sumDocCountError = -1;
@@ -244,28 +367,30 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
                 // Note that if the error is unbounded (-1) this will be fixed
                 // later in this method.
                 bucket.docCountError -= thisAggDocCountError;
-                List<B> bucketList = buckets.get(bucket.getKey());
-                if (bucketList == null) {
-                    bucketList = new ArrayList<>();
-                    buckets.put(bucket.getKey(), bucketList);
-                }
-                bucketList.add(bucket);
             }
         }
-
+        /**
+         * Buckets returned by a partial reduce or a shard response are sorted by key since {@link Version#V_7_10_0}.
+         * That allows to perform a merge sort when reducing multiple aggregations together.
+         * For backward compatibility, we disable the merge sort and use ({@link InternalTerms#reduceLegacy} if any of
+         * the provided aggregations use a different {@link InternalTerms#reduceOrder}.
+         */
+        BucketOrder thisReduceOrder = getReduceOrder(aggregations);
+        List<B> reducedBuckets = isKeyOrder(thisReduceOrder) ?
+            reduceMergeSort(aggregations, thisReduceOrder, reduceContext) : reduceLegacy(aggregations, reduceContext);
         final B[] list;
         if (reduceContext.isFinalReduce()) {
-            final int size = Math.min(requiredSize, buckets.size());
+            final int size = Math.min(requiredSize, reducedBuckets.size());
+            // final comparator
             final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
-            for (List<B> sameTermBuckets : buckets.values()) {
-                final B b = reduceBucket(sameTermBuckets, reduceContext);
+            for (B bucket : reducedBuckets) {
                 if (sumDocCountError == -1) {
-                    b.docCountError = -1;
+                    bucket.docCountError = -1;
                 } else {
-                    b.docCountError += sumDocCountError;
+                    bucket.docCountError += sumDocCountError;
                 }
-                if (b.docCount >= minDocCount) {
-                    B removed = ordered.insertWithOverflow(b);
+                if (bucket.docCount >= minDocCount) {
+                    B removed = ordered.insertWithOverflow(bucket);
                     if (removed != null) {
                         otherDocCount += removed.getDocCount();
                         reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
@@ -273,7 +398,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
                         reduceContext.consumeBucketsAndMaybeBreak(1);
                     }
                 } else {
-                    reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
+                    reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
                 }
             }
             list = createBucketsArray(ordered.size());
@@ -281,19 +406,18 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
                 list[i] = ordered.pop();
             }
         } else {
-            // keep all buckets on partial reduce
-            // TODO: we could prune the buckets when sorting by key
-            list = createBucketsArray(buckets.size());
-            int pos = 0;
-            for (List<B> sameTermBuckets : buckets.values()) {
-                final B b = reduceBucket(sameTermBuckets, reduceContext);
+            // we can prune the list on partial reduce if the aggregation is ordered by key
+            // and not filtered (minDocCount == 0)
+            int size = isKeyOrder(order) && minDocCount == 0 ? Math.min(requiredSize, reducedBuckets.size()) : reducedBuckets.size();
+            list = createBucketsArray(size);
+            for (int i = 0; i < size; i++) {
                 reduceContext.consumeBucketsAndMaybeBreak(1);
+                list[i] = reducedBuckets.get(i);
                 if (sumDocCountError == -1) {
-                    b.docCountError = -1;
+                    list[i].docCountError = -1;
                 } else {
-                    b.docCountError += sumDocCountError;
+                    list[i].docCountError += sumDocCountError;
                 }
-                list[pos++] = b;
             }
         }
         long docCountError;
@@ -302,7 +426,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
         } else {
             docCountError = aggregations.size() == 1 ? 0 : sumDocCountError;
         }
-        return create(name, Arrays.asList(list), docCountError, otherDocCount);
+        return create(name, Arrays.asList(list), thisReduceOrder, docCountError, otherDocCount);
     }
 
     @Override
@@ -334,7 +458,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
 
     protected abstract int getShardSize();
 
-    protected abstract A create(String name, List<B> buckets, long docCountError, long otherDocCount);
+    protected abstract A create(String name, List<B> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount);
 
     /**
      * Create an array to hold some buckets. Used in collecting the results.
@@ -351,13 +475,14 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
 
         InternalTerms<?,?> that = (InternalTerms<?,?>) obj;
         return Objects.equals(minDocCount, that.minDocCount)
+                && Objects.equals(reduceOrder, that.reduceOrder)
                 && Objects.equals(order, that.order)
                 && Objects.equals(requiredSize, that.requiredSize);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), minDocCount, order, requiredSize);
+        return Objects.hash(super.hashCode(), minDocCount, reduceOrder, order, requiredSize);
     }
 
     protected static XContentBuilder doXContentCommon(XContentBuilder builder, Params params,

+ 6 - 6
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java

@@ -100,10 +100,10 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
         }
     }
 
-    public LongTerms(String name, BucketOrder order, int requiredSize, long minDocCount,
+    public LongTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
             Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
             List<Bucket> buckets, long docCountError) {
-        super(name, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
+        super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
                 otherDocCount, buckets, docCountError);
     }
 
@@ -121,7 +121,7 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
 
     @Override
     public LongTerms create(List<Bucket> buckets) {
-        return new LongTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
+        return new LongTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
                 showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
@@ -132,8 +132,8 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
     }
 
     @Override
-    protected LongTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
-        return new LongTerms(name, order, requiredSize, minDocCount, getMetadata(), format, shardSize,
+    protected LongTerms create(String name, List<Bucket> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) {
+        return new LongTerms(name, reduceOrder, order, requiredSize, minDocCount, getMetadata(), format, shardSize,
                 showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
@@ -168,7 +168,7 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
                 bucket.getDocCount(), (InternalAggregations) bucket.getAggregations(), longTerms.showTermDocCountError,
                 longTerms.showTermDocCountError ? bucket.getDocCountError() : 0, decimalFormat));
         }
-        return new DoubleTerms(longTerms.getName(), longTerms.order, longTerms.requiredSize,
+        return new DoubleTerms(longTerms.getName(), longTerms.reduceOrder, longTerms.order, longTerms.requiredSize,
             longTerms.minDocCount,
             longTerms.metadata, longTerms.format, longTerms.shardSize,
             longTerms.showTermDocCountError, longTerms.otherDocCount,

+ 13 - 4
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java

@@ -50,6 +50,8 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.function.LongConsumer;
 
+import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
+
 /**
  * An aggregator of string values that hashes the strings on the fly rather
  * than up front like the {@link GlobalOrdinalsStringTermsAggregator}.
@@ -226,7 +228,7 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
                 PriorityQueue<B> ordered = buildPriorityQueue(size);
                 B spare = null;
                 BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
-                Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); 
+                Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
                 while (ordsEnum.next()) {
                     long docCount = bucketDocCount(ordsEnum.ord());
                     otherDocCounts[ordIdx] += docCount;
@@ -416,9 +418,16 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
 
         @Override
         StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) {
-            return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
-                metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount,
-                Arrays.asList(topBuckets), 0);
+            final BucketOrder reduceOrder;
+            if (isKeyOrder(order) == false) {
+                reduceOrder = InternalOrder.key(true);
+                Arrays.sort(topBuckets, reduceOrder.comparator());
+            } else {
+                reduceOrder = order;
+            }
+            return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
+                bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
+                otherDocCount, Arrays.asList(topBuckets), 0);
         }
 
         @Override

+ 23 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java

@@ -47,6 +47,7 @@ import org.elasticsearch.search.internal.ContextIndexSearcher;
 import org.elasticsearch.search.internal.SearchContext;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
@@ -54,6 +55,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static java.util.Collections.emptyList;
+import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
 
 public class NumericTermsAggregator extends TermsAggregator {
     private final ResultStrategy<?, ?> resultStrategy;
@@ -189,7 +191,8 @@ public class NumericTermsAggregator extends TermsAggregator {
 
             InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
             for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
-                result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]);
+                result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx],
+                    topBucketsPerOrd[ordIdx]);
             }
             return result;
         }
@@ -363,8 +366,16 @@ public class NumericTermsAggregator extends TermsAggregator {
 
         @Override
         LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket[] topBuckets) {
+            final BucketOrder reduceOrder;
+            if (isKeyOrder(order) == false) {
+                reduceOrder = InternalOrder.key(true);
+                Arrays.sort(topBuckets, reduceOrder.comparator());
+            } else {
+                reduceOrder = order;
+            }
             return new LongTerms(
                 name,
+                reduceOrder,
                 order,
                 bucketCountThresholds.getRequiredSize(),
                 bucketCountThresholds.getMinDocCount(),
@@ -383,6 +394,7 @@ public class NumericTermsAggregator extends TermsAggregator {
             return new LongTerms(
                 name,
                 order,
+                order,
                 bucketCountThresholds.getRequiredSize(),
                 bucketCountThresholds.getMinDocCount(),
                 metadata(),
@@ -397,6 +409,7 @@ public class NumericTermsAggregator extends TermsAggregator {
     }
 
     class DoubleTermsResults extends StandardTermsResultStrategy<DoubleTerms, DoubleTerms.Bucket> {
+
         DoubleTermsResults(boolean showTermDocCountError) {
             super(showTermDocCountError);
         }
@@ -435,8 +448,16 @@ public class NumericTermsAggregator extends TermsAggregator {
 
         @Override
         DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bucket[] topBuckets) {
+            final BucketOrder reduceOrder;
+            if (isKeyOrder(order) == false) {
+                reduceOrder = InternalOrder.key(true);
+                Arrays.sort(topBuckets, reduceOrder.comparator());
+            } else {
+                reduceOrder = order;
+            }
             return new DoubleTerms(
                 name,
+                reduceOrder,
                 order,
                 bucketCountThresholds.getRequiredSize(),
                 bucketCountThresholds.getMinDocCount(),
@@ -455,6 +476,7 @@ public class NumericTermsAggregator extends TermsAggregator {
             return new DoubleTerms(
                 name,
                 order,
+                order,
                 bucketCountThresholds.getRequiredSize(),
                 bucketCountThresholds.getMinDocCount(),
                 metadata(),

+ 5 - 5
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java

@@ -103,10 +103,10 @@ public class StringTerms extends InternalMappedTerms<StringTerms, StringTerms.Bu
         }
     }
 
-    public StringTerms(String name, BucketOrder order, int requiredSize, long minDocCount,
+    public StringTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
             Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
             List<Bucket> buckets, long docCountError) {
-        super(name, order, requiredSize, minDocCount, metadata, format,
+        super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format,
                 shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
@@ -124,7 +124,7 @@ public class StringTerms extends InternalMappedTerms<StringTerms, StringTerms.Bu
 
     @Override
     public StringTerms create(List<Bucket> buckets) {
-        return new StringTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
+        return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
                 showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
@@ -140,8 +140,8 @@ public class StringTerms extends InternalMappedTerms<StringTerms, StringTerms.Bu
     }
 
     @Override
-    protected StringTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
-        return new StringTerms(name, order, requiredSize, minDocCount, getMetadata(), format, shardSize,
+    protected StringTerms create(String name, List<Bucket> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) {
+        return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount, getMetadata(), format, shardSize,
                 showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 

+ 3 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java

@@ -374,8 +374,9 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
                      *  which directly linked to maxOrd, so we need to limit).
                      */
                     return new GlobalOrdinalsStringTermsAggregator.LowCardinality(name, factories,
-                            ordinalsValuesSource, order, format, bucketCountThresholds, context, parent, false,
-                            subAggCollectMode, showTermDocCountError, metadata);
+                        a -> a.new StandardTermsResults(),
+                        ordinalsValuesSource, order, format, bucketCountThresholds, context, parent, false,
+                        subAggCollectMode, showTermDocCountError, metadata);
 
                 }
                 final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);

+ 2 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java

@@ -51,7 +51,7 @@ public class UnmappedTerms extends InternalTerms<UnmappedTerms, UnmappedTerms.Bu
     }
 
     public UnmappedTerms(String name, BucketOrder order, int requiredSize, long minDocCount, Map<String, Object> metadata) {
-        super(name, order, requiredSize, minDocCount, metadata);
+        super(name, order, order, requiredSize, minDocCount, metadata);
     }
 
     /**
@@ -92,7 +92,7 @@ public class UnmappedTerms extends InternalTerms<UnmappedTerms, UnmappedTerms.Bu
     }
 
     @Override
-    protected UnmappedTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
+    protected UnmappedTerms create(String name, List<Bucket> buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) {
         throw new UnsupportedOperationException("not supported for UnmappedTerms");
     }
 

+ 5 - 7
server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java

@@ -34,7 +34,6 @@ import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregati
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.InternalAggregationTestCase;
-import org.elasticsearch.test.VersionUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -57,7 +56,7 @@ public class InternalAggregationsTests extends ESTestCase {
     }
 
     public void testNonFinalReduceTopLevelPipelineAggs()  {
-        InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
+        InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
             10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
         List<InternalAggregations> aggs = singletonList(InternalAggregations.from(Collections.singletonList(terms)));
         InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction());
@@ -65,7 +64,7 @@ public class InternalAggregationsTests extends ESTestCase {
     }
 
     public void testFinalReduceTopLevelPipelineAggs()  {
-        InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
+        InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
             10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
 
         InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(terms));
@@ -103,11 +102,10 @@ public class InternalAggregationsTests extends ESTestCase {
 
     public void testSerialization() throws Exception {
         InternalAggregations aggregations = createTestInstance();
-        writeToAndReadFrom(aggregations, 0);
+        writeToAndReadFrom(aggregations, Version.CURRENT, 0);
     }
 
-    private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException {
-        Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
+    private void writeToAndReadFrom(InternalAggregations aggregations, Version version, int iteration) throws IOException {
         try (BytesStreamOutput out = new BytesStreamOutput()) {
             out.setVersion(version);
             aggregations.writeTo(out);
@@ -116,7 +114,7 @@ public class InternalAggregationsTests extends ESTestCase {
                 InternalAggregations deserialized = InternalAggregations.readFrom(in);
                 assertEquals(aggregations.aggregations, deserialized.aggregations);
                 if (iteration < 2) {
-                    writeToAndReadFrom(deserialized, iteration + 1);
+                    writeToAndReadFrom(deserialized, version, iteration + 1);
                 }
             }
         }

+ 2 - 2
server/src/test/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregationTests.java

@@ -141,7 +141,7 @@ public class InternalMultiBucketAggregationTests extends ESTestCase {
             new BytesRef("foo".getBytes(StandardCharsets.UTF_8), 0, "foo".getBytes(StandardCharsets.UTF_8).length), 1,
             internalStringAggs, false, 0, DocValueFormat.RAW));
 
-        InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), 1, 0,
+        InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), BucketOrder.count(false), 1, 0,
             Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, 0);
         InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg));
         LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW);
@@ -161,7 +161,7 @@ public class InternalMultiBucketAggregationTests extends ESTestCase {
             new BytesRef("foo".getBytes(StandardCharsets.UTF_8), 0, "foo".getBytes(StandardCharsets.UTF_8).length), 1,
             internalStringAggs, false, 0, DocValueFormat.RAW));
 
-        InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), 1, 0,
+        InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), BucketOrder.count(false), 1, 0,
             Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, 0);
         InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg));
         LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW);

+ 6 - 2
server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -53,7 +54,9 @@ public class DoubleTermsTests extends InternalTermsTestCase {
             int docCount = randomIntBetween(1, 100);
             buckets.add(new DoubleTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
         }
-        return new DoubleTerms(name, order, requiredSize, minDocCount,
+        BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true);
+        Collections.sort(buckets, reduceOrder.comparator());
+        return new DoubleTerms(name, reduceOrder, order, requiredSize, minDocCount,
                 metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
@@ -115,7 +118,8 @@ public class DoubleTermsTests extends InternalTermsTestCase {
             default:
                 throw new AssertionError("Illegal randomisation branch");
             }
-            return new DoubleTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
+            Collections.sort(buckets, doubleTerms.reduceOrder.comparator());
+            return new DoubleTerms(name, doubleTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
                     showTermDocCountError, otherDocCount, buckets, docCountError);
         } else {
             String name = instance.getName();

+ 6 - 2
server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -53,7 +54,9 @@ public class LongTermsTests extends InternalTermsTestCase {
             int docCount = randomIntBetween(1, 100);
             buckets.add(new LongTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
         }
-        return new LongTerms(name, order, requiredSize, minDocCount,
+        BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true);
+        Collections.sort(buckets, reduceOrder.comparator());
+        return new LongTerms(name, reduceOrder, order, requiredSize, minDocCount,
                 metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
@@ -115,7 +118,8 @@ public class LongTermsTests extends InternalTermsTestCase {
             default:
                 throw new AssertionError("Illegal randomisation branch");
             }
-            return new LongTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
+            Collections.sort(buckets, longTerms.reduceOrder.comparator());
+            return new LongTerms(name, longTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
                     showTermDocCountError, otherDocCount, buckets, docCountError);
         } else {
             String name = instance.getName();

+ 6 - 2
server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java

@@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -54,7 +55,9 @@ public class StringTermsTests extends InternalTermsTestCase {
             int docCount = randomIntBetween(1, 100);
             buckets.add(new StringTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
         }
-        return new StringTerms(name, order, requiredSize, minDocCount,
+        BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true);
+        Collections.sort(buckets, reduceOrder.comparator());
+        return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount,
                 metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
@@ -116,7 +119,8 @@ public class StringTermsTests extends InternalTermsTestCase {
             default:
                 throw new AssertionError("Illegal randomisation branch");
             }
-            return new StringTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize,
+            Collections.sort(buckets, stringTerms.reduceOrder.comparator());
+            return new StringTerms(name, stringTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
                     showTermDocCountError, otherDocCount, buckets, docCountError);
         } else {
             String name = instance.getName();

+ 39 - 23
server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java

@@ -39,6 +39,7 @@ import org.apache.lucene.search.TotalHits;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.NumericUtils;
+import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.geo.GeoPoint;
 import org.elasticsearch.common.network.InetAddresses;
 import org.elasticsearch.common.settings.Settings;
@@ -73,6 +74,7 @@ import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.search.aggregations.BucketOrder;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 import org.elasticsearch.search.aggregations.bucket.filter.Filter;
 import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
@@ -254,7 +256,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                         aggregator.preCollection();
                         indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                         aggregator.postCollection();
-                        Terms result = (Terms) aggregator.buildTopLevel();
+                        Terms result = reduce(aggregator);
                         assertEquals(5, result.getBuckets().size());
                         assertEquals("", result.getBuckets().get(0).getKeyAsString());
                         assertEquals(2L, result.getBuckets().get(0).getDocCount());
@@ -319,11 +321,11 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                         .size(12)
                         .order(BucketOrder.key(true));
 
-                    Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
+                    TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    Terms result = (Terms) aggregator.buildTopLevel();
+                    Terms result = reduce(aggregator);
                     assertEquals(10, result.getBuckets().size());
                     assertEquals("val000", result.getBuckets().get(0).getKeyAsString());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -358,7 +360,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals(5, result.getBuckets().size());
                     assertEquals("val001", result.getBuckets().get(0).getKeyAsString());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -382,7 +384,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals(8, result.getBuckets().size());
                     assertEquals("val002", result.getBuckets().get(0).getKeyAsString());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -411,7 +413,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals(2, result.getBuckets().size());
                     assertEquals("val010", result.getBuckets().get(0).getKeyAsString());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -428,7 +430,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals(2, result.getBuckets().size());
                     assertEquals("val000", result.getBuckets().get(0).getKeyAsString());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -446,7 +448,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals(2, result.getBuckets().size());
                     assertEquals("val000", result.getBuckets().get(0).getKeyAsString());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -497,11 +499,11 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                         .includeExclude(new IncludeExclude(new long[]{0, 5}, null))
                         .field("long_field")
                         .order(BucketOrder.key(true));
-                    Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
+                    TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    Terms result = (Terms) aggregator.buildTopLevel();
+                    Terms result = reduce(aggregator);
                     assertEquals(2, result.getBuckets().size());
                     assertEquals(0L, result.getBuckets().get(0).getKey());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -518,7 +520,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals(4, result.getBuckets().size());
                     assertEquals(1L, result.getBuckets().get(0).getKey());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -541,7 +543,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals(2, result.getBuckets().size());
                     assertEquals(0.0, result.getBuckets().get(0).getKey());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -558,7 +560,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals(4, result.getBuckets().size());
                     assertEquals(1.0, result.getBuckets().get(0).getKey());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -724,7 +726,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    Terms result = (Terms) aggregator.buildTopLevel();
+                    Terms result = reduce(aggregator);
                     assertEquals(size, result.getBuckets().size());
                     for (int i = 0; i < size; i++) {
                         Map.Entry<T, Integer>  expected = expectedBuckets.get(i);
@@ -750,7 +752,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                         aggregator.preCollection();
                         indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                         aggregator.postCollection();
-                        result = ((Filter) aggregator.buildTopLevel()).getAggregations().get("_name2");
+                        result = ((Filter) reduce(aggregator)).getAggregations().get("_name2");
                         int expectedFilteredCounts = 0;
                         for (Integer count : filteredCounts.values()) {
                             if (count > 0) {
@@ -823,7 +825,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    Terms result = (Terms) aggregator.buildTopLevel();
+                    Terms result = reduce(aggregator);
                     assertEquals(size, result.getBuckets().size());
                     for (int i = 0; i < size; i++) {
                         Map.Entry<T, Long>  expected = expectedBuckets.get(i);
@@ -852,7 +854,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    Terms result = (Terms) aggregator.buildTopLevel();
+                    Terms result = reduce(aggregator);
                     assertEquals("_name", result.getName());
                     assertEquals(0, result.getBuckets().size());
 
@@ -862,7 +864,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals("_name", result.getName());
                     assertEquals(0, result.getBuckets().size());
 
@@ -872,7 +874,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    result = (Terms) aggregator.buildTopLevel();
+                    result = reduce(aggregator);
                     assertEquals("_name", result.getName());
                     assertEquals(0, result.getBuckets().size());
                 }
@@ -895,7 +897,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                         aggregator.preCollection();
                         indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                         aggregator.postCollection();
-                        Terms result = (Terms) aggregator.buildTopLevel();
+                        Terms result = reduce(aggregator);
                         assertEquals("_name", result.getName());
                         assertEquals(0, result.getBuckets().size());
                         assertFalse(AggregationInspectionHelper.hasValue((InternalTerms)result));
@@ -931,7 +933,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                         aggregator.preCollection();
                         indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                         aggregator.postCollection();
-                        Terms result = (Terms) aggregator.buildTopLevel();
+                        Terms result = reduce(aggregator);
                         assertEquals("_name", result.getName());
                         assertEquals(1, result.getBuckets().size());
                         assertEquals(missingValues[i], result.getBuckets().get(0).getKey());
@@ -1003,7 +1005,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    Terms result = (Terms) aggregator.buildTopLevel();
+                    Terms result = reduce(aggregator);
                     assertEquals("_name", result.getName());
                     assertEquals(1, result.getBuckets().size());
                     assertEquals("192.168.100.42", result.getBuckets().get(0).getKey());
@@ -1051,7 +1053,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
                     aggregator.preCollection();
                     indexSearcher.search(new MatchAllDocsQuery(), aggregator);
                     aggregator.postCollection();
-                    Terms result = (Terms) aggregator.buildTopLevel();
+                    Terms result = reduce(aggregator);
                     assertEquals(3, result.getBuckets().size());
                     assertEquals("a", result.getBuckets().get(0).getKeyAsString());
                     assertEquals(1L, result.getBuckets().get(0).getDocCount());
@@ -1449,4 +1451,18 @@ public class TermsAggregatorTests extends AggregatorTestCase {
         return aggregator.buildTopLevel();
     }
 
+    private <T extends InternalAggregation> T reduce(Aggregator agg) throws IOException {
+        // now do the final reduce
+        MultiBucketConsumerService.MultiBucketConsumer reduceBucketConsumer =
+            new MultiBucketConsumerService.MultiBucketConsumer(Integer.MAX_VALUE,
+                new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST));
+        InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
+            agg.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, PipelineTree.EMPTY);
+
+        T topLevel  = (T) agg.buildTopLevel();
+        T result = (T) topLevel.reduce(Collections.singletonList(topLevel), context);
+        doAssertReducedMultiBucketConsumer(result, reduceBucketConsumer);
+        return result;
+    }
+
 }

+ 2 - 3
server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java

@@ -42,7 +42,6 @@ import org.elasticsearch.search.internal.ShardSearchContextId;
 import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.search.suggest.SuggestTests;
 import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.test.VersionUtils;
 
 import static java.util.Collections.emptyList;
 
@@ -80,8 +79,8 @@ public class QuerySearchResultTests extends ESTestCase {
 
     public void testSerialization() throws Exception {
         QuerySearchResult querySearchResult = createTestInstance();
-        Version version = VersionUtils.randomVersion(random());
-        QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
+        QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry,
+            QuerySearchResult::new, Version.CURRENT);
         assertEquals(querySearchResult.getContextId().getId(), deserialized.getContextId().getId());
         assertNull(deserialized.getSearchShardTarget());
         assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);

+ 4 - 4
x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java

@@ -146,8 +146,8 @@ public class AsyncSearchTaskTests extends ESTestCase {
         AsyncSearchTask task = createAsyncSearchTask();
         task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
             SearchResponse.Clusters.EMPTY, false);
-        InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1,
-            Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
+        InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true),
+            BucketOrder.key(true), 1, 1, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
         //providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too
         //causing an exception when executing getResponse as part of the completion listener callback
         DelayableWriteable.Serialized<InternalAggregations> serializedAggs = DelayableWriteable.referencing(aggs)
@@ -184,8 +184,8 @@ public class AsyncSearchTaskTests extends ESTestCase {
         AsyncSearchTask task = createAsyncSearchTask();
         task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(),
             SearchResponse.Clusters.EMPTY, false);
-        InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1,
-            Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
+        InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true),
+            BucketOrder.key(true), 1, 1, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0)));
         //providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too
         //causing an exception when executing getResponse as part of the completion listener callback
         DelayableWriteable.Serialized<InternalAggregations> serializedAggs = DelayableWriteable.referencing(aggs)