Browse Source

Add a new cluster setting to limit the total number of buckets returned by a request (#27581)

This commit adds a new dynamic cluster setting named `search.max_buckets` that can be used to limit the number of buckets created per shard or by the reduce phase. Each multi bucket aggregator can consume buckets during the final build of the aggregation at the shard level or during the reduce phase (final or not) in the coordinating node. When an aggregator consumes a bucket, a global count for the request is incremented and if this number is greater than the limit an exception is thrown (TooManyBuckets exception).
This change adds the ability for multi bucket aggregator to "consume" buckets in the global limit, the default is 10,000. It's an opt-in consumer so each multi-bucket aggregator must explicitly call the consumer when a bucket is added in the response.

Closes #27452 #26012
Jim Ferenczi 7 years ago
parent
commit
caea6b70fa
49 changed files with 658 additions and 84 deletions
  1. 5 1
      core/src/main/java/org/elasticsearch/ElasticsearchException.java
  2. 11 7
      core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
  3. 2 0
      core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
  4. 8 6
      core/src/main/java/org/elasticsearch/node/Node.java
  5. 11 1
      core/src/main/java/org/elasticsearch/search/SearchService.java
  6. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java
  7. 16 0
      core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
  8. 34 0
      core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java
  9. 126 0
      core/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java
  10. 18 1
      core/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java
  11. 15 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java
  12. 2 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java
  13. 3 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java
  14. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java
  15. 2 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java
  16. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java
  17. 2 1
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java
  18. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java
  19. 6 1
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java
  20. 2 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java
  21. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java
  22. 10 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java
  23. 10 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java
  24. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregator.java
  25. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java
  26. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java
  27. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java
  28. 3 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java
  29. 8 1
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java
  30. 3 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java
  31. 3 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java
  32. 21 18
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java
  33. 1 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java
  34. 5 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java
  35. 4 2
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java
  36. 3 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java
  37. 10 0
      core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
  38. 7 3
      core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java
  39. 13 6
      core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java
  40. 3 1
      core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
  41. 17 0
      core/src/test/java/org/elasticsearch/search/aggregations/EquivalenceIT.java
  42. 1 1
      core/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java
  43. 64 7
      core/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java
  44. 2 3
      core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java
  45. 4 0
      docs/reference/aggregations/bucket.asciidoc
  46. 7 1
      docs/reference/migration/migrate_7_0/aggregations.asciidoc
  47. 110 0
      rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/240_max_buckets.yml
  48. 62 21
      test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java
  49. 15 2
      test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

+ 5 - 1
core/src/main/java/org/elasticsearch/ElasticsearchException.java

@@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.TcpTransport;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -986,7 +987,10 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
         SHARD_LOCK_OBTAIN_FAILED_EXCEPTION(org.elasticsearch.env.ShardLockObtainFailedException.class,
         SHARD_LOCK_OBTAIN_FAILED_EXCEPTION(org.elasticsearch.env.ShardLockObtainFailedException.class,
                                            org.elasticsearch.env.ShardLockObtainFailedException::new, 147, Version.V_5_0_2),
                                            org.elasticsearch.env.ShardLockObtainFailedException::new, 147, Version.V_5_0_2),
         UNKNOWN_NAMED_OBJECT_EXCEPTION(org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException.class,
         UNKNOWN_NAMED_OBJECT_EXCEPTION(org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException.class,
-                org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException::new, 148, Version.V_5_2_0);
+                org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException::new, 148, Version.V_5_2_0),
+        TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class,
+                                   MultiBucketConsumerService.TooManyBucketsException::new, 149,
+            Version.V_7_0_0_alpha1);
 
 
         final Class<? extends ElasticsearchException> exceptionClass;
         final Class<? extends ElasticsearchException> exceptionClass;
         final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
         final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

+ 11 - 7
core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

@@ -65,6 +65,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.function.IntFunction;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import java.util.stream.StreamSupport;
@@ -73,13 +74,16 @@ public final class SearchPhaseController extends AbstractComponent {
 
 
     private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
     private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
 
 
-    private final BigArrays bigArrays;
-    private final ScriptService scriptService;
+    private final Function<Boolean, ReduceContext> reduceContextFunction;
 
 
-    public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService) {
+    /**
+     * Constructor.
+     * @param settings Node settings
+     * @param reduceContextFunction A function that builds a context for the reduce of an {@link InternalAggregation}
+     */
+    public SearchPhaseController(Settings settings, Function<Boolean, ReduceContext> reduceContextFunction) {
         super(settings);
         super(settings);
-        this.bigArrays = bigArrays;
-        this.scriptService = scriptService;
+        this.reduceContextFunction = reduceContextFunction;
     }
     }
 
 
     public AggregatedDfs aggregateDfs(Collection<DfsSearchResult> results) {
     public AggregatedDfs aggregateDfs(Collection<DfsSearchResult> results) {
@@ -496,7 +500,7 @@ public final class SearchPhaseController extends AbstractComponent {
             }
             }
         }
         }
         final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
         final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
-        ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, true);
+        ReduceContext reduceContext = reduceContextFunction.apply(true);
         final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
         final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
             firstResult.pipelineAggregators(), reduceContext);
             firstResult.pipelineAggregators(), reduceContext);
         final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
         final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
@@ -513,7 +517,7 @@ public final class SearchPhaseController extends AbstractComponent {
      * that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
      * that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
      */
      */
     private InternalAggregations reduceAggsIncrementally(List<InternalAggregations> aggregationsList) {
     private InternalAggregations reduceAggsIncrementally(List<InternalAggregations> aggregationsList) {
-        ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, false);
+        ReduceContext reduceContext = reduceContextFunction.apply(false);
         return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
         return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
             null, reduceContext);
             null, reduceContext);
     }
     }

+ 2 - 0
core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -85,6 +85,7 @@ import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.SearchService;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
 import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
 import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.transport.RemoteClusterAware;
@@ -360,6 +361,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     SearchService.DEFAULT_KEEPALIVE_SETTING,
                     SearchService.DEFAULT_KEEPALIVE_SETTING,
                     SearchService.KEEPALIVE_INTERVAL_SETTING,
                     SearchService.KEEPALIVE_INTERVAL_SETTING,
                     SearchService.MAX_KEEPALIVE_SETTING,
                     SearchService.MAX_KEEPALIVE_SETTING,
+                    MultiBucketConsumerService.MAX_BUCKET_SETTING,
                     SearchService.LOW_LEVEL_CANCELLATION_SETTING,
                     SearchService.LOW_LEVEL_CANCELLATION_SETTING,
                     Node.WRITE_PORTS_FILE_SETTING,
                     Node.WRITE_PORTS_FILE_SETTING,
                     Node.NODE_NAME_SETTING,
                     Node.NODE_NAME_SETTING,

+ 8 - 6
core/src/main/java/org/elasticsearch/node/Node.java

@@ -100,7 +100,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
 import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.indices.cluster.IndicesClusterStateService;
 import org.elasticsearch.indices.cluster.IndicesClusterStateService;
-import org.elasticsearch.indices.mapper.MapperRegistry;
 import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
 import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
 import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
 import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.recovery.RecoverySettings;
@@ -449,6 +448,11 @@ public class Node implements Closeable {
                 transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
                 transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
                 httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
                 httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
                 searchTransportService);
                 searchTransportService);
+
+            final SearchService searchService = newSearchService(clusterService, indicesService,
+                threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
+                responseCollectorService);
+
             modules.add(b -> {
             modules.add(b -> {
                     b.bind(Node.class).toInstance(this);
                     b.bind(Node.class).toInstance(this);
                     b.bind(NodeService.class).toInstance(nodeService);
                     b.bind(NodeService.class).toInstance(nodeService);
@@ -470,12 +474,10 @@ public class Node implements Closeable {
                     b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
                     b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
                     b.bind(MetaStateService.class).toInstance(metaStateService);
                     b.bind(MetaStateService.class).toInstance(metaStateService);
                     b.bind(IndicesService.class).toInstance(indicesService);
                     b.bind(IndicesService.class).toInstance(indicesService);
-                    b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
-                        threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
-                        responseCollectorService));
+                    b.bind(SearchService.class).toInstance(searchService);
                     b.bind(SearchTransportService.class).toInstance(searchTransportService);
                     b.bind(SearchTransportService.class).toInstance(searchTransportService);
-                    b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
-                            scriptModule.getScriptService()));
+                    b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings,
+                        searchService::createReduceContext));
                     b.bind(Transport.class).toInstance(transport);
                     b.bind(Transport.class).toInstance(transport);
                     b.bind(TransportService.class).toInstance(transportService);
                     b.bind(TransportService.class).toInstance(transportService);
                     b.bind(NetworkService.class).toInstance(networkService);
                     b.bind(NetworkService.class).toInstance(networkService);

+ 11 - 1
core/src/main/java/org/elasticsearch/search/SearchService.java

@@ -60,6 +60,8 @@ import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.script.SearchScript;
 import org.elasticsearch.script.SearchScript;
 import org.elasticsearch.search.aggregations.AggregationInitializationException;
 import org.elasticsearch.search.aggregations.AggregationInitializationException;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
 import org.elasticsearch.search.aggregations.SearchContextAggregations;
 import org.elasticsearch.search.aggregations.SearchContextAggregations;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.collapse.CollapseContext;
 import org.elasticsearch.search.collapse.CollapseContext;
@@ -118,6 +120,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         Setting.positiveTimeSetting("search.max_keep_alive", timeValueHours(24), Property.NodeScope, Property.Dynamic);
         Setting.positiveTimeSetting("search.max_keep_alive", timeValueHours(24), Property.NodeScope, Property.Dynamic);
     public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING =
     public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING =
         Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), Property.NodeScope);
         Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), Property.NodeScope);
+
     /**
     /**
      * Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
      * Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
      * to the cancellation request faster. However, since it will produce more cancellation checks it might slow the search performance
      * to the cancellation request faster. However, since it will produce more cancellation checks it might slow the search performance
@@ -163,6 +166,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
 
 
     private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
     private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
 
 
+    private final MultiBucketConsumerService multiBucketConsumerService;
+
     public SearchService(ClusterService clusterService, IndicesService indicesService,
     public SearchService(ClusterService clusterService, IndicesService indicesService,
                          ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
                          ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
                          ResponseCollectorService responseCollectorService) {
                          ResponseCollectorService responseCollectorService) {
@@ -175,6 +180,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         this.bigArrays = bigArrays;
         this.bigArrays = bigArrays;
         this.queryPhase = new QueryPhase(settings);
         this.queryPhase = new QueryPhase(settings);
         this.fetchPhase = fetchPhase;
         this.fetchPhase = fetchPhase;
+        this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings);
 
 
         TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
         TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
         setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
         setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
@@ -741,7 +747,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         if (source.aggregations() != null) {
         if (source.aggregations() != null) {
             try {
             try {
                 AggregatorFactories factories = source.aggregations().build(context, null);
                 AggregatorFactories factories = source.aggregations().build(context, null);
-                context.aggregations(new SearchContextAggregations(factories));
+                context.aggregations(new SearchContextAggregations(factories, multiBucketConsumerService.create()));
             } catch (IOException e) {
             } catch (IOException e) {
                 throw new AggregationInitializationException("Failed to create aggregators", e);
                 throw new AggregationInitializationException("Failed to create aggregators", e);
             }
             }
@@ -1017,4 +1023,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     public IndicesService getIndicesService() {
     public IndicesService getIndicesService() {
         return indicesService;
         return indicesService;
     }
     }
+
+    public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce) {
+        return new InternalAggregation.ReduceContext(bigArrays, scriptService, multiBucketConsumerService.create(), finalReduce);
+    }
 }
 }

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

@@ -123,6 +123,7 @@ public class AggregationPhase implements SearchPhase {
         }
         }
 
 
         List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
         List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
+        context.aggregations().resetBucketMultiConsumer();
         for (Aggregator aggregator : context.aggregations().aggregators()) {
         for (Aggregator aggregator : context.aggregations().aggregators()) {
             try {
             try {
                 aggregator.postCollection();
                 aggregator.postCollection();

+ 16 - 0
core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteable;
 import org.elasticsearch.common.io.stream.NamedWriteable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.BigArray;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.rest.action.search.RestSearchAction;
 import org.elasticsearch.rest.action.search.RestSearchAction;
@@ -33,6 +34,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Objects;
+import java.util.function.IntConsumer;
 
 
 /**
 /**
  * An internal implementation of {@link Aggregation}. Serves as a base class for all aggregation implementations.
  * An internal implementation of {@link Aggregation}. Serves as a base class for all aggregation implementations.
@@ -43,11 +45,17 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
 
 
         private final BigArrays bigArrays;
         private final BigArrays bigArrays;
         private final ScriptService scriptService;
         private final ScriptService scriptService;
+        private final IntConsumer multiBucketConsumer;
         private final boolean isFinalReduce;
         private final boolean isFinalReduce;
 
 
         public ReduceContext(BigArrays bigArrays, ScriptService scriptService, boolean isFinalReduce) {
         public ReduceContext(BigArrays bigArrays, ScriptService scriptService, boolean isFinalReduce) {
+            this(bigArrays, scriptService, (s) -> {}, isFinalReduce);
+        }
+
+        public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsumer multiBucketConsumer, boolean isFinalReduce) {
             this.bigArrays = bigArrays;
             this.bigArrays = bigArrays;
             this.scriptService = scriptService;
             this.scriptService = scriptService;
+            this.multiBucketConsumer = multiBucketConsumer;
             this.isFinalReduce = isFinalReduce;
             this.isFinalReduce = isFinalReduce;
         }
         }
 
 
@@ -67,6 +75,14 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
         public ScriptService scriptService() {
         public ScriptService scriptService() {
             return scriptService;
             return scriptService;
         }
         }
+
+        /**
+         * Adds <tt>count</tt> buckets to the global count for the request and fails if this number is greater than
+         * the maximum number of buckets allowed in a response
+         */
+        public void consumeBucketsAndMaybeBreak(int size) {
+            multiBucketConsumer.accept(size);
+        }
     }
     }
 
 
     protected final String name;
     protected final String name;

+ 34 - 0
core/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java

@@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -82,6 +83,39 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
         }
         }
     }
     }
 
 
+    /**
+     * Counts the number of inner buckets inside the provided {@link InternalBucket}
+     */
+    public static int countInnerBucket(InternalBucket bucket) {
+        int count = 0;
+        for (Aggregation agg : bucket.getAggregations().asList()) {
+            count += countInnerBucket(agg);
+        }
+        return count;
+    }
+
+    /**
+     * Counts the number of inner buckets inside the provided {@link Aggregation}
+     */
+    public static int countInnerBucket(Aggregation agg) {
+        int size = 0;
+        if (agg instanceof MultiBucketsAggregation) {
+            MultiBucketsAggregation multi = (MultiBucketsAggregation) agg;
+            for (MultiBucketsAggregation.Bucket bucket : multi.getBuckets()) {
+                ++ size;
+                for (Aggregation bucketAgg : bucket.getAggregations().asList()) {
+                    size += countInnerBucket(bucketAgg);
+                }
+            }
+        } else if (agg instanceof SingleBucketAggregation) {
+            SingleBucketAggregation single = (SingleBucketAggregation) agg;
+            for (Aggregation bucketAgg : single.getAggregations().asList()) {
+                size += countInnerBucket(bucketAgg);
+            }
+        }
+        return size;
+    }
+
     public abstract static class InternalBucket implements Bucket, Writeable {
     public abstract static class InternalBucket implements Bucket, Writeable {
 
 
         public Object getProperty(String containingAggName, List<String> path) {
         public Object getProperty(String containingAggName, List<String> path) {

+ 126 - 0
core/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java

@@ -0,0 +1,126 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations;
+
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
+
+import java.io.IOException;
+import java.util.function.IntConsumer;
+
+/**
+ * An aggregation service that creates instances of {@link MultiBucketConsumer}.
+ * The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
+ * in {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
+ * The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
+ */
+public class MultiBucketConsumerService {
+    public static final int DEFAULT_MAX_BUCKETS = 10000;
+    public static final Setting<Integer> MAX_BUCKET_SETTING =
+        Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);
+
+    private volatile int maxBucket;
+
+    public MultiBucketConsumerService(ClusterService clusterService, Settings settings) {
+       this.maxBucket = MAX_BUCKET_SETTING.get(settings);
+       clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BUCKET_SETTING, this::setMaxBucket);
+    }
+
+    private void setMaxBucket(int maxBucket) {
+        this.maxBucket = maxBucket;
+    }
+
+    public static class TooManyBucketsException extends AggregationExecutionException {
+        private final int maxBuckets;
+
+        public TooManyBucketsException(String message, int maxBuckets) {
+            super(message);
+            this.maxBuckets = maxBuckets;
+        }
+
+        public TooManyBucketsException(StreamInput in) throws IOException {
+            super(in);
+            maxBuckets = in.readInt();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeInt(maxBuckets);
+        }
+
+        public int getMaxBuckets() {
+            return maxBuckets;
+        }
+
+        @Override
+        public RestStatus status() {
+            return RestStatus.SERVICE_UNAVAILABLE;
+        }
+
+        @Override
+        protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.field("max_buckets", maxBuckets);
+        }
+    }
+
+    /**
+     * An {@link IntConsumer} that throws a {@link TooManyBucketsException}
+     * when the sum of the provided values is above the limit (`search.max_buckets`).
+     * It is used by aggregators to limit the number of bucket creation during
+     * {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
+     */
+    public static class MultiBucketConsumer implements IntConsumer {
+        private final int limit;
+        // aggregations execute in a single thread so no atomic here
+        private int count;
+
+        public MultiBucketConsumer(int limit) {
+            this.limit = limit;
+        }
+
+        @Override
+        public void accept(int value) {
+            count += value;
+            if (count > limit) {
+                throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
+                    + "] but was [" + count + "]. This limit can be set by changing the [" +
+                    MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
+            }
+        }
+
+        public void reset() {
+            this.count = 0;
+        }
+
+        public int getCount() {
+            return count;
+        }
+    }
+
+    public MultiBucketConsumer create() {
+        return new MultiBucketConsumer(maxBucket);
+    }
+}

+ 18 - 1
core/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java

@@ -18,19 +18,25 @@
  */
  */
 package org.elasticsearch.search.aggregations;
 package org.elasticsearch.search.aggregations;
 
 
+import java.util.function.IntConsumer;
+
+import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
+
 /**
 /**
  * The aggregation context that is part of the search context.
  * The aggregation context that is part of the search context.
  */
  */
 public class SearchContextAggregations {
 public class SearchContextAggregations {
 
 
     private final AggregatorFactories factories;
     private final AggregatorFactories factories;
+    private final MultiBucketConsumer multiBucketConsumer;
     private Aggregator[] aggregators;
     private Aggregator[] aggregators;
 
 
     /**
     /**
      * Creates a new aggregation context with the parsed aggregator factories
      * Creates a new aggregation context with the parsed aggregator factories
      */
      */
-    public SearchContextAggregations(AggregatorFactories factories) {
+    public SearchContextAggregations(AggregatorFactories factories, MultiBucketConsumer multiBucketConsumer) {
         this.factories = factories;
         this.factories = factories;
+        this.multiBucketConsumer = multiBucketConsumer;
     }
     }
 
 
     public AggregatorFactories factories() {
     public AggregatorFactories factories() {
@@ -50,4 +56,15 @@ public class SearchContextAggregations {
         this.aggregators = aggregators;
         this.aggregators = aggregators;
     }
     }
 
 
+    /**
+     * Returns a consumer for multi bucket aggregation that checks the total number of buckets
+     * created in the response
+     */
+    public IntConsumer multiBucketConsumer() {
+        return multiBucketConsumer;
+    }
+
+    void resetBucketMultiConsumer() {
+        multiBucketConsumer.reset();
+    }
 }
 }

+ 15 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

@@ -34,10 +34,12 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.function.IntConsumer;
 
 
 public abstract class BucketsAggregator extends AggregatorBase {
 public abstract class BucketsAggregator extends AggregatorBase {
 
 
     private final BigArrays bigArrays;
     private final BigArrays bigArrays;
+    private final IntConsumer multiBucketConsumer;
     private IntArray docCounts;
     private IntArray docCounts;
 
 
     public BucketsAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
     public BucketsAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
@@ -45,6 +47,11 @@ public abstract class BucketsAggregator extends AggregatorBase {
         super(name, factories, context, parent, pipelineAggregators, metaData);
         super(name, factories, context, parent, pipelineAggregators, metaData);
         bigArrays = context.bigArrays();
         bigArrays = context.bigArrays();
         docCounts = bigArrays.newIntArray(1, true);
         docCounts = bigArrays.newIntArray(1, true);
+        if (context.aggregations() != null) {
+            multiBucketConsumer = context.aggregations().multiBucketConsumer();
+        } else {
+            multiBucketConsumer = (count) -> {};
+        }
     }
     }
 
 
     /**
     /**
@@ -104,6 +111,14 @@ public abstract class BucketsAggregator extends AggregatorBase {
         }
         }
     }
     }
 
 
+    /**
+     * Adds <tt>count</tt> buckets to the global count for the request and fails if this number is greater than
+     * the maximum number of buckets allowed in a response
+     */
+    protected final void consumeBucketsAndMaybeBreak(int count) {
+        multiBucketConsumer.accept(count);
+    }
+
     /**
     /**
      * Required method to build the child aggregations of the given bucket (identified by the bucket ordinal).
      * Required method to build the child aggregations of the given bucket (identified by the bucket ordinal).
      */
      */

+ 2 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java

@@ -210,6 +210,7 @@ public class AdjacencyMatrixAggregator extends BucketsAggregator {
                 InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(keys[i],
                 InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(keys[i],
                         docCount, bucketAggregations(bucketOrd));
                         docCount, bucketAggregations(bucketOrd));
                 buckets.add(bucket);
                 buckets.add(bucket);
+                consumeBucketsAndMaybeBreak(1);
             }
             }
         }
         }
         int pos = keys.length;
         int pos = keys.length;
@@ -223,6 +224,7 @@ public class AdjacencyMatrixAggregator extends BucketsAggregator {
                     InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(intersectKey,
                     InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(intersectKey,
                             docCount, bucketAggregations(bucketOrd));
                             docCount, bucketAggregations(bucketOrd));
                     buckets.add(bucket);
                     buckets.add(bucket);
+                    consumeBucketsAndMaybeBreak(1);
                 }
                 }
                 pos++;
                 pos++;
             }
             }

+ 3 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java

@@ -214,7 +214,10 @@ public class InternalAdjacencyMatrix
         for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
         for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
             InternalBucket reducedBucket = sameRangeList.get(0).reduce(sameRangeList, reduceContext);
             InternalBucket reducedBucket = sameRangeList.get(0).reduce(sameRangeList, reduceContext);
             if(reducedBucket.docCount >= 1){
             if(reducedBucket.docCount >= 1){
+                reduceContext.consumeBucketsAndMaybeBreak(1);
                 reducedBuckets.add(reducedBucket);
                 reducedBuckets.add(reducedBucket);
+            } else {
+                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket));
             }
             }
         }
         }
         Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey));
         Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey));

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java

@@ -83,6 +83,7 @@ final class CompositeAggregator extends BucketsAggregator {
     @Override
     @Override
     public InternalAggregation buildAggregation(long zeroBucket) throws IOException {
     public InternalAggregation buildAggregation(long zeroBucket) throws IOException {
         assert zeroBucket == 0L;
         assert zeroBucket == 0L;
+        consumeBucketsAndMaybeBreak(keys.size());
 
 
         // Replay all documents that contain at least one top bucket (collected during the first pass).
         // Replay all documents that contain at least one top bucket (collected during the first pass).
         grow(keys.size()+1);
         grow(keys.size()+1);

+ 2 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

@@ -132,6 +132,7 @@ public class InternalComposite
             if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
             if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
                 InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext);
                 InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext);
                 buckets.clear();
                 buckets.clear();
+                reduceContext.consumeBucketsAndMaybeBreak(1);
                 result.add(reduceBucket);
                 result.add(reduceBucket);
                 if (result.size() >= size) {
                 if (result.size() >= size) {
                     break;
                     break;
@@ -145,6 +146,7 @@ public class InternalComposite
         }
         }
         if (buckets.size() > 0) {
         if (buckets.size() > 0) {
             InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext);
             InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext);
+            reduceContext.consumeBucketsAndMaybeBreak(1);
             result.add(reduceBucket);
             result.add(reduceBucket);
         }
         }
         return new InternalComposite(name, size, sourceNames, result, reverseMuls, pipelineAggregators(), metaData);
         return new InternalComposite(name, size, sourceNames, result, reverseMuls, pipelineAggregators(), metaData);

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java

@@ -166,6 +166,7 @@ public class FiltersAggregator extends BucketsAggregator {
 
 
     @Override
     @Override
     public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
     public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
+        consumeBucketsAndMaybeBreak(keys.length + (showOtherBucket ? 1 : 0));
         List<InternalFilters.InternalBucket> buckets = new ArrayList<>(keys.length);
         List<InternalFilters.InternalBucket> buckets = new ArrayList<>(keys.length);
         for (int i = 0; i < keys.length; i++) {
         for (int i = 0; i < keys.length; i++) {
             long bucketOrd = bucketOrd(owningBucketOrdinal, i);
             long bucketOrd = bucketOrd(owningBucketOrdinal, i);

+ 2 - 1
core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java

@@ -223,7 +223,8 @@ public class InternalFilters extends InternalMultiBucketAggregation<InternalFilt
             }
             }
         }
         }
 
 
-        InternalFilters reduced = new InternalFilters(name, new ArrayList<InternalBucket>(bucketsList.size()), keyed, pipelineAggregators(),
+        reduceContext.consumeBucketsAndMaybeBreak(bucketsList.size());
+        InternalFilters reduced = new InternalFilters(name, new ArrayList<>(bucketsList.size()), keyed, pipelineAggregators(),
                 getMetaData());
                 getMetaData());
         for (List<InternalBucket> sameRangeList : bucketsList) {
         for (List<InternalBucket> sameRangeList : bucketsList) {
             reduced.buckets.add((sameRangeList.get(0)).reduce(sameRangeList, reduceContext));
             reduced.buckets.add((sameRangeList.get(0)).reduce(sameRangeList, reduceContext));

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java

@@ -106,6 +106,7 @@ public class GeoHashGridAggregator extends BucketsAggregator {
     public InternalGeoHashGrid buildAggregation(long owningBucketOrdinal) throws IOException {
     public InternalGeoHashGrid buildAggregation(long owningBucketOrdinal) throws IOException {
         assert owningBucketOrdinal == 0;
         assert owningBucketOrdinal == 0;
         final int size = (int) Math.min(bucketOrds.size(), shardSize);
         final int size = (int) Math.min(bucketOrds.size(), shardSize);
+        consumeBucketsAndMaybeBreak(size);
 
 
         InternalGeoHashGrid.BucketPriorityQueue ordered = new InternalGeoHashGrid.BucketPriorityQueue(size);
         InternalGeoHashGrid.BucketPriorityQueue ordered = new InternalGeoHashGrid.BucketPriorityQueue(size);
         OrdinalBucket spare = null;
         OrdinalBucket spare = null;

+ 6 - 1
core/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java

@@ -211,7 +211,12 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
         BucketPriorityQueue ordered = new BucketPriorityQueue(size);
         BucketPriorityQueue ordered = new BucketPriorityQueue(size);
         for (LongObjectPagedHashMap.Cursor<List<Bucket>> cursor : buckets) {
         for (LongObjectPagedHashMap.Cursor<List<Bucket>> cursor : buckets) {
             List<Bucket> sameCellBuckets = cursor.value;
             List<Bucket> sameCellBuckets = cursor.value;
-            ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext));
+            Bucket removed = ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext));
+            if (removed != null) {
+                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
+            } else {
+                reduceContext.consumeBucketsAndMaybeBreak(1);
+            }
         }
         }
         buckets.close();
         buckets.close();
         Bucket[] list = new Bucket[ordered.size()];
         Bucket[] list = new Bucket[ordered.size()];

+ 2 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

@@ -127,6 +127,8 @@ class DateHistogramAggregator extends BucketsAggregator {
     @Override
     @Override
     public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
     public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
         assert owningBucketOrdinal == 0;
         assert owningBucketOrdinal == 0;
+        consumeBucketsAndMaybeBreak((int) bucketOrds.size());
+
         List<InternalDateHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
         List<InternalDateHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
         for (long i = 0; i < bucketOrds.size(); i++) {
         for (long i = 0; i < bucketOrds.size(); i++) {
             buckets.add(new InternalDateHistogram.Bucket(bucketOrds.get(i), bucketDocCount(i), keyed, formatter, bucketAggregations(i)));
             buckets.add(new InternalDateHistogram.Bucket(bucketOrds.get(i), bucketDocCount(i), keyed, formatter, bucketAggregations(i)));

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java

@@ -131,6 +131,7 @@ class HistogramAggregator extends BucketsAggregator {
     @Override
     @Override
     public InternalAggregation buildAggregation(long bucket) throws IOException {
     public InternalAggregation buildAggregation(long bucket) throws IOException {
         assert bucket == 0;
         assert bucket == 0;
+        consumeBucketsAndMaybeBreak((int) bucketOrds.size());
         List<InternalHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
         List<InternalHistogram.Bucket> buckets = new ArrayList<>((int) bucketOrds.size());
         for (long i = 0; i < bucketOrds.size(); i++) {
         for (long i = 0; i < bucketOrds.size(); i++) {
             double roundKey = Double.longBitsToDouble(bucketOrds.get(i));
             double roundKey = Double.longBitsToDouble(bucketOrds.get(i));

+ 10 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

@@ -344,7 +344,10 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
                     // the key changes, reduce what we already buffered and reset the buffer for current buckets
                     // the key changes, reduce what we already buffered and reset the buffer for current buckets
                     final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext);
                     final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext);
                     if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
                     if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
+                        reduceContext.consumeBucketsAndMaybeBreak(1);
                         reducedBuckets.add(reduced);
                         reducedBuckets.add(reduced);
+                    } else {
+                        reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
                     }
                     }
                     currentBuckets.clear();
                     currentBuckets.clear();
                     key = top.current.key;
                     key = top.current.key;
@@ -365,7 +368,10 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
             if (currentBuckets.isEmpty() == false) {
             if (currentBuckets.isEmpty() == false) {
                 final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext);
                 final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext);
                 if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
                 if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
+                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     reducedBuckets.add(reduced);
                     reducedBuckets.add(reduced);
+                } else {
+                    reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
                 }
                 }
             }
             }
         }
         }
@@ -388,6 +394,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
                     long key = bounds.getMin() + offset;
                     long key = bounds.getMin() + offset;
                     long max = bounds.getMax() + offset;
                     long max = bounds.getMax() + offset;
                     while (key <= max) {
                     while (key <= max) {
+                        reduceContext.consumeBucketsAndMaybeBreak(1);
                         iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                         iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                         key = nextKey(key).longValue();
                         key = nextKey(key).longValue();
                     }
                     }
@@ -397,6 +404,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
                     long key = bounds.getMin() + offset;
                     long key = bounds.getMin() + offset;
                     if (key < firstBucket.key) {
                     if (key < firstBucket.key) {
                         while (key < firstBucket.key) {
                         while (key < firstBucket.key) {
+                            reduceContext.consumeBucketsAndMaybeBreak(1);
                             iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                             iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                             key = nextKey(key).longValue();
                             key = nextKey(key).longValue();
                         }
                         }
@@ -412,6 +420,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
             if (lastBucket != null) {
             if (lastBucket != null) {
                 long key = nextKey(lastBucket.key).longValue();
                 long key = nextKey(lastBucket.key).longValue();
                 while (key < nextBucket.key) {
                 while (key < nextBucket.key) {
+                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                     iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                     key = nextKey(key).longValue();
                     key = nextKey(key).longValue();
                 }
                 }
@@ -425,6 +434,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
             long key = nextKey(lastBucket.key).longValue();
             long key = nextKey(lastBucket.key).longValue();
             long max = bounds.getMax() + offset;
             long max = bounds.getMax() + offset;
             while (key <= max) {
             while (key <= max) {
+                reduceContext.consumeBucketsAndMaybeBreak(1);
                 iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                 iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                 key = nextKey(key).longValue();
                 key = nextKey(key).longValue();
             }
             }

+ 10 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

@@ -326,7 +326,10 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
                     // Using Double.compare instead of != to handle NaN correctly.
                     // Using Double.compare instead of != to handle NaN correctly.
                     final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext);
                     final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext);
                     if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
                     if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
+                        reduceContext.consumeBucketsAndMaybeBreak(1);
                         reducedBuckets.add(reduced);
                         reducedBuckets.add(reduced);
+                    } else {
+                        reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
                     }
                     }
                     currentBuckets.clear();
                     currentBuckets.clear();
                     key = top.current.key;
                     key = top.current.key;
@@ -347,7 +350,10 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
             if (currentBuckets.isEmpty() == false) {
             if (currentBuckets.isEmpty() == false) {
                 final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext);
                 final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext);
                 if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
                 if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
+                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     reducedBuckets.add(reduced);
                     reducedBuckets.add(reduced);
+                } else {
+                    reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
                 }
                 }
             }
             }
         }
         }
@@ -374,6 +380,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
         if (iter.hasNext() == false) {
         if (iter.hasNext() == false) {
             // fill with empty buckets
             // fill with empty buckets
             for (double key = round(emptyBucketInfo.minBound); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
             for (double key = round(emptyBucketInfo.minBound); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
+                reduceContext.consumeBucketsAndMaybeBreak(1);
                 iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                 iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
             }
             }
         } else {
         } else {
@@ -381,6 +388,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
             if (Double.isFinite(emptyBucketInfo.minBound)) {
             if (Double.isFinite(emptyBucketInfo.minBound)) {
                 // fill with empty buckets until the first key
                 // fill with empty buckets until the first key
                 for (double key = round(emptyBucketInfo.minBound); key < first.key; key = nextKey(key)) {
                 for (double key = round(emptyBucketInfo.minBound); key < first.key; key = nextKey(key)) {
+                    reduceContext.consumeBucketsAndMaybeBreak(1);
                     iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                     iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                 }
                 }
             }
             }
@@ -393,6 +401,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
                 if (lastBucket != null) {
                 if (lastBucket != null) {
                     double key = nextKey(lastBucket.key);
                     double key = nextKey(lastBucket.key);
                     while (key < nextBucket.key) {
                     while (key < nextBucket.key) {
+                        reduceContext.consumeBucketsAndMaybeBreak(1);
                         iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                         iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                         key = nextKey(key);
                         key = nextKey(key);
                     }
                     }
@@ -403,6 +412,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
 
 
             // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
             // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
             for (double key = nextKey(lastBucket.key); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
             for (double key = nextKey(lastBucket.key); key <= emptyBucketInfo.maxBound; key = nextKey(key)) {
+                reduceContext.consumeBucketsAndMaybeBreak(1);
                 iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
                 iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
             }
             }
         }
         }

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregator.java

@@ -325,6 +325,7 @@ public final class BinaryRangeAggregator extends BucketsAggregator {
 
 
     @Override
     @Override
     public InternalAggregation buildAggregation(long bucket) throws IOException {
     public InternalAggregation buildAggregation(long bucket) throws IOException {
+        consumeBucketsAndMaybeBreak(ranges.length);
         List<InternalBinaryRange.Bucket> buckets = new ArrayList<>(ranges.length);
         List<InternalBinaryRange.Bucket> buckets = new ArrayList<>(ranges.length);
         for (int i = 0; i < ranges.length; ++i) {
         for (int i = 0; i < ranges.length; ++i) {
             long bucketOrd = bucket * ranges.length + i;
             long bucketOrd = bucket * ranges.length + i;

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java

@@ -241,6 +241,7 @@ public final class InternalBinaryRange
 
 
     @Override
     @Override
     public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
     public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+        reduceContext.consumeBucketsAndMaybeBreak(buckets.size());
         long[] docCounts = new long[buckets.size()];
         long[] docCounts = new long[buckets.size()];
         InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][];
         InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][];
         for (int i = 0; i < aggs.length; ++i) {
         for (int i = 0; i < aggs.length; ++i) {

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java

@@ -302,6 +302,7 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     @Override
     @Override
     public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
     public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+        reduceContext.consumeBucketsAndMaybeBreak(ranges.size());
         List<Bucket>[] rangeList = new List[ranges.size()];
         List<Bucket>[] rangeList = new List[ranges.size()];
         for (int i = 0; i < rangeList.length; ++i) {
         for (int i = 0; i < rangeList.length; ++i) {
             rangeList[i] = new ArrayList<>();
             rangeList[i] = new ArrayList<>();

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java

@@ -323,6 +323,7 @@ public class RangeAggregator extends BucketsAggregator {
 
 
     @Override
     @Override
     public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
     public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
+        consumeBucketsAndMaybeBreak(ranges.length);
         List<org.elasticsearch.search.aggregations.bucket.range.Range.Bucket> buckets = new ArrayList<>(ranges.length);
         List<org.elasticsearch.search.aggregations.bucket.range.Range.Bucket> buckets = new ArrayList<>(ranges.length);
         for (int i = 0; i < ranges.length; i++) {
         for (int i = 0; i < ranges.length; i++) {
             Range range = ranges[i];
             Range range = ranges[i];

+ 3 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java

@@ -131,6 +131,9 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
             // global stats
             // global stats
             spare.updateScore(significanceHeuristic);
             spare.updateScore(significanceHeuristic);
             spare = ordered.insertWithOverflow(spare);
             spare = ordered.insertWithOverflow(spare);
+            if (spare == null) {
+                consumeBucketsAndMaybeBreak(1);
+            }
         }
         }
 
 
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];

+ 8 - 1
core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java

@@ -241,7 +241,14 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
             final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
             final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
             b.updateScore(heuristic);
             b.updateScore(heuristic);
             if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) {
             if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) {
-                ordered.insertWithOverflow(b);
+                B removed = ordered.insertWithOverflow(b);
+                if (removed == null) {
+                    reduceContext.consumeBucketsAndMaybeBreak(1);
+                } else {
+                    reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
+                }
+            } else {
+                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
             }
             }
         }
         }
         B[] list = createBucketsArray(ordered.size());
         B[] list = createBucketsArray(ordered.size());

+ 3 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java

@@ -101,6 +101,9 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
 
 
             spare.bucketOrd = i;
             spare.bucketOrd = i;
             spare = ordered.insertWithOverflow(spare);
             spare = ordered.insertWithOverflow(spare);
+            if (spare == null) {
+                consumeBucketsAndMaybeBreak(1);
+            }
         }
         }
 
 
         final SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
         final SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];

+ 3 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsAggregator.java

@@ -107,6 +107,9 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
 
 
             spare.bucketOrd = i;
             spare.bucketOrd = i;
             spare = ordered.insertWithOverflow(spare);
             spare = ordered.insertWithOverflow(spare);
+            if (spare == null) {
+                consumeBucketsAndMaybeBreak(1);
+            }
         }
         }
 
 
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];

+ 21 - 18
core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregator.java

@@ -59,7 +59,7 @@ import java.util.stream.Collectors;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyList;
 
 
 public class SignificantTextAggregator extends BucketsAggregator {
 public class SignificantTextAggregator extends BucketsAggregator {
-    
+
     private final StringFilter includeExclude;
     private final StringFilter includeExclude;
     protected final BucketCountThresholds bucketCountThresholds;
     protected final BucketCountThresholds bucketCountThresholds;
     protected long numCollectedDocs;
     protected long numCollectedDocs;
@@ -90,20 +90,20 @@ public class SignificantTextAggregator extends BucketsAggregator {
         this.sourceFieldNames = sourceFieldNames;
         this.sourceFieldNames = sourceFieldNames;
         bucketOrds = new BytesRefHash(1, context.bigArrays());
         bucketOrds = new BytesRefHash(1, context.bigArrays());
         if(filterDuplicateText){
         if(filterDuplicateText){
-            dupSequenceSpotter = new DuplicateByteSequenceSpotter();        
+            dupSequenceSpotter = new DuplicateByteSequenceSpotter();
             lastTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
             lastTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
         }
         }
     }
     }
 
 
-    
-    
+
+
 
 
     @Override
     @Override
     public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
     public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
             final LeafBucketCollector sub) throws IOException {
             final LeafBucketCollector sub) throws IOException {
         final BytesRefBuilder previous = new BytesRefBuilder();
         final BytesRefBuilder previous = new BytesRefBuilder();
         return new LeafBucketCollectorBase(sub, null) {
         return new LeafBucketCollectorBase(sub, null) {
-            
+
             @Override
             @Override
             public void collect(int doc, long bucket) throws IOException {
             public void collect(int doc, long bucket) throws IOException {
                 collectFromSource(doc, bucket, fieldName, sourceFieldNames);
                 collectFromSource(doc, bucket, fieldName, sourceFieldNames);
@@ -112,8 +112,8 @@ public class SignificantTextAggregator extends BucketsAggregator {
                     dupSequenceSpotter.startNewSequence();
                     dupSequenceSpotter.startNewSequence();
                 }
                 }
             }
             }
-            
-            private void processTokenStream(int doc, long bucket, TokenStream ts, BytesRefHash inDocTerms, String fieldText) 
+
+            private void processTokenStream(int doc, long bucket, TokenStream ts, BytesRefHash inDocTerms, String fieldText)
                     throws IOException{
                     throws IOException{
                 if (dupSequenceSpotter != null) {
                 if (dupSequenceSpotter != null) {
                     ts = new DeDuplicatingTokenFilter(ts, dupSequenceSpotter);
                     ts = new DeDuplicatingTokenFilter(ts, dupSequenceSpotter);
@@ -151,35 +151,35 @@ public class SignificantTextAggregator extends BucketsAggregator {
                     ts.close();
                     ts.close();
                 }
                 }
             }
             }
-            
+
             private void collectFromSource(int doc, long bucket, String indexedFieldName, String[] sourceFieldNames) throws IOException {
             private void collectFromSource(int doc, long bucket, String indexedFieldName, String[] sourceFieldNames) throws IOException {
                 MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName);
                 MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName);
                 if(fieldType == null){
                 if(fieldType == null){
                     throw new IllegalArgumentException("Aggregation [" + name + "] cannot process field ["+indexedFieldName
                     throw new IllegalArgumentException("Aggregation [" + name + "] cannot process field ["+indexedFieldName
-                            +"] since it is not present");                    
+                            +"] since it is not present");
                 }
                 }
 
 
                 SourceLookup sourceLookup = context.lookup().source();
                 SourceLookup sourceLookup = context.lookup().source();
                 sourceLookup.setSegmentAndDocument(ctx, doc);
                 sourceLookup.setSegmentAndDocument(ctx, doc);
                 BytesRefHash inDocTerms = new BytesRefHash(256, context.bigArrays());
                 BytesRefHash inDocTerms = new BytesRefHash(256, context.bigArrays());
-                
-                try {                
+
+                try {
                     for (String sourceField : sourceFieldNames) {
                     for (String sourceField : sourceFieldNames) {
-                        List<Object> textsToHighlight = sourceLookup.extractRawValues(sourceField);    
+                        List<Object> textsToHighlight = sourceLookup.extractRawValues(sourceField);
                         textsToHighlight = textsToHighlight.stream().map(obj -> {
                         textsToHighlight = textsToHighlight.stream().map(obj -> {
                             if (obj instanceof BytesRef) {
                             if (obj instanceof BytesRef) {
                                 return fieldType.valueForDisplay(obj).toString();
                                 return fieldType.valueForDisplay(obj).toString();
                             } else {
                             } else {
                                 return obj;
                                 return obj;
                             }
                             }
-                        }).collect(Collectors.toList());                
-                        
-                        Analyzer analyzer = fieldType.indexAnalyzer();                
+                        }).collect(Collectors.toList());
+
+                        Analyzer analyzer = fieldType.indexAnalyzer();
                         for (Object fieldValue : textsToHighlight) {
                         for (Object fieldValue : textsToHighlight) {
                             String fieldText = fieldValue.toString();
                             String fieldText = fieldValue.toString();
                             TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText);
                             TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText);
-                            processTokenStream(doc, bucket, ts, inDocTerms, fieldText);                     
-                        }                    
+                            processTokenStream(doc, bucket, ts, inDocTerms, fieldText);
+                        }
                     }
                     }
                 } finally{
                 } finally{
                     Releasables.close(inDocTerms);
                     Releasables.close(inDocTerms);
@@ -220,7 +220,10 @@ public class SignificantTextAggregator extends BucketsAggregator {
             spare.updateScore(significanceHeuristic);
             spare.updateScore(significanceHeuristic);
 
 
             spare.bucketOrd = i;
             spare.bucketOrd = i;
-            spare = ordered.insertWithOverflow(spare);            
+            spare = ordered.insertWithOverflow(spare);
+            if (spare == null) {
+                consumeBucketsAndMaybeBreak(1);
+            }
         }
         }
 
 
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
         final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];

+ 1 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

@@ -204,6 +204,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
             if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
             if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
                 spare = ordered.insertWithOverflow(spare);
                 spare = ordered.insertWithOverflow(spare);
                 if (spare == null) {
                 if (spare == null) {
+                    consumeBucketsAndMaybeBreak(1);
                     spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
                     spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
                 }
                 }
             }
             }

+ 5 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java

@@ -293,7 +293,12 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
                 B removed = ordered.insertWithOverflow(b);
                 B removed = ordered.insertWithOverflow(b);
                 if (removed != null) {
                 if (removed != null) {
                     otherDocCount += removed.getDocCount();
                     otherDocCount += removed.getDocCount();
+                    reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
+                } else {
+                    reduceContext.consumeBucketsAndMaybeBreak(1);
                 }
                 }
+            } else {
+                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
             }
             }
         }
         }
         B[] list = createBucketsArray(ordered.size());
         B[] list = createBucketsArray(ordered.size());

+ 4 - 2
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java

@@ -125,7 +125,6 @@ public class LongTermsAggregator extends TermsAggregator {
         }
         }
 
 
         final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
         final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
-
         long otherDocCount = 0;
         long otherDocCount = 0;
         BucketPriorityQueue<LongTerms.Bucket> ordered = new BucketPriorityQueue<>(size, order.comparator(this));
         BucketPriorityQueue<LongTerms.Bucket> ordered = new BucketPriorityQueue<>(size, order.comparator(this));
         LongTerms.Bucket spare = null;
         LongTerms.Bucket spare = null;
@@ -138,7 +137,10 @@ public class LongTermsAggregator extends TermsAggregator {
             otherDocCount += spare.docCount;
             otherDocCount += spare.docCount;
             spare.bucketOrd = i;
             spare.bucketOrd = i;
             if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
             if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
-                spare = (LongTerms.Bucket) ordered.insertWithOverflow(spare);
+                spare = ordered.insertWithOverflow(spare);
+                if (spare == null) {
+                    consumeBucketsAndMaybeBreak(1);
+                }
             }
             }
         }
         }
 
 

+ 3 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java

@@ -144,6 +144,9 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
             spare.bucketOrd = i;
             spare.bucketOrd = i;
             if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
             if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
                 spare = ordered.insertWithOverflow(spare);
                 spare = ordered.insertWithOverflow(spare);
+                if (spare == null) {
+                    consumeBucketsAndMaybeBreak(1);
+                }
             }
             }
         }
         }
 
 

+ 10 - 0
core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

@@ -72,6 +72,7 @@ import org.elasticsearch.search.SearchContextMissingException;
 import org.elasticsearch.search.SearchException;
 import org.elasticsearch.search.SearchException;
 import org.elasticsearch.search.SearchParseException;
 import org.elasticsearch.search.SearchParseException;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.SearchShardTarget;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.snapshots.Snapshot;
 import org.elasticsearch.snapshots.Snapshot;
 import org.elasticsearch.snapshots.SnapshotException;
 import org.elasticsearch.snapshots.SnapshotException;
@@ -364,6 +365,14 @@ public class ExceptionSerializationTests extends ESTestCase {
         assertEquals(0, ex.getBytesWanted());
         assertEquals(0, ex.getBytesWanted());
     }
     }
 
 
+    public void testTooManyBucketsException() throws IOException {
+        MultiBucketConsumerService.TooManyBucketsException ex =
+            serialize(new MultiBucketConsumerService.TooManyBucketsException("Too many buckets", 100),
+                randomFrom(Version.V_7_0_0_alpha1));
+        assertEquals("Too many buckets", ex.getMessage());
+        assertEquals(100, ex.getMaxBuckets());
+    }
+
     public void testTimestampParsingException() throws IOException {
     public void testTimestampParsingException() throws IOException {
         TimestampParsingException ex = serialize(new TimestampParsingException("TIMESTAMP", null));
         TimestampParsingException ex = serialize(new TimestampParsingException("TIMESTAMP", null));
         assertEquals("failed to parse timestamp [TIMESTAMP]", ex.getMessage());
         assertEquals("failed to parse timestamp [TIMESTAMP]", ex.getMessage());
@@ -805,6 +814,7 @@ public class ExceptionSerializationTests extends ESTestCase {
         ids.put(146, org.elasticsearch.tasks.TaskCancelledException.class);
         ids.put(146, org.elasticsearch.tasks.TaskCancelledException.class);
         ids.put(147, org.elasticsearch.env.ShardLockObtainFailedException.class);
         ids.put(147, org.elasticsearch.env.ShardLockObtainFailedException.class);
         ids.put(148, org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException.class);
         ids.put(148, org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException.class);
+        ids.put(149, MultiBucketConsumerService.TooManyBucketsException.class);
 
 
         Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
         Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
         for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
         for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

+ 7 - 3
core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java

@@ -30,6 +30,7 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.SearchShardTarget;
+import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.dfs.DfsSearchResult;
 import org.elasticsearch.search.dfs.DfsSearchResult;
 import org.elasticsearch.search.query.QuerySearchRequest;
 import org.elasticsearch.search.query.QuerySearchRequest;
 import org.elasticsearch.search.query.QuerySearchResult;
 import org.elasticsearch.search.query.QuerySearchResult;
@@ -56,7 +57,8 @@ public class DfsQueryPhaseTests extends ESTestCase {
         results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
         results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
         results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
         results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
 
 
-        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
         SearchTransportService searchTransportService = new SearchTransportService(
         SearchTransportService searchTransportService = new SearchTransportService(
             Settings.builder().put("search.remote.connect", false).build(), null, null) {
             Settings.builder().put("search.remote.connect", false).build(), null, null) {
 
 
@@ -113,7 +115,8 @@ public class DfsQueryPhaseTests extends ESTestCase {
         results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
         results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
         results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
         results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
 
 
-        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
         SearchTransportService searchTransportService = new SearchTransportService(
         SearchTransportService searchTransportService = new SearchTransportService(
             Settings.builder().put("search.remote.connect", false).build(), null, null) {
             Settings.builder().put("search.remote.connect", false).build(), null, null) {
 
 
@@ -169,7 +172,8 @@ public class DfsQueryPhaseTests extends ESTestCase {
         results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
         results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
         results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
         results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
 
 
-        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
         SearchTransportService searchTransportService = new SearchTransportService(
         SearchTransportService searchTransportService = new SearchTransportService(
             Settings.builder().put("search.remote.connect", false).build(), null, null) {
             Settings.builder().put("search.remote.connect", false).build(), null, null) {
 
 

+ 13 - 6
core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.SearchShardTarget;
+import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.fetch.FetchSearchResult;
 import org.elasticsearch.search.fetch.FetchSearchResult;
 import org.elasticsearch.search.fetch.QueryFetchSearchResult;
 import org.elasticsearch.search.fetch.QueryFetchSearchResult;
 import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
 import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
@@ -44,7 +45,8 @@ import java.util.concurrent.atomic.AtomicReference;
 public class FetchSearchPhaseTests extends ESTestCase {
 public class FetchSearchPhaseTests extends ESTestCase {
 
 
     public void testShortcutQueryAndFetchOptimization() throws IOException {
     public void testShortcutQueryAndFetchOptimization() throws IOException {
-        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1);
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1);
@@ -85,7 +87,8 @@ public class FetchSearchPhaseTests extends ESTestCase {
 
 
     public void testFetchTwoDocument() throws IOException {
     public void testFetchTwoDocument() throws IOException {
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
-        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
         AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
         AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
@@ -139,7 +142,8 @@ public class FetchSearchPhaseTests extends ESTestCase {
 
 
     public void testFailFetchOneDoc() throws IOException {
     public void testFailFetchOneDoc() throws IOException {
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
-        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
         AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
         AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
@@ -197,7 +201,8 @@ public class FetchSearchPhaseTests extends ESTestCase {
         int resultSetSize = randomIntBetween(0, 100);
         int resultSetSize = randomIntBetween(0, 100);
         // we use at least 2 hits otherwise this is subject to single shard optimization and we trip an assert...
         // we use at least 2 hits otherwise this is subject to single shard optimization and we trip an assert...
         int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard
         int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard
-        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits);
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits);
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits);
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits);
@@ -253,7 +258,8 @@ public class FetchSearchPhaseTests extends ESTestCase {
 
 
     public void testExceptionFailsPhase() throws IOException {
     public void testExceptionFailsPhase() throws IOException {
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
-        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
         AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
         AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
@@ -306,7 +312,8 @@ public class FetchSearchPhaseTests extends ESTestCase {
 
 
     public void testCleanupIrrelevantContexts() throws IOException { // contexts that are not fetched should be cleaned up
     public void testCleanupIrrelevantContexts() throws IOException { // contexts that are not fetched should be cleaned up
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
         MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
-        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
             controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
         AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
         AtomicReference<SearchResponse> responseRef = new AtomicReference<>();

+ 3 - 1
core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

@@ -31,6 +31,7 @@ import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
 import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -66,7 +67,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
 
 
     @Before
     @Before
     public void setup() {
     public void setup() {
-        searchPhaseController = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
+        searchPhaseController = new SearchPhaseController(Settings.EMPTY,
+            (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
     }
     }
 
 
     public void testSort() throws Exception {
     public void testSort() throws Exception {

+ 17 - 0
core/src/test/java/org/elasticsearch/search/aggregations/EquivalenceIT.java

@@ -41,6 +41,8 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.junit.After;
+import org.junit.Before;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
@@ -90,6 +92,21 @@ public class EquivalenceIT extends ESIntegTestCase {
         }
         }
     }
     }
 
 
+    @Before
+    private void setupMaxBuckets() {
+        // disables the max bucket limit for this test
+        client().admin().cluster().prepareUpdateSettings()
+            .setTransientSettings(Collections.singletonMap("search.max_buckets", Integer.MAX_VALUE))
+            .get();
+    }
+
+    @After
+    private void cleanupMaxBuckets() {
+        client().admin().cluster().prepareUpdateSettings()
+            .setTransientSettings(Collections.singletonMap("search.max_buckets", null))
+            .get();
+    }
+
     // Make sure that unordered, reversed, disjoint and/or overlapping ranges are supported
     // Make sure that unordered, reversed, disjoint and/or overlapping ranges are supported
     // Duel with filters
     // Duel with filters
     public void testRandomRanges() throws Exception {
     public void testRandomRanges() throws Exception {

+ 1 - 1
core/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java

@@ -1048,7 +1048,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
                 if (reduced) {
                 if (reduced) {
                     composite = searchAndReduce(indexSearcher, query, aggregationBuilder, FIELD_TYPES);
                     composite = searchAndReduce(indexSearcher, query, aggregationBuilder, FIELD_TYPES);
                 } else {
                 } else {
-                    composite = search(indexSearcher, query, aggregationBuilder, indexSettings, FIELD_TYPES);
+                    composite = search(indexSearcher, query, aggregationBuilder, FIELD_TYPES);
                 }
                 }
                 verify.accept(composite);
                 verify.accept(composite);
             }
             }

+ 64 - 7
core/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java

@@ -31,6 +31,7 @@ import org.apache.lucene.search.MatchNoDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.Directory;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.search.aggregations.AggregatorTestCase;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -39,6 +40,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
 
 
+import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.TooManyBucketsException;
+
 public class DateHistogramAggregatorTests extends AggregatorTestCase {
 public class DateHistogramAggregatorTests extends AggregatorTestCase {
 
 
     private static final String DATE_FIELD = "date";
     private static final String DATE_FIELD = "date";
@@ -335,28 +338,82 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
         );
         );
     }
     }
 
 
+    public void testMaxBucket() throws IOException {
+        Query query = new MatchAllDocsQuery();
+        List<String> timestamps = Arrays.asList(
+            "2010-01-01T00:00:00.000Z",
+            "2011-01-01T00:00:00.000Z",
+            "2017-01-01T00:00:00.000Z"
+        );
+
+        TooManyBucketsException exc = expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps,
+            aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(DATE_FIELD),
+            histogram -> {}, 2));
+
+        exc = expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
+            aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(DATE_FIELD),
+            histogram -> {}, 2));
+
+        exc = expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
+            aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(DATE_FIELD).minDocCount(0L),
+            histogram -> {}, 100));
+
+        exc = expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps,
+            aggregation ->
+                aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5))
+                    .field(DATE_FIELD)
+                    .subAggregation(
+                        AggregationBuilders.dateHistogram("1")
+                            .dateHistogramInterval(DateHistogramInterval.seconds(5))
+                            .field(DATE_FIELD)
+                    ),
+            histogram -> {}, 5));
+    }
+
     private void testSearchCase(Query query, List<String> dataset,
     private void testSearchCase(Query query, List<String> dataset,
                                 Consumer<DateHistogramAggregationBuilder> configure,
                                 Consumer<DateHistogramAggregationBuilder> configure,
                                 Consumer<Histogram> verify) throws IOException {
                                 Consumer<Histogram> verify) throws IOException {
-        executeTestCase(false, query, dataset, configure, verify);
+        testSearchCase(query, dataset, configure, verify, 10000);
+    }
+
+    private void testSearchCase(Query query, List<String> dataset,
+                                Consumer<DateHistogramAggregationBuilder> configure,
+                                Consumer<Histogram> verify,
+                                int maxBucket) throws IOException {
+        executeTestCase(false, query, dataset, configure, verify, maxBucket);
     }
     }
 
 
     private void testSearchAndReduceCase(Query query, List<String> dataset,
     private void testSearchAndReduceCase(Query query, List<String> dataset,
                                          Consumer<DateHistogramAggregationBuilder> configure,
                                          Consumer<DateHistogramAggregationBuilder> configure,
                                          Consumer<Histogram> verify) throws IOException {
                                          Consumer<Histogram> verify) throws IOException {
-        executeTestCase(true, query, dataset, configure, verify);
+        testSearchAndReduceCase(query, dataset, configure, verify, 1000);
+    }
+
+    private void testSearchAndReduceCase(Query query, List<String> dataset,
+                                         Consumer<DateHistogramAggregationBuilder> configure,
+                                         Consumer<Histogram> verify,
+                                         int maxBucket) throws IOException {
+        executeTestCase(true, query, dataset, configure, verify, maxBucket);
     }
     }
 
 
     private void testBothCases(Query query, List<String> dataset,
     private void testBothCases(Query query, List<String> dataset,
                                Consumer<DateHistogramAggregationBuilder> configure,
                                Consumer<DateHistogramAggregationBuilder> configure,
                                Consumer<Histogram> verify) throws IOException {
                                Consumer<Histogram> verify) throws IOException {
-        testSearchCase(query, dataset, configure, verify);
-        testSearchAndReduceCase(query, dataset, configure, verify);
+        testBothCases(query, dataset, configure, verify, 10000);
+    }
+
+    private void testBothCases(Query query, List<String> dataset,
+                               Consumer<DateHistogramAggregationBuilder> configure,
+                               Consumer<Histogram> verify,
+                               int maxBucket) throws IOException {
+        testSearchCase(query, dataset, configure, verify, maxBucket);
+        testSearchAndReduceCase(query, dataset, configure, verify, maxBucket);
     }
     }
 
 
     private void executeTestCase(boolean reduced, Query query, List<String> dataset,
     private void executeTestCase(boolean reduced, Query query, List<String> dataset,
                                  Consumer<DateHistogramAggregationBuilder> configure,
                                  Consumer<DateHistogramAggregationBuilder> configure,
-                                 Consumer<Histogram> verify) throws IOException {
+                                 Consumer<Histogram> verify,
+                                 int maxBucket) throws IOException {
 
 
         try (Directory directory = newDirectory()) {
         try (Directory directory = newDirectory()) {
             try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
             try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
@@ -389,9 +446,9 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
 
 
                 InternalDateHistogram histogram;
                 InternalDateHistogram histogram;
                 if (reduced) {
                 if (reduced) {
-                    histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType);
+                    histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, maxBucket, fieldType);
                 } else {
                 } else {
-                    histogram = search(indexSearcher, query, aggregationBuilder, fieldType);
+                    histogram = search(indexSearcher, query, aggregationBuilder, maxBucket, fieldType);
                 }
                 }
                 verify.accept(histogram);
                 verify.accept(histogram);
             }
             }

+ 2 - 3
core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java

@@ -72,15 +72,14 @@ public class TermsAggregatorTests extends AggregatorTestCase {
 
 
     private boolean randomizeAggregatorImpl = true;
     private boolean randomizeAggregatorImpl = true;
 
 
-    @Override
     protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
     protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
-            IndexSearcher indexSearcher, IndexSettings indexSettings, MappedFieldType... fieldTypes) throws IOException {
+            IndexSearcher indexSearcher, MappedFieldType... fieldTypes) throws IOException {
         try {
         try {
             if (randomizeAggregatorImpl) {
             if (randomizeAggregatorImpl) {
                 TermsAggregatorFactory.COLLECT_SEGMENT_ORDS = randomBoolean();
                 TermsAggregatorFactory.COLLECT_SEGMENT_ORDS = randomBoolean();
                 TermsAggregatorFactory.REMAP_GLOBAL_ORDS = randomBoolean();
                 TermsAggregatorFactory.REMAP_GLOBAL_ORDS = randomBoolean();
             }
             }
-            return super.createAggregator(aggregationBuilder, indexSearcher, indexSettings, fieldTypes);
+            return super.createAggregator(aggregationBuilder, indexSearcher, fieldTypes);
         } finally {
         } finally {
             TermsAggregatorFactory.COLLECT_SEGMENT_ORDS = null;
             TermsAggregatorFactory.COLLECT_SEGMENT_ORDS = null;
             TermsAggregatorFactory.REMAP_GLOBAL_ORDS = null;
             TermsAggregatorFactory.REMAP_GLOBAL_ORDS = null;

+ 4 - 0
docs/reference/aggregations/bucket.asciidoc

@@ -13,6 +13,10 @@ aggregated for the buckets created by their "parent" bucket aggregation.
 There are different bucket aggregators, each with a different "bucketing" strategy. Some define a single bucket, some
 There are different bucket aggregators, each with a different "bucketing" strategy. Some define a single bucket, some
 define fixed number of multiple buckets, and others dynamically create the buckets during the aggregation process.
 define fixed number of multiple buckets, and others dynamically create the buckets during the aggregation process.
 
 
+NOTE: The maximum number of buckets allowed in a single response is limited by a dynamic cluster
+setting named `search.max_buckets`. It defaults to 10,000, requests that try to return more than
+the limit will fail with an exception.
+
 include::bucket/adjacency-matrix-aggregation.asciidoc[]
 include::bucket/adjacency-matrix-aggregation.asciidoc[]
 
 
 include::bucket/children-aggregation.asciidoc[]
 include::bucket/children-aggregation.asciidoc[]

+ 7 - 1
docs/reference/migration/migrate_7_0/aggregations.asciidoc

@@ -3,4 +3,10 @@
 
 
 ==== Deprecated `global_ordinals_hash` and `global_ordinals_low_cardinality` execution hints for terms aggregations have been removed
 ==== Deprecated `global_ordinals_hash` and `global_ordinals_low_cardinality` execution hints for terms aggregations have been removed
 
 
-These `execution_hint` are removed and should be replaced by `global_ordinals`.
+These `execution_hint` are removed and should be replaced by `global_ordinals`.
+
+==== `search.max_buckets` in the cluster setting
+
+The dynamic cluster setting named `search.max_buckets` now defaults
+to 10,000 (instead of unlimited in the previous version).
+Requests that try to return more than the limit will fail with an exception.

+ 110 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/240_max_buckets.yml

@@ -0,0 +1,110 @@
+---
+setup:
+  - do:
+        indices.create:
+          index: test
+          body:
+              mappings:
+                doc:
+                  properties:
+                    keyword:
+                      type: keyword
+                    date:
+                      type: date
+
+  - do:
+      index:
+        index: test
+        type:  doc
+        id:    1
+        body:  { "date": "2014-03-03T00:00:00", "keyword": "foo" }
+
+  - do:
+      index:
+        index: test
+        type:  doc
+        id:    2
+        body:  { "date": "2015-03-03T00:00:00", "keyword": "bar" }
+
+  - do:
+      index:
+        index: test
+        type:  doc
+        id:    3
+        body:  { "date": "2016-03-03T00:00:00", "keyword": "foobar" }
+
+  - do:
+      index:
+        index: test
+        type:  doc
+        id:    4
+        body:  { "date": "2017-03-03T00:00:00" }
+
+  - do:
+      indices.refresh:
+        index: [test]
+
+---
+ teardown:
+
+  - do:
+      cluster.put_settings:
+        body:
+          transient:
+            search.max_buckets: null
+
+---
+"Max bucket":
+  - skip:
+      version: " - 6.99.99"
+      reason:  search.max_buckets limit has been added in 7.0
+
+  - do:
+      cluster.put_settings:
+        body:
+          transient:
+            search.max_buckets: 3
+
+  - do:
+      catch: /.*Trying to create too many buckets.*/
+      search:
+        index: test
+        body:
+          aggregations:
+            test:
+              date_histogram:
+                field: date
+                interval: 1d
+
+  - do:
+      catch: /.*Trying to create too many buckets.*/
+      search:
+        index: test
+        body:
+          aggregations:
+            test:
+              terms:
+                field: keyword
+              aggs:
+                2:
+                  date_histogram:
+                    field: date
+                    interval: 1d
+
+  - do:
+      cluster.put_settings:
+        body:
+          transient:
+            search.max_buckets: 100
+
+  - do:
+      catch: /.*Trying to create too many buckets.*/
+      search:
+        index: test
+        body:
+          aggregations:
+            test:
+              date_histogram:
+                field: date
+                interval: 1d
+                min_doc_count: 0

+ 62 - 21
test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

@@ -59,17 +59,18 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.mock.orig.Mockito;
 import org.elasticsearch.mock.orig.Mockito;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 import org.elasticsearch.search.fetch.FetchPhase;
 import org.elasticsearch.search.fetch.FetchPhase;
 import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase;
 import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase;
 import org.elasticsearch.search.fetch.subphase.FetchSourceSubPhase;
 import org.elasticsearch.search.fetch.subphase.FetchSourceSubPhase;
 import org.elasticsearch.search.internal.ContextIndexSearcher;
 import org.elasticsearch.search.internal.ContextIndexSearcher;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.lookup.SearchLookup;
 import org.elasticsearch.search.lookup.SearchLookup;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.InternalAggregationTestCase;
 import org.junit.After;
 import org.junit.After;
 import org.mockito.Matchers;
 import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -82,6 +83,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
+import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS;
 
 
 /**
 /**
  * Base class for testing {@link Aggregator} implementations.
  * Base class for testing {@link Aggregator} implementations.
@@ -96,16 +98,20 @@ public abstract class AggregatorTestCase extends ESTestCase {
     protected AggregatorFactory<?> createAggregatorFactory(AggregationBuilder aggregationBuilder,
     protected AggregatorFactory<?> createAggregatorFactory(AggregationBuilder aggregationBuilder,
                                                            IndexSearcher indexSearcher,
                                                            IndexSearcher indexSearcher,
                                                            MappedFieldType... fieldTypes) throws IOException {
                                                            MappedFieldType... fieldTypes) throws IOException {
-        return createAggregatorFactory(aggregationBuilder, indexSearcher, createIndexSettings(), fieldTypes);
+        return createAggregatorFactory(aggregationBuilder, indexSearcher, createIndexSettings(),
+            new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes);
     }
     }
 
 
     /** Create a factory for the given aggregation builder. */
     /** Create a factory for the given aggregation builder. */
     protected AggregatorFactory<?> createAggregatorFactory(AggregationBuilder aggregationBuilder,
     protected AggregatorFactory<?> createAggregatorFactory(AggregationBuilder aggregationBuilder,
                                                            IndexSearcher indexSearcher,
                                                            IndexSearcher indexSearcher,
                                                            IndexSettings indexSettings,
                                                            IndexSettings indexSettings,
+                                                           MultiBucketConsumer bucketConsumer,
                                                            MappedFieldType... fieldTypes) throws IOException {
                                                            MappedFieldType... fieldTypes) throws IOException {
         SearchContext searchContext = createSearchContext(indexSearcher, indexSettings);
         SearchContext searchContext = createSearchContext(indexSearcher, indexSettings);
         CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
         CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
+        when(searchContext.aggregations())
+            .thenReturn(new SearchContextAggregations(AggregatorFactories.EMPTY, bucketConsumer));
         when(searchContext.bigArrays()).thenReturn(new MockBigArrays(Settings.EMPTY, circuitBreakerService));
         when(searchContext.bigArrays()).thenReturn(new MockBigArrays(Settings.EMPTY, circuitBreakerService));
         // TODO: now just needed for top_hits, this will need to be revised for other agg unit tests:
         // TODO: now just needed for top_hits, this will need to be revised for other agg unit tests:
         MapperService mapperService = mapperServiceMock();
         MapperService mapperService = mapperServiceMock();
@@ -116,12 +122,8 @@ public abstract class AggregatorTestCase extends ESTestCase {
         IndexFieldDataService ifds = new IndexFieldDataService(indexSettings,
         IndexFieldDataService ifds = new IndexFieldDataService(indexSettings,
                 new IndicesFieldDataCache(Settings.EMPTY, new IndexFieldDataCache.Listener() {
                 new IndicesFieldDataCache(Settings.EMPTY, new IndexFieldDataCache.Listener() {
                 }), circuitBreakerService, mapperService);
                 }), circuitBreakerService, mapperService);
-        when(searchContext.getForField(Mockito.any(MappedFieldType.class))).thenAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                return ifds.getForField((MappedFieldType) invocationOnMock.getArguments()[0]);
-            }
-        });
+        when(searchContext.getForField(Mockito.any(MappedFieldType.class)))
+            .thenAnswer(invocationOnMock -> ifds.getForField((MappedFieldType) invocationOnMock.getArguments()[0]));
 
 
         SearchLookup searchLookup = new SearchLookup(mapperService, ifds::getForField, new String[]{TYPE_NAME});
         SearchLookup searchLookup = new SearchLookup(mapperService, ifds::getForField, new String[]{TYPE_NAME});
         when(searchContext.lookup()).thenReturn(searchLookup);
         when(searchContext.lookup()).thenReturn(searchLookup);
@@ -139,15 +141,32 @@ public abstract class AggregatorTestCase extends ESTestCase {
     protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
     protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
                                                         IndexSearcher indexSearcher,
                                                         IndexSearcher indexSearcher,
                                                         MappedFieldType... fieldTypes) throws IOException {
                                                         MappedFieldType... fieldTypes) throws IOException {
-        return createAggregator(aggregationBuilder, indexSearcher, createIndexSettings(), fieldTypes);
+        return createAggregator(aggregationBuilder, indexSearcher, createIndexSettings(),
+            new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes);
     }
     }
 
 
     protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
     protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
                                                         IndexSearcher indexSearcher,
                                                         IndexSearcher indexSearcher,
                                                         IndexSettings indexSettings,
                                                         IndexSettings indexSettings,
                                                         MappedFieldType... fieldTypes) throws IOException {
                                                         MappedFieldType... fieldTypes) throws IOException {
+        return createAggregator(aggregationBuilder, indexSearcher, indexSettings,
+            new MultiBucketConsumer(DEFAULT_MAX_BUCKETS), fieldTypes);
+    }
+
+    protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
+                                                        IndexSearcher indexSearcher,
+                                                        MultiBucketConsumer bucketConsumer,
+                                                        MappedFieldType... fieldTypes) throws IOException {
+        return createAggregator(aggregationBuilder, indexSearcher, createIndexSettings(), bucketConsumer, fieldTypes);
+    }
+
+    protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
+                                                        IndexSearcher indexSearcher,
+                                                        IndexSettings indexSettings,
+                                                        MultiBucketConsumer bucketConsumer,
+                                                        MappedFieldType... fieldTypes) throws IOException {
         @SuppressWarnings("unchecked")
         @SuppressWarnings("unchecked")
-        A aggregator = (A) createAggregatorFactory(aggregationBuilder, indexSearcher, indexSettings, fieldTypes)
+        A aggregator = (A) createAggregatorFactory(aggregationBuilder, indexSearcher, indexSettings, bucketConsumer, fieldTypes)
             .create(null, true);
             .create(null, true);
         return aggregator;
         return aggregator;
     }
     }
@@ -233,24 +252,33 @@ public abstract class AggregatorTestCase extends ESTestCase {
                                                                              Query query,
                                                                              Query query,
                                                                              AggregationBuilder builder,
                                                                              AggregationBuilder builder,
                                                                              MappedFieldType... fieldTypes) throws IOException {
                                                                              MappedFieldType... fieldTypes) throws IOException {
-        return search(searcher, query, builder, createIndexSettings(), fieldTypes);
+        return search(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
     }
     }
 
 
     protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSearcher searcher,
     protected <A extends InternalAggregation, C extends Aggregator> A search(IndexSearcher searcher,
                                                                              Query query,
                                                                              Query query,
                                                                              AggregationBuilder builder,
                                                                              AggregationBuilder builder,
-                                                                             IndexSettings indexSettings,
+                                                                             int maxBucket,
                                                                              MappedFieldType... fieldTypes) throws IOException {
                                                                              MappedFieldType... fieldTypes) throws IOException {
-        C a = createAggregator(builder, searcher, fieldTypes);
+        MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket);
+        C a = createAggregator(builder, searcher, bucketConsumer, fieldTypes);
         a.preCollection();
         a.preCollection();
         searcher.search(query, a);
         searcher.search(query, a);
         a.postCollection();
         a.postCollection();
         @SuppressWarnings("unchecked")
         @SuppressWarnings("unchecked")
         A internalAgg = (A) a.buildAggregation(0L);
         A internalAgg = (A) a.buildAggregation(0L);
+        InternalAggregationTestCase.assertMultiBucketConsumer(internalAgg, bucketConsumer);
         return internalAgg;
         return internalAgg;
 
 
     }
     }
 
 
+    protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSearcher searcher,
+                                                                                      Query query,
+                                                                                      AggregationBuilder builder,
+                                                                                      MappedFieldType... fieldTypes) throws IOException {
+        return searchAndReduce(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
+    }
+
     /**
     /**
      * Divides the provided {@link IndexSearcher} in sub-searcher, one for each segment,
      * Divides the provided {@link IndexSearcher} in sub-searcher, one for each segment,
      * builds an aggregator for each sub-searcher filtered by the provided {@link Query} and
      * builds an aggregator for each sub-searcher filtered by the provided {@link Query} and
@@ -259,6 +287,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
     protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSearcher searcher,
     protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(IndexSearcher searcher,
                                                                                       Query query,
                                                                                       Query query,
                                                                                       AggregationBuilder builder,
                                                                                       AggregationBuilder builder,
+                                                                                      int maxBucket,
                                                                                       MappedFieldType... fieldTypes) throws IOException {
                                                                                       MappedFieldType... fieldTypes) throws IOException {
         final IndexReaderContext ctx = searcher.getTopReaderContext();
         final IndexReaderContext ctx = searcher.getTopReaderContext();
 
 
@@ -279,14 +308,18 @@ public abstract class AggregatorTestCase extends ESTestCase {
         List<InternalAggregation> aggs = new ArrayList<> ();
         List<InternalAggregation> aggs = new ArrayList<> ();
         Query rewritten = searcher.rewrite(query);
         Query rewritten = searcher.rewrite(query);
         Weight weight = searcher.createWeight(rewritten, true, 1f);
         Weight weight = searcher.createWeight(rewritten, true, 1f);
-        C root = createAggregator(builder, searcher, fieldTypes);
+        MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket);
+        C root = createAggregator(builder, searcher, bucketConsumer, fieldTypes);
 
 
         for (ShardSearcher subSearcher : subSearchers) {
         for (ShardSearcher subSearcher : subSearchers) {
-            C a = createAggregator(builder, subSearcher, fieldTypes);
+            MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket);
+            C a = createAggregator(builder, subSearcher, shardBucketConsumer, fieldTypes);
             a.preCollection();
             a.preCollection();
             subSearcher.search(weight, a);
             subSearcher.search(weight, a);
             a.postCollection();
             a.postCollection();
-            aggs.add(a.buildAggregation(0L));
+            InternalAggregation agg = a.buildAggregation(0L);
+            aggs.add(agg);
+            InternalAggregationTestCase.assertMultiBucketConsumer(agg, shardBucketConsumer);
         }
         }
         if (aggs.isEmpty()) {
         if (aggs.isEmpty()) {
             return null;
             return null;
@@ -297,15 +330,23 @@ public abstract class AggregatorTestCase extends ESTestCase {
                 Collections.shuffle(aggs, random());
                 Collections.shuffle(aggs, random());
                 int r = randomIntBetween(1, toReduceSize);
                 int r = randomIntBetween(1, toReduceSize);
                 List<InternalAggregation> toReduce = aggs.subList(0, r);
                 List<InternalAggregation> toReduce = aggs.subList(0, r);
-                A reduced = (A) aggs.get(0).doReduce(toReduce,
-                    new InternalAggregation.ReduceContext(root.context().bigArrays(), null, false));
+                MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket);
+                InternalAggregation.ReduceContext context =
+                    new InternalAggregation.ReduceContext(root.context().bigArrays(), null,
+                        reduceBucketConsumer, false);
+                A reduced = (A) aggs.get(0).doReduce(toReduce, context);
+                InternalAggregationTestCase.assertMultiBucketConsumer(reduced, reduceBucketConsumer);
                 aggs = new ArrayList<>(aggs.subList(r, toReduceSize));
                 aggs = new ArrayList<>(aggs.subList(r, toReduceSize));
                 aggs.add(reduced);
                 aggs.add(reduced);
             }
             }
             // now do the final reduce
             // now do the final reduce
+            MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket);
+            InternalAggregation.ReduceContext context =
+                new InternalAggregation.ReduceContext(root.context().bigArrays(), null, reduceBucketConsumer, true);
+
             @SuppressWarnings("unchecked")
             @SuppressWarnings("unchecked")
-            A internalAgg = (A) aggs.get(0).doReduce(aggs, new InternalAggregation.ReduceContext(root.context().bigArrays(), null,
-                true));
+            A internalAgg = (A) aggs.get(0).doReduce(aggs, context);
+            InternalAggregationTestCase.assertMultiBucketConsumer(internalAgg, reduceBucketConsumer);
             return internalAgg;
             return internalAgg;
         }
         }
 
 

+ 15 - 2
test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

@@ -38,7 +38,9 @@ import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
 import org.elasticsearch.search.aggregations.ParsedAggregation;
 import org.elasticsearch.search.aggregations.ParsedAggregation;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.adjacency.ParsedAdjacencyMatrix;
 import org.elasticsearch.search.aggregations.bucket.adjacency.ParsedAdjacencyMatrix;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
@@ -140,10 +142,13 @@ import java.util.stream.Collectors;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonMap;
 import static java.util.Collections.singletonMap;
 import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
 import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
+import static org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.countInnerBucket;
 import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;
 import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
+import static org.hamcrest.Matchers.equalTo;
 
 
 public abstract class InternalAggregationTestCase<T extends InternalAggregation> extends AbstractWireSerializingTestCase<T> {
 public abstract class InternalAggregationTestCase<T extends InternalAggregation> extends AbstractWireSerializingTestCase<T> {
+    public static final int DEFAULT_MAX_BUCKETS = 100000;
 
 
     private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
     private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
             new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables());
             new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables());
@@ -237,17 +242,21 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
             Collections.shuffle(toReduce, random());
             Collections.shuffle(toReduce, random());
             int r = randomIntBetween(1, toReduceSize);
             int r = randomIntBetween(1, toReduceSize);
             List<InternalAggregation> internalAggregations = toReduce.subList(0, r);
             List<InternalAggregation> internalAggregations = toReduce.subList(0, r);
+            MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS);
             InternalAggregation.ReduceContext context =
             InternalAggregation.ReduceContext context =
-                new InternalAggregation.ReduceContext(bigArrays, mockScriptService, false);
+                new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false);
             @SuppressWarnings("unchecked")
             @SuppressWarnings("unchecked")
             T reduced = (T) inputs.get(0).reduce(internalAggregations, context);
             T reduced = (T) inputs.get(0).reduce(internalAggregations, context);
+            assertMultiBucketConsumer(reduced, bucketConsumer);
             toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize));
             toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize));
             toReduce.add(reduced);
             toReduce.add(reduced);
         }
         }
+        MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS);
         InternalAggregation.ReduceContext context =
         InternalAggregation.ReduceContext context =
-            new InternalAggregation.ReduceContext(bigArrays, mockScriptService, true);
+            new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer, true);
         @SuppressWarnings("unchecked")
         @SuppressWarnings("unchecked")
         T reduced = (T) inputs.get(0).reduce(toReduce, context);
         T reduced = (T) inputs.get(0).reduce(toReduce, context);
+        assertMultiBucketConsumer(reduced, bucketConsumer);
         assertReduced(reduced, inputs);
         assertReduced(reduced, inputs);
     }
     }
 
 
@@ -392,4 +401,8 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
         formats.add(() -> new DocValueFormat.Decimal(randomFrom("###.##", "###,###.##")));
         formats.add(() -> new DocValueFormat.Decimal(randomFrom("###.##", "###,###.##")));
         return randomFrom(formats).get();
         return randomFrom(formats).get();
     }
     }
+
+    public static void assertMultiBucketConsumer(Aggregation agg, MultiBucketConsumer bucketConsumer) {
+        assertThat(bucketConsumer.getCount(), equalTo(countInnerBucket(agg)));
+    }
 }
 }