Browse Source

Fold some of sig_terms into terms (#57361)

This merges the global-ordinals-based implementation for
`significant_terms` into the global-ordinals-based implementation of
`terms`, removing a bunch of copy and pasted code that is subtly
different across the two implementations and replacing it with an
explicit `ResultStrategy` with nice stuff like Javadoc.

The actual behavior is mostly unchanged, though I was able to remove a
redundant copy of bytes representing the string from the result
construction phase of `significant_terms`.
Nik Everett 5 years ago
parent
commit
10dc144693

+ 12 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java

@@ -19,11 +19,13 @@
 
 package org.elasticsearch.search.aggregations.bucket.terms;
 
+import org.apache.lucene.index.IndexReader;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.Aggregator;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.BucketOrder;
-import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
+import org.elasticsearch.search.internal.ContextIndexSearcher;
 import org.elasticsearch.search.internal.SearchContext;
 
 import java.io.IOException;
@@ -42,10 +44,17 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {
         this.showTermDocCountError = showTermDocCountError;
     }
 
-    @Override
-    public InternalAggregation buildEmptyAggregation() {
+    protected StringTerms buildEmptyTermsAggregation() {
         return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
                 metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
     }
 
+    protected SignificantStringTerms buildEmptySignificantTermsAggregation(SignificanceHeuristic significanceHeuristic) {
+        // We need to account for the significance of a miss in our global stats - provide corpus size as context
+        ContextIndexSearcher searcher = context.searcher();
+        IndexReader topReader = searcher.getIndexReader();
+        int supersetSize = topReader.numDocs();
+        return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+                metadata(), format, 0, supersetSize, significanceHeuristic, emptyList());
+    }
 }

+ 0 - 160
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsSignificantTermsAggregator.java

@@ -1,160 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.search.aggregations.bucket.terms;
-
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.common.lease.Releasables;
-import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.Aggregator;
-import org.elasticsearch.search.aggregations.AggregatorFactories;
-import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.LeafBucketCollector;
-import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
-import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
-import org.elasticsearch.search.aggregations.support.ValuesSource;
-import org.elasticsearch.search.internal.ContextIndexSearcher;
-import org.elasticsearch.search.internal.SearchContext;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
-
-import static java.util.Collections.emptyList;
-
-/**
- * An global ordinal based implementation of significant terms, based on {@link SignificantStringTermsAggregator}.
- */
-public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStringTermsAggregator {
-
-    protected long numCollectedDocs;
-    protected final SignificantTermsAggregatorFactory termsAggFactory;
-    private final SignificanceHeuristic significanceHeuristic;
-
-    public GlobalOrdinalsSignificantTermsAggregator(String name,
-                                                    AggregatorFactories factories,
-                                                    ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
-                                                    DocValueFormat format,
-                                                    BucketCountThresholds bucketCountThresholds,
-                                                    IncludeExclude.OrdinalsFilter includeExclude,
-                                                    SearchContext context,
-                                                    Aggregator parent,
-                                                    boolean forceRemapGlobalOrds,
-                                                    SignificanceHeuristic significanceHeuristic,
-                                                    SignificantTermsAggregatorFactory termsAggFactory,
-                                                    Map<String, Object> metadata) throws IOException {
-        super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, context, parent,
-            forceRemapGlobalOrds, SubAggCollectionMode.BREADTH_FIRST, false, metadata);
-        this.significanceHeuristic = significanceHeuristic;
-        this.termsAggFactory = termsAggFactory;
-        this.numCollectedDocs = 0;
-    }
-
-    @Override
-    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
-        return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) {
-            @Override
-            public void collect(int doc, long bucket) throws IOException {
-                super.collect(doc, bucket);
-                numCollectedDocs++;
-            }
-        };
-    }
-
-    @Override
-    public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
-        assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
-        if (valueCount == 0) { // no context in this reader
-            return new InternalAggregation[] {buildEmptyAggregation()};
-        }
-
-        final int size;
-        if (bucketCountThresholds.getMinDocCount() == 0) {
-            // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
-            size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
-        } else {
-            size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
-        }
-        long supersetSize = termsAggFactory.getSupersetNumDocs();
-        long subsetSize = numCollectedDocs;
-
-        BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
-        collectionStrategy.forEach(new BucketInfoConsumer() {
-            SignificantStringTerms.Bucket spare = null;
-
-            @Override
-            public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
-                if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
-                    if (spare == null) {
-                        spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
-                    }
-                    spare.bucketOrd = bucketOrd;
-                    copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
-                    spare.subsetDf = docCount;
-                    spare.subsetSize = subsetSize;
-                    spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
-                    spare.supersetSize = supersetSize;
-                    // During shard-local down-selection we use subset/superset stats
-                    // that are for this shard only
-                    // Back at the central reducer these properties will be updated with
-                    // global stats
-                    spare.updateScore(significanceHeuristic);
-                    spare = ordered.insertWithOverflow(spare);
-                    if (spare == null) {
-                        consumeBucketsAndMaybeBreak(1);
-                    }
-                }
-            }
-        });
-
-        final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
-        for (int i = ordered.size() - 1; i >= 0; i--) {
-            list[i] = ordered.pop();
-            /*
-             * The terms are owned by the BytesRefHash which will close after
-             * we're finished building the aggregation so we need to pull a copy.
-             */
-            list[i].termBytes = BytesRef.deepCopyOf(list[i].termBytes);
-        }
-        buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
-
-        return new InternalAggregation[] {
-            new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
-                metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list))
-        };
-    }
-
-    @Override
-    public SignificantStringTerms buildEmptyAggregation() {
-        // We need to account for the significance of a miss in our global stats - provide corpus size as context
-        ContextIndexSearcher searcher = context.searcher();
-        IndexReader topReader = searcher.getIndexReader();
-        int supersetSize = topReader.numDocs();
-        return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
-                metadata(), format, numCollectedDocs, supersetSize, significanceHeuristic, emptyList());
-    }
-
-    @Override
-    protected void doClose() {
-        super.doClose();
-        Releasables.close(termsAggFactory);
-    }
-}
-

+ 377 - 122
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

@@ -26,6 +26,7 @@ import org.apache.lucene.index.SortedDocValues;
 import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.PriorityQueue;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
@@ -38,8 +39,10 @@ import org.elasticsearch.search.aggregations.Aggregator;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.BucketOrder;
 import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
 import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
 import org.elasticsearch.search.aggregations.support.ValuesSource;
 import org.elasticsearch.search.internal.SearchContext;
 
@@ -47,6 +50,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.function.LongPredicate;
 import java.util.function.LongUnaryOperator;
 
@@ -56,7 +60,7 @@ import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
  * An aggregator of string values that relies on global ordinals in order to build buckets.
  */
 public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator {
-
+    protected final ResultStrategy<?, ?, ?> resultStrategy;
     protected final ValuesSource.Bytes.WithOrdinals valuesSource;
 
     // TODO: cache the acceptedglobalValues per aggregation definition.
@@ -65,28 +69,33 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
     // first defined one.
     // So currently for each instance of this aggregator the acceptedglobalValues will be computed, this is unnecessary
     // especially if this agg is on a second layer or deeper.
-    protected final LongPredicate acceptedGlobalOrdinals;
-    protected final long valueCount;
-    protected final GlobalOrdLookupFunction lookupGlobalOrd;
+    private final LongPredicate acceptedGlobalOrdinals;
+    private final long valueCount;
+    private final GlobalOrdLookupFunction lookupGlobalOrd;
     protected final CollectionStrategy collectionStrategy;
 
     public interface GlobalOrdLookupFunction {
         BytesRef apply(long ord) throws IOException;
     }
 
-    public GlobalOrdinalsStringTermsAggregator(String name, AggregatorFactories factories,
-                                               ValuesSource.Bytes.WithOrdinals valuesSource,
-                                               BucketOrder order,
-                                               DocValueFormat format,
-                                               BucketCountThresholds bucketCountThresholds,
-                                               IncludeExclude.OrdinalsFilter includeExclude,
-                                               SearchContext context,
-                                               Aggregator parent,
-                                               boolean remapGlobalOrds,
-                                               SubAggCollectionMode collectionMode,
-                                               boolean showTermDocCountError,
-                                               Map<String, Object> metadata) throws IOException {
+    public GlobalOrdinalsStringTermsAggregator(
+        String name,
+        AggregatorFactories factories,
+        Function<GlobalOrdinalsStringTermsAggregator, ResultStrategy<?, ?, ?>> resultStrategy,
+        ValuesSource.Bytes.WithOrdinals valuesSource,
+        BucketOrder order,
+        DocValueFormat format,
+        BucketCountThresholds bucketCountThresholds,
+        IncludeExclude.OrdinalsFilter includeExclude,
+        SearchContext context,
+        Aggregator parent,
+        boolean remapGlobalOrds,
+        SubAggCollectionMode collectionMode,
+        boolean showTermDocCountError,
+        Map<String, Object> metadata
+    ) throws IOException {
         super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
+        this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
         this.valuesSource = valuesSource;
         final IndexReader reader = context.searcher().getIndexReader();
         final SortedSetDocValues values = reader.leaves().size() > 0 ?
@@ -107,105 +116,50 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
     }
 
     @Override
-    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
         SortedSetDocValues globalOrds = getGlobalOrds(ctx);
         collectionStrategy.globalOrdsReady(globalOrds);
         SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds);
         if (singleValues != null) {
-            return new LeafBucketCollectorBase(sub, globalOrds) {
+            return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) {
                 @Override
-                public void collect(int doc, long bucket) throws IOException {
-                    assert bucket == 0;
+                public void collect(int doc, long owningBucketOrd) throws IOException {
+                    assert owningBucketOrd == 0;
                     if (singleValues.advanceExact(doc)) {
                         int ord = singleValues.ordValue();
                         collectionStrategy.collectGlobalOrd(doc, ord, sub);
                     }
                 }
-            };
+            });
         }
-        return new LeafBucketCollectorBase(sub, globalOrds) {
+        return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) {
             @Override
-            public void collect(int doc, long bucket) throws IOException {
-                assert bucket == 0;
+            public void collect(int doc, long owningBucketOrd) throws IOException {
+                assert owningBucketOrd == 0;
                 if (globalOrds.advanceExact(doc)) {
                     for (long globalOrd = globalOrds.nextOrd(); globalOrd != NO_MORE_ORDS; globalOrd = globalOrds.nextOrd()) {
                         collectionStrategy.collectGlobalOrd(doc, globalOrd, sub);
                     }
                 }
             }
-        };
-    }
-
-    protected static void copy(BytesRef from, BytesRef to) {
-        if (to.bytes.length < from.length) {
-            to.bytes = new byte[ArrayUtil.oversize(from.length, 1)];
-        }
-        to.offset = 0;
-        to.length = from.length;
-        System.arraycopy(from.bytes, from.offset, to.bytes, 0, from.length);
+        });
     }
 
     @Override
     public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
-        assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
-        if (valueCount == 0) { // no context in this reader
-            return new InternalAggregation[] {buildEmptyAggregation()};
-        }
-
-        final int size;
-        if (bucketCountThresholds.getMinDocCount() == 0) {
-            // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
-            size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
-        } else {
-            size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
-        }
-        long[] otherDocCount = new long[1];
-        BucketPriorityQueue<OrdBucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
-        collectionStrategy.forEach(new BucketInfoConsumer() {
-            OrdBucket spare = null;
-
-            @Override
-            public void accept(long globalOrd, long bucketOrd, long docCount) {
-                otherDocCount[0] += docCount;
-                if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
-                    if (spare == null) {
-                        spare = new OrdBucket(showTermDocCountError, format);
-                    }
-                    spare.globalOrd = globalOrd;
-                    spare.bucketOrd = bucketOrd;
-                    spare.docCount = docCount;
-                    spare = ordered.insertWithOverflow(spare);
-                    if (spare == null) {
-                        consumeBucketsAndMaybeBreak(1);
-                    }
-                }
-            }
-        });
+        return resultStrategy.buildAggregations(owningBucketOrds);
+    }
 
-        // Get the top buckets
-        final StringTerms.Bucket[] list = new StringTerms.Bucket[ordered.size()];
-        for (int i = ordered.size() - 1; i >= 0; --i) {
-            final OrdBucket bucket = ordered.pop();
-            BytesRef scratch = new BytesRef();
-            copy(lookupGlobalOrd.apply(bucket.globalOrd), scratch);
-            list[i] = new StringTerms.Bucket(scratch, bucket.docCount, null, showTermDocCountError, 0, format);
-            list[i].bucketOrd = bucket.bucketOrd;
-            otherDocCount[0] -= list[i].docCount;
-            list[i].docCountError = 0;
-        }
-        buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
-
-        return new InternalAggregation[] {
-            new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
-                metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
-                otherDocCount[0], Arrays.asList(list), 0)
-        };
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return resultStrategy.buildEmptyResult();
     }
 
     @Override
     public void collectDebugInfo(BiConsumer<String, Object> add) {
         super.collectDebugInfo(add);
         add.accept("collection_strategy", collectionStrategy.describe());
+        add.accept("result_strategy", resultStrategy.describe());
     }
 
     /**
@@ -251,40 +205,46 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
 
     @Override
     protected void doClose() {
-        Releasables.close(collectionStrategy);
+        Releasables.close(resultStrategy, collectionStrategy);
     }
 
     /**
-     * Variant of {@link GlobalOrdinalsStringTermsAggregator} that resolves global ordinals post segment collection
-     * instead of on the fly for each match.This is beneficial for low cardinality fields, because it can reduce
-     * the amount of look-ups significantly.
+     * Variant of {@link GlobalOrdinalsStringTermsAggregator} that
+     * resolves global ordinals post segment collection instead of on the fly
+     * for each match.This is beneficial for low cardinality fields, because
+     * it can reduce the amount of look-ups significantly.
+     * <p>
+     * This is only supported for the standard {@code terms} aggregation and
+     * doesn't support {@code significant_terms} so this forces
+     * {@link StandardTermsResults}.
      */
     static class LowCardinality extends GlobalOrdinalsStringTermsAggregator {
 
         private LongUnaryOperator mapping;
         private IntArray segmentDocCounts;
 
-        LowCardinality(String name,
-                       AggregatorFactories factories,
-                       ValuesSource.Bytes.WithOrdinals valuesSource,
-                       BucketOrder order,
-                       DocValueFormat format,
-                       BucketCountThresholds bucketCountThresholds,
-                       SearchContext context,
-                       Aggregator parent,
-                       boolean forceDenseMode,
-                       SubAggCollectionMode collectionMode,
-                       boolean showTermDocCountError,
-                       Map<String, Object> metadata) throws IOException {
-            super(name, factories, valuesSource, order, format, bucketCountThresholds, null,
+        LowCardinality(
+            String name,
+            AggregatorFactories factories,
+            ValuesSource.Bytes.WithOrdinals valuesSource,
+            BucketOrder order,
+            DocValueFormat format,
+            BucketCountThresholds bucketCountThresholds,
+            SearchContext context,
+            Aggregator parent,
+            boolean forceDenseMode,
+            SubAggCollectionMode collectionMode,
+            boolean showTermDocCountError,
+            Map<String, Object> metadata
+        ) throws IOException {
+            super(name, factories, a -> a.new StandardTermsResults(), valuesSource, order, format, bucketCountThresholds, null,
                 context, parent, forceDenseMode, collectionMode, showTermDocCountError, metadata);
             assert factories == null || factories.countAggregators() == 0;
             this.segmentDocCounts = context.bigArrays().newIntArray(1, true);
         }
 
         @Override
-        public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
-                                                    final LeafBucketCollector sub) throws IOException {
+        public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
             if (mapping != null) {
                 mapSegmentCountsToGlobalCounts(mapping);
             }
@@ -294,29 +254,28 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
             final SortedDocValues singleValues = DocValues.unwrapSingleton(segmentOrds);
             mapping = valuesSource.globalOrdinalsMapping(ctx);
             if (singleValues != null) {
-                return new LeafBucketCollectorBase(sub, segmentOrds) {
+                return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, segmentOrds) {
                     @Override
-                    public void collect(int doc, long bucket) throws IOException {
-                        assert bucket == 0;
+                    public void collect(int doc, long owningBucketOrd) throws IOException {
+                        assert owningBucketOrd == 0;
                         if (singleValues.advanceExact(doc)) {
                             final int ord = singleValues.ordValue();
                             segmentDocCounts.increment(ord + 1, 1);
                         }
                     }
-                };
-            } else {
-                return new LeafBucketCollectorBase(sub, segmentOrds) {
-                    @Override
-                    public void collect(int doc, long bucket) throws IOException {
-                        assert bucket == 0;
-                        if (segmentOrds.advanceExact(doc)) {
-                            for (long segmentOrd = segmentOrds.nextOrd(); segmentOrd != NO_MORE_ORDS; segmentOrd = segmentOrds.nextOrd()) {
-                                segmentDocCounts.increment(segmentOrd + 1, 1);
-                            }
+                });
+            }
+            return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, segmentOrds) {
+                @Override
+                public void collect(int doc, long owningBucketOrd) throws IOException {
+                    assert owningBucketOrd == 0;
+                    if (segmentOrds.advanceExact(doc)) {
+                        for (long segmentOrd = segmentOrds.nextOrd(); segmentOrd != NO_MORE_ORDS; segmentOrd = segmentOrds.nextOrd()) {
+                            segmentDocCounts.increment(segmentOrd + 1, 1);
                         }
                     }
-                };
-            }
+                }
+            });
         }
 
         @Override
@@ -329,7 +288,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
 
         @Override
         protected void doClose() {
-            Releasables.close(segmentDocCounts);
+            Releasables.close(resultStrategy, segmentDocCounts, collectionStrategy);
         }
 
         private void mapSegmentCountsToGlobalCounts(LongUnaryOperator mapping) throws IOException {
@@ -396,7 +355,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
     /**
      * Strategy for collecting global ordinals.
      * <p>
-     * The {@link GlobalOrdinalsSignificantTermsAggregator} uses one of these
+     * The {@link GlobalOrdinalsStringTermsAggregator} uses one of these
      * to collect the global ordinals by calling
      * {@link CollectionStrategy#collectGlobalOrd(int, long, LeafBucketCollector)}
      * for each global ordinal that it hits and then calling
@@ -545,4 +504,300 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
             bucketOrds.close();
         }
     }
+
+    /**
+     * Strategy for building results.
+     */
+    abstract class ResultStrategy<
+        R extends InternalAggregation,
+        B extends InternalMultiBucketAggregation.InternalBucket,
+        TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable {
+
+        private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
+            assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
+            if (valueCount == 0) { // no context in this reader
+                return new InternalAggregation[] {buildEmptyAggregation()};
+            }
+
+            final int size;
+            if (bucketCountThresholds.getMinDocCount() == 0) {
+                // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
+                size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
+            } else {
+                size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
+            }
+            long[] otherDocCount = new long[1];
+            PriorityQueue<TB> ordered = buildPriorityQueue(size);
+            collectionStrategy.forEach(new BucketInfoConsumer() {
+                TB spare = null;
+
+                @Override
+                public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
+                    otherDocCount[0] += docCount;
+                    if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
+                        if (spare == null) {
+                            spare = buildEmptyTemporaryBucket();
+                        }
+                        updateBucket(spare, globalOrd, bucketOrd, docCount);
+                        spare = ordered.insertWithOverflow(spare);
+                        if (spare == null) {
+                            consumeBucketsAndMaybeBreak(1);
+                        }
+                    }
+                }
+            });
+
+            // Get the top buckets
+            B[] topBuckets = buildBuckets(ordered.size());
+            for (int i = ordered.size() - 1; i >= 0; --i) {
+                topBuckets[i] = convertTempBucketToRealBucket(ordered.pop());
+            }
+            buildSubAggs(topBuckets);
+
+            return new InternalAggregation[] {
+                buildResult(topBuckets)
+            };
+        }
+
+        /**
+         * Short description of the collection mechanism added to the profile
+         * output to help with debugging.
+         */
+        abstract String describe();
+
+        /**
+         * Wrap the "standard" numeric terms collector to collect any more
+         * information that this result type may need.
+         */
+        abstract LeafBucketCollector wrapCollector(LeafBucketCollector primary);
+
+        /**
+         * Build an empty temporary bucket.
+         */
+        abstract TB buildEmptyTemporaryBucket();
+
+        /**
+         * Update fields in {@code spare} to reflect information collected for
+         * this bucket ordinal.
+         */
+        abstract void updateBucket(TB spare, long globalOrd, long bucketOrd, long docCount) throws IOException;
+
+        /**
+         * Build a {@link PriorityQueue} to sort the buckets. After we've
+         * collected all of the buckets we'll collect all entries in the queue.
+         */
+        abstract PriorityQueue<TB> buildPriorityQueue(int size);
+
+        /**
+         * Build an array of buckets for a particular ordinal to collect the
+         * results. The populated list is passed to {@link #buildResult}.
+         */
+        abstract B[] buildBuckets(int size);
+
+        /**
+         * Convert a temporary bucket into a real bucket.
+         */
+        abstract B convertTempBucketToRealBucket(TB temp) throws IOException;
+
+        /**
+         * Build the sub-aggregations into the buckets. This will usually
+         * delegate to {@link #buildSubAggsForAllBuckets}.
+         */
+        abstract void buildSubAggs(B[] topBuckets) throws IOException;
+
+        /**
+         * Turn the buckets into an aggregation result.
+         */
+        abstract R buildResult(B[] topBuckets);
+
+        /**
+         * Build an "empty" result. Only called if there isn't any data on this
+         * shard.
+         */
+        abstract R buildEmptyResult();
+    }
+
+    /**
+     * Builds results for the standard {@code terms} aggregation.
+     */
+    class StandardTermsResults extends ResultStrategy<StringTerms, StringTerms.Bucket, OrdBucket> {
+        private long otherDocCount;
+
+        @Override
+        String describe() {
+            return "terms";
+        }
+
+        @Override
+        LeafBucketCollector wrapCollector(LeafBucketCollector primary) {
+            return primary;
+        }
+
+        @Override
+        StringTerms.Bucket[] buildBuckets(int size) {
+            return new StringTerms.Bucket[size];
+        }
+
+        @Override
+        OrdBucket buildEmptyTemporaryBucket() {
+            return new OrdBucket(showTermDocCountError, format);
+        }
+
+        @Override
+        void updateBucket(OrdBucket spare, long globalOrd, long bucketOrd, long docCount) throws IOException {
+            spare.globalOrd = globalOrd;
+            spare.bucketOrd = bucketOrd;
+            spare.docCount = docCount;
+            otherDocCount += docCount;
+        }
+        
+        @Override
+        PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
+            return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
+        }
+
+        StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException {
+            BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
+            StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
+            result.bucketOrd = temp.bucketOrd;
+            otherDocCount -= temp.docCount;
+            result.docCountError = 0;
+            return result;
+        }
+
+        @Override
+        void buildSubAggs(StringTerms.Bucket[] topBuckets) throws IOException {
+            buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
+        }
+
+        @Override
+        StringTerms buildResult(StringTerms.Bucket[] topBuckets) {
+            return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+                metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
+                otherDocCount, Arrays.asList(topBuckets), 0);
+        }
+
+        @Override
+        StringTerms buildEmptyResult() {
+            return buildEmptyTermsAggregation();
+        }
+
+        @Override
+        public void close() {}
+    }
+
+    /**
+     * Builds results for the {@code significant_terms} aggregation.
+     */
+    class SignificantTermsResults extends ResultStrategy<
+        SignificantStringTerms,
+        SignificantStringTerms.Bucket,
+        SignificantStringTerms.Bucket> {
+
+        // TODO a reference to the factory is weird - probably should be reference to what we need from it.
+        private final SignificantTermsAggregatorFactory termsAggFactory;
+        private final SignificanceHeuristic significanceHeuristic;
+
+        private long subsetSize = 0;
+
+        SignificantTermsResults(SignificantTermsAggregatorFactory termsAggFactory, SignificanceHeuristic significanceHeuristic) {
+            this.termsAggFactory = termsAggFactory;
+            this.significanceHeuristic = significanceHeuristic;
+        }
+
+        @Override
+        String describe() {
+            return "terms";
+        }
+
+        @Override
+        LeafBucketCollector wrapCollector(LeafBucketCollector primary) {
+            return new LeafBucketCollectorBase(primary, null) {
+                @Override
+                public void collect(int doc, long owningBucketOrd) throws IOException {
+                    super.collect(doc, owningBucketOrd);
+                    subsetSize++;
+                }
+            };
+        }
+
+        @Override
+        SignificantStringTerms.Bucket[] buildBuckets(int size) {
+            return new SignificantStringTerms.Bucket[size];
+        }
+
+        @Override
+        SignificantStringTerms.Bucket buildEmptyTemporaryBucket() {
+            return new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
+        }
+
+        @Override
+        void updateBucket(SignificantStringTerms.Bucket spare, long globalOrd, long bucketOrd, long docCount) throws IOException {
+            spare.bucketOrd = bucketOrd;
+            oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
+            spare.subsetDf = docCount;
+            spare.subsetSize = subsetSize;
+            spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
+            spare.supersetSize = termsAggFactory.getSupersetNumDocs();
+            /*
+             * During shard-local down-selection we use subset/superset stats
+             * that are for this shard only. Back at the central reducer these
+             * properties will be updated with global stats.
+             */
+            spare.updateScore(significanceHeuristic);
+        }
+
+        @Override
+        PriorityQueue<SignificantStringTerms.Bucket> buildPriorityQueue(int size) {
+            return new BucketSignificancePriorityQueue<>(size);
+        }
+
+        @Override
+        SignificantStringTerms.Bucket convertTempBucketToRealBucket(SignificantStringTerms.Bucket temp) throws IOException {
+            return temp;
+        }
+
+        @Override
+        void buildSubAggs(SignificantStringTerms.Bucket[] topBuckets) throws IOException {
+            buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
+        }
+
+        @Override
+        SignificantStringTerms buildResult(SignificantStringTerms.Bucket[] topBuckets) {
+            return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+                metadata(), format, subsetSize, termsAggFactory.getSupersetNumDocs(), significanceHeuristic, Arrays.asList(topBuckets));
+        }
+
+        @Override
+        SignificantStringTerms buildEmptyResult() {
+            return buildEmptySignificantTermsAggregation(significanceHeuristic);
+        }
+
+        @Override
+        public void close() {
+            termsAggFactory.close();
+        }
+
+        /**
+         * Copies the bytes from {@code from} into {@code to}, oversizing
+         * the destination array if the bytes won't fit into the array.
+         * <p>
+         * This is fairly similar in spirit to
+         * {@link BytesRef#deepCopyOf(BytesRef)} in that it is a way to read
+         * bytes from a mutable {@link BytesRef} into
+         * <strong>something</strong> that won't mutate out from under you.
+         * Unlike {@linkplain BytesRef#deepCopyOf(BytesRef)} its designed to
+         * be run over and over again into the same destination. In particular,
+         * oversizing the destination bytes helps to keep from allocating
+         * a bunch of little arrays over and over and over again.
+         */
+        private void oversizedCopy(BytesRef from, BytesRef to) {
+            if (to.bytes.length < from.length) {
+                to.bytes = new byte[ArrayUtil.oversize(from.length, 1)];
+            }
+            to.offset = 0;
+            to.length = from.length;
+            System.arraycopy(from.bytes, from.offset, to.bytes, 0, from.length);
+        }
+    }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java

@@ -78,7 +78,7 @@ public class NumericTermsAggregator extends TermsAggregator {
     )
         throws IOException {
         super(name, factories, aggregationContext, parent, bucketCountThresholds, order, format, subAggCollectMode, metadata);
-        this.resultStrategy = resultStrategy.apply(this);
+        this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
         this.valuesSource = valuesSource;
         this.longFilter = longFilter;
         bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);

+ 17 - 4
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java

@@ -389,10 +389,23 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
                      **/
                     remapGlobalOrd = false;
                 }
-                return new GlobalOrdinalsSignificantTermsAggregator(name, factories,
-                        (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter,
-                        aggregationContext, parent, remapGlobalOrd, significanceHeuristic, termsAggregatorFactory, metadata);
-
+                
+                return new GlobalOrdinalsStringTermsAggregator(
+                    name,
+                    factories,
+                    a -> a.new SignificantTermsResults(termsAggregatorFactory, significanceHeuristic),
+                    (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource,
+                    null,
+                    format,
+                    bucketCountThresholds,
+                    filter,
+                    aggregationContext,
+                    parent,
+                    remapGlobalOrd,
+                    SubAggCollectionMode.BREADTH_FIRST,
+                    false,
+                    metadata
+                );
             }
         };
 

+ 5 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java

@@ -171,6 +171,11 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
         };
     }
 
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return buildEmptyTermsAggregation();
+    }
+
     @Override
     public void doClose() {
         Releasables.close(bucketOrds);

+ 16 - 3
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java

@@ -391,9 +391,22 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
                          remapGlobalOrds = false;
                     }
                 }
-                return new GlobalOrdinalsStringTermsAggregator(name, factories, ordinalsValuesSource, order,
-                        format, bucketCountThresholds, filter, context, parent, remapGlobalOrds, subAggCollectMode, showTermDocCountError,
-                        metadata);
+                return new GlobalOrdinalsStringTermsAggregator(
+                    name,
+                    factories,
+                    a -> a.new StandardTermsResults(),
+                    ordinalsValuesSource,
+                    order,
+                    format,
+                    bucketCountThresholds,
+                    filter,
+                    context,
+                    parent,
+                    remapGlobalOrds,
+                    subAggCollectMode,
+                    showTermDocCountError,
+                    metadata
+                );
             }
         };