|
|
@@ -310,6 +310,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ int consumeBucketCount = 0;
|
|
|
List<Bucket> reducedBuckets = new ArrayList<>();
|
|
|
if (pq.size() > 0) {
|
|
|
// list of buckets coming from different shards that have the same key
|
|
|
@@ -323,6 +324,10 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
// the key changes, reduce what we already buffered and reset the buffer for current buckets
|
|
|
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
|
|
|
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
|
|
|
+ if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
|
|
|
+ reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
|
|
|
+ consumeBucketCount = 0;
|
|
|
+ }
|
|
|
reducedBuckets.add(reduced);
|
|
|
}
|
|
|
currentBuckets.clear();
|
|
|
@@ -344,10 +349,14 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
|
|
|
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
|
|
|
reducedBuckets.add(reduced);
|
|
|
+ if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
|
|
|
+ reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
|
|
|
+ consumeBucketCount = 0;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
|
|
|
return reducedBuckets;
|
|
|
}
|
|
|
|
|
|
@@ -387,7 +396,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
* consumeBucketsAndMaybeBreak.
|
|
|
*/
|
|
|
class Counter implements LongConsumer {
|
|
|
- private int size = list.size();
|
|
|
+ private int size;
|
|
|
|
|
|
@Override
|
|
|
public void accept(long key) {
|
|
|
@@ -490,11 +499,9 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
@Override
|
|
|
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
|
|
|
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
|
|
|
- boolean alreadyAccountedForBuckets = false;
|
|
|
if (reduceContext.isFinalReduce()) {
|
|
|
if (minDocCount == 0) {
|
|
|
addEmptyBuckets(reducedBuckets, reduceContext);
|
|
|
- alreadyAccountedForBuckets = true;
|
|
|
}
|
|
|
if (InternalOrder.isKeyDesc(order)) {
|
|
|
// we just need to reverse here...
|
|
|
@@ -508,9 +515,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
CollectionUtil.introSort(reducedBuckets, order.comparator());
|
|
|
}
|
|
|
}
|
|
|
- if (false == alreadyAccountedForBuckets) {
|
|
|
- reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
|
|
|
- }
|
|
|
return new InternalDateHistogram(
|
|
|
getName(),
|
|
|
reducedBuckets,
|