Kaynağa Gözat

Reduce InternalHistogram and InternalDateHistogram in a streaming fashion (#105359)

Ignacio Vera 1 yıl önce
ebeveyn
işleme
f942605c83

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java

@@ -37,7 +37,7 @@ public abstract class InternalMultiBucketAggregation<
      * buckets is plenty fast to fail this quickly in pathological cases and
      * plenty large to keep the overhead minimal.
      */
-    protected static final int REPORT_EMPTY_EVERY = 10_000;
+    public static final int REPORT_EMPTY_EVERY = 10_000;
 
     public InternalMultiBucketAggregation(String name, Map<String, Object> metadata) {
         super(name, metadata);

+ 23 - 84
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

@@ -8,12 +8,12 @@
 package org.elasticsearch.search.aggregations.bucket.histogram;
 
 import org.apache.lucene.util.CollectionUtil;
-import org.apache.lucene.util.PriorityQueue;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.Rounding;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.AggregationReduceContext;
 import org.elasticsearch.search.aggregations.AggregatorReducer;
@@ -23,7 +23,6 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.InternalOrder;
 import org.elasticsearch.search.aggregations.KeyComparable;
-import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 import org.elasticsearch.search.aggregations.support.SamplingContext;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -33,6 +32,7 @@ import java.time.Instant;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -323,85 +323,6 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
         return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
     }
 
-    private List<Bucket> reduceBuckets(List<InternalDateHistogram> aggregations, AggregationReduceContext reduceContext) {
-
-        final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
-            @Override
-            protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
-                return a.current().key < b.current().key;
-            }
-        };
-        for (InternalDateHistogram histogram : aggregations) {
-            if (histogram.buckets.isEmpty() == false) {
-                pq.add(new IteratorAndCurrent<>(histogram.buckets.iterator()));
-            }
-        }
-
-        int consumeBucketCount = 0;
-        List<Bucket> reducedBuckets = new ArrayList<>();
-        if (pq.size() > 0) {
-            // list of buckets coming from different shards that have the same key
-            List<Bucket> currentBuckets = new ArrayList<>();
-            double key = pq.top().current().key;
-
-            do {
-                final IteratorAndCurrent<Bucket> top = pq.top();
-
-                if (top.current().key != key) {
-                    // the key changes, reduce what we already buffered and reset the buffer for current buckets
-                    final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                    if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
-                        if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
-                            reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
-                            consumeBucketCount = 0;
-                        }
-                        reducedBuckets.add(reduced);
-                    }
-                    currentBuckets.clear();
-                    key = top.current().key;
-                }
-
-                currentBuckets.add(top.current());
-
-                if (top.hasNext()) {
-                    top.next();
-                    assert top.current().key > key : "shards must return data sorted by key";
-                    pq.updateTop();
-                } else {
-                    pq.pop();
-                }
-            } while (pq.size() > 0);
-
-            if (currentBuckets.isEmpty() == false) {
-                final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
-                    reducedBuckets.add(reduced);
-                    if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
-                        reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
-                        consumeBucketCount = 0;
-                    }
-                }
-            }
-        }
-        reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
-        return reducedBuckets;
-    }
-
-    /**
-     * Reduce a list of same-keyed buckets (from multiple shards) to a single bucket. This
-     * requires all buckets to have the same key.
-     */
-    private Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
-        assert buckets.isEmpty() == false;
-        long docCount = 0;
-        for (Bucket bucket : buckets) {
-            docCount += bucket.docCount;
-        }
-        final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
-        final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
-        return createBucket(buckets.get(0).key, docCount, aggs);
-    }
-
     private void addEmptyBuckets(List<Bucket> list, AggregationReduceContext reduceContext) {
         /*
          * Make sure we have space for the empty buckets we're going to add by
@@ -513,17 +434,30 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
     protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
         return new AggregatorReducer() {
 
-            final List<InternalDateHistogram> aggregations = new ArrayList<>(size);
+            final LongKeyedMultiBucketsAggregatorReducer<Bucket> reducer = new LongKeyedMultiBucketsAggregatorReducer<>(
+                reduceContext,
+                size,
+                minDocCount
+            ) {
+                @Override
+                protected Bucket createBucket(long key, long docCount, InternalAggregations aggregations) {
+                    return InternalDateHistogram.this.createBucket(key, docCount, aggregations);
+                }
+            };
 
             @Override
             public void accept(InternalAggregation aggregation) {
-                aggregations.add((InternalDateHistogram) aggregation);
+                InternalDateHistogram dateHistogram = (InternalDateHistogram) aggregation;
+                for (Bucket bucket : dateHistogram.buckets) {
+                    reducer.accept(bucket.key, bucket);
+                }
             }
 
             @Override
             public InternalAggregation get() {
-                List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
+                List<Bucket> reducedBuckets = reducer.get();
                 if (reduceContext.isFinalReduce()) {
+                    reducedBuckets.sort(Comparator.comparingLong(b -> b.key));
                     if (minDocCount == 0) {
                         addEmptyBuckets(reducedBuckets, reduceContext);
                     }
@@ -552,6 +486,11 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
                     getMetadata()
                 );
             }
+
+            @Override
+            public void close() {
+                Releasables.close(reducer);
+            }
         };
     }
 

+ 24 - 82
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

@@ -8,9 +8,10 @@
 package org.elasticsearch.search.aggregations.bucket.histogram;
 
 import org.apache.lucene.util.CollectionUtil;
-import org.apache.lucene.util.PriorityQueue;
+import org.apache.lucene.util.NumericUtils;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.AggregationReduceContext;
 import org.elasticsearch.search.aggregations.AggregatorReducer;
@@ -20,7 +21,6 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.InternalOrder;
 import org.elasticsearch.search.aggregations.KeyComparable;
-import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 import org.elasticsearch.search.aggregations.support.SamplingContext;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -28,6 +28,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -281,83 +282,6 @@ public class InternalHistogram extends InternalMultiBucketAggregation<InternalHi
         return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
     }
 
-    private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
-        final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
-            @Override
-            protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
-                return Double.compare(a.current().key, b.current().key) < 0;
-            }
-        };
-        for (InternalAggregation aggregation : aggregations) {
-            InternalHistogram histogram = (InternalHistogram) aggregation;
-            if (histogram.buckets.isEmpty() == false) {
-                pq.add(new IteratorAndCurrent<>(histogram.buckets.iterator()));
-            }
-        }
-
-        int consumeBucketCount = 0;
-        List<Bucket> reducedBuckets = new ArrayList<>();
-        if (pq.size() > 0) {
-            // list of buckets coming from different shards that have the same key
-            List<Bucket> currentBuckets = new ArrayList<>();
-            double key = pq.top().current().key;
-
-            do {
-                final IteratorAndCurrent<Bucket> top = pq.top();
-
-                if (Double.compare(top.current().key, key) != 0) {
-                    // The key changes, reduce what we already buffered and reset the buffer for current buckets.
-                    // Using Double.compare instead of != to handle NaN correctly.
-                    final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                    if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
-                        reducedBuckets.add(reduced);
-                        if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
-                            reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
-                            consumeBucketCount = 0;
-                        }
-                    }
-                    currentBuckets.clear();
-                    key = top.current().key;
-                }
-
-                currentBuckets.add(top.current());
-
-                if (top.hasNext()) {
-                    top.next();
-                    assert Double.compare(top.current().key, key) > 0 : "shards must return data sorted by key";
-                    pq.updateTop();
-                } else {
-                    pq.pop();
-                }
-            } while (pq.size() > 0);
-
-            if (currentBuckets.isEmpty() == false) {
-                final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
-                if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
-                    reducedBuckets.add(reduced);
-                    if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
-                        reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
-                        consumeBucketCount = 0;
-                    }
-                }
-            }
-        }
-
-        reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
-        return reducedBuckets;
-    }
-
-    private Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
-        assert buckets.isEmpty() == false;
-        long docCount = 0;
-        for (Bucket bucket : buckets) {
-            docCount += bucket.docCount;
-        }
-        final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
-        final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
-        return createBucket(buckets.get(0).key, docCount, aggs);
-    }
-
     private double nextKey(double key) {
         return round(key + emptyBucketInfo.interval + emptyBucketInfo.interval / 2);
     }
@@ -453,17 +377,30 @@ public class InternalHistogram extends InternalMultiBucketAggregation<InternalHi
     protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
         return new AggregatorReducer() {
 
-            List<InternalAggregation> aggregations = new ArrayList<>(size);
+            final LongKeyedMultiBucketsAggregatorReducer<Bucket> reducer = new LongKeyedMultiBucketsAggregatorReducer<>(
+                reduceContext,
+                size,
+                minDocCount
+            ) {
+                @Override
+                protected Bucket createBucket(long key, long docCount, InternalAggregations aggregations) {
+                    return InternalHistogram.this.createBucket(NumericUtils.sortableLongToDouble(key), docCount, aggregations);
+                }
+            };
 
             @Override
             public void accept(InternalAggregation aggregation) {
-                aggregations.add(aggregation);
+                InternalHistogram histogram = (InternalHistogram) aggregation;
+                for (Bucket bucket : histogram.buckets) {
+                    reducer.accept(NumericUtils.doubleToSortableLong(bucket.key), bucket);
+                }
             }
 
             @Override
             public InternalAggregation get() {
-                List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
+                List<Bucket> reducedBuckets = reducer.get();
                 if (reduceContext.isFinalReduce()) {
+                    reducedBuckets.sort(Comparator.comparingDouble(b -> b.key));
                     if (minDocCount == 0) {
                         addEmptyBuckets(reducedBuckets, reduceContext);
                     }
@@ -481,6 +418,11 @@ public class InternalHistogram extends InternalMultiBucketAggregation<InternalHi
                 }
                 return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata());
             }
+
+            @Override
+            public void close() {
+                Releasables.close(reducer);
+            }
         };
     }
 

+ 97 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/LongKeyedMultiBucketsAggregatorReducer.java

@@ -0,0 +1,97 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.search.aggregations.bucket.histogram;
+
+import org.elasticsearch.common.util.LongObjectPagedHashMap;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.search.aggregations.AggregationReduceContext;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregatorsReducer;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  Reduces aggregations where buckets are represented by a long key. It uses a {@link LongObjectPagedHashMap}
+ *  to keep track of the different buckets.
+ */
+abstract class LongKeyedMultiBucketsAggregatorReducer<B extends MultiBucketsAggregation.Bucket> implements Releasable {
+
+    private final AggregationReduceContext reduceContext;
+    private final int size;
+    private final long minDocCount;
+    private final LongObjectPagedHashMap<MultiBucketAggregatorsReducer> bucketsReducer;
+    int consumeBucketCount = 0;
+
+    LongKeyedMultiBucketsAggregatorReducer(AggregationReduceContext reduceContext, int size, long minDocCount) {
+        this.reduceContext = reduceContext;
+        this.size = size;
+        this.minDocCount = minDocCount;
+        bucketsReducer = new LongObjectPagedHashMap<>(size, reduceContext.bigArrays());
+    }
+
+    /**
+     * The bucket to reduce with its corresponding long key.
+     */
+    public final void accept(long key, B bucket) {
+        MultiBucketAggregatorsReducer reducer = bucketsReducer.get(key);
+        if (reducer == null) {
+            reducer = new MultiBucketAggregatorsReducer(reduceContext, size);
+            bucketsReducer.put(key, reducer);
+        }
+        consumeBucketsAndMaybeBreak(reducer, bucket);
+        reducer.accept(bucket);
+    }
+
+    private void consumeBucketsAndMaybeBreak(MultiBucketAggregatorsReducer reducer, B bucket) {
+        if (reduceContext.isFinalReduce() == false || minDocCount == 0) {
+            if (reducer.getDocCount() == 0 && bucket.getDocCount() > 0) {
+                consumeBucketsAndMaybeBreak();
+            }
+        } else {
+            if (reducer.getDocCount() < minDocCount && (reducer.getDocCount() + bucket.getDocCount()) >= minDocCount) {
+                consumeBucketsAndMaybeBreak();
+            }
+        }
+    }
+
+    private void consumeBucketsAndMaybeBreak() {
+        if (consumeBucketCount++ >= InternalMultiBucketAggregation.REPORT_EMPTY_EVERY) {
+            reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
+            consumeBucketCount = 0;
+        }
+    }
+
+    /**
+     * Returns the reduced buckets.
+     */
+    public final List<B> get() {
+        reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
+        final List<B> reducedBuckets = new ArrayList<>((int) bucketsReducer.size());
+        bucketsReducer.iterator().forEachRemaining(entry -> {
+            if (reduceContext.isFinalReduce() == false || entry.value.getDocCount() >= minDocCount) {
+                reducedBuckets.add(createBucket(entry.key, entry.value.getDocCount(), entry.value.get()));
+            }
+        });
+        return reducedBuckets;
+    }
+
+    /**
+     * Builds a bucket provided the key, the number of documents and the sub-aggregations.
+     */
+    protected abstract B createBucket(long key, long docCount, InternalAggregations aggregations);
+
+    @Override
+    public final void close() {
+        bucketsReducer.iterator().forEachRemaining(r -> Releasables.close(r.value));
+        Releasables.close(bucketsReducer);
+    }
+}