Browse Source

Save memory when significant_text is not on top (#58145)

This merges the aggregator for `significant_text` into
`significant_terms`, applying the optimization built in #55873 to save
memory when the aggregation is not on top. The `significant_text`
aggregation is pretty memory intensive all on its own and this doesn't
particularly help with that, but it'll help with the memory usage of any
sub-aggregations.
Nik Everett 5 years ago
parent
commit
91813ea5cf

+ 3 - 7
server/src/main/java/org/apache/lucene/analysis/miscellaneous/DuplicateByteSequenceSpotter.java

@@ -25,20 +25,18 @@ import org.apache.lucene.util.RamUsageEstimator;
  * A Trie structure for analysing byte streams for duplicate sequences. Bytes
  * from a stream are added one at a time using the addByte method and the number
  * of times it has been seen as part of a sequence is returned.
- * 
+ * <p>
  * The minimum required length for a duplicate sequence detected is 6 bytes.
- * 
+ * <p>
  * The design goals are to maximize speed of lookup while minimizing the space
  * required to do so. This has led to a hybrid solution for representing the
  * bytes that make up a sequence in the trie.
- * 
+ * <p>
  * If we have 6 bytes in sequence e.g. abcdef then they are represented as
  * object nodes in the tree as follows:
  * <p>
  * (a)-(b)-(c)-(def as an int)
  * <p>
- * 
- * 
  * {@link RootTreeNode} objects are used for the first two levels of the tree
  * (representing bytes a and b in the example sequence). The combinations of
  * objects at these 2 levels are few so internally these objects allocate an
@@ -61,11 +59,9 @@ import org.apache.lucene.util.RamUsageEstimator;
  * reached
  * <li>halting any growth of the tree
  * </ol>
- * 
  * Tests on real-world-text show that the size of the tree is a multiple of the
  * input text where that multiplier varies between 10 and 5 times as the content
  * size increased from 10 to 100 megabytes of content.
- * 
  */
 public class DuplicateByteSequenceSpotter {
     public static final int TREE_DEPTH = 6;

+ 99 - 34
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java

@@ -47,22 +47,23 @@ import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.function.LongConsumer;
 
 /**
  * An aggregator of string values that hashes the strings on the fly rather
  * than up front like the {@link GlobalOrdinalsStringTermsAggregator}.
  */
 public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
+    private final CollectorSource collectorSource;
     private final ResultStrategy<?, ?> resultStrategy;
-    private final ValuesSource valuesSource;
     private final BytesKeyedBucketOrds bucketOrds;
     private final IncludeExclude.StringFilter includeExclude;
 
     public MapStringTermsAggregator(
         String name,
         AggregatorFactories factories,
+        CollectorSource collectorSource,
         Function<MapStringTermsAggregator, ResultStrategy<?, ?>> resultStrategy,
-        ValuesSource valuesSource,
         BucketOrder order,
         DocValueFormat format,
         BucketCountThresholds bucketCountThresholds,
@@ -75,56 +76,39 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
         Map<String, Object> metadata
     ) throws IOException {
         super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
+        this.collectorSource = collectorSource;
         this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
-        this.valuesSource = valuesSource;
         this.includeExclude = includeExclude;
         bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
     }
 
     @Override
     public ScoreMode scoreMode() {
-        if (valuesSource != null && valuesSource.needsScores()) {
+        if (collectorSource.needsScores()) {
             return ScoreMode.COMPLETE;
         }
         return super.scoreMode();
     }
 
     @Override
-    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
-            final LeafBucketCollector sub) throws IOException {
-        SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
-        return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) {
-            final BytesRefBuilder previous = new BytesRefBuilder();
-
-            @Override
-            public void collect(int doc, long owningBucketOrd) throws IOException {
-                if (false == values.advanceExact(doc)) {
-                    return;
-                }
-                int valuesCount = values.docValueCount();
-
-                // SortedBinaryDocValues don't guarantee uniqueness so we
-                // need to take care of dups
-                previous.clear();
-                for (int i = 0; i < valuesCount; ++i) {
-                    final BytesRef bytes = values.nextValue();
-                    if (includeExclude != null && false == includeExclude.accept(bytes)) {
-                        continue;
-                    }
-                    if (i > 0 && previous.get().equals(bytes)) {
-                        continue;
-                    }
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
+        return resultStrategy.wrapCollector(
+            collectorSource.getLeafCollector(
+                includeExclude,
+                ctx,
+                sub,
+                this::addRequestCircuitBreakerBytes,
+                (s, doc, owningBucketOrd, bytes) -> {
                     long bucketOrdinal = bucketOrds.add(owningBucketOrd, bytes);
                     if (bucketOrdinal < 0) { // already seen
                         bucketOrdinal = -1 - bucketOrdinal;
-                        collectExistingBucket(sub, doc, bucketOrdinal);
+                        collectExistingBucket(s, doc, bucketOrdinal);
                     } else {
-                        collectBucket(sub, doc, bucketOrdinal);
+                        collectBucket(s, doc, bucketOrdinal);
                     }
-                    previous.copyBytes(bytes);
                 }
-            }
-        });
+            )
+        );
     }
 
     @Override
@@ -146,7 +130,82 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
 
     @Override
     public void doClose() {
-        Releasables.close(bucketOrds, resultStrategy);
+        Releasables.close(collectorSource, resultStrategy, bucketOrds);
+    }
+
+    /**
+     * Abstaction on top of building collectors to fetch values.
+     */
+    public interface CollectorSource extends Releasable {
+        boolean needsScores();
+
+        LeafBucketCollector getLeafCollector(
+            IncludeExclude.StringFilter includeExclude,
+            LeafReaderContext ctx,
+            LeafBucketCollector sub,
+            LongConsumer addRequestCircuitBreakerBytes,
+            CollectConsumer consumer
+        ) throws IOException;
+    }
+    @FunctionalInterface
+    public interface CollectConsumer {
+        void accept(LeafBucketCollector sub, int doc, long owningBucketOrd, BytesRef bytes) throws IOException;
+    }
+
+    /**
+     * Fetch values from a {@link ValuesSource}.
+     */
+    public static class ValuesSourceCollectorSource implements CollectorSource {
+        private final ValuesSource valuesSource;
+
+        public ValuesSourceCollectorSource(ValuesSource valuesSource) {
+            this.valuesSource = valuesSource;
+        }
+
+        @Override
+        public boolean needsScores() {
+            return valuesSource.needsScores();
+        }
+
+        @Override
+        public LeafBucketCollector getLeafCollector(
+            IncludeExclude.StringFilter includeExclude,
+            LeafReaderContext ctx,
+            LeafBucketCollector sub,
+            LongConsumer addRequestCircuitBreakerBytes,
+            CollectConsumer consumer
+        ) throws IOException {
+            SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
+            return new LeafBucketCollectorBase(sub, values) {
+                final BytesRefBuilder previous = new BytesRefBuilder();
+
+                @Override
+                public void collect(int doc, long owningBucketOrd) throws IOException {
+                    if (false == values.advanceExact(doc)) {
+                        return;
+                    }
+                    int valuesCount = values.docValueCount();
+
+                    // SortedBinaryDocValues don't guarantee uniqueness so we
+                    // need to take care of dups
+                    previous.clear();
+                    for (int i = 0; i < valuesCount; ++i) {
+                        BytesRef bytes = values.nextValue();
+                        if (includeExclude != null && false == includeExclude.accept(bytes)) {
+                            continue;
+                        }
+                        if (i > 0 && previous.get().equals(bytes)) {
+                            continue;
+                        }
+                        previous.copyBytes(bytes);
+                        consumer.accept(sub, doc, owningBucketOrd, bytes);
+                    }
+                }
+            };
+        }
+
+        @Override
+        public void close() {}
     }
 
     /**
@@ -270,6 +329,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
      * Builds results for the standard {@code terms} aggregation.
      */
     class StandardTermsResults extends ResultStrategy<StringTerms, StringTerms.Bucket> {
+        private final ValuesSource valuesSource;
+
+        StandardTermsResults(ValuesSource valuesSource) {
+            this.valuesSource = valuesSource;
+        }
+
         @Override
         String describe() {
             return "terms";

+ 11 - 7
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificanceLookup.java

@@ -36,10 +36,11 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.BytesRefHash;
 import org.elasticsearch.common.util.LongArray;
 import org.elasticsearch.common.util.LongHash;
+import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryShardContext;
+import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
-import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
 
 import java.io.IOException;
 
@@ -62,14 +63,17 @@ class SignificanceLookup {
     }
 
     private final QueryShardContext context;
-    private final ValuesSourceConfig config;
+    private final MappedFieldType fieldType;
+    private final DocValueFormat format;
     private final Query backgroundFilter;
     private final int supersetNumDocs;
     private TermsEnum termsEnum;
 
-    SignificanceLookup(QueryShardContext context, ValuesSourceConfig config, QueryBuilder backgroundFilter) throws IOException {
+    SignificanceLookup(QueryShardContext context, MappedFieldType fieldType, DocValueFormat format, QueryBuilder backgroundFilter)
+        throws IOException {
         this.context = context;
-        this.config = config;
+        this.fieldType = fieldType;
+        this.format = format;
         this.backgroundFilter = backgroundFilter == null ? null : backgroundFilter.toQuery(context);
         /*
          * We need to use a superset size that includes deleted docs or we
@@ -129,7 +133,7 @@ class SignificanceLookup {
      * Get the background frequency of a {@link BytesRef} term.
      */
     private long getBackgroundFrequency(BytesRef term) throws IOException {
-        return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
+        return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context));
     }
 
     /**
@@ -174,7 +178,7 @@ class SignificanceLookup {
      * Get the background frequency of a {@code long} term.
      */
     private long getBackgroundFrequency(long term) throws IOException {
-        return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
+        return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context));
     }
 
     private long getBackgroundFrequency(Query query) throws IOException {
@@ -201,7 +205,7 @@ class SignificanceLookup {
             return termsEnum;
         }
         IndexReader reader = context.getIndexReader();
-        termsEnum = new FilterableTermsEnum(reader, config.fieldContext().field(), PostingsEnum.NONE, backgroundFilter);
+        termsEnum = new FilterableTermsEnum(reader, fieldType.name(), PostingsEnum.NONE, backgroundFilter);
         return termsEnum;
     }
 

+ 7 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java

@@ -227,7 +227,12 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
             bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
         }
 
-        SignificanceLookup lookup = new SignificanceLookup(queryShardContext, config, backgroundFilter);
+        SignificanceLookup lookup = new SignificanceLookup(
+            queryShardContext,
+            config.fieldContext().fieldType(),
+            config.format(),
+            backgroundFilter
+        );
 
         return sigTermsAggregatorSupplier.build(name, factories, config.getValuesSource(), config.format(),
             bucketCountThresholds, includeExclude, executionHint, searchContext, parent,
@@ -256,8 +261,8 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
                 return new MapStringTermsAggregator(
                     name,
                     factories,
+                    new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSource),
                     a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket),
-                    valuesSource,
                     null,
                     format,
                     bucketCountThresholds,

+ 0 - 251
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java

@@ -1,251 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.search.aggregations.bucket.terms;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.TokenStream;
-import org.apache.lucene.analysis.miscellaneous.DeDuplicatingTokenFilter;
-import org.apache.lucene.analysis.miscellaneous.DuplicateByteSequenceSpotter;
-import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.BytesRefBuilder;
-import org.elasticsearch.common.lease.Releasables;
-import org.elasticsearch.common.util.BytesRefHash;
-import org.elasticsearch.index.mapper.MappedFieldType;
-import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.Aggregator;
-import org.elasticsearch.search.aggregations.AggregatorFactories;
-import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.LeafBucketCollector;
-import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
-import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
-import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.StringFilter;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
-import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
-import org.elasticsearch.search.internal.ContextIndexSearcher;
-import org.elasticsearch.search.internal.SearchContext;
-import org.elasticsearch.search.lookup.SourceLookup;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static java.util.Collections.emptyList;
-
-public class SignificantTextAggregator extends BucketsAggregator {
-
-    private final StringFilter includeExclude;
-    protected final BucketCountThresholds bucketCountThresholds;
-    protected long numCollectedDocs;
-    private final BytesRefHash bucketOrds;
-    private final SignificanceHeuristic significanceHeuristic;
-    private SignificantTextAggregatorFactory termsAggFactory;
-    private final DocValueFormat format = DocValueFormat.RAW;
-    private final String fieldName;
-    private final String[] sourceFieldNames;
-    private DuplicateByteSequenceSpotter dupSequenceSpotter = null ;
-    private long lastTrieSize;
-    private static final int MEMORY_GROWTH_REPORTING_INTERVAL_BYTES = 5000;
-
-
-
-    public SignificantTextAggregator(String name, AggregatorFactories factories,
-            SearchContext context, Aggregator parent,
-            BucketCountThresholds bucketCountThresholds, IncludeExclude.StringFilter includeExclude,
-            SignificanceHeuristic significanceHeuristic, SignificantTextAggregatorFactory termsAggFactory,
-            String fieldName, String [] sourceFieldNames, boolean filterDuplicateText,
-            Map<String, Object> metadata) throws IOException {
-        super(name, factories, context, parent, metadata);
-        this.bucketCountThresholds = bucketCountThresholds;
-        this.includeExclude = includeExclude;
-        this.significanceHeuristic = significanceHeuristic;
-        this.termsAggFactory = termsAggFactory;
-        this.fieldName = fieldName;
-        this.sourceFieldNames = sourceFieldNames;
-        bucketOrds = new BytesRefHash(1, context.bigArrays());
-        if(filterDuplicateText){
-            dupSequenceSpotter = new DuplicateByteSequenceSpotter();
-            lastTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
-        }
-    }
-
-
-
-
-    @Override
-    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
-            final LeafBucketCollector sub) throws IOException {
-        final BytesRefBuilder previous = new BytesRefBuilder();
-        return new LeafBucketCollectorBase(sub, null) {
-
-            @Override
-            public void collect(int doc, long bucket) throws IOException {
-                collectFromSource(doc, bucket, fieldName, sourceFieldNames);
-                numCollectedDocs++;
-                if (dupSequenceSpotter != null) {
-                    dupSequenceSpotter.startNewSequence();
-                }
-            }
-
-            private void processTokenStream(int doc, long bucket, TokenStream ts, BytesRefHash inDocTerms, String fieldText)
-                    throws IOException{
-                if (dupSequenceSpotter != null) {
-                    ts = new DeDuplicatingTokenFilter(ts, dupSequenceSpotter);
-                }
-                CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
-                ts.reset();
-                try {
-                    while (ts.incrementToken()) {
-                        if (dupSequenceSpotter != null) {
-                            long newTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
-                            long growth = newTrieSize - lastTrieSize;
-                            // Only update the circuitbreaker after
-                            if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
-                                addRequestCircuitBreakerBytes(growth);
-                                lastTrieSize = newTrieSize;
-                            }
-                        }
-                        previous.clear();
-                        previous.copyChars(termAtt);
-                        BytesRef bytes = previous.get();
-                        if (inDocTerms.add(bytes) >= 0) {
-                            if (includeExclude == null || includeExclude.accept(bytes)) {
-                                long bucketOrdinal = bucketOrds.add(bytes);
-                                if (bucketOrdinal < 0) { // already seen
-                                    bucketOrdinal = -1 - bucketOrdinal;
-                                    collectExistingBucket(sub, doc, bucketOrdinal);
-                                } else {
-                                    collectBucket(sub, doc, bucketOrdinal);
-                                }
-                            }
-                        }
-                    }
-
-                } finally{
-                    ts.close();
-                }
-            }
-
-            private void collectFromSource(int doc, long bucket, String indexedFieldName, String[] sourceFieldNames) throws IOException {
-                MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName);
-                if(fieldType == null){
-                    throw new IllegalArgumentException("Aggregation [" + name + "] cannot process field ["+indexedFieldName
-                            +"] since it is not present");
-                }
-
-                SourceLookup sourceLookup = context.lookup().source();
-                sourceLookup.setSegmentAndDocument(ctx, doc);
-                BytesRefHash inDocTerms = new BytesRefHash(256, context.bigArrays());
-
-                try {
-                    for (String sourceField : sourceFieldNames) {
-                        List<Object> textsToHighlight = sourceLookup.extractRawValues(sourceField);
-                        textsToHighlight = textsToHighlight.stream().map(obj -> {
-                            if (obj instanceof BytesRef) {
-                                return fieldType.valueForDisplay(obj).toString();
-                            } else {
-                                return obj;
-                            }
-                        }).collect(Collectors.toList());
-
-                        Analyzer analyzer = fieldType.indexAnalyzer();
-                        for (Object fieldValue : textsToHighlight) {
-                            String fieldText = fieldValue.toString();
-                            TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText);
-                            processTokenStream(doc, bucket, ts, inDocTerms, fieldText);
-                        }
-                    }
-                } finally{
-                    Releasables.close(inDocTerms);
-                }
-            }
-        };
-    }
-
-    @Override
-    public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
-        assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
-
-        final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
-        long supersetSize = termsAggFactory.getSupersetNumDocs();
-        long subsetSize = numCollectedDocs;
-
-        BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
-        SignificantStringTerms.Bucket spare = null;
-        for (int i = 0; i < bucketOrds.size(); i++) {
-            final int docCount = bucketDocCount(i);
-            if (docCount < bucketCountThresholds.getShardMinDocCount()) {
-                continue;
-            }
-
-            if (spare == null) {
-                spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
-            }
-
-            bucketOrds.get(i, spare.termBytes);
-            spare.subsetDf = docCount;
-            spare.subsetSize = subsetSize;
-            spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
-            spare.supersetSize = supersetSize;
-            // During shard-local down-selection we use subset/superset stats
-            // that are for this shard only
-            // Back at the central reducer these properties will be updated with
-            // global stats
-            spare.updateScore(significanceHeuristic);
-
-            spare.bucketOrd = i;
-            spare = ordered.insertWithOverflow(spare);
-        }
-
-        final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
-        for (int i = ordered.size() - 1; i >= 0; i--) {
-            list[i] = ordered.pop();
-            // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
-            list[i].termBytes = BytesRef.deepCopyOf(list[i].termBytes);
-        }
-        buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
-
-        return new InternalAggregation[] { new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(),
-                bucketCountThresholds.getMinDocCount(),
-                metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list))
-        };
-    }
-
-
-    @Override
-    public SignificantStringTerms buildEmptyAggregation() {
-        // We need to account for the significance of a miss in our global stats - provide corpus size as context
-        ContextIndexSearcher searcher = context.searcher();
-        IndexReader topReader = searcher.getIndexReader();
-        int supersetSize = topReader.numDocs();
-        return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
-                metadata(), format, 0, supersetSize, significanceHeuristic, emptyList());
-    }
-
-    @Override
-    public void doClose() {
-        Releasables.close(bucketOrds, termsAggFactory);
-    }
-
-}

+ 187 - 104
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorFactory.java

@@ -19,52 +19,56 @@
 
 package org.elasticsearch.search.aggregations.bucket.terms;
 
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.PostingsEnum;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause.Occur;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.miscellaneous.DeDuplicatingTokenFilter;
+import org.apache.lucene.analysis.miscellaneous.DuplicateByteSequenceSpotter;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.common.lease.Releasable;
-import org.elasticsearch.common.lucene.index.FilterableTermsEnum;
-import org.elasticsearch.common.lucene.index.FreqTermsEnum;
+import org.apache.lucene.util.BytesRefBuilder;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.BytesRefHash;
+import org.elasticsearch.common.util.ObjectArray;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryShardContext;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
 import org.elasticsearch.search.aggregations.bucket.BucketUtils;
+import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.StringFilter;
+import org.elasticsearch.search.aggregations.bucket.terms.MapStringTermsAggregator.CollectConsumer;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
 import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
 import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.lookup.SourceLookup;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.function.LongConsumer;
 
-public class SignificantTextAggregatorFactory extends AggregatorFactory
-        implements Releasable {
+public class SignificantTextAggregatorFactory extends AggregatorFactory {
+    private static final int MEMORY_GROWTH_REPORTING_INTERVAL_BYTES = 5000;
 
     private final IncludeExclude includeExclude;
-    private String indexedFieldName;
-    private MappedFieldType fieldType;
+    private final String indexedFieldName;
+    private final MappedFieldType fieldType;
     private final String[] sourceFieldNames;
-    private FilterableTermsEnum termsEnum;
-    private int numberOfAggregatorsCreated;
-    private final Query filter;
-    private final int supersetNumDocs;
+    private final QueryBuilder backgroundFilter;
     private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
     private final SignificanceHeuristic significanceHeuristic;
-    private final DocValueFormat format = DocValueFormat.RAW;
     private final boolean filterDuplicateText;
 
     public SignificantTextAggregatorFactory(String name,
                                                 IncludeExclude includeExclude,
-                                                QueryBuilder filterBuilder,
+                                                QueryBuilder backgroundFilter,
                                                 TermsAggregator.BucketCountThresholds bucketCountThresholds,
                                                 SignificanceHeuristic significanceHeuristic,
                                                 QueryShardContext queryShardContext,
@@ -84,97 +88,18 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory
                 "requires an analyzed field");
         }
         this.indexedFieldName = fieldType != null ? fieldType.name() : fieldName;
-        this.sourceFieldNames = sourceFieldNames == null
-            ? new String[] { indexedFieldName }
-            : sourceFieldNames;
+        this.sourceFieldNames = sourceFieldNames == null ? new String[] { indexedFieldName } : sourceFieldNames;
 
         this.includeExclude = includeExclude;
-        this.filter = filterBuilder == null
-                ? null
-                : filterBuilder.toQuery(queryShardContext);
+        this.backgroundFilter = backgroundFilter;
         this.filterDuplicateText = filterDuplicateText;
-        IndexSearcher searcher = queryShardContext.searcher();
-        // Important - need to use the doc count that includes deleted docs
-        // or we have this issue: https://github.com/elastic/elasticsearch/issues/7951
-        this.supersetNumDocs = filter == null
-                ? searcher.getIndexReader().maxDoc()
-                : searcher.count(filter);
         this.bucketCountThresholds = bucketCountThresholds;
         this.significanceHeuristic = significanceHeuristic;
     }
 
-    /**
-     * Get the number of docs in the superset.
-     */
-    public long getSupersetNumDocs() {
-        return supersetNumDocs;
-    }
-
-    private FilterableTermsEnum getTermsEnum(String field) throws IOException {
-        if (termsEnum != null) {
-            return termsEnum;
-        }
-        IndexReader reader = queryShardContext.getIndexReader();
-        if (numberOfAggregatorsCreated > 1) {
-            termsEnum = new FreqTermsEnum(reader, field, true, false, filter, queryShardContext.bigArrays());
-        } else {
-            termsEnum = new FilterableTermsEnum(reader, indexedFieldName, PostingsEnum.NONE, filter);
-        }
-        return termsEnum;
-    }
-
-    private long getBackgroundFrequency(String value) throws IOException {
-        // fieldType can be null if the field is unmapped, but theoretically this method should only be called
-        // when constructing buckets.  Assert to ensure this is the case
-        // TODO this is a bad setup and it should be refactored
-        assert fieldType != null;
-        Query query = fieldType.termQuery(value, queryShardContext);
-        if (query instanceof TermQuery) {
-            // for types that use the inverted index, we prefer using a caching terms
-            // enum that will do a better job at reusing index inputs
-            Term term = ((TermQuery) query).getTerm();
-            FilterableTermsEnum termsEnum = getTermsEnum(term.field());
-            if (termsEnum.seekExact(term.bytes())) {
-                return termsEnum.docFreq();
-            } else {
-                return 0;
-            }
-        }
-        // otherwise do it the naive way
-        if (filter != null) {
-            query = new BooleanQuery.Builder()
-                    .add(query, Occur.FILTER)
-                    .add(filter, Occur.FILTER)
-                    .build();
-        }
-        return queryShardContext.searcher().count(query);
-    }
-
-    public long getBackgroundFrequency(BytesRef termBytes) throws IOException {
-        String value = format.format(termBytes).toString();
-        return getBackgroundFrequency(value);
-    }
-
-
-    @Override
-    public void close() {
-        try {
-            if (termsEnum instanceof Releasable) {
-                ((Releasable) termsEnum).close();
-            }
-        } finally {
-            termsEnum = null;
-        }
-    }
-
     @Override
     protected Aggregator createInternal(SearchContext searchContext, Aggregator parent, boolean collectsFromSingleBucket,
                                         Map<String, Object> metadata) throws IOException {
-        if (collectsFromSingleBucket == false) {
-            return asMultiBucketAggregator(this, searchContext, parent);
-        }
-
-        numberOfAggregatorsCreated++;
         BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
         if (bucketCountThresholds.getShardSize() == SignificantTextAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
             // The user has not made a shardSize selection.
@@ -194,8 +119,166 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory
         IncludeExclude.StringFilter incExcFilter = includeExclude == null ? null:
             includeExclude.convertToStringFilter(DocValueFormat.RAW);
 
-        return new SignificantTextAggregator(name, factories, searchContext, parent, bucketCountThresholds,
-                incExcFilter, significanceHeuristic, this, indexedFieldName, sourceFieldNames, filterDuplicateText, metadata);
+        MapStringTermsAggregator.CollectorSource collectorSource = new SignificantTextCollectorSource(
+            queryShardContext.lookup().source(),
+            queryShardContext.bigArrays(),
+            fieldType,
+            sourceFieldNames,
+            filterDuplicateText
+        );
+        SignificanceLookup lookup = new SignificanceLookup(queryShardContext, fieldType, DocValueFormat.RAW, backgroundFilter);
+        return new MapStringTermsAggregator(
+            name,
+            factories,
+            collectorSource,
+            a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket),
+            null,
+            DocValueFormat.RAW,
+            bucketCountThresholds,
+            incExcFilter,
+            searchContext,
+            parent,
+            SubAggCollectionMode.BREADTH_FIRST,
+            false,
+            collectsFromSingleBucket,
+            metadata
+        );
+    }
+
+    private static class SignificantTextCollectorSource implements MapStringTermsAggregator.CollectorSource {
+        private final SourceLookup sourceLookup;
+        private final BigArrays bigArrays;
+        private final MappedFieldType fieldType;
+        private final String[] sourceFieldNames;
+        private ObjectArray<DuplicateByteSequenceSpotter> dupSequenceSpotters;
+
+        SignificantTextCollectorSource(
+            SourceLookup sourceLookup,
+            BigArrays bigArrays,
+            MappedFieldType fieldType,
+            String[] sourceFieldNames,
+            boolean filterDuplicateText
+        ) {
+            this.sourceLookup = sourceLookup;
+            this.bigArrays = bigArrays;
+            this.fieldType = fieldType;
+            this.sourceFieldNames = sourceFieldNames;
+            dupSequenceSpotters = filterDuplicateText ? bigArrays.newObjectArray(1) : null;
+        }
+
+        @Override
+        public boolean needsScores() {
+            return false;
+        }
+
+        @Override
+        public LeafBucketCollector getLeafCollector(
+            StringFilter includeExclude,
+            LeafReaderContext ctx,
+            LeafBucketCollector sub,
+            LongConsumer addRequestCircuitBreakerBytes,
+            CollectConsumer consumer
+        ) throws IOException {
+            return new LeafBucketCollectorBase(sub, null) {
+                private final BytesRefBuilder scratch = new BytesRefBuilder();
+
+                @Override
+                public void collect(int doc, long owningBucketOrd) throws IOException {
+                    if (dupSequenceSpotters == null) {
+                        collectFromSource(doc, owningBucketOrd, null);
+                        return;
+                    }
+                    dupSequenceSpotters = bigArrays.grow(dupSequenceSpotters, owningBucketOrd + 1);
+                    DuplicateByteSequenceSpotter spotter = dupSequenceSpotters.get(owningBucketOrd);
+                    if (spotter == null) {
+                        spotter = new DuplicateByteSequenceSpotter();
+                        dupSequenceSpotters.set(owningBucketOrd, spotter);
+                    }
+                    collectFromSource(doc, owningBucketOrd, spotter);
+                    spotter.startNewSequence();
+                }
+
+                private void collectFromSource(int doc, long owningBucketOrd, DuplicateByteSequenceSpotter spotter) throws IOException {
+                    sourceLookup.setSegmentAndDocument(ctx, doc);
+                    BytesRefHash inDocTerms = new BytesRefHash(256, bigArrays);
+
+                    try {
+                        for (String sourceField : sourceFieldNames) {
+                            Iterator<String> itr = sourceLookup.extractRawValues(sourceField).stream()
+                                .map(obj -> {
+                                    if (obj == null) {
+                                        return null;
+                                    }
+                                    if (obj instanceof BytesRef) {
+                                        return fieldType.valueForDisplay(obj).toString();
+                                    }
+                                    return obj.toString();
+                                })
+                                .iterator();
+                            Analyzer analyzer = fieldType.indexAnalyzer();
+                            while (itr.hasNext()) {
+                                TokenStream ts = analyzer.tokenStream(fieldType.name(), itr.next());
+                                processTokenStream(doc, owningBucketOrd, ts, inDocTerms, spotter);
+                            }
+                        }
+                    } finally {
+                        Releasables.close(inDocTerms);
+                    }
+                }
 
+                private void processTokenStream(
+                    int doc,
+                    long owningBucketOrd,
+                    TokenStream ts,
+                    BytesRefHash inDocTerms,
+                    DuplicateByteSequenceSpotter spotter
+                ) throws IOException {
+                    long lastTrieSize = 0;
+                    if (spotter != null) {
+                        lastTrieSize = spotter.getEstimatedSizeInBytes();
+                        ts = new DeDuplicatingTokenFilter(ts, spotter);
+                    }
+                    CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
+                    ts.reset();
+                    try {
+                        while (ts.incrementToken()) {
+                            if (spotter != null) {
+                                long newTrieSize = spotter.getEstimatedSizeInBytes();
+                                long growth = newTrieSize - lastTrieSize;
+                                // Only update the circuitbreaker after
+                                if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
+                                    addRequestCircuitBreakerBytes.accept(growth);
+                                    lastTrieSize = newTrieSize;
+                                }
+                            }
+
+                            scratch.clear();
+                            scratch.copyChars(termAtt);
+                            BytesRef bytes = scratch.get();
+                            if (includeExclude != null && includeExclude.accept(bytes)) {
+                                continue;
+                            }
+                            if (inDocTerms.add(bytes) < 0) {
+                                continue;
+                            }
+                            consumer.accept(sub, doc, owningBucketOrd, bytes);
+                        }
+                    } finally {
+                        ts.close();
+                    }
+                    if (spotter != null) {
+                        long growth = spotter.getEstimatedSizeInBytes() - lastTrieSize;
+                        if (growth > 0) {
+                            addRequestCircuitBreakerBytes.accept(growth);
+                        }
+                    }
+                }
+            };
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(dupSequenceSpotters);
+        }
     }
 }

+ 2 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java

@@ -311,8 +311,8 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
                 return new MapStringTermsAggregator(
                     name,
                     factories,
-                    a -> a.new StandardTermsResults(),
-                    valuesSource,
+                    new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSource),
+                    a -> a.new StandardTermsResults(valuesSource),
                     order,
                     format,
                     bucketCountThresholds,

+ 0 - 11
server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorTests.java

@@ -40,7 +40,6 @@ import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.index.analysis.AnalyzerScope;
 import org.elasticsearch.index.analysis.NamedAnalyzer;
 import org.elasticsearch.index.mapper.BinaryFieldMapper;
-import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.index.mapper.NumberFieldMapper.NumberFieldType;
@@ -56,7 +55,6 @@ import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.search.aggregations.bucket.terms.SignificantTermsAggregatorFactory.ExecutionMode;
 import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
 import org.elasticsearch.search.aggregations.support.ValuesSourceType;
-import org.junit.Before;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -70,15 +68,6 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.signific
 import static org.hamcrest.Matchers.equalTo;
 
 public class SignificantTermsAggregatorTests extends AggregatorTestCase {
-
-    private MappedFieldType fieldType;
-
-    @Before
-    public void setUpTest() throws Exception {
-        super.setUp();
-        fieldType = new KeywordFieldMapper.KeywordFieldType("field");
-    }
-
     @Override
     protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) {
         return new SignificantTermsAggregationBuilder("foo").field(fieldName);

+ 44 - 5
server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregatorTests.java

@@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.bucket.terms;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
+import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.document.StoredField;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
@@ -29,6 +30,7 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
@@ -102,7 +104,7 @@ public class SignificantTextAggregatorTests extends AggregatorTestCase {
         indexWriterConfig.setMaxBufferedDocs(100);
         indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment
         try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
-            indexDocuments(w, textFieldType);
+            indexDocuments(w);
 
             SignificantTextAggregationBuilder sigAgg = new SignificantTextAggregationBuilder("sig_text", "text").filterDuplicateText(true);
             if(randomBoolean()){
@@ -150,7 +152,7 @@ public class SignificantTextAggregatorTests extends AggregatorTestCase {
         indexWriterConfig.setMaxBufferedDocs(100);
         indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment
         try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
-            indexDocuments(w, textFieldType);
+            indexDocuments(w);
 
             SignificantTextAggregationBuilder agg = significantText("sig_text", "text")
                 .filterDuplicateText(true);
@@ -193,14 +195,50 @@ public class SignificantTextAggregatorTests extends AggregatorTestCase {
         }
     }
 
-    private void indexDocuments(IndexWriter writer, TextFieldType textFieldType) throws IOException {
+    public void testInsideTermsAgg() throws IOException {
+        TextFieldType textFieldType = new TextFieldType("text");
+        textFieldType.setIndexAnalyzer(new NamedAnalyzer("my_analyzer", AnalyzerScope.GLOBAL, new StandardAnalyzer()));
+
+        IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
+        indexWriterConfig.setMaxBufferedDocs(100);
+        indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment
+        try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
+            indexDocuments(w);
+
+            SignificantTextAggregationBuilder sigAgg = new SignificantTextAggregationBuilder("sig_text", "text").filterDuplicateText(true);
+            TermsAggregationBuilder aggBuilder = new TermsAggregationBuilder("terms").field("kwd").subAggregation(sigAgg);
+
+            try (IndexReader reader = DirectoryReader.open(w)) {
+                assertEquals("test expects a single segment", 1, reader.leaves().size());
+                IndexSearcher searcher = new IndexSearcher(reader);
+
+                StringTerms terms = searchAndReduce(searcher, new MatchAllDocsQuery(), aggBuilder, textFieldType, keywordField("kwd"));
+                SignificantTerms sigOdd = terms.getBucketByKey("odd").getAggregations().get("sig_text");
+                assertNull(sigOdd.getBucketByKey("even"));
+                assertNull(sigOdd.getBucketByKey("duplicate"));
+                assertNull(sigOdd.getBucketByKey("common"));
+                assertNotNull(sigOdd.getBucketByKey("odd"));
+
+                SignificantStringTerms sigEven = terms.getBucketByKey("even").getAggregations().get("sig_text");
+                assertNull(sigEven.getBucketByKey("odd"));
+                assertNull(sigEven.getBucketByKey("duplicate"));
+                assertNull(sigEven.getBucketByKey("common"));
+                assertNull(sigEven.getBucketByKey("separator2"));
+                assertNull(sigEven.getBucketByKey("separator4"));
+                assertNull(sigEven.getBucketByKey("separator6"));
+                assertNotNull(sigEven.getBucketByKey("even"));
+            }
+        }
+    }
+
+    private void indexDocuments(IndexWriter writer) throws IOException {
         for (int i = 0; i < 10; i++) {
             Document doc = new Document();
             StringBuilder text = new StringBuilder("common ");
             if (i % 2 == 0) {
-                text.append("odd ");
-            } else {
                 text.append("even separator" + i + " duplicate duplicate duplicate duplicate duplicate duplicate ");
+            } else {
+                text.append("odd ");
             }
 
             doc.add(new Field("text", text.toString(), TextFieldMapper.Defaults.FIELD_TYPE));
@@ -208,6 +246,7 @@ public class SignificantTextAggregatorTests extends AggregatorTestCase {
                 " \"json_only_field\" : \"" + text.toString() + "\"" +
                 " }";
             doc.add(new StoredField("_source", new BytesRef(json)));
+            doc.add(new SortedSetDocValuesField("kwd", i % 2 == 0 ? new BytesRef("even") : new BytesRef("odd")));
             writer.addDocument(doc);
         }
     }