|
@@ -36,14 +36,18 @@ import org.elasticsearch.search.aggregations.NonCollectingAggregator;
|
|
|
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
|
|
|
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.StringFilter;
|
|
|
import org.elasticsearch.search.aggregations.bucket.terms.MapStringTermsAggregator.CollectConsumer;
|
|
|
+import org.elasticsearch.search.aggregations.bucket.terms.MapStringTermsAggregator.CollectorSource;
|
|
|
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
|
|
|
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
|
|
|
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
|
|
import org.elasticsearch.search.lookup.SourceLookup;
|
|
|
+import org.elasticsearch.search.profile.Timer;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.function.BiConsumer;
|
|
|
import java.util.function.LongConsumer;
|
|
|
|
|
|
public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
@@ -66,7 +70,7 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
AggregatorFactory parent,
|
|
|
AggregatorFactories.Builder subFactoriesBuilder,
|
|
|
String fieldName,
|
|
|
- String [] sourceFieldNames,
|
|
|
+ String[] sourceFieldNames,
|
|
|
boolean filterDuplicateText,
|
|
|
Map<String, Object> metadata) throws IOException {
|
|
|
super(name, context, parent, subFactoriesBuilder, metadata);
|
|
@@ -76,7 +80,7 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
if (supportsAgg(fieldType) == false) {
|
|
|
throw new IllegalArgumentException("Field [" + fieldType.name() + "] has no analyzer, but SignificantText " +
|
|
|
"requires an analyzed field");
|
|
|
- }
|
|
|
+ }
|
|
|
String indexedFieldName = fieldType.name();
|
|
|
this.sourceFieldNames = sourceFieldNames == null ? new String[] {indexedFieldName} : sourceFieldNames;
|
|
|
} else {
|
|
@@ -89,7 +93,7 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
this.bucketCountThresholds = bucketCountThresholds;
|
|
|
this.significanceHeuristic = significanceHeuristic;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
protected Aggregator createUnmapped(Aggregator parent, Map<String, Object> metadata) throws IOException {
|
|
|
final InternalAggregation aggregation = new UnmappedSignificantTerms(name, bucketCountThresholds.getRequiredSize(),
|
|
|
bucketCountThresholds.getMinDocCount(), metadata);
|
|
@@ -99,7 +103,7 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
return aggregation;
|
|
|
}
|
|
|
};
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
private static boolean supportsAgg(MappedFieldType ft) {
|
|
|
return ft.getTextSearchInfo() != TextSearchInfo.NONE
|
|
@@ -109,11 +113,11 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
@Override
|
|
|
protected Aggregator createInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata)
|
|
|
throws IOException {
|
|
|
-
|
|
|
+
|
|
|
if (fieldType == null) {
|
|
|
return createUnmapped(parent, metadata);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
|
|
|
if (bucketCountThresholds.getShardSize() == SignificantTextAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
|
|
|
// The user has not made a shardSize selection.
|
|
@@ -133,21 +137,12 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
IncludeExclude.StringFilter incExcFilter = includeExclude == null ? null:
|
|
|
includeExclude.convertToStringFilter(DocValueFormat.RAW);
|
|
|
|
|
|
- MapStringTermsAggregator.CollectorSource collectorSource = new SignificantTextCollectorSource(
|
|
|
- context.lookup().source(),
|
|
|
- context.bigArrays(),
|
|
|
- fieldType,
|
|
|
- context.getIndexAnalyzer(f -> {
|
|
|
- throw new IllegalArgumentException("No analyzer configured for field " + f);
|
|
|
- }),
|
|
|
- sourceFieldNames,
|
|
|
- filterDuplicateText
|
|
|
- );
|
|
|
+
|
|
|
SignificanceLookup lookup = new SignificanceLookup(context, fieldType, DocValueFormat.RAW, backgroundFilter);
|
|
|
return new MapStringTermsAggregator(
|
|
|
name,
|
|
|
factories,
|
|
|
- collectorSource,
|
|
|
+ createCollectorSource(),
|
|
|
a -> a.new SignificantTermsResults(lookup, significanceHeuristic, cardinality),
|
|
|
null,
|
|
|
DocValueFormat.RAW,
|
|
@@ -162,12 +157,58 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Create the {@link CollectorSource}, gathering some timing information
|
|
|
+ * if we're profiling.
|
|
|
+ * <p>
|
|
|
+ * When profiling aggregations {@link LeafBucketCollector#collect(int, long)} method
|
|
|
+ * out of the box but our implementation of that method does three things that is
|
|
|
+ * useful to get timing for:
|
|
|
+ * <ul>
|
|
|
+ * <li>Fetch field values from {@code _source}
|
|
|
+ * <li>Analyze the field
|
|
|
+ * <li>Do all the normal {@code terms} agg stuff with its terms
|
|
|
+ * </ul>
|
|
|
+ * <p>
|
|
|
+ * The most convenient way to measure all of these is to time the fetch and all
|
|
|
+ * the normal {@code terms} agg stuff. You can then subtract those timings from
|
|
|
+ * the overall collect time to get the analyze time. You can also get the total
|
|
|
+ * number of terms that we analyzed by looking at the invocation count on the
|
|
|
+ * {@code terms} agg stuff.
|
|
|
+ * <p>
|
|
|
+ * While we're at it we count the number of values we fetch from source.
|
|
|
+ */
|
|
|
+ private CollectorSource createCollectorSource() {
|
|
|
+ Analyzer analyzer = context.getIndexAnalyzer(f -> {
|
|
|
+ throw new IllegalArgumentException("No analyzer configured for field " + f);
|
|
|
+ });
|
|
|
+ if (context.profiling()) {
|
|
|
+ return new ProfilingSignificantTextCollectorSource(
|
|
|
+ context.lookup().source(),
|
|
|
+ context.bigArrays(),
|
|
|
+ fieldType,
|
|
|
+ analyzer,
|
|
|
+ sourceFieldNames,
|
|
|
+ filterDuplicateText
|
|
|
+ );
|
|
|
+ }
|
|
|
+ return new SignificantTextCollectorSource(
|
|
|
+ context.lookup().source(),
|
|
|
+ context.bigArrays(),
|
|
|
+ fieldType,
|
|
|
+ analyzer,
|
|
|
+ sourceFieldNames,
|
|
|
+ filterDuplicateText
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
private static class SignificantTextCollectorSource implements MapStringTermsAggregator.CollectorSource {
|
|
|
private final SourceLookup sourceLookup;
|
|
|
private final BigArrays bigArrays;
|
|
|
private final MappedFieldType fieldType;
|
|
|
private final Analyzer analyzer;
|
|
|
private final String[] sourceFieldNames;
|
|
|
+ private final BytesRefBuilder scratch = new BytesRefBuilder();
|
|
|
private ObjectArray<DuplicateByteSequenceSpotter> dupSequenceSpotters;
|
|
|
|
|
|
SignificantTextCollectorSource(
|
|
@@ -186,6 +227,15 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
dupSequenceSpotters = filterDuplicateText ? bigArrays.newObjectArray(1) : null;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public String describe() {
|
|
|
+ return "analyze " + fieldType.name() + " from _source";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void collectDebugInfo(BiConsumer<String, Object> add) {
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public boolean needsScores() {
|
|
|
return false;
|
|
@@ -200,8 +250,6 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
CollectConsumer consumer
|
|
|
) throws IOException {
|
|
|
return new LeafBucketCollectorBase(sub, null) {
|
|
|
- private final BytesRefBuilder scratch = new BytesRefBuilder();
|
|
|
-
|
|
|
@Override
|
|
|
public void collect(int doc, long owningBucketOrd) throws IOException {
|
|
|
if (dupSequenceSpotters == null) {
|
|
@@ -224,7 +272,7 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
|
|
|
try {
|
|
|
for (String sourceField : sourceFieldNames) {
|
|
|
- Iterator<String> itr = sourceLookup.extractRawValues(sourceField).stream()
|
|
|
+ Iterator<String> itr = extractRawValues(sourceField).stream()
|
|
|
.map(obj -> {
|
|
|
if (obj == null) {
|
|
|
return null;
|
|
@@ -236,63 +284,87 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
})
|
|
|
.iterator();
|
|
|
while (itr.hasNext()) {
|
|
|
- TokenStream ts = analyzer.tokenStream(fieldType.name(), itr.next());
|
|
|
- processTokenStream(doc, owningBucketOrd, ts, inDocTerms, spotter);
|
|
|
+ String text = itr.next();
|
|
|
+ TokenStream ts = analyzer.tokenStream(fieldType.name(), text);
|
|
|
+ processTokenStream(
|
|
|
+ includeExclude,
|
|
|
+ doc,
|
|
|
+ owningBucketOrd,
|
|
|
+ text,
|
|
|
+ ts,
|
|
|
+ inDocTerms,
|
|
|
+ spotter,
|
|
|
+ addRequestCircuitBreakerBytes,
|
|
|
+ sub,
|
|
|
+ consumer
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
Releasables.close(inDocTerms);
|
|
|
}
|
|
|
}
|
|
|
+ };
|
|
|
+ }
|
|
|
|
|
|
- private void processTokenStream(
|
|
|
- int doc,
|
|
|
- long owningBucketOrd,
|
|
|
- TokenStream ts,
|
|
|
- BytesRefHash inDocTerms,
|
|
|
- DuplicateByteSequenceSpotter spotter
|
|
|
- ) throws IOException {
|
|
|
- long lastTrieSize = 0;
|
|
|
+ protected void processTokenStream(
|
|
|
+ StringFilter includeExclude,
|
|
|
+ int doc,
|
|
|
+ long owningBucketOrd,
|
|
|
+ String text,
|
|
|
+ TokenStream ts,
|
|
|
+ BytesRefHash inDocTerms,
|
|
|
+ DuplicateByteSequenceSpotter spotter,
|
|
|
+ LongConsumer addRequestCircuitBreakerBytes,
|
|
|
+ LeafBucketCollector sub,
|
|
|
+ CollectConsumer consumer
|
|
|
+ ) throws IOException {
|
|
|
+ long lastTrieSize = 0;
|
|
|
+ if (spotter != null) {
|
|
|
+ lastTrieSize = spotter.getEstimatedSizeInBytes();
|
|
|
+ ts = new DeDuplicatingTokenFilter(ts, spotter);
|
|
|
+ }
|
|
|
+ CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
|
|
|
+ ts.reset();
|
|
|
+ try {
|
|
|
+ while (ts.incrementToken()) {
|
|
|
if (spotter != null) {
|
|
|
- lastTrieSize = spotter.getEstimatedSizeInBytes();
|
|
|
- ts = new DeDuplicatingTokenFilter(ts, spotter);
|
|
|
+ long newTrieSize = spotter.getEstimatedSizeInBytes();
|
|
|
+ long growth = newTrieSize - lastTrieSize;
|
|
|
+ // Only update the circuitbreaker after
|
|
|
+ if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
|
|
|
+ addRequestCircuitBreakerBytes.accept(growth);
|
|
|
+ lastTrieSize = newTrieSize;
|
|
|
+ }
|
|
|
}
|
|
|
- CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
|
|
|
- ts.reset();
|
|
|
- try {
|
|
|
- while (ts.incrementToken()) {
|
|
|
- if (spotter != null) {
|
|
|
- long newTrieSize = spotter.getEstimatedSizeInBytes();
|
|
|
- long growth = newTrieSize - lastTrieSize;
|
|
|
- // Only update the circuitbreaker after
|
|
|
- if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
|
|
|
- addRequestCircuitBreakerBytes.accept(growth);
|
|
|
- lastTrieSize = newTrieSize;
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- scratch.clear();
|
|
|
- scratch.copyChars(termAtt);
|
|
|
- BytesRef bytes = scratch.get();
|
|
|
- if (includeExclude != null && false == includeExclude.accept(bytes)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (inDocTerms.add(bytes) < 0) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- consumer.accept(sub, doc, owningBucketOrd, bytes);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- ts.close();
|
|
|
+ scratch.clear();
|
|
|
+ scratch.copyChars(termAtt);
|
|
|
+ BytesRef bytes = scratch.get();
|
|
|
+ if (includeExclude != null && false == includeExclude.accept(bytes)) {
|
|
|
+ continue;
|
|
|
}
|
|
|
- if (spotter != null) {
|
|
|
- long growth = spotter.getEstimatedSizeInBytes() - lastTrieSize;
|
|
|
- if (growth > 0) {
|
|
|
- addRequestCircuitBreakerBytes.accept(growth);
|
|
|
- }
|
|
|
+ if (inDocTerms.add(bytes) < 0) {
|
|
|
+ continue;
|
|
|
}
|
|
|
+ consumer.accept(sub, doc, owningBucketOrd, bytes);
|
|
|
}
|
|
|
- };
|
|
|
+ } finally {
|
|
|
+ ts.close();
|
|
|
+ }
|
|
|
+ if (spotter != null) {
|
|
|
+ long growth = spotter.getEstimatedSizeInBytes() - lastTrieSize;
|
|
|
+ if (growth > 0) {
|
|
|
+ addRequestCircuitBreakerBytes.accept(growth);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Extract values from {@code _source}.
|
|
|
+ */
|
|
|
+ protected List<Object> extractRawValues(String field) {
|
|
|
+ return sourceLookup.extractRawValues(field);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -300,4 +372,79 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory {
|
|
|
Releasables.close(dupSequenceSpotters);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private static class ProfilingSignificantTextCollectorSource extends SignificantTextCollectorSource {
|
|
|
+ private final Timer extract = new Timer();
|
|
|
+ private final Timer collectAnalyzed = new Timer();
|
|
|
+ private long valuesFetched;
|
|
|
+ private long charsFetched;
|
|
|
+
|
|
|
+ private ProfilingSignificantTextCollectorSource(
|
|
|
+ SourceLookup sourceLookup,
|
|
|
+ BigArrays bigArrays,
|
|
|
+ MappedFieldType fieldType,
|
|
|
+ Analyzer analyzer,
|
|
|
+ String[] sourceFieldNames,
|
|
|
+ boolean filterDuplicateText
|
|
|
+ ) {
|
|
|
+ super(sourceLookup, bigArrays, fieldType, analyzer, sourceFieldNames, filterDuplicateText);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void processTokenStream(
|
|
|
+ StringFilter includeExclude,
|
|
|
+ int doc,
|
|
|
+ long owningBucketOrd,
|
|
|
+ String text,
|
|
|
+ TokenStream ts,
|
|
|
+ BytesRefHash inDocTerms,
|
|
|
+ DuplicateByteSequenceSpotter spotter,
|
|
|
+ LongConsumer addRequestCircuitBreakerBytes,
|
|
|
+ LeafBucketCollector sub,
|
|
|
+ CollectConsumer consumer
|
|
|
+ ) throws IOException {
|
|
|
+ valuesFetched++;
|
|
|
+ charsFetched += text.length();
|
|
|
+ super.processTokenStream(
|
|
|
+ includeExclude,
|
|
|
+ doc,
|
|
|
+ owningBucketOrd,
|
|
|
+ text,
|
|
|
+ ts,
|
|
|
+ inDocTerms,
|
|
|
+ spotter,
|
|
|
+ addRequestCircuitBreakerBytes,
|
|
|
+ sub,
|
|
|
+ (subCollector, d, o, bytes) -> {
|
|
|
+ collectAnalyzed.start();
|
|
|
+ try {
|
|
|
+ consumer.accept(subCollector, d, o, bytes);
|
|
|
+ } finally {
|
|
|
+ collectAnalyzed.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected List<Object> extractRawValues(String field) {
|
|
|
+ extract.start();
|
|
|
+ try {
|
|
|
+ return super.extractRawValues(field);
|
|
|
+ } finally {
|
|
|
+ extract.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void collectDebugInfo(BiConsumer<String, Object> add) {
|
|
|
+ super.collectDebugInfo(add);
|
|
|
+ add.accept("extract_ns", extract.getApproximateTiming());
|
|
|
+ add.accept("extract_count", extract.getCount());
|
|
|
+ add.accept("collect_analyzed_ns", collectAnalyzed.getApproximateTiming());
|
|
|
+ add.accept("collect_analyzed_count", collectAnalyzed.getCount());
|
|
|
+ add.accept("values_fetched", valuesFetched);
|
|
|
+ add.accept("chars_fetched", charsFetched);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|