Browse Source

Reduce InternalVariableWidthHistogram in a streaming fashion (#105748)

Ignacio Vera 1 year ago
parent
commit
dbf72e1dad

+ 43 - 58
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java

@@ -8,9 +8,11 @@
 
 package org.elasticsearch.search.aggregations.bucket.histogram;
 
-import org.apache.lucene.util.PriorityQueue;
+import org.apache.lucene.util.NumericUtils;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.LongObjectPagedHashMap;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.AggregationReduceContext;
 import org.elasticsearch.search.aggregations.AggregatorReducer;
@@ -18,7 +20,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.KeyComparable;
-import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 import org.elasticsearch.search.aggregations.support.SamplingContext;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -324,59 +326,6 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
         return new Bucket(centroid, bounds, docCount, format, aggs);
     }
 
-    public List<Bucket> reduceBuckets(List<InternalVariableWidthHistogram> aggregations, AggregationReduceContext reduceContext) {
-        PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
-            @Override
-            protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
-                return Double.compare(a.current().centroid, b.current().centroid) < 0;
-            }
-        };
-        for (InternalVariableWidthHistogram histogram : aggregations) {
-            if (histogram.buckets.isEmpty() == false) {
-                pq.add(new IteratorAndCurrent<>(histogram.buckets.iterator()));
-            }
-        }
-
-        List<Bucket> reducedBuckets = new ArrayList<>();
-        if (pq.size() > 0) {
-            double key = pq.top().current().centroid();
-            // list of buckets coming from different shards that have the same key
-            List<Bucket> currentBuckets = new ArrayList<>();
-            do {
-                IteratorAndCurrent<Bucket> top = pq.top();
-
-                if (Double.compare(top.current().centroid(), key) != 0) {
-                    // The key changes, reduce what we already buffered and reset the buffer for current buckets.
-                    final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                    reduceContext.consumeBucketsAndMaybeBreak(1);
-                    reducedBuckets.add(reduced);
-                    currentBuckets.clear();
-                    key = top.current().centroid();
-                }
-
-                currentBuckets.add(top.current());
-
-                if (top.hasNext()) {
-                    Bucket prev = top.current();
-                    top.next();
-                    assert top.current().compareKey(prev) >= 0 : "shards must return data sorted by centroid";
-                    pq.updateTop();
-                } else {
-                    pq.pop();
-                }
-            } while (pq.size() > 0);
-
-            if (currentBuckets.isEmpty() == false) {
-                final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                reduceContext.consumeBucketsAndMaybeBreak(1);
-                reducedBuckets.add(reduced);
-            }
-        }
-
-        mergeBucketsIfNeeded(reducedBuckets, targetNumBuckets, reduceContext);
-        return reducedBuckets;
-    }
-
     static class BucketRange {
         int startIdx;
         int endIdx;
@@ -530,16 +479,40 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
     @Override
     protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
         return new AggregatorReducer() {
-            final List<InternalVariableWidthHistogram> aggregations = new ArrayList<>(size);
+
+            final LongObjectPagedHashMap<ReducerAndExtraInfo> bucketsReducer = new LongObjectPagedHashMap<>(
+                getBuckets().size(),
+                reduceContext.bigArrays()
+            );
 
             @Override
             public void accept(InternalAggregation aggregation) {
-                aggregations.add((InternalVariableWidthHistogram) aggregation);
+                InternalVariableWidthHistogram histogram = (InternalVariableWidthHistogram) aggregation;
+                for (Bucket bucket : histogram.getBuckets()) {
+                    long key = NumericUtils.doubleToSortableLong(bucket.centroid());
+                    ReducerAndExtraInfo reducer = bucketsReducer.get(key);
+                    if (reducer == null) {
+                        reducer = new ReducerAndExtraInfo(new MultiBucketAggregatorsReducer(reduceContext, size));
+                        bucketsReducer.put(key, reducer);
+                        reduceContext.consumeBucketsAndMaybeBreak(1);
+                    }
+                    reducer.min[0] = Math.min(reducer.min[0], bucket.bounds.min);
+                    reducer.max[0] = Math.max(reducer.max[0], bucket.bounds.max);
+                    reducer.sum[0] += bucket.docCount * bucket.centroid;
+                    reducer.reducer.accept(bucket);
+                }
             }
 
             @Override
             public InternalAggregation get() {
-                final List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
+                final List<Bucket> reducedBuckets = new ArrayList<>((int) bucketsReducer.size());
+                bucketsReducer.iterator().forEachRemaining(entry -> {
+                    final double centroid = entry.value.sum[0] / entry.value.reducer.getDocCount();
+                    final Bucket.BucketBounds bounds = new Bucket.BucketBounds(entry.value.min[0], entry.value.max[0]);
+                    reducedBuckets.add(new Bucket(centroid, bounds, entry.value.reducer.getDocCount(), format, entry.value.reducer.get()));
+                });
+                reducedBuckets.sort(Comparator.comparing(Bucket::centroid));
+                mergeBucketsIfNeeded(reducedBuckets, targetNumBuckets, reduceContext);
                 if (reduceContext.isFinalReduce()) {
                     buckets.sort(Comparator.comparing(Bucket::min));
                     mergeBucketsWithSameMin(reducedBuckets, reduceContext);
@@ -547,9 +520,21 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
                 }
                 return new InternalVariableWidthHistogram(getName(), reducedBuckets, emptyBucketInfo, targetNumBuckets, format, metadata);
             }
+
+            @Override
+            public void close() {
+                bucketsReducer.iterator().forEachRemaining(entry -> Releasables.close(entry.value.reducer));
+                Releasables.close(bucketsReducer);
+            }
         };
     }
 
+    private record ReducerAndExtraInfo(MultiBucketAggregatorsReducer reducer, double[] min, double[] max, double[] sum) {
+        private ReducerAndExtraInfo(MultiBucketAggregatorsReducer reducer) {
+            this(reducer, new double[] { Double.POSITIVE_INFINITY }, new double[] { Double.NEGATIVE_INFINITY }, new double[] { 0 });
+        }
+    }
+
     @Override
     public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
         return new InternalVariableWidthHistogram(