|
@@ -171,13 +171,14 @@ public abstract class AbstractInternalTerms<A extends AbstractInternalTerms<A, B
|
|
|
pq.add(new IteratorAndCurrent<>(buckets.iterator()));
|
|
|
}
|
|
|
// list of buckets coming from different shards that have the same key
|
|
|
- List<B> sameTermBuckets = new ArrayList<>();
|
|
|
+ ArrayList<B> sameTermBuckets = new ArrayList<>();
|
|
|
B lastBucket = null;
|
|
|
while (pq.size() > 0) {
|
|
|
final IteratorAndCurrent<B> top = pq.top();
|
|
|
assert lastBucket == null || cmp.compare(top.current(), lastBucket) >= 0;
|
|
|
if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) {
|
|
|
// the key changed so bundle up the last key's worth of buckets
|
|
|
+ sameTermBuckets.trimToSize();
|
|
|
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
|
|
|
sameTermBuckets = new ArrayList<>();
|
|
|
}
|
|
@@ -198,18 +199,20 @@ public abstract class AbstractInternalTerms<A extends AbstractInternalTerms<A, B
|
|
|
}
|
|
|
|
|
|
if (sameTermBuckets.isEmpty() == false) {
|
|
|
+ sameTermBuckets.trimToSize();
|
|
|
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void reduceLegacy(List<List<B>> bucketsList, AggregationReduceContext reduceContext, Consumer<DelayedBucket<B>> sink) {
|
|
|
- final Map<Object, List<B>> bucketMap = new HashMap<>();
|
|
|
+ final Map<Object, ArrayList<B>> bucketMap = new HashMap<>();
|
|
|
for (List<B> buckets : bucketsList) {
|
|
|
for (B bucket : buckets) {
|
|
|
bucketMap.computeIfAbsent(bucket.getKey(), k -> new ArrayList<>()).add(bucket);
|
|
|
}
|
|
|
}
|
|
|
- for (List<B> sameTermBuckets : bucketMap.values()) {
|
|
|
+ for (ArrayList<B> sameTermBuckets : bucketMap.values()) {
|
|
|
+ sameTermBuckets.trimToSize();
|
|
|
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
|
|
|
}
|
|
|
}
|