浏览代码

Increase search.max_buckets to 65,535 (#57042)

Increases the default search.max_buckets limit to 65,535, and only counts
buckets during reduce phase.

Closes #51731
Igor Motov 5 年之前
父节点
当前提交
29b5643c1a
共有 31 个文件被更改,包括 92 次插入179 次删除
  1. 1 1
      docs/reference/aggregations/bucket.asciidoc
  2. 13 10
      server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java
  3. 9 15
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java
  4. 0 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java
  5. 1 3
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java
  6. 0 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java
  7. 1 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java
  8. 3 4
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java
  9. 2 6
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java
  10. 1 12
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java
  11. 1 10
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java
  12. 1 10
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java
  13. 3 6
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java
  14. 0 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java
  15. 0 3
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java
  16. 0 3
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java
  17. 0 3
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java
  18. 0 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java
  19. 0 3
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java
  20. 4 1
      server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java
  21. 9 0
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java
  22. 8 0
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java
  23. 0 67
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java
  24. 1 1
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java
  25. 9 0
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java
  26. 0 2
      test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java
  27. 11 5
      test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java
  28. 8 0
      test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java
  29. 2 2
      x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/JdbcErrorsTestCase.java
  30. 2 2
      x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java
  31. 2 2
      x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java

+ 1 - 1
docs/reference/aggregations/bucket.asciidoc

@@ -15,7 +15,7 @@ define fixed number of multiple buckets, and others dynamically create the bucke
 
 NOTE: The maximum number of buckets allowed in a single response is limited by a
 dynamic cluster setting named
-<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 10,000,
+<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 65,535,
 requests that try to return more than the limit will fail with an exception.
 
 include::bucket/adjacency-matrix-aggregation.asciidoc[]

+ 13 - 10
server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java

@@ -35,10 +35,10 @@ import java.util.function.IntConsumer;
  * An aggregation service that creates instances of {@link MultiBucketConsumer}.
  * The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
  * in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
- * The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
+ * The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 65535.
  */
 public class MultiBucketConsumerService {
-    public static final int DEFAULT_MAX_BUCKETS = 10000;
+    public static final int DEFAULT_MAX_BUCKETS = 65535;
     public static final Setting<Integer> MAX_BUCKET_SETTING =
         Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);
 
@@ -102,6 +102,7 @@ public class MultiBucketConsumerService {
 
         // aggregations execute in a single thread so no atomic here
         private int count;
+        private int callCount = 0;
 
         public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
             this.limit = limit;
@@ -110,15 +111,17 @@ public class MultiBucketConsumerService {
 
         @Override
         public void accept(int value) {
-            count += value;
-            if (count > limit) {
-                throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
-                    + "] but was [" + count + "]. This limit can be set by changing the [" +
-                    MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
+            if (value != 0) {
+                count += value;
+                if (count > limit) {
+                    throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
+                        + "] but was [" + count + "]. This limit can be set by changing the [" +
+                        MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
+                }
             }
-
-            // check parent circuit breaker every 1024 buckets
-            if (value > 0 && (count & 0x3FF) == 0) {
+            // check parent circuit breaker every 1024 calls
+            callCount++;
+            if ((callCount & 0x3FF) == 0) {
                 breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
             }
         }

+ 9 - 15
server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

@@ -57,12 +57,12 @@ public abstract class BucketsAggregator extends AggregatorBase {
             Map<String, Object> metadata) throws IOException {
         super(name, factories, context, parent, metadata);
         bigArrays = context.bigArrays();
-        docCounts = bigArrays.newIntArray(1, true);
         if (context.aggregations() != null) {
             multiBucketConsumer = context.aggregations().multiBucketConsumer();
         } else {
             multiBucketConsumer = (count) -> {};
         }
+        docCounts = bigArrays.newIntArray(1, true);
     }
 
     /**
@@ -91,7 +91,12 @@ public abstract class BucketsAggregator extends AggregatorBase {
      * Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized.
      */
     public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException {
-        docCounts.increment(bucketOrd, 1);
+        if (docCounts.increment(bucketOrd, 1) == 1) {
+            // We calculate the final number of buckets only during the reduce phase. But we still need to
+            // trigger bucket consumer from time to time in order to give it a chance to check available memory and break
+            // the execution if we are running out. To achieve that we are passing 0 as a bucket count.
+            multiBucketConsumer.accept(0);
+        }
         subCollector.collect(doc, bucketOrd);
     }
 
@@ -137,14 +142,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
         }
     }
 
-    /**
-     * Adds {@code count} buckets to the global count for the request and fails if this number is greater than
-     * the maximum number of buckets allowed in a response
-     */
-    protected final void consumeBucketsAndMaybeBreak(int count) {
-        multiBucketConsumer.accept(count);
-    }
-
     /**
      * Hook to allow taking an action before building buckets.
      */
@@ -186,7 +183,7 @@ public abstract class BucketsAggregator extends AggregatorBase {
                 public int size() {
                     return aggregations.length;
                 }
-            }); 
+            });
         }
         return result;
     }
@@ -267,7 +264,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
     protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd,
             BucketBuilderForFixedCount<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
         int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
-        consumeBucketsAndMaybeBreak(totalBuckets);
         long[] bucketOrdsToCollect = new long[totalBuckets];
         int bucketOrdIdx = 0;
         for (long owningBucketOrd : owningBucketOrds) {
@@ -299,7 +295,7 @@ public abstract class BucketsAggregator extends AggregatorBase {
      * @param owningBucketOrds owning bucket ordinals for which to build the results
      * @param resultBuilder how to build a result from the sub aggregation results
      */
-    protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, 
+    protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
                 SingleBucketResultBuilder resultBuilder) throws IOException {
         /*
          * It'd be entirely reasonable to call
@@ -328,7 +324,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
     protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds,
             BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
         assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
-        consumeBucketsAndMaybeBreak((int) bucketOrds.size());
         long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()];
         for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
             bucketOrdsToCollect[bucketOrd] = bucketOrd;
@@ -360,7 +355,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
             throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE
                     + "] buckets but attempted [" + totalOrdsToCollect + "]");
         }
-        consumeBucketsAndMaybeBreak((int) totalOrdsToCollect);
         long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
         int b = 0;
         for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {

+ 0 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java

@@ -184,7 +184,6 @@ public class AdjacencyMatrixAggregator extends BucketsAggregator {
                 totalBucketsToBuild++;
             }
         }
-        consumeBucketsAndMaybeBreak(totalBucketsToBuild);
         long[] bucketOrdsToBuild = new long[totalBucketsToBuild];
         int builtBucketIndex = 0;
         for (int ord = 0; ord < maxOrd; ord++) {

+ 1 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java

@@ -196,12 +196,10 @@ public class InternalAdjacencyMatrix
         for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
             InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext);
             if(reducedBucket.docCount >= 1){
-                reduceContext.consumeBucketsAndMaybeBreak(1);
                 reducedBuckets.add(reducedBucket);
-            } else {
-                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket));
             }
         }
+        reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
         Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey));
 
         InternalAdjacencyMatrix reduced = new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata());

+ 0 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java

@@ -136,7 +136,6 @@ final class CompositeAggregator extends BucketsAggregator {
     public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
         // Composite aggregator must be at the top of the aggregation tree
         assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
-        consumeBucketsAndMaybeBreak(queue.size());
         if (deferredCollectors != NO_OP_COLLECTOR) {
             // Replay all documents that contain at least one top bucket (collected during the first pass).
             runDeferredCollections();

+ 1 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

@@ -178,7 +178,6 @@ public class InternalComposite
             if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
                 InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
                 buckets.clear();
-                reduceContext.consumeBucketsAndMaybeBreak(1);
                 result.add(reduceBucket);
                 if (result.size() >= size) {
                     break;
@@ -192,7 +191,6 @@ public class InternalComposite
         }
         if (buckets.size() > 0) {
             InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
-            reduceContext.consumeBucketsAndMaybeBreak(1);
             result.add(reduceBucket);
         }
 
@@ -205,6 +203,7 @@ public class InternalComposite
             reducedFormats = lastBucket.formats;
             lastKey = lastBucket.getRawKey();
         }
+        reduceContext.consumeBucketsAndMaybeBreak(result.size());
         return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls,
             earlyTerminated, metadata);
     }

+ 3 - 4
server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java

@@ -109,8 +109,7 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
         InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
         for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
             int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
-            consumeBucketsAndMaybeBreak(size);
-    
+
             BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
             InternalGeoGridBucket spare = null;
             LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
@@ -118,7 +117,7 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
                 if (spare == null) {
                     spare = newEmptyBucket();
                 }
-    
+
                 // need a special function to keep the source bucket
                 // up-to-date so it can get the appropriate key
                 spare.hashAsLong = ordsEnum.value();
@@ -126,7 +125,7 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
                 spare.bucketOrd = ordsEnum.ord();
                 spare = ordered.insertWithOverflow(spare);
             }
-    
+
             topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
             for (int i = ordered.size() - 1; i >= 0; --i) {
                 topBucketsPerOrd[ordIdx][i] = ordered.pop();

+ 2 - 6
server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java

@@ -100,18 +100,14 @@ public abstract class InternalGeoGrid<B extends InternalGeoGridBucket>
         BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
         for (LongObjectPagedHashMap.Cursor<List<InternalGeoGridBucket>> cursor : buckets) {
             List<InternalGeoGridBucket> sameCellBuckets = cursor.value;
-            InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
-            if (removed != null) {
-                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
-            } else {
-                reduceContext.consumeBucketsAndMaybeBreak(1);
-            }
+            ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
         }
         buckets.close();
         InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
         for (int i = ordered.size() - 1; i >= 0; i--) {
             list[i] = ordered.pop();
         }
+        reduceContext.consumeBucketsAndMaybeBreak(list.length);
         return create(getName(), requiredSize, Arrays.asList(list), getMetadata());
     }
 

+ 1 - 12
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

@@ -328,7 +328,6 @@ public final class InternalAutoDateHistogram extends
                 if (reduceRounding.round(top.current.key) != key) {
                     // the key changes, reduce what we already buffered and reset the buffer for current buckets
                     final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     reducedBuckets.add(reduced);
                     currentBuckets.clear();
                     key = reduceRounding.round(top.current.key);
@@ -348,7 +347,6 @@ public final class InternalAutoDateHistogram extends
 
             if (currentBuckets.isEmpty() == false) {
                 final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                reduceContext.consumeBucketsAndMaybeBreak(1);
                 reducedBuckets.add(reduced);
             }
         }
@@ -376,22 +374,17 @@ public final class InternalAutoDateHistogram extends
             long roundedBucketKey = reduceRounding.round(bucket.key);
             if (Double.isNaN(key)) {
                 key = roundedBucketKey;
-                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
                 sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
             } else if (roundedBucketKey == key) {
-                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
                 sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
             } else {
-                reduceContext.consumeBucketsAndMaybeBreak(1);
                 mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
                 sameKeyedBuckets.clear();
                 key = roundedBucketKey;
-                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
                 sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
             }
         }
         if (sameKeyedBuckets.isEmpty() == false) {
-            reduceContext.consumeBucketsAndMaybeBreak(1);
             mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
         }
         reducedBuckets = mergedBuckets;
@@ -449,7 +442,6 @@ public final class InternalAutoDateHistogram extends
             if (lastBucket != null) {
                 long key = rounding.nextRoundingValue(lastBucket.key);
                 while (key < nextBucket.key) {
-                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs));
                     key = rounding.nextRoundingValue(key);
                 }
@@ -515,7 +507,7 @@ public final class InternalAutoDateHistogram extends
             // Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
             reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
         }
-
+        reduceContext.consumeBucketsAndMaybeBreak(reducedBucketsResult.buckets.size());
         BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
                 this.bucketInfo.emptySubAggregations);
 
@@ -551,16 +543,13 @@ public final class InternalAutoDateHistogram extends
         for (int i = 0; i < reducedBuckets.size(); i++) {
             Bucket bucket = reducedBuckets.get(i);
             if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
-                reduceContext.consumeBucketsAndMaybeBreak(1);
                 mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
                 sameKeyedBuckets.clear();
                 key = roundingInfo.rounding.round(bucket.key);
             }
-            reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
             sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
         }
         if (sameKeyedBuckets.isEmpty() == false) {
-            reduceContext.consumeBucketsAndMaybeBreak(1);
             mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
         }
         return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);

+ 1 - 10
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

@@ -329,10 +329,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
                     // the key changes, reduce what we already buffered and reset the buffer for current buckets
                     final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
                     if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
-                        reduceContext.consumeBucketsAndMaybeBreak(1);
                         reducedBuckets.add(reduced);
-                    } else {
-                        reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
                     }
                     currentBuckets.clear();
                     key = top.current.key;
@@ -353,10 +350,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
             if (currentBuckets.isEmpty() == false) {
                 final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
                 if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
-                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     reducedBuckets.add(reduced);
-                } else {
-                    reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
                 }
             }
         }
@@ -396,7 +390,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
                     long key = bounds.getMin() + offset;
                     long max = bounds.getMax() + offset;
                     while (key <= max) {
-                        reduceContext.consumeBucketsAndMaybeBreak(1);
                         iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                         key = nextKey(key).longValue();
                     }
@@ -406,7 +399,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
                     long key = bounds.getMin() + offset;
                     if (key < firstBucket.key) {
                         while (key < firstBucket.key) {
-                            reduceContext.consumeBucketsAndMaybeBreak(1);
                             iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                             key = nextKey(key).longValue();
                         }
@@ -422,7 +414,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
             if (lastBucket != null) {
                 long key = nextKey(lastBucket.key).longValue();
                 while (key < nextBucket.key) {
-                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                     key = nextKey(key).longValue();
                 }
@@ -436,7 +427,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
             long key = nextKey(lastBucket.key).longValue();
             long max = bounds.getMax() + offset;
             while (key <= max) {
-                reduceContext.consumeBucketsAndMaybeBreak(1);
                 iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                 key = nextKey(key).longValue();
             }
@@ -462,6 +452,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
                 CollectionUtil.introSort(reducedBuckets, order.comparator());
             }
         }
+        reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
         return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
                 format, keyed, getMetadata());
     }

+ 1 - 10
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

@@ -313,10 +313,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
                     // Using Double.compare instead of != to handle NaN correctly.
                     final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
                     if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
-                        reduceContext.consumeBucketsAndMaybeBreak(1);
                         reducedBuckets.add(reduced);
-                    } else {
-                        reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
                     }
                     currentBuckets.clear();
                     key = top.current.key;
@@ -337,10 +334,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
             if (currentBuckets.isEmpty() == false) {
                 final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
                 if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
-                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     reducedBuckets.add(reduced);
-                } else {
-                    reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
                 }
             }
         }
@@ -380,7 +374,6 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
         if (iter.hasNext() == false) {
             // fill with empty buckets
             for (double key = round(emptyBucketInfo.minBound); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
-                reduceContext.consumeBucketsAndMaybeBreak(1);
                 iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
             }
         } else {
@@ -388,7 +381,6 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
             if (Double.isFinite(emptyBucketInfo.minBound)) {
                 // fill with empty buckets until the first key
                 for (double key = round(emptyBucketInfo.minBound); key < first.key; key = nextKey(key)) {
-                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                 }
             }
@@ -401,7 +393,6 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
                 if (lastBucket != null) {
                     double key = nextKey(lastBucket.key);
                     while (key < nextBucket.key) {
-                        reduceContext.consumeBucketsAndMaybeBreak(1);
                         iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                         key = nextKey(key);
                     }
@@ -412,7 +403,6 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
 
             // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
             for (double key = nextKey(lastBucket.key); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
-                reduceContext.consumeBucketsAndMaybeBreak(1);
                 iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
             }
         }
@@ -437,6 +427,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
                 CollectionUtil.introSort(reducedBuckets, order.comparator());
             }
         }
+        reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
         return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata());
     }
 

+ 3 - 6
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

@@ -392,7 +392,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
          * Iterate all of the buckets. Implementations take into account
          * the {@link BucketCountThresholds}. In particular,
          * if the {@link BucketCountThresholds#getMinDocCount()} is 0 then
-         * they'll make sure to iterate a bucket even if it was never 
+         * they'll make sure to iterate a bucket even if it was never
          * {{@link #collectGlobalOrd(int, long, LeafBucketCollector) collected}.
          * If {@link BucketCountThresholds#getMinDocCount()} is not 0 then
          * they'll skip all global ords that weren't collected.
@@ -500,7 +500,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
                 }
             }
         }
-            
+
 
         @Override
         public void close() {
@@ -543,9 +543,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
                         }
                         updateBucket(spare, globalOrd, bucketOrd, docCount);
                         spare = ordered.insertWithOverflow(spare);
-                        if (spare == null) {
-                            consumeBucketsAndMaybeBreak(1);
-                        }
                     }
                 }
             });
@@ -653,7 +650,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
             spare.docCount = docCount;
             otherDocCount += docCount;
         }
-        
+
         @Override
         PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
             return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);

+ 0 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java

@@ -113,8 +113,6 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesS
                     LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(oldKey, docCount, null, format);
                     bucket.bucketOrd = newBucketOrd;
                     buckets.add(bucket);
-
-                    consumeBucketsAndMaybeBreak(1);
                 } else {
                     // Make a note when one of the ords has been deleted
                     deletionCount += 1;

+ 0 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java

@@ -172,9 +172,6 @@ public class NumericTermsAggregator extends TermsAggregator {
                     if (bucketCountThresholds.getShardMinDocCount() <= docCount) {
                         updateBucket(spare, ordsEnum, docCount);
                         spare = ordered.insertWithOverflow(spare);
-                        if (spare == null) {
-                            consumeBucketsAndMaybeBreak(1);
-                        }
                     }
                 }
 

+ 0 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java

@@ -104,9 +104,6 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
 
             spare.bucketOrd = i;
             spare = ordered.insertWithOverflow(spare);
-            if (spare == null) {
-                consumeBucketsAndMaybeBreak(1);
-            }
         }
 
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];

+ 0 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java

@@ -216,9 +216,6 @@ public class SignificantTextAggregator extends BucketsAggregator {
 
             spare.bucketOrd = i;
             spare = ordered.insertWithOverflow(spare);
-            if (spare == null) {
-                consumeBucketsAndMaybeBreak(1);
-            }
         }
 
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];

+ 0 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java

@@ -119,8 +119,6 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator<Value
                     StringRareTerms.Bucket bucket = new StringRareTerms.Bucket(BytesRef.deepCopyOf(oldKey), docCount, null, format);
                     bucket.bucketOrd = newBucketOrd;
                     buckets.add(bucket);
-
-                    consumeBucketsAndMaybeBreak(1);
                 } else {
                     // Make a note when one of the ords has been deleted
                     deletionCount += 1;

+ 0 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java

@@ -147,9 +147,6 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
             spare.bucketOrd = i;
             if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
                 spare = ordered.insertWithOverflow(spare);
-                if (spare == null) {
-                    consumeBucketsAndMaybeBreak(1);
-                }
             }
         }
 

+ 4 - 1
server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java

@@ -323,7 +323,10 @@ public class HierarchyCircuitBreakerServiceTests extends ESTestCase {
 
             // make sure used bytes is greater than the total circuit breaker limit
             breaker.addWithoutBreaking(200);
-
+            // make sure that we check on the the following call
+            for (int i = 0; i < 1023; i++) {
+                multiBucketConsumer.accept(0);
+            }
             CircuitBreakingException exception =
                 expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(1024));
             assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [allocated_buckets] would be"));

+ 9 - 0
server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java

@@ -36,9 +36,11 @@ import org.elasticsearch.common.geo.GeoBoundingBoxTests;
 import org.elasticsearch.common.geo.GeoUtils;
 import org.elasticsearch.index.mapper.GeoPointFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.Aggregator;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
@@ -304,4 +306,11 @@ public abstract class GeoGridAggregatorTestCase<T extends InternalGeoGridBucket>
         indexReader.close();
         directory.close();
     }
+
+    @Override
+    public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
+        /*
+         * No-op.
+         */
+    }
 }

+ 8 - 0
server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java

@@ -38,6 +38,7 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
@@ -869,4 +870,11 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
             i += 1;
         }
     }
+
+    @Override
+    public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
+        /*
+         * No-op.
+         */
+    }
 }

+ 0 - 67
server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java

@@ -38,10 +38,8 @@ import org.elasticsearch.common.time.DateFormatters;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.search.aggregations.BucketOrder;
-import org.elasticsearch.search.aggregations.MultiBucketConsumerService.TooManyBucketsException;
 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
@@ -967,71 +965,6 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
         );
     }
 
-    public void testMaxBucket() throws IOException {
-        Query query = new MatchAllDocsQuery();
-        List<String> timestamps = Arrays.asList(
-            "2010-01-01T00:00:00.000Z",
-            "2011-01-01T00:00:00.000Z",
-            "2017-01-01T00:00:00.000Z"
-        );
-
-        expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps,
-            aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE),
-            histogram -> {}, 2, false));
-
-        expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
-            aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE),
-            histogram -> {}, 2, false));
-
-        expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
-            aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L),
-            histogram -> {}, 100, false));
-
-        expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
-            aggregation ->
-                aggregation.fixedInterval(DateHistogramInterval.seconds(5))
-                    .field(AGGREGABLE_DATE)
-                    .subAggregation(
-                        AggregationBuilders.dateHistogram("1")
-                            .fixedInterval(DateHistogramInterval.seconds(5))
-                            .field(AGGREGABLE_DATE)
-                    ),
-            histogram -> {}, 5, false));
-    }
-
-    public void testMaxBucketDeprecated() throws IOException {
-        Query query = new MatchAllDocsQuery();
-        List<String> timestamps = Arrays.asList(
-            "2010-01-01T00:00:00.000Z",
-            "2011-01-01T00:00:00.000Z",
-            "2017-01-01T00:00:00.000Z"
-        );
-
-        expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps,
-            aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE),
-            histogram -> {}, 2, false));
-
-        expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
-            aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE),
-            histogram -> {}, 2, false));
-
-        expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
-            aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L),
-            histogram -> {}, 100, false));
-
-        expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
-            aggregation ->
-                aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5))
-                    .field(AGGREGABLE_DATE)
-                    .subAggregation(
-                        AggregationBuilders.dateHistogram("1")
-                            .dateHistogramInterval(DateHistogramInterval.seconds(5))
-                            .field(AGGREGABLE_DATE)
-                    ),
-            histogram -> {}, 5, false));
-        assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
-    }
-
     public void testFixedWithCalendar() throws IOException {
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testSearchCase(new MatchAllDocsQuery(),
             Arrays.asList(

+ 1 - 1
server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java

@@ -147,7 +147,7 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
         /*
          * Guess the interval to use based on the roughly estimated
          * duration. It'll be accurate or it'll produce more buckets
-         * than we need but it is quick. 
+         * than we need but it is quick.
          */
         if (normalizedDuration != 0) {
             for (int j = roundingInfo.innerIntervals.length-1; j >= 0; j--) {

+ 9 - 0
server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java

@@ -36,7 +36,9 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
 import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
 
 import java.io.IOException;
@@ -304,4 +306,11 @@ public class DateRangeAggregatorTests extends AggregatorTestCase {
             }
         }
     }
+
+    @Override
+    public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
+        /*
+         * No-op.
+         */
+    }
 }

+ 0 - 2
test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

@@ -422,7 +422,6 @@ public abstract class AggregatorTestCase extends ESTestCase {
         a.postCollection();
         @SuppressWarnings("unchecked")
         A result = (A) a.buildTopLevel();
-        InternalAggregationTestCase.assertMultiBucketConsumer(result, bucketConsumer);
         return result;
     }
 
@@ -493,7 +492,6 @@ public abstract class AggregatorTestCase extends ESTestCase {
             a.postCollection();
             InternalAggregation agg = a.buildTopLevel();
             aggs.add(agg);
-            InternalAggregationTestCase.assertMultiBucketConsumer(agg, shardBucketConsumer);
         }
         if (aggs.isEmpty()) {
             return (A) root.buildEmptyAggregation();

+ 11 - 5
test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

@@ -44,6 +44,7 @@ import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
 import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
 import org.elasticsearch.search.aggregations.ParsedAggregation;
 import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder;
@@ -82,15 +83,15 @@ import org.elasticsearch.search.aggregations.bucket.range.ParsedRange;
 import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler;
 import org.elasticsearch.search.aggregations.bucket.sampler.ParsedSampler;
-import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms;
-import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms;
-import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms;
-import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
+import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms;
+import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
+import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms;
+import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
@@ -387,10 +388,15 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
                 bigArrays, mockScriptService, bucketConsumer, PipelineTree.EMPTY);
         @SuppressWarnings("unchecked")
         T reduced = (T) inputs.get(0).reduce(toReduce, context);
-        assertMultiBucketConsumer(reduced, bucketConsumer);
+        doAssertReducedMultiBucketConsumer(reduced, bucketConsumer);
         assertReduced(reduced, inputs);
     }
 
+    protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
+        InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer);
+    }
+
+
     /**
      * overwrite in tests that need it
      */

+ 8 - 0
test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java

@@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
 import org.elasticsearch.search.aggregations.ParsedAggregation;
 import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
@@ -184,4 +185,11 @@ public abstract class InternalMultiBucketAggregationTestCase<T extends InternalA
             }
         }
     }
+
+    @Override
+    public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) {
+        /*
+         * No-op.
+         */
+    }
 }

+ 2 - 2
x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/JdbcErrorsTestCase.java

@@ -135,9 +135,9 @@ public abstract class JdbcErrorsTestCase extends JdbcIntegrationTestCase {
         try (Connection c = esJdbc()) {
             SQLException e = expectThrows(
                 SQLException.class,
-                () -> c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000").executeQuery()
+                () -> c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000").executeQuery()
             );
-            assertEquals("The maximum LIMIT for aggregate sorting is [10000], received [12000]", e.getMessage());
+            assertEquals("The maximum LIMIT for aggregate sorting is [65535], received [120000]", e.getMessage());
         }
     }
 }

+ 2 - 2
x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java

@@ -117,9 +117,9 @@ public abstract class ErrorsTestCase extends CliIntegrationTestCase implements o
     @Override
     public void testHardLimitForSortOnAggregate() throws Exception {
         index("test", body -> body.field("a", 1).field("b", 2));
-        String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000");
+        String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000");
         assertEquals(
-            START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [10000], received [12000]" + END,
+            START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [65535], received [120000]" + END,
             commandResult
         );
     }

+ 2 - 2
x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java

@@ -439,8 +439,8 @@ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements Err
     public void testHardLimitForSortOnAggregate() throws Exception {
         index("{\"a\": 1, \"b\": 2}");
         expectBadRequest(
-            () -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000"),
-            containsString("The maximum LIMIT for aggregate sorting is [10000], received [12000]")
+            () -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000"),
+            containsString("The maximum LIMIT for aggregate sorting is [65535], received [120000]")
         );
     }