Browse Source

Add a PriorityQueue backed by BigArrays (#106361)

This implementation is heavily based on lucene's implementation but it is using an ObjectArray instead of java 
plain object arrays.
Ignacio Vera 1 year ago
parent
commit
2a2e648836
18 changed files with 873 additions and 213 deletions
  1. 5 0
      docs/changelog/106361.yaml
  2. 276 0
      server/src/main/java/org/elasticsearch/common/util/ObjectArrayPriorityQueue.java
  3. 3 3
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java
  4. 24 17
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java
  5. 8 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java
  6. 26 19
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java
  7. 5 4
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/BucketPriorityQueue.java
  8. 18 17
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java
  9. 15 14
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java
  10. 5 4
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java
  11. 5 4
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java
  12. 28 26
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java
  13. 24 23
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java
  14. 27 25
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java
  15. 29 28
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java
  16. 341 0
      server/src/test/java/org/elasticsearch/common/util/ObjectArrayPriorityQueueTests.java
  17. 1 1
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java
  18. 33 26
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java

+ 5 - 0
docs/changelog/106361.yaml

@@ -0,0 +1,5 @@
+pr: 106361
+summary: Add a `PriorityQueue` backed by `BigArrays`
+area: Aggregations
+type: enhancement
+issues: []

+ 276 - 0
server/src/main/java/org/elasticsearch/common/util/ObjectArrayPriorityQueue.java

@@ -0,0 +1,276 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.common.util;
+
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A priority queue maintains a partial ordering of its elements such that the least element can
+ * always be found in constant time. Put()'s and pop()'s require log(size) but the remove()
+ * cost implemented here is linear.
+ *
+ * <p><b>NOTE</b>: Iteration order is not specified.
+ *
+ * Based in lucene's {@link org.apache.lucene.util.PriorityQueue} but it uses a {@link ObjectArray} instead of plain {code Object[]}.
+ * This class only track the {@link ObjectArray} and not the memory usage of the elements. Furthermore,
+ * the elements are not closed even if they implement {@link Releasable}.
+ */
+public abstract class ObjectArrayPriorityQueue<T> implements Iterable<T>, Releasable {
+    private long size = 0;
+    private final long maxSize;
+    // package private for testing
+    final ObjectArray<T> heap;
+
+    /**
+     * Create a priority queue.
+     */
+    public ObjectArrayPriorityQueue(long maxSize, BigArrays bigArrays) {
+        final long heapSize;
+        if (0 == maxSize) {
+            // We allocate 1 extra to avoid if statement in top()
+            heapSize = 2;
+        } else {
+            if ((maxSize < 0) || (maxSize >= Long.MAX_VALUE)) {
+                // Throw exception to prevent confusing OOME:
+                throw new IllegalArgumentException("maxSize must be >= 0 and < " + (Long.MAX_VALUE) + "; got: " + maxSize);
+            }
+            // NOTE: we add +1 because all access to heap is
+            // 1-based not 0-based. heap[0] is unused.
+            heapSize = maxSize + 1;
+        }
+
+        this.heap = bigArrays.newObjectArray(heapSize);
+        this.maxSize = maxSize;
+    }
+
+    /**
+     * Determines the ordering of objects in this priority queue. Subclasses must define this one
+     * method.
+     *
+     * @return <code>true</code> iff parameter <code>a</code> is less than parameter <code>b</code>.
+     */
+    protected abstract boolean lessThan(T a, T b);
+
+    /**
+     * Adds an Object to a PriorityQueue in log(size) time. If one tries to add more objects than
+     * maxSize from initialize an {@link ArrayIndexOutOfBoundsException} is thrown.
+     *
+     * @return the new 'top' element in the queue.
+     */
+    public final T add(T element) {
+        // don't modify size until we know heap access didn't throw AIOOB.
+        long index = size + 1;
+        heap.set(index, element);
+        size = index;
+        upHeap(index);
+        return heap.get(1);
+    }
+
+    /**
+     * Adds all elements of the collection into the queue. This method should be preferred over
+     * calling {@link #add(Object)} in loop if all elements are known in advance as it builds queue
+     * faster.
+     *
+     * <p>If one tries to add more objects than the maxSize passed in the constructor, an {@link
+     * ArrayIndexOutOfBoundsException} is thrown.
+     */
+    public void addAll(Collection<T> elements) {
+        if (this.size + elements.size() > this.maxSize) {
+            throw new ArrayIndexOutOfBoundsException(
+                "Cannot add " + elements.size() + " elements to a queue with remaining capacity: " + (maxSize - size)
+            );
+        }
+
+        // Heap with size S always takes first S elements of the array,
+        // and thus it's safe to fill array further - no actual non-sentinel value will be overwritten.
+        Iterator<T> iterator = elements.iterator();
+        while (iterator.hasNext()) {
+            this.heap.set(size + 1, iterator.next());
+            this.size++;
+        }
+
+        // The loop goes down to 1 as heap is 1-based not 0-based.
+        for (long i = (size >>> 1); i >= 1; i--) {
+            downHeap(i);
+        }
+    }
+
+    /**
+     * Adds an Object to a PriorityQueue in log(size) time. It returns the object (if any) that was
+     * dropped off the heap because it was full. This can be the given parameter (in case it is
+     * smaller than the full heap's minimum, and couldn't be added), or another object that was
+     * previously the smallest value in the heap and now has been replaced by a larger one, or null if
+     * the queue wasn't yet full with maxSize elements.
+     */
+    public T insertWithOverflow(T element) {
+        if (size < maxSize) {
+            add(element);
+            return null;
+        } else if (size > 0 && lessThan(heap.get(1), element)) {
+            T ret = heap.get(1);
+            heap.set(1, element);
+            updateTop();
+            return ret;
+        } else {
+            return element;
+        }
+    }
+
+    /** Returns the least element of the PriorityQueue in constant time. */
+    public final T top() {
+        // We don't need to check size here: if maxSize is 0,
+        // then heap is length 2 array with both entries null.
+        // If size is 0 then heap[1] is already null.
+        return heap.get(1);
+    }
+
+    /** Removes and returns the least element of the PriorityQueue in log(size) time. */
+    public final T pop() {
+        if (size > 0) {
+            T result = heap.get(1); // save first value
+            heap.set(1, heap.get(size)); // move last to first
+            heap.set(size, null); // permit GC of objects
+            size--;
+            downHeap(1); // adjust heap
+            return result;
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Should be called when the Object at top changes values. Still log(n) worst case, but it's at
+     * least twice as fast to
+     *
+     * <pre class="prettyprint">
+     * pq.top().change();
+     * pq.updateTop();
+     * </pre>
+     *
+     * instead of
+     *
+     * <pre class="prettyprint">
+     * o = pq.pop();
+     * o.change();
+     * pq.push(o);
+     * </pre>
+     *
+     * @return the new 'top' element.
+     */
+    public final T updateTop() {
+        downHeap(1);
+        return heap.get(1);
+    }
+
+    /** Replace the top of the pq with {@code newTop} and run {@link #updateTop()}. */
+    public final T updateTop(T newTop) {
+        heap.set(1, newTop);
+        return updateTop();
+    }
+
+    /** Returns the number of elements currently stored in the PriorityQueue. */
+    public final long size() {
+        return size;
+    }
+
+    /** Removes all entries from the PriorityQueue. */
+    public final void clear() {
+        for (int i = 0; i <= size; i++) {
+            heap.set(i, null);
+        }
+        size = 0;
+    }
+
+    /**
+     * Removes an existing element currently stored in the PriorityQueue. Cost is linear with the size
+     * of the queue. (A specialization of PriorityQueue which tracks element positions would provide a
+     * constant remove time but the trade-off would be extra cost to all additions/insertions)
+     */
+    public final boolean remove(T element) {
+        for (int i = 1; i <= size; i++) {
+            if (heap.get(i) == element) {
+                heap.set(i, heap.get(size));
+                heap.set(size, null); // permit GC of objects
+                size--;
+                if (i <= size) {
+                    if (upHeap(i) == false) {
+                        downHeap(i);
+                    }
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean upHeap(long origPos) {
+        long i = origPos;
+        T node = heap.get(i); // save bottom node
+        long j = i >>> 1;
+        while (j > 0 && lessThan(node, heap.get(j))) {
+            heap.set(i, heap.get(j)); // shift parents down
+            i = j;
+            j = j >>> 1;
+        }
+        heap.set(i, node); // install saved node
+        return i != origPos;
+    }
+
+    private void downHeap(long i) {
+        T node = heap.get(i); // save top node
+        long j = i << 1; // find smaller child
+        long k = j + 1;
+        if (k <= size && lessThan(heap.get(k), heap.get(j))) {
+            j = k;
+        }
+        while (j <= size && lessThan(heap.get(j), node)) {
+            heap.set(i, heap.get(j)); // shift up child
+            i = j;
+            j = i << 1;
+            k = j + 1;
+            if (k <= size && lessThan(heap.get(k), heap.get(j))) {
+                j = k;
+            }
+        }
+        heap.set(i, node); // install saved node
+    }
+
+    @Override
+    public final Iterator<T> iterator() {
+        return new Iterator<>() {
+
+            long i = 1;
+
+            @Override
+            public boolean hasNext() {
+                return i <= size;
+            }
+
+            @Override
+            public T next() {
+                if (hasNext() == false) {
+                    throw new NoSuchElementException();
+                }
+                return heap.get(i++);
+            }
+        };
+    }
+
+    @Override
+    public final void close() {
+        Releasables.close(heap);
+        doClose();
+    }
+
+    protected void doClose() {}
+}

+ 3 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java

@@ -191,9 +191,9 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
             runDeferredCollections();
         }
 
-        int num = Math.min(size, queue.size());
+        int num = Math.min(size, (int) queue.size());
         final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
-        long[] bucketOrdsToCollect = new long[queue.size()];
+        long[] bucketOrdsToCollect = new long[(int) queue.size()];
         for (int i = 0; i < queue.size(); i++) {
             bucketOrdsToCollect[i] = i;
         }
@@ -203,7 +203,7 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
             CompositeKey key = queue.toCompositeKey(slot);
             InternalAggregations aggs = subAggsForBuckets.apply(slot);
             long docCount = queue.getDocCount(slot);
-            buckets[queue.size()] = new InternalComposite.InternalBucket(
+            buckets[(int) queue.size()] = new InternalComposite.InternalBucket(
                 sourceNames,
                 formats,
                 key,

+ 24 - 17
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java

@@ -11,11 +11,10 @@ package org.elasticsearch.search.aggregations.bucket.composite;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.CollectionTerminatedException;
-import org.apache.lucene.util.PriorityQueue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.LongArray;
 import org.elasticsearch.common.util.Maps;
-import org.elasticsearch.core.Releasable;
+import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
 
@@ -25,9 +24,9 @@ import java.util.Map;
 import static org.elasticsearch.core.Types.forciblyCast;
 
 /**
- * A specialized {@link PriorityQueue} implementation for composite buckets.
+ * A specialized {@link ObjectArrayPriorityQueue} implementation for composite buckets.
  */
-final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> implements Releasable {
+final class CompositeValuesCollectorQueue extends ObjectArrayPriorityQueue<Integer> {
     private class Slot {
         final int value;
 
@@ -74,25 +73,33 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
      * @param indexReader
      */
     CompositeValuesCollectorQueue(BigArrays bigArrays, SingleDimensionValuesSource<?>[] sources, int size, IndexReader indexReader) {
-        super(size);
+        super(size, bigArrays);
         this.bigArrays = bigArrays;
         this.maxSize = size;
         this.arrays = sources;
 
-        // If the leading source is a GlobalOrdinalValuesSource we can apply an optimization which requires
-        // tracking the highest competitive value.
-        if (arrays[0] instanceof GlobalOrdinalValuesSource globalOrdinalValuesSource) {
-            if (shouldApplyGlobalOrdinalDynamicPruningForLeadingSource(sources, size, indexReader)) {
-                competitiveBoundsChangedListener = globalOrdinalValuesSource::updateHighestCompetitiveValue;
+        boolean success = false;
+        try {
+            // If the leading source is a GlobalOrdinalValuesSource we can apply an optimization which requires
+            // tracking the highest competitive value.
+            if (arrays[0] instanceof GlobalOrdinalValuesSource globalOrdinalValuesSource) {
+                if (shouldApplyGlobalOrdinalDynamicPruningForLeadingSource(sources, size, indexReader)) {
+                    competitiveBoundsChangedListener = globalOrdinalValuesSource::updateHighestCompetitiveValue;
+                } else {
+                    competitiveBoundsChangedListener = null;
+                }
             } else {
                 competitiveBoundsChangedListener = null;
             }
-        } else {
-            competitiveBoundsChangedListener = null;
-        }
 
-        this.map = Maps.newMapWithExpectedSize(size);
-        this.docCounts = bigArrays.newLongArray(1, false);
+            this.map = Maps.newMapWithExpectedSize(size);
+            this.docCounts = bigArrays.newLongArray(1, false);
+            success = true;
+        } finally {
+            if (success == false) {
+                super.close();
+            }
+        }
     }
 
     private static boolean shouldApplyGlobalOrdinalDynamicPruningForLeadingSource(
@@ -385,7 +392,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
             // and we recycle the deleted slot
             newSlot = slot;
         } else {
-            newSlot = size();
+            newSlot = (int) size();
         }
         // move the candidate key to its new slot
         copyCurrent(newSlot, inc);
@@ -399,7 +406,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
     }
 
     @Override
-    public void close() {
+    protected void doClose() {
         Releasables.close(docCounts);
     }
 }

+ 8 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

@@ -9,9 +9,10 @@
 package org.elasticsearch.search.aggregations.bucket.composite;
 
 import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.PriorityQueue;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.AggregationReduceContext;
 import org.elasticsearch.search.aggregations.AggregatorReducer;
@@ -200,7 +201,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
     @Override
     protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
         return new AggregatorReducer() {
-            final PriorityQueue<BucketIterator> pq = new PriorityQueue<>(size) {
+            final ObjectArrayPriorityQueue<BucketIterator> pq = new ObjectArrayPriorityQueue<>(size, reduceContext.bigArrays()) {
                 @Override
                 protected boolean lessThan(BucketIterator a, BucketIterator b) {
                     return a.compareTo(b) < 0;
@@ -271,6 +272,11 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
                 reduced.validateAfterKey();
                 return reduced;
             }
+
+            @Override
+            public void close() {
+                Releasables.close(pq);
+            }
         };
     }
 

+ 26 - 19
server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java

@@ -92,27 +92,34 @@ class CountedTermsAggregator extends TermsAggregator {
             int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
 
             // as users can't control sort order, in practice we'll always sort by doc count descending
-            BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
-            StringTerms.Bucket spare = null;
-            BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
-            Supplier<StringTerms.Bucket> emptyBucketBuilder = () -> new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format);
-            while (ordsEnum.next()) {
-                long docCount = bucketDocCount(ordsEnum.ord());
-                otherDocCounts[ordIdx] += docCount;
-                if (spare == null) {
-                    spare = emptyBucketBuilder.get();
+            try (
+                BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(
+                    size,
+                    bigArrays(),
+                    partiallyBuiltBucketComparator
+                )
+            ) {
+                StringTerms.Bucket spare = null;
+                BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
+                Supplier<StringTerms.Bucket> emptyBucketBuilder = () -> new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format);
+                while (ordsEnum.next()) {
+                    long docCount = bucketDocCount(ordsEnum.ord());
+                    otherDocCounts[ordIdx] += docCount;
+                    if (spare == null) {
+                        spare = emptyBucketBuilder.get();
+                    }
+                    ordsEnum.readValue(spare.getTermBytes());
+                    spare.setDocCount(docCount);
+                    spare.setBucketOrd(ordsEnum.ord());
+                    spare = ordered.insertWithOverflow(spare);
                 }
-                ordsEnum.readValue(spare.getTermBytes());
-                spare.setDocCount(docCount);
-                spare.setBucketOrd(ordsEnum.ord());
-                spare = ordered.insertWithOverflow(spare);
-            }
 
-            topBucketsPerOrd[ordIdx] = new StringTerms.Bucket[ordered.size()];
-            for (int i = ordered.size() - 1; i >= 0; --i) {
-                topBucketsPerOrd[ordIdx][i] = ordered.pop();
-                otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount();
-                topBucketsPerOrd[ordIdx][i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd[ordIdx][i].getTermBytes()));
+                topBucketsPerOrd[ordIdx] = new StringTerms.Bucket[(int) ordered.size()];
+                for (int i = (int) ordered.size() - 1; i >= 0; --i) {
+                    topBucketsPerOrd[ordIdx][i] = ordered.pop();
+                    otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount();
+                    topBucketsPerOrd[ordIdx][i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd[ordIdx][i].getTermBytes()));
+                }
             }
         }
 

+ 5 - 4
server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/BucketPriorityQueue.java

@@ -7,12 +7,13 @@
  */
 package org.elasticsearch.search.aggregations.bucket.geogrid;
 
-import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
 
-class BucketPriorityQueue<B extends InternalGeoGridBucket> extends PriorityQueue<B> {
+class BucketPriorityQueue<B extends InternalGeoGridBucket> extends ObjectArrayPriorityQueue<B> {
 
-    BucketPriorityQueue(int size) {
-        super(size);
+    BucketPriorityQueue(int size, BigArrays bigArrays) {
+        super(size, bigArrays);
     }
 
     @Override

+ 18 - 17
server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java

@@ -136,25 +136,26 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid<?>> extends Bu
         for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
             int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
 
-            BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
-            InternalGeoGridBucket spare = null;
-            LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
-            while (ordsEnum.next()) {
-                if (spare == null) {
-                    spare = newEmptyBucket();
-                }
+            try (BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size, bigArrays())) {
+                InternalGeoGridBucket spare = null;
+                LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
+                while (ordsEnum.next()) {
+                    if (spare == null) {
+                        spare = newEmptyBucket();
+                    }
 
-                // need a special function to keep the source bucket
-                // up-to-date so it can get the appropriate key
-                spare.hashAsLong = ordsEnum.value();
-                spare.docCount = bucketDocCount(ordsEnum.ord());
-                spare.bucketOrd = ordsEnum.ord();
-                spare = ordered.insertWithOverflow(spare);
-            }
+                    // need a special function to keep the source bucket
+                    // up-to-date so it can get the appropriate key
+                    spare.hashAsLong = ordsEnum.value();
+                    spare.docCount = bucketDocCount(ordsEnum.ord());
+                    spare.bucketOrd = ordsEnum.ord();
+                    spare = ordered.insertWithOverflow(spare);
+                }
 
-            topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
-            for (int i = ordered.size() - 1; i >= 0; --i) {
-                topBucketsPerOrd[ordIdx][i] = ordered.pop();
+                topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[(int) ordered.size()];
+                for (int i = (int) ordered.size() - 1; i >= 0; --i) {
+                    topBucketsPerOrd[ordIdx][i] = ordered.pop();
+                }
             }
         }
         buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);

+ 15 - 14
server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java

@@ -78,12 +78,12 @@ public abstract class InternalGeoGrid<B extends InternalGeoGridBucket> extends I
     }
 
     @Override
-    protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
+    protected AggregatorReducer getLeaderReducer(AggregationReduceContext context, int size) {
         return new AggregatorReducer() {
 
             final LongObjectPagedHashMap<MultiBucketAggregatorsReducer> bucketsReducer = new LongObjectPagedHashMap<>(
                 size,
-                reduceContext.bigArrays()
+                context.bigArrays()
             );
 
             @Override
@@ -93,7 +93,7 @@ public abstract class InternalGeoGrid<B extends InternalGeoGridBucket> extends I
                 for (InternalGeoGridBucket bucket : grid.getBuckets()) {
                     MultiBucketAggregatorsReducer reducer = bucketsReducer.get(bucket.hashAsLong());
                     if (reducer == null) {
-                        reducer = new MultiBucketAggregatorsReducer(reduceContext, size);
+                        reducer = new MultiBucketAggregatorsReducer(context, size);
                         bucketsReducer.put(bucket.hashAsLong(), reducer);
                     }
                     reducer.accept(bucket);
@@ -103,19 +103,20 @@ public abstract class InternalGeoGrid<B extends InternalGeoGridBucket> extends I
             @Override
             public InternalAggregation get() {
                 final int size = Math.toIntExact(
-                    reduceContext.isFinalReduce() == false ? bucketsReducer.size() : Math.min(requiredSize, bucketsReducer.size())
+                    context.isFinalReduce() == false ? bucketsReducer.size() : Math.min(requiredSize, bucketsReducer.size())
                 );
-                final BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
-                bucketsReducer.iterator().forEachRemaining(entry -> {
-                    InternalGeoGridBucket bucket = createBucket(entry.key, entry.value.getDocCount(), entry.value.get());
-                    ordered.insertWithOverflow(bucket);
-                });
-                final InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
-                for (int i = ordered.size() - 1; i >= 0; i--) {
-                    list[i] = ordered.pop();
+                try (BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size, context.bigArrays())) {
+                    bucketsReducer.iterator().forEachRemaining(entry -> {
+                        InternalGeoGridBucket bucket = createBucket(entry.key, entry.value.getDocCount(), entry.value.get());
+                        ordered.insertWithOverflow(bucket);
+                    });
+                    final InternalGeoGridBucket[] list = new InternalGeoGridBucket[(int) ordered.size()];
+                    for (int i = (int) ordered.size() - 1; i >= 0; i--) {
+                        list[i] = ordered.pop();
+                    }
+                    context.consumeBucketsAndMaybeBreak(list.length);
+                    return create(getName(), requiredSize, Arrays.asList(list), getMetadata());
                 }
-                reduceContext.consumeBucketsAndMaybeBreak(list.length);
-                return create(getName(), requiredSize, Arrays.asList(list), getMetadata());
             }
 
             @Override

+ 5 - 4
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java

@@ -7,16 +7,17 @@
  */
 package org.elasticsearch.search.aggregations.bucket.terms;
 
-import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
 
 import java.util.Comparator;
 
-public class BucketPriorityQueue<B> extends PriorityQueue<B> {
+public class BucketPriorityQueue<B> extends ObjectArrayPriorityQueue<B> {
 
     private final Comparator<? super B> comparator;
 
-    public BucketPriorityQueue(int size, Comparator<? super B> comparator) {
-        super(size);
+    public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator<? super B> comparator) {
+        super(size, bigArrays);
         this.comparator = comparator;
     }
 

+ 5 - 4
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java

@@ -8,12 +8,13 @@
 
 package org.elasticsearch.search.aggregations.bucket.terms;
 
-import org.apache.lucene.util.PriorityQueue;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
 
-public class BucketSignificancePriorityQueue<B extends SignificantTerms.Bucket> extends PriorityQueue<B> {
+public class BucketSignificancePriorityQueue<B extends SignificantTerms.Bucket> extends ObjectArrayPriorityQueue<B> {
 
-    public BucketSignificancePriorityQueue(int size) {
-        super(size);
+    public BucketSignificancePriorityQueue(int size, BigArrays bigArrays) {
+        super(size, bigArrays);
     }
 
     @Override

+ 28 - 26
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.LongArray;
 import org.elasticsearch.common.util.LongHash;
+import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
@@ -710,30 +711,31 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
                 } else {
                     size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
                 }
-                PriorityQueue<TB> ordered = buildPriorityQueue(size);
-                final int finalOrdIdx = ordIdx;
-                BucketUpdater<TB> updater = bucketUpdater(owningBucketOrds[ordIdx], lookupGlobalOrd);
-                collectionStrategy.forEach(owningBucketOrds[ordIdx], new BucketInfoConsumer() {
-                    TB spare = null;
-
-                    @Override
-                    public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
-                        otherDocCount[finalOrdIdx] += docCount;
-                        if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
-                            if (spare == null) {
-                                spare = buildEmptyTemporaryBucket();
+                try (ObjectArrayPriorityQueue<TB> ordered = buildPriorityQueue(size)) {
+                    final int finalOrdIdx = ordIdx;
+                    BucketUpdater<TB> updater = bucketUpdater(owningBucketOrds[ordIdx], lookupGlobalOrd);
+                    collectionStrategy.forEach(owningBucketOrds[ordIdx], new BucketInfoConsumer() {
+                        TB spare = null;
+
+                        @Override
+                        public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
+                            otherDocCount[finalOrdIdx] += docCount;
+                            if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
+                                if (spare == null) {
+                                    spare = buildEmptyTemporaryBucket();
+                                }
+                                updater.updateBucket(spare, globalOrd, bucketOrd, docCount);
+                                spare = ordered.insertWithOverflow(spare);
                             }
-                            updater.updateBucket(spare, globalOrd, bucketOrd, docCount);
-                            spare = ordered.insertWithOverflow(spare);
                         }
-                    }
-                });
+                    });
 
-                // Get the top buckets
-                topBucketsPreOrd[ordIdx] = buildBuckets(ordered.size());
-                for (int i = ordered.size() - 1; i >= 0; --i) {
-                    topBucketsPreOrd[ordIdx][i] = convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd);
-                    otherDocCount[ordIdx] -= topBucketsPreOrd[ordIdx][i].getDocCount();
+                    // Get the top buckets
+                    topBucketsPreOrd[ordIdx] = buildBuckets((int) ordered.size());
+                    for (int i = (int) ordered.size() - 1; i >= 0; --i) {
+                        topBucketsPreOrd[ordIdx][i] = convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd);
+                        otherDocCount[ordIdx] -= topBucketsPreOrd[ordIdx][i].getDocCount();
+                    }
                 }
             }
 
@@ -773,7 +775,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
          * Build a {@link PriorityQueue} to sort the buckets. After we've
          * collected all of the buckets we'll collect all entries in the queue.
          */
-        abstract PriorityQueue<TB> buildPriorityQueue(int size);
+        abstract ObjectArrayPriorityQueue<TB> buildPriorityQueue(int size);
 
         /**
          * Build an array to hold the "top" buckets for each ordinal.
@@ -858,8 +860,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
         }
 
         @Override
-        PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
-            return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
+        ObjectArrayPriorityQueue<OrdBucket> buildPriorityQueue(int size) {
+            return new BucketPriorityQueue<>(size, bigArrays(), partiallyBuiltBucketComparator);
         }
 
         @Override
@@ -1006,8 +1008,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
         }
 
         @Override
-        PriorityQueue<SignificantStringTerms.Bucket> buildPriorityQueue(int size) {
-            return new BucketSignificancePriorityQueue<>(size);
+        ObjectArrayPriorityQueue<SignificantStringTerms.Bucket> buildPriorityQueue(int size) {
+            return new BucketSignificancePriorityQueue<>(size, bigArrays());
         }
 
         @Override

+ 24 - 23
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java

@@ -227,33 +227,34 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
             public InternalAggregation get() {
                 final SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext);
                 final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());
-                final BucketSignificancePriorityQueue<B> ordered = new BucketSignificancePriorityQueue<>(size);
-                for (ReducerAndProto<B> reducerAndProto : buckets.values()) {
-                    final B b = createBucket(
-                        reducerAndProto.subsetDf[0],
-                        globalSubsetSize,
-                        reducerAndProto.supersetDf[0],
-                        globalSupersetSize,
-                        reducerAndProto.reducer.get(),
-                        reducerAndProto.proto
-                    );
-                    b.updateScore(heuristic);
-                    if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) {
-                        final B removed = ordered.insertWithOverflow(b);
-                        if (removed == null) {
-                            reduceContext.consumeBucketsAndMaybeBreak(1);
+                try (BucketSignificancePriorityQueue<B> ordered = new BucketSignificancePriorityQueue<>(size, reduceContext.bigArrays())) {
+                    for (ReducerAndProto<B> reducerAndProto : buckets.values()) {
+                        final B b = createBucket(
+                            reducerAndProto.subsetDf[0],
+                            globalSubsetSize,
+                            reducerAndProto.supersetDf[0],
+                            globalSupersetSize,
+                            reducerAndProto.reducer.get(),
+                            reducerAndProto.proto
+                        );
+                        b.updateScore(heuristic);
+                        if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) {
+                            final B removed = ordered.insertWithOverflow(b);
+                            if (removed == null) {
+                                reduceContext.consumeBucketsAndMaybeBreak(1);
+                            } else {
+                                reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
+                            }
                         } else {
-                            reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
+                            reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
                         }
-                    } else {
-                        reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b));
                     }
+                    final B[] list = createBucketsArray((int) ordered.size());
+                    for (int i = (int) ordered.size() - 1; i >= 0; i--) {
+                        list[i] = ordered.pop();
+                    }
+                    return create(globalSubsetSize, globalSupersetSize, Arrays.asList(list));
                 }
-                final B[] list = createBucketsArray(ordered.size());
-                for (int i = ordered.size() - 1; i >= 0; i--) {
-                    list[i] = ordered.pop();
-                }
-                return create(globalSubsetSize, globalSupersetSize, Arrays.asList(list));
             }
 
             @Override

+ 27 - 25
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java

@@ -13,6 +13,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
 import org.apache.lucene.util.PriorityQueue;
 import org.elasticsearch.common.util.LongArray;
+import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
@@ -250,28 +251,29 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
                 collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx], excludeDeletedDocs);
                 int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
 
-                PriorityQueue<B> ordered = buildPriorityQueue(size);
-                B spare = null;
-                BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
-                Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
-                while (ordsEnum.next()) {
-                    long docCount = bucketDocCount(ordsEnum.ord());
-                    otherDocCounts[ordIdx] += docCount;
-                    if (docCount < bucketCountThresholds.getShardMinDocCount()) {
-                        continue;
-                    }
-                    if (spare == null) {
-                        spare = emptyBucketBuilder.get();
+                try (ObjectArrayPriorityQueue<B> ordered = buildPriorityQueue(size)) {
+                    B spare = null;
+                    BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
+                    Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
+                    while (ordsEnum.next()) {
+                        long docCount = bucketDocCount(ordsEnum.ord());
+                        otherDocCounts[ordIdx] += docCount;
+                        if (docCount < bucketCountThresholds.getShardMinDocCount()) {
+                            continue;
+                        }
+                        if (spare == null) {
+                            spare = emptyBucketBuilder.get();
+                        }
+                        updateBucket(spare, ordsEnum, docCount);
+                        spare = ordered.insertWithOverflow(spare);
                     }
-                    updateBucket(spare, ordsEnum, docCount);
-                    spare = ordered.insertWithOverflow(spare);
-                }
 
-                topBucketsPerOrd[ordIdx] = buildBuckets(ordered.size());
-                for (int i = ordered.size() - 1; i >= 0; --i) {
-                    topBucketsPerOrd[ordIdx][i] = ordered.pop();
-                    otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount();
-                    finalizeBucket(topBucketsPerOrd[ordIdx][i]);
+                    topBucketsPerOrd[ordIdx] = buildBuckets((int) ordered.size());
+                    for (int i = (int) ordered.size() - 1; i >= 0; --i) {
+                        topBucketsPerOrd[ordIdx][i] = ordered.pop();
+                        otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount();
+                        finalizeBucket(topBucketsPerOrd[ordIdx][i]);
+                    }
                 }
             }
 
@@ -310,7 +312,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
          * Build a {@link PriorityQueue} to sort the buckets. After we've
          * collected all of the buckets we'll collect all entries in the queue.
          */
-        abstract PriorityQueue<B> buildPriorityQueue(int size);
+        abstract ObjectArrayPriorityQueue<B> buildPriorityQueue(int size);
 
         /**
          * Update fields in {@code spare} to reflect information collected for
@@ -408,8 +410,8 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
         }
 
         @Override
-        PriorityQueue<StringTerms.Bucket> buildPriorityQueue(int size) {
-            return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
+        ObjectArrayPriorityQueue<StringTerms.Bucket> buildPriorityQueue(int size) {
+            return new BucketPriorityQueue<>(size, bigArrays(), partiallyBuiltBucketComparator);
         }
 
         @Override
@@ -534,8 +536,8 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
         }
 
         @Override
-        PriorityQueue<SignificantStringTerms.Bucket> buildPriorityQueue(int size) {
-            return new BucketSignificancePriorityQueue<>(size);
+        ObjectArrayPriorityQueue<SignificantStringTerms.Bucket> buildPriorityQueue(int size) {
+            return new BucketSignificancePriorityQueue<>(size, bigArrays());
         }
 
         @Override

+ 29 - 28
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java

@@ -11,8 +11,8 @@ import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.SortedNumericDocValues;
 import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.util.NumericUtils;
-import org.apache.lucene.util.PriorityQueue;
 import org.elasticsearch.common.util.LongArray;
+import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.fielddata.FieldData;
@@ -151,29 +151,30 @@ public final class NumericTermsAggregator extends TermsAggregator {
                 long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
 
                 int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
-                PriorityQueue<B> ordered = buildPriorityQueue(size);
-                B spare = null;
-                BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
-                Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
-                while (ordsEnum.next()) {
-                    long docCount = bucketDocCount(ordsEnum.ord());
-                    otherDocCounts[ordIdx] += docCount;
-                    if (docCount < bucketCountThresholds.getShardMinDocCount()) {
-                        continue;
-                    }
-                    if (spare == null) {
-                        spare = emptyBucketBuilder.get();
+                try (ObjectArrayPriorityQueue<B> ordered = buildPriorityQueue(size)) {
+                    B spare = null;
+                    BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
+                    Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
+                    while (ordsEnum.next()) {
+                        long docCount = bucketDocCount(ordsEnum.ord());
+                        otherDocCounts[ordIdx] += docCount;
+                        if (docCount < bucketCountThresholds.getShardMinDocCount()) {
+                            continue;
+                        }
+                        if (spare == null) {
+                            spare = emptyBucketBuilder.get();
+                        }
+                        updateBucket(spare, ordsEnum, docCount);
+                        spare = ordered.insertWithOverflow(spare);
                     }
-                    updateBucket(spare, ordsEnum, docCount);
-                    spare = ordered.insertWithOverflow(spare);
-                }
 
-                // Get the top buckets
-                B[] bucketsForOrd = buildBuckets(ordered.size());
-                topBucketsPerOrd[ordIdx] = bucketsForOrd;
-                for (int b = ordered.size() - 1; b >= 0; --b) {
-                    topBucketsPerOrd[ordIdx][b] = ordered.pop();
-                    otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
+                    // Get the top buckets
+                    B[] bucketsForOrd = buildBuckets((int) ordered.size());
+                    topBucketsPerOrd[ordIdx] = bucketsForOrd;
+                    for (int b = (int) ordered.size() - 1; b >= 0; --b) {
+                        topBucketsPerOrd[ordIdx][b] = ordered.pop();
+                        otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
+                    }
                 }
             }
 
@@ -228,10 +229,10 @@ public final class NumericTermsAggregator extends TermsAggregator {
         abstract void updateBucket(B spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException;
 
         /**
-         * Build a {@link PriorityQueue} to sort the buckets. After we've
+         * Build a {@link ObjectArrayPriorityQueue} to sort the buckets. After we've
          * collected all of the buckets we'll collect all entries in the queue.
          */
-        abstract PriorityQueue<B> buildPriorityQueue(int size);
+        abstract ObjectArrayPriorityQueue<B> buildPriorityQueue(int size);
 
         /**
          * Build the sub-aggregations into the buckets. This will usually
@@ -271,8 +272,8 @@ public final class NumericTermsAggregator extends TermsAggregator {
         }
 
         @Override
-        final PriorityQueue<B> buildPriorityQueue(int size) {
-            return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
+        final ObjectArrayPriorityQueue<B> buildPriorityQueue(int size) {
+            return new BucketPriorityQueue<>(size, bigArrays(), partiallyBuiltBucketComparator);
         }
 
         @Override
@@ -557,8 +558,8 @@ public final class NumericTermsAggregator extends TermsAggregator {
         }
 
         @Override
-        PriorityQueue<SignificantLongTerms.Bucket> buildPriorityQueue(int size) {
-            return new BucketSignificancePriorityQueue<>(size);
+        ObjectArrayPriorityQueue<SignificantLongTerms.Bucket> buildPriorityQueue(int size) {
+            return new BucketSignificancePriorityQueue<>(size, bigArrays());
         }
 
         @Override

+ 341 - 0
server/src/test/java/org/elasticsearch/common/util/ObjectArrayPriorityQueueTests.java

@@ -0,0 +1,341 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.common.util;
+
+import org.apache.lucene.tests.util.TestUtil;
+import org.apache.lucene.util.CollectionUtil;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+public class ObjectArrayPriorityQueueTests extends ESTestCase {
+
+    private static BigArrays randombigArrays() {
+        return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
+    }
+
+    private static class IntegerQueue extends ObjectArrayPriorityQueue<Integer> {
+        IntegerQueue(int count, BigArrays bigArrays) {
+            super(count, bigArrays);
+        }
+
+        @Override
+        protected boolean lessThan(Integer a, Integer b) {
+            return (a < b);
+        }
+
+        protected final void checkValidity() {
+            for (int i = 1; i <= size(); i++) {
+                int parent = i >>> 1;
+                if (parent > 1) {
+                    if (lessThan(heap.get(parent), heap.get(i)) == false) {
+                        assertThat(heap.get(i), Matchers.equalTo(heap.get(parent)));
+                    }
+                }
+            }
+        }
+    }
+
+    public void testZeroSizedQueue() {
+        try (ObjectArrayPriorityQueue<Integer> pq = new IntegerQueue(0, randombigArrays())) {
+            assertEquals((Object) 1, pq.insertWithOverflow(1));
+            assertEquals(0, pq.size());
+
+            // should fail, but passes and modifies the top...
+            pq.add(1);
+            assertEquals((Object) 1, pq.top());
+        }
+    }
+
+    public void testNoExtraWorkOnEqualElements() {
+        class Value {
+            private final int index;
+            private final int value;
+
+            Value(int index, int value) {
+                this.index = index;
+                this.value = value;
+            }
+        }
+
+        try (ObjectArrayPriorityQueue<Value> pq = new ObjectArrayPriorityQueue<>(5, randombigArrays()) {
+            @Override
+            protected boolean lessThan(Value a, Value b) {
+                return a.value < b.value;
+            }
+        }) {
+
+            // Make all elements equal but record insertion order.
+            for (int i = 0; i < 100; i++) {
+                pq.insertWithOverflow(new Value(i, 0));
+            }
+
+            ArrayList<Integer> indexes = new ArrayList<>();
+            for (Value e : pq) {
+                indexes.add(e.index);
+            }
+
+            // All elements are "equal" so we should have exactly the indexes of those elements that were
+            // added first.
+            MatcherAssert.assertThat(indexes, Matchers.containsInAnyOrder(0, 1, 2, 3, 4));
+        }
+    }
+
+    public void testPQ() {
+        testPQ(atLeast(10000), random());
+    }
+
+    public static void testPQ(int count, Random gen) {
+        try (ObjectArrayPriorityQueue<Integer> pq = new IntegerQueue(count, randombigArrays())) {
+            int sum = 0, sum2 = 0;
+
+            for (int i = 0; i < count; i++) {
+                int next = gen.nextInt();
+                sum += next;
+                pq.add(next);
+            }
+
+            int last = Integer.MIN_VALUE;
+            for (int i = 0; i < count; i++) {
+                Integer next = pq.pop();
+                assertTrue(next.intValue() >= last);
+                last = next.intValue();
+                sum2 += last;
+            }
+
+            assertEquals(sum, sum2);
+        }
+    }
+
+    public void testFixedSize() {
+        try (ObjectArrayPriorityQueue<Integer> pq = new IntegerQueue(3, randombigArrays())) {
+            pq.insertWithOverflow(2);
+            pq.insertWithOverflow(3);
+            pq.insertWithOverflow(1);
+            pq.insertWithOverflow(5);
+            pq.insertWithOverflow(7);
+            pq.insertWithOverflow(1);
+            assertEquals(3, pq.size());
+            assertEquals((Integer) 3, pq.top());
+        }
+    }
+
+    public void testInsertWithOverflow() {
+        int size = 4;
+        try (ObjectArrayPriorityQueue<Integer> pq = new IntegerQueue(size, randombigArrays())) {
+            Integer i1 = 2;
+            Integer i2 = 3;
+            Integer i3 = 1;
+            Integer i4 = 5;
+            Integer i5 = 7;
+            Integer i6 = 1;
+
+            assertNull(pq.insertWithOverflow(i1));
+            assertNull(pq.insertWithOverflow(i2));
+            assertNull(pq.insertWithOverflow(i3));
+            assertNull(pq.insertWithOverflow(i4));
+            assertSame(pq.insertWithOverflow(i5), i3); // i3 should have been dropped
+            assertSame(pq.insertWithOverflow(i6), i6); // i6 should not have been inserted
+            assertEquals(size, pq.size());
+            assertEquals((Integer) 2, pq.top());
+        }
+    }
+
+    public void testAddAllToEmptyQueue() {
+        Random random = random();
+        int size = 10;
+        List<Integer> list = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            list.add(random.nextInt());
+        }
+        try (IntegerQueue pq = new IntegerQueue(size, randombigArrays())) {
+            pq.addAll(list);
+            pq.checkValidity();
+            assertOrderedWhenDrained(pq, list);
+        }
+    }
+
+    public void testAddAllToPartiallyFilledQueue() {
+        try (IntegerQueue pq = new IntegerQueue(20, randombigArrays())) {
+            List<Integer> oneByOne = new ArrayList<>();
+            List<Integer> bulkAdded = new ArrayList<>();
+            Random random = random();
+            for (int i = 0; i < 10; i++) {
+                bulkAdded.add(random.nextInt());
+
+                int x = random.nextInt();
+                pq.add(x);
+                oneByOne.add(x);
+            }
+
+            pq.addAll(bulkAdded);
+            pq.checkValidity();
+
+            oneByOne.addAll(bulkAdded); // Gather all "reference" data.
+            assertOrderedWhenDrained(pq, oneByOne);
+        }
+    }
+
+    public void testAddAllDoesNotFitIntoQueue() {
+        try (IntegerQueue pq = new IntegerQueue(20, randombigArrays())) {
+            List<Integer> list = new ArrayList<>();
+            Random random = random();
+            for (int i = 0; i < 11; i++) {
+                list.add(random.nextInt());
+                pq.add(random.nextInt());
+            }
+
+            assertThrows(
+                "Cannot add 11 elements to a queue with remaining capacity: 9",
+                ArrayIndexOutOfBoundsException.class,
+                () -> pq.addAll(list)
+            );
+        }
+    }
+
+    public void testRemovalsAndInsertions() {
+        Random random = random();
+        int numDocsInPQ = TestUtil.nextInt(random, 1, 100);
+        try (IntegerQueue pq = new IntegerQueue(numDocsInPQ, randombigArrays())) {
+            Integer lastLeast = null;
+
+            // Basic insertion of new content
+            ArrayList<Integer> sds = new ArrayList<Integer>(numDocsInPQ);
+            for (int i = 0; i < numDocsInPQ * 10; i++) {
+                Integer newEntry = Math.abs(random.nextInt());
+                sds.add(newEntry);
+                Integer evicted = pq.insertWithOverflow(newEntry);
+                pq.checkValidity();
+                if (evicted != null) {
+                    assertTrue(sds.remove(evicted));
+                    if (evicted != newEntry) {
+                        assertSame(evicted, lastLeast);
+                    }
+                }
+                Integer newLeast = pq.top();
+                if ((lastLeast != null) && (newLeast != newEntry) && (newLeast != lastLeast)) {
+                    // If there has been a change of least entry and it wasn't our new
+                    // addition we expect the scores to increase
+                    assertTrue(newLeast <= newEntry);
+                    assertTrue(newLeast >= lastLeast);
+                }
+                lastLeast = newLeast;
+            }
+
+            // Try many random additions to existing entries - we should always see
+            // increasing scores in the lowest entry in the PQ
+            for (int p = 0; p < 500000; p++) {
+                int element = (int) (random.nextFloat() * (sds.size() - 1));
+                Integer objectToRemove = sds.get(element);
+                assertSame(sds.remove(element), objectToRemove);
+                assertTrue(pq.remove(objectToRemove));
+                pq.checkValidity();
+                Integer newEntry = Math.abs(random.nextInt());
+                sds.add(newEntry);
+                assertNull(pq.insertWithOverflow(newEntry));
+                pq.checkValidity();
+                Integer newLeast = pq.top();
+                if ((objectToRemove != lastLeast) && (lastLeast != null) && (newLeast != newEntry)) {
+                    // If there has been a change of least entry and it wasn't our new
+                    // addition or the loss of our randomly removed entry we expect the
+                    // scores to increase
+                    assertTrue(newLeast <= newEntry);
+                    assertTrue(newLeast >= lastLeast);
+                }
+                lastLeast = newLeast;
+            }
+        }
+    }
+
+    public void testIteratorEmpty() {
+        try (IntegerQueue queue = new IntegerQueue(3, randombigArrays())) {
+            Iterator<Integer> it = queue.iterator();
+            assertFalse(it.hasNext());
+            expectThrows(NoSuchElementException.class, () -> { it.next(); });
+        }
+    }
+
+    public void testIteratorOne() {
+        try (IntegerQueue queue = new IntegerQueue(3, randombigArrays())) {
+            queue.add(1);
+            Iterator<Integer> it = queue.iterator();
+            assertTrue(it.hasNext());
+            assertEquals(Integer.valueOf(1), it.next());
+            assertFalse(it.hasNext());
+            expectThrows(NoSuchElementException.class, () -> { it.next(); });
+        }
+    }
+
+    public void testIteratorTwo() {
+        try (IntegerQueue queue = new IntegerQueue(3, randombigArrays())) {
+            queue.add(1);
+            queue.add(2);
+            Iterator<Integer> it = queue.iterator();
+            assertTrue(it.hasNext());
+            assertEquals(Integer.valueOf(1), it.next());
+            assertTrue(it.hasNext());
+            assertEquals(Integer.valueOf(2), it.next());
+            assertFalse(it.hasNext());
+            expectThrows(NoSuchElementException.class, () -> { it.next(); });
+        }
+    }
+
+    public void testIteratorRandom() {
+        final int maxSize = TestUtil.nextInt(random(), 1, 20);
+        try (IntegerQueue queue = new IntegerQueue(maxSize, randombigArrays())) {
+            final int iters = atLeast(100);
+            final List<Integer> expected = new ArrayList<>();
+            for (int iter = 0; iter < iters; ++iter) {
+                if (queue.size() == 0 || (queue.size() < maxSize && random().nextBoolean())) {
+                    final Integer value = random().nextInt(10);
+                    queue.add(value);
+                    expected.add(value);
+                } else {
+                    expected.remove(queue.pop());
+                }
+                List<Integer> actual = new ArrayList<>();
+                for (Integer value : queue) {
+                    actual.add(value);
+                }
+                CollectionUtil.introSort(expected);
+                CollectionUtil.introSort(actual);
+                assertEquals(expected, actual);
+            }
+        }
+    }
+
+    public void testMaxIntSize() {
+        expectThrows(IllegalArgumentException.class, () -> {
+            new ObjectArrayPriorityQueue<Boolean>(Long.MAX_VALUE, randombigArrays()) {
+                @Override
+                public boolean lessThan(Boolean a, Boolean b) {
+                    // uncalled
+                    return true;
+                }
+            };
+        });
+    }
+
+    private void assertOrderedWhenDrained(IntegerQueue pq, List<Integer> referenceDataList) {
+        Collections.sort(referenceDataList);
+        int i = 0;
+        while (pq.size() > 0) {
+            assertEquals(pq.pop(), referenceDataList.get(i));
+            i++;
+        }
+    }
+}

+ 1 - 1
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java

@@ -357,7 +357,7 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
                         }
                     }
                     assertEquals(size, Math.min(queue.size(), expected.length - pos));
-                    int ptr = pos + (queue.size() - 1);
+                    int ptr = pos + ((int) queue.size() - 1);
                     pos += queue.size();
                     last = null;
                     while (queue.size() > pos) {

+ 33 - 26
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java

@@ -12,12 +12,12 @@ import org.apache.lucene.index.SortedNumericDocValues;
 import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
-import org.apache.lucene.util.PriorityQueue;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
@@ -237,33 +237,40 @@ class MultiTermsAggregator extends DeferableBucketAggregator {
             long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
 
             int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
-            PriorityQueue<InternalMultiTerms.Bucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
-            InternalMultiTerms.Bucket spare = null;
-            BytesRef spareKey = null;
-            BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
-            while (ordsEnum.next()) {
-                long docCount = bucketDocCount(ordsEnum.ord());
-                otherDocCounts[ordIdx] += docCount;
-                if (docCount < bucketCountThresholds.getShardMinDocCount()) {
-                    continue;
-                }
-                if (spare == null) {
-                    spare = new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters);
-                    spareKey = new BytesRef();
+            try (
+                ObjectArrayPriorityQueue<InternalMultiTerms.Bucket> ordered = new BucketPriorityQueue<>(
+                    size,
+                    bigArrays(),
+                    partiallyBuiltBucketComparator
+                )
+            ) {
+                InternalMultiTerms.Bucket spare = null;
+                BytesRef spareKey = null;
+                BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
+                while (ordsEnum.next()) {
+                    long docCount = bucketDocCount(ordsEnum.ord());
+                    otherDocCounts[ordIdx] += docCount;
+                    if (docCount < bucketCountThresholds.getShardMinDocCount()) {
+                        continue;
+                    }
+                    if (spare == null) {
+                        spare = new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters);
+                        spareKey = new BytesRef();
+                    }
+                    ordsEnum.readValue(spareKey);
+                    spare.terms = unpackTerms(spareKey);
+                    spare.docCount = docCount;
+                    spare.bucketOrd = ordsEnum.ord();
+                    spare = ordered.insertWithOverflow(spare);
                 }
-                ordsEnum.readValue(spareKey);
-                spare.terms = unpackTerms(spareKey);
-                spare.docCount = docCount;
-                spare.bucketOrd = ordsEnum.ord();
-                spare = ordered.insertWithOverflow(spare);
-            }
 
-            // Get the top buckets
-            InternalMultiTerms.Bucket[] bucketsForOrd = new InternalMultiTerms.Bucket[ordered.size()];
-            topBucketsPerOrd[ordIdx] = bucketsForOrd;
-            for (int b = ordered.size() - 1; b >= 0; --b) {
-                topBucketsPerOrd[ordIdx][b] = ordered.pop();
-                otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
+                // Get the top buckets
+                InternalMultiTerms.Bucket[] bucketsForOrd = new InternalMultiTerms.Bucket[(int) ordered.size()];
+                topBucketsPerOrd[ordIdx] = bucketsForOrd;
+                for (int b = (int) ordered.size() - 1; b >= 0; --b) {
+                    topBucketsPerOrd[ordIdx][b] = ordered.pop();
+                    otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
+                }
             }
         }