|
@@ -33,6 +33,7 @@ import java.util.List;
|
|
|
import java.util.ListIterator;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.function.LongConsumer;
|
|
|
|
|
|
/**
|
|
|
* Implementation of {@link Histogram}.
|
|
@@ -289,7 +290,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
for (InternalAggregation aggregation : aggregations) {
|
|
|
InternalDateHistogram histogram = (InternalDateHistogram) aggregation;
|
|
|
if (histogram.buckets.isEmpty() == false) {
|
|
|
- pq.add(new IteratorAndCurrent(histogram.buckets.iterator()));
|
|
|
+ pq.add(new IteratorAndCurrent<Bucket>(histogram.buckets.iterator()));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -351,14 +352,50 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
return createBucket(buckets.get(0).key, docCount, aggs);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * When we pre-count the empty buckets we report them periodically
|
|
|
+ * because you can configure the date_histogram to create an astounding
|
|
|
+ * number of buckets. It'd take a while to count that high only to abort.
|
|
|
+ * So we report every couple thousand buckets. It's be simpler to report
|
|
|
+ * every single bucket we plan to allocate one at a time but that'd cause
|
|
|
+ * needless overhead on the circuit breakers. Counting a couple thousand
|
|
|
+ * buckets is plenty fast to fail this quickly in pathological cases and
|
|
|
+ * plenty large to keep the overhead minimal.
|
|
|
+ */
|
|
|
+ private static final int REPORT_EMPTY_EVERY = 10_000;
|
|
|
+
|
|
|
private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
|
|
|
- Bucket lastBucket = null;
|
|
|
- LongBounds bounds = emptyBucketInfo.bounds;
|
|
|
+ /*
|
|
|
+ * Make sure we have space for the empty buckets we're going to add by
|
|
|
+ * counting all of the empties we plan to add and firing them into
|
|
|
+ * consumeBucketsAndMaybeBreak.
|
|
|
+ */
|
|
|
+ class Counter implements LongConsumer {
|
|
|
+ private int size = list.size();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(long key) {
|
|
|
+ size++;
|
|
|
+ if (size >= REPORT_EMPTY_EVERY) {
|
|
|
+ reduceContext.consumeBucketsAndMaybeBreak(size);
|
|
|
+ size = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Counter counter = new Counter();
|
|
|
+ iterateEmptyBuckets(list, list.listIterator(), counter);
|
|
|
+ reduceContext.consumeBucketsAndMaybeBreak(counter.size);
|
|
|
+
|
|
|
+ InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(emptyBucketInfo.subAggregations), reduceContext);
|
|
|
ListIterator<Bucket> iter = list.listIterator();
|
|
|
+ iterateEmptyBuckets(list, iter, key -> iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, LongConsumer onBucket) {
|
|
|
+ LongBounds bounds = emptyBucketInfo.bounds;
|
|
|
|
|
|
// first adding all the empty buckets *before* the actual data (based on the extended_bounds.min the user requested)
|
|
|
- InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(emptyBucketInfo.subAggregations),
|
|
|
- reduceContext);
|
|
|
+
|
|
|
if (bounds != null) {
|
|
|
Bucket firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null;
|
|
|
if (firstBucket == null) {
|
|
@@ -366,7 +403,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
long key = bounds.getMin() + offset;
|
|
|
long max = bounds.getMax() + offset;
|
|
|
while (key <= max) {
|
|
|
- iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
|
|
+ onBucket.accept(key);
|
|
|
key = nextKey(key).longValue();
|
|
|
}
|
|
|
}
|
|
@@ -375,7 +412,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
long key = bounds.getMin() + offset;
|
|
|
if (key < firstBucket.key) {
|
|
|
while (key < firstBucket.key) {
|
|
|
- iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
|
|
+ onBucket.accept(key);
|
|
|
key = nextKey(key).longValue();
|
|
|
}
|
|
|
}
|
|
@@ -383,6 +420,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ Bucket lastBucket = null;
|
|
|
// now adding the empty buckets within the actual data,
|
|
|
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
|
|
|
while (iter.hasNext()) {
|
|
@@ -390,7 +428,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
if (lastBucket != null) {
|
|
|
long key = nextKey(lastBucket.key).longValue();
|
|
|
while (key < nextBucket.key) {
|
|
|
- iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
|
|
+ onBucket.accept(key);
|
|
|
key = nextKey(key).longValue();
|
|
|
}
|
|
|
assert key == nextBucket.key : "key: " + key + ", nextBucket.key: " + nextBucket.key;
|
|
@@ -403,7 +441,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
long key = nextKey(lastBucket.key).longValue();
|
|
|
long max = bounds.getMax() + offset;
|
|
|
while (key <= max) {
|
|
|
- iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
|
|
+ onBucket.accept(key);
|
|
|
key = nextKey(key).longValue();
|
|
|
}
|
|
|
}
|
|
@@ -412,9 +450,11 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
@Override
|
|
|
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
|
|
|
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
|
|
|
+ boolean alreadyAccountedForBuckets = false;
|
|
|
if (reduceContext.isFinalReduce()) {
|
|
|
if (minDocCount == 0) {
|
|
|
addEmptyBuckets(reducedBuckets, reduceContext);
|
|
|
+ alreadyAccountedForBuckets = true;
|
|
|
}
|
|
|
if (InternalOrder.isKeyDesc(order)) {
|
|
|
// we just need to reverse here...
|
|
@@ -428,7 +468,9 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|
|
CollectionUtil.introSort(reducedBuckets, order.comparator());
|
|
|
}
|
|
|
}
|
|
|
- reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
|
|
|
+ if (false == alreadyAccountedForBuckets) {
|
|
|
+ reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
|
|
|
+ }
|
|
|
return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
|
|
|
format, keyed, getMetadata());
|
|
|
}
|