|
@@ -293,15 +293,17 @@ public final class InternalAutoDateHistogram extends
|
|
|
// First we need to find the highest level rounding used across all the
|
|
|
// shards
|
|
|
int reduceRoundingIdx = 0;
|
|
|
+ long min = Long.MAX_VALUE;
|
|
|
+ long max = Long.MIN_VALUE;
|
|
|
for (InternalAggregation aggregation : aggregations) {
|
|
|
- int aggRoundingIdx = ((InternalAutoDateHistogram) aggregation).bucketInfo.roundingIdx;
|
|
|
- if (aggRoundingIdx > reduceRoundingIdx) {
|
|
|
- reduceRoundingIdx = aggRoundingIdx;
|
|
|
+ InternalAutoDateHistogram agg = ((InternalAutoDateHistogram) aggregation);
|
|
|
+ reduceRoundingIdx = Math.max(agg.bucketInfo.roundingIdx, reduceRoundingIdx);
|
|
|
+ if (false == agg.buckets.isEmpty()) {
|
|
|
+ min = Math.min(min, agg.buckets.get(0).key);
|
|
|
+ max = Math.max(max, agg.buckets.get(agg.buckets.size() - 1).key);
|
|
|
}
|
|
|
}
|
|
|
- // This rounding will be used to reduce all the buckets
|
|
|
- RoundingInfo reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx];
|
|
|
- Rounding reduceRounding = reduceRoundingInfo.rounding;
|
|
|
+ Rounding.Prepared reduceRounding = prepare(reduceRoundingIdx, min, max);
|
|
|
|
|
|
final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
|
|
|
@Override
|
|
@@ -351,21 +353,33 @@ public final class InternalAutoDateHistogram extends
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return mergeBucketsIfNeeded(reducedBuckets, reduceRoundingIdx, reduceRoundingInfo, reduceContext);
|
|
|
+ return mergeBucketsIfNeeded(
|
|
|
+ new BucketReduceResult(reducedBuckets, reduceRoundingIdx, 1, reduceRounding, min, max),
|
|
|
+ reduceContext
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
- private BucketReduceResult mergeBucketsIfNeeded(List<Bucket> reducedBuckets, int reduceRoundingIdx, RoundingInfo reduceRoundingInfo,
|
|
|
- ReduceContext reduceContext) {
|
|
|
- while (reducedBuckets.size() > (targetBuckets * reduceRoundingInfo.getMaximumInnerInterval())
|
|
|
- && reduceRoundingIdx < bucketInfo.roundingInfos.length - 1) {
|
|
|
- reduceRoundingIdx++;
|
|
|
- reduceRoundingInfo = bucketInfo.roundingInfos[reduceRoundingIdx];
|
|
|
- reducedBuckets = mergeBuckets(reducedBuckets, reduceRoundingInfo.rounding, reduceContext);
|
|
|
- }
|
|
|
- return new BucketReduceResult(reducedBuckets, reduceRoundingInfo, reduceRoundingIdx, 1);
|
|
|
+ private BucketReduceResult mergeBucketsIfNeeded(BucketReduceResult firstPassResult, ReduceContext reduceContext) {
|
|
|
+ int idx = firstPassResult.roundingIdx;
|
|
|
+ RoundingInfo info = bucketInfo.roundingInfos[idx];
|
|
|
+ List<Bucket> buckets = firstPassResult.buckets;
|
|
|
+ Rounding.Prepared prepared = firstPassResult.preparedRounding;
|
|
|
+ while (buckets.size() > (targetBuckets * info.getMaximumInnerInterval())
|
|
|
+ && idx < bucketInfo.roundingInfos.length - 1) {
|
|
|
+ idx++;
|
|
|
+ info = bucketInfo.roundingInfos[idx];
|
|
|
+ prepared = prepare(idx, firstPassResult.min, firstPassResult.max);
|
|
|
+ buckets = mergeBuckets(buckets, prepared, reduceContext);
|
|
|
+ }
|
|
|
+ return new BucketReduceResult(buckets, idx, 1, prepared, firstPassResult.min, firstPassResult.max);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Rounding.Prepared prepare(int idx, long min, long max) {
|
|
|
+ Rounding rounding = bucketInfo.roundingInfos[idx].rounding;
|
|
|
+ return min <= max ? rounding.prepare(min, max) : rounding.prepareForUnknown();
|
|
|
}
|
|
|
|
|
|
- private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRounding, ReduceContext reduceContext) {
|
|
|
+ private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding.Prepared reduceRounding, ReduceContext reduceContext) {
|
|
|
List<Bucket> mergedBuckets = new ArrayList<>();
|
|
|
|
|
|
List<Bucket> sameKeyedBuckets = new ArrayList<>();
|
|
@@ -405,35 +419,51 @@ public final class InternalAutoDateHistogram extends
|
|
|
}
|
|
|
|
|
|
private static class BucketReduceResult {
|
|
|
- List<Bucket> buckets;
|
|
|
- RoundingInfo roundingInfo;
|
|
|
- int roundingIdx;
|
|
|
- long innerInterval;
|
|
|
-
|
|
|
- BucketReduceResult(List<Bucket> buckets, RoundingInfo roundingInfo, int roundingIdx, long innerInterval) {
|
|
|
+ final List<Bucket> buckets;
|
|
|
+ final int roundingIdx;
|
|
|
+ final long innerInterval;
|
|
|
+ final Rounding.Prepared preparedRounding;
|
|
|
+ final long min;
|
|
|
+ final long max;
|
|
|
+
|
|
|
+ BucketReduceResult(
|
|
|
+ List<Bucket> buckets,
|
|
|
+ int roundingIdx,
|
|
|
+ long innerInterval,
|
|
|
+ Rounding.Prepared preparedRounding,
|
|
|
+ long min,
|
|
|
+ long max
|
|
|
+ ) {
|
|
|
this.buckets = buckets;
|
|
|
- this.roundingInfo = roundingInfo;
|
|
|
this.roundingIdx = roundingIdx;
|
|
|
this.innerInterval = innerInterval;
|
|
|
+ this.preparedRounding = preparedRounding;
|
|
|
+ this.min = min;
|
|
|
+ this.max = max;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, ReduceContext reduceContext) {
|
|
|
- List<Bucket> list = currentResult.buckets;
|
|
|
+ private BucketReduceResult addEmptyBuckets(BucketReduceResult current, ReduceContext reduceContext) {
|
|
|
+ List<Bucket> list = current.buckets;
|
|
|
if (list.isEmpty()) {
|
|
|
- return currentResult;
|
|
|
- }
|
|
|
- int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx,
|
|
|
- bucketInfo.roundingInfos, targetBuckets);
|
|
|
- RoundingInfo roundingInfo = bucketInfo.roundingInfos[roundingIdx];
|
|
|
- Rounding rounding = roundingInfo.rounding;
|
|
|
+ return current;
|
|
|
+ }
|
|
|
+ int roundingIdx = getAppropriateRounding(
|
|
|
+ list.get(0).key,
|
|
|
+ list.get(list.size() - 1).key,
|
|
|
+ current.roundingIdx,
|
|
|
+ bucketInfo.roundingInfos,
|
|
|
+ targetBuckets
|
|
|
+ );
|
|
|
+ Rounding.Prepared rounding = current.roundingIdx == roundingIdx
|
|
|
+ ? current.preparedRounding
|
|
|
+ : prepare(roundingIdx, current.min, current.max);
|
|
|
// merge buckets using the new rounding
|
|
|
list = mergeBuckets(list, rounding, reduceContext);
|
|
|
|
|
|
Bucket lastBucket = null;
|
|
|
ListIterator<Bucket> iter = list.listIterator();
|
|
|
- InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(bucketInfo.emptySubAggregations),
|
|
|
- reduceContext);
|
|
|
+ InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(bucketInfo.emptySubAggregations), reduceContext);
|
|
|
|
|
|
// Add the empty buckets within the data,
|
|
|
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
|
|
@@ -449,7 +479,7 @@ public final class InternalAutoDateHistogram extends
|
|
|
}
|
|
|
lastBucket = iter.next();
|
|
|
}
|
|
|
- return new BucketReduceResult(list, roundingInfo, roundingIdx, currentResult.innerInterval);
|
|
|
+ return new BucketReduceResult(list, roundingIdx, 1, rounding, current.min, current.max);
|
|
|
}
|
|
|
|
|
|
static int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
|
|
@@ -501,8 +531,7 @@ public final class InternalAutoDateHistogram extends
|
|
|
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);
|
|
|
|
|
|
// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
|
|
|
- reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
|
|
|
- reducedBucketsResult.roundingInfo, reduceContext);
|
|
|
+ reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult, reduceContext);
|
|
|
|
|
|
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
|
|
|
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
|
|
@@ -515,11 +544,9 @@ public final class InternalAutoDateHistogram extends
|
|
|
getMetadata(), reducedBucketsResult.innerInterval);
|
|
|
}
|
|
|
|
|
|
- private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult,
|
|
|
- ReduceContext reduceContext) {
|
|
|
- List<Bucket> buckets = reducedBucketsResult.buckets;
|
|
|
- RoundingInfo roundingInfo = reducedBucketsResult.roundingInfo;
|
|
|
- int roundingIdx = reducedBucketsResult.roundingIdx;
|
|
|
+ private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult current, ReduceContext reduceContext) {
|
|
|
+ List<Bucket> buckets = current.buckets;
|
|
|
+ RoundingInfo roundingInfo = bucketInfo.roundingInfos[current.roundingIdx];
|
|
|
if (buckets.size() > targetBuckets) {
|
|
|
for (int interval : roundingInfo.innerIntervals) {
|
|
|
int resultingBuckets = buckets.size() / interval;
|
|
@@ -527,32 +554,38 @@ public final class InternalAutoDateHistogram extends
|
|
|
resultingBuckets++;
|
|
|
}
|
|
|
if (resultingBuckets <= targetBuckets) {
|
|
|
- return mergeConsecutiveBuckets(buckets, interval, roundingIdx, roundingInfo, reduceContext);
|
|
|
+ return mergeConsecutiveBuckets(current, interval, reduceContext);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return reducedBucketsResult;
|
|
|
+ return current;
|
|
|
}
|
|
|
|
|
|
- private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets, int mergeInterval, int roundingIdx,
|
|
|
- RoundingInfo roundingInfo, ReduceContext reduceContext) {
|
|
|
+ private BucketReduceResult mergeConsecutiveBuckets(BucketReduceResult current, int mergeInterval, ReduceContext reduceContext) {
|
|
|
List<Bucket> mergedBuckets = new ArrayList<>();
|
|
|
List<Bucket> sameKeyedBuckets = new ArrayList<>();
|
|
|
|
|
|
- double key = roundingInfo.rounding.round(reducedBuckets.get(0).key);
|
|
|
- for (int i = 0; i < reducedBuckets.size(); i++) {
|
|
|
- Bucket bucket = reducedBuckets.get(i);
|
|
|
+ double key = current.preparedRounding.round(current.buckets.get(0).key);
|
|
|
+ for (int i = 0; i < current.buckets.size(); i++) {
|
|
|
+ Bucket bucket = current.buckets.get(i);
|
|
|
if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
|
|
|
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
|
|
|
sameKeyedBuckets.clear();
|
|
|
- key = roundingInfo.rounding.round(bucket.key);
|
|
|
+ key = current.preparedRounding.round(bucket.key);
|
|
|
}
|
|
|
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
|
|
|
}
|
|
|
if (sameKeyedBuckets.isEmpty() == false) {
|
|
|
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
|
|
|
}
|
|
|
- return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
|
|
|
+ return new BucketReduceResult(
|
|
|
+ mergedBuckets,
|
|
|
+ current.roundingIdx,
|
|
|
+ mergeInterval,
|
|
|
+ current.preparedRounding,
|
|
|
+ current.min,
|
|
|
+ current.max
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
@Override
|