Browse Source

Reduce InternalIpPrefix in a streaming fashion (#105479)

Ignacio Vera 1 year ago
parent
commit
ce265438b2

+ 28 - 66
server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java

@@ -9,9 +9,9 @@
 package org.elasticsearch.search.aggregations.bucket.prefix;
 
 import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.PriorityQueue;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.AggregationReduceContext;
 import org.elasticsearch.search.aggregations.AggregatorReducer;
@@ -19,12 +19,14 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.KeyComparable;
-import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer;
 import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -223,75 +225,46 @@ public class InternalIpPrefix extends InternalMultiBucketAggregation<InternalIpP
     @Override
     protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
         return new AggregatorReducer() {
-            final List<InternalIpPrefix> aggregations = new ArrayList<>(size);
+            final Map<BytesRef, ReducerAndProto> buckets = new HashMap<>();
 
             @Override
             public void accept(InternalAggregation aggregation) {
-                aggregations.add((InternalIpPrefix) aggregation);
+                final InternalIpPrefix ipPrefix = (InternalIpPrefix) aggregation;
+                for (Bucket bucket : ipPrefix.getBuckets()) {
+                    ReducerAndProto reducerAndProto = buckets.computeIfAbsent(
+                        bucket.key,
+                        k -> new ReducerAndProto(new MultiBucketAggregatorsReducer(reduceContext, size), bucket)
+                    );
+                    reducerAndProto.reducer.accept(bucket);
+                }
             }
 
             @Override
             public InternalAggregation get() {
-                List<InternalIpPrefix.Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
+                final List<Bucket> reducedBuckets = new ArrayList<>(buckets.size());
+                for (ReducerAndProto reducerAndProto : buckets.values()) {
+                    if (false == reduceContext.isFinalReduce() || reducerAndProto.reducer.getDocCount() >= minDocCount) {
+                        reducedBuckets.add(
+                            createBucket(reducerAndProto.proto, reducerAndProto.reducer.get(), reducerAndProto.reducer.getDocCount())
+                        );
+                    }
+                }
                 reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
+                reducedBuckets.sort(Comparator.comparing(a -> a.key));
                 return new InternalIpPrefix(getName(), format, keyed, minDocCount, reducedBuckets, metadata);
             }
-        };
-    }
 
-    private List<Bucket> reduceBuckets(List<InternalIpPrefix> aggregations, AggregationReduceContext reduceContext) {
-        final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
             @Override
-            protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
-                return a.current().key.compareTo(b.current().key) < 0;
-            }
-        };
-        for (InternalIpPrefix ipPrefix : aggregations) {
-            if (ipPrefix.buckets.isEmpty() == false) {
-                pq.add(new IteratorAndCurrent<>(ipPrefix.buckets.iterator()));
-            }
-        }
-
-        List<Bucket> reducedBuckets = new ArrayList<>();
-        if (pq.size() > 0) {
-            // list of buckets coming from different shards that have the same value
-            List<Bucket> currentBuckets = new ArrayList<>();
-            BytesRef value = pq.top().current().key;
-
-            do {
-                final IteratorAndCurrent<Bucket> top = pq.top();
-                if (top.current().key.equals(value) == false) {
-                    final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                    if (false == reduceContext.isFinalReduce() || reduced.getDocCount() >= minDocCount) {
-                        reducedBuckets.add(reduced);
-                    }
-                    currentBuckets.clear();
-                    value = top.current().key;
-                }
-
-                currentBuckets.add(top.current());
-
-                if (top.hasNext()) {
-                    top.next();
-                    assert top.current().key.compareTo(value) > 0
-                        : "shards must return data sorted by value [" + top.current().key + "] and [" + value + "]";
-                    pq.updateTop();
-                } else {
-                    pq.pop();
-                }
-            } while (pq.size() > 0);
-
-            if (currentBuckets.isEmpty() == false) {
-                final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                if (false == reduceContext.isFinalReduce() || reduced.getDocCount() >= minDocCount) {
-                    reducedBuckets.add(reduced);
+            public void close() {
+                for (ReducerAndProto reducerAndProto : buckets.values()) {
+                    Releasables.close(reducerAndProto.reducer);
                 }
             }
-        }
-
-        return reducedBuckets;
+        };
     }
 
+    private record ReducerAndProto(MultiBucketAggregatorsReducer reducer, Bucket proto) {}
+
     @Override
     public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
         if (keyed) {
@@ -342,17 +315,6 @@ public class InternalIpPrefix extends InternalMultiBucketAggregation<InternalIpP
         );
     }
 
-    private Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
-        assert buckets.isEmpty() == false;
-        long docCount = 0;
-        for (InternalIpPrefix.Bucket bucket : buckets) {
-            docCount += bucket.docCount;
-        }
-        final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
-        final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
-        return createBucket(buckets.get(0), aggs, docCount);
-    }
-
     @Override
     public List<Bucket> getBuckets() {
         return Collections.unmodifiableList(buckets);