|
@@ -16,12 +16,18 @@ import org.apache.lucene.search.BooleanClause;
|
|
|
import org.apache.lucene.search.BooleanQuery;
|
|
|
import org.apache.lucene.search.Collector;
|
|
|
import org.apache.lucene.search.FieldDoc;
|
|
|
+import org.apache.lucene.search.MultiCollector;
|
|
|
import org.apache.lucene.search.Query;
|
|
|
import org.apache.lucene.search.ScoreDoc;
|
|
|
+import org.apache.lucene.search.ScoreMode;
|
|
|
+import org.apache.lucene.search.SimpleCollector;
|
|
|
import org.apache.lucene.search.Sort;
|
|
|
import org.apache.lucene.search.TopDocs;
|
|
|
import org.apache.lucene.search.TotalHits;
|
|
|
+import org.apache.lucene.search.Weight;
|
|
|
import org.elasticsearch.common.lucene.Lucene;
|
|
|
+import org.elasticsearch.common.lucene.MinimumScoreCollector;
|
|
|
+import org.elasticsearch.common.lucene.search.FilteredCollector;
|
|
|
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
|
|
|
import org.elasticsearch.common.util.concurrent.EWMATrackingEsThreadPoolExecutor;
|
|
|
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
|
@@ -33,6 +39,7 @@ import org.elasticsearch.search.aggregations.AggregationPhase;
|
|
|
import org.elasticsearch.search.internal.ContextIndexSearcher;
|
|
|
import org.elasticsearch.search.internal.ScrollContext;
|
|
|
import org.elasticsearch.search.internal.SearchContext;
|
|
|
+import org.elasticsearch.search.profile.Profilers;
|
|
|
import org.elasticsearch.search.profile.query.InternalProfileCollector;
|
|
|
import org.elasticsearch.search.rescore.RescorePhase;
|
|
|
import org.elasticsearch.search.sort.SortAndFormats;
|
|
@@ -40,13 +47,12 @@ import org.elasticsearch.search.suggest.SuggestPhase;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.util.LinkedList;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
|
|
-import static org.elasticsearch.search.query.QueryCollectorContext.createAggsCollectorContext;
|
|
|
-import static org.elasticsearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
|
|
|
-import static org.elasticsearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
|
|
|
-import static org.elasticsearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
|
|
|
+import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MIN_SCORE;
|
|
|
+import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MULTI;
|
|
|
+import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_POST_FILTER;
|
|
|
+import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT;
|
|
|
import static org.elasticsearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;
|
|
|
|
|
|
/**
|
|
@@ -125,30 +131,65 @@ public class QueryPhase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- final LinkedList<QueryCollectorContext> collectors = new LinkedList<>();
|
|
|
- // whether the chain contains a collector that filters documents
|
|
|
- boolean hasFilterCollector = false;
|
|
|
+ // create the top docs collector last when the other collectors are known
|
|
|
+ final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(
|
|
|
+ searchContext,
|
|
|
+ searchContext.parsedPostFilter() != null || searchContext.minimumScore() != null
|
|
|
+ );
|
|
|
+
|
|
|
+ Collector collector = wrapWithProfilerCollectorIfNeeded(
|
|
|
+ searchContext.getProfilers(),
|
|
|
+ topDocsFactory.collector(),
|
|
|
+ topDocsFactory.profilerName
|
|
|
+ );
|
|
|
+
|
|
|
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
|
|
|
// add terminate_after before the filter collectors
|
|
|
// it will only be applied on documents accepted by these filter collectors
|
|
|
- collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));
|
|
|
+ EarlyTerminatingCollector earlyTerminatingCollector = new EarlyTerminatingCollector(
|
|
|
+ EMPTY_COLLECTOR,
|
|
|
+ searchContext.terminateAfter(),
|
|
|
+ true
|
|
|
+ );
|
|
|
+ collector = wrapWithProfilerCollectorIfNeeded(
|
|
|
+ searchContext.getProfilers(),
|
|
|
+ MultiCollector.wrap(earlyTerminatingCollector, collector),
|
|
|
+ REASON_SEARCH_TERMINATE_AFTER_COUNT,
|
|
|
+ collector
|
|
|
+ );
|
|
|
}
|
|
|
if (searchContext.parsedPostFilter() != null) {
|
|
|
// add post filters before aggregations
|
|
|
// it will only be applied to top hits
|
|
|
- collectors.add(createFilteredCollectorContext(searcher, searchContext.parsedPostFilter().query()));
|
|
|
- // this collector can filter documents during the collection
|
|
|
- hasFilterCollector = true;
|
|
|
+ final Weight filterWeight = searcher.createWeight(
|
|
|
+ searcher.rewrite(searchContext.parsedPostFilter().query()),
|
|
|
+ ScoreMode.COMPLETE_NO_SCORES,
|
|
|
+ 1f
|
|
|
+ );
|
|
|
+ collector = wrapWithProfilerCollectorIfNeeded(
|
|
|
+ searchContext.getProfilers(),
|
|
|
+ new FilteredCollector(collector, filterWeight),
|
|
|
+ REASON_SEARCH_POST_FILTER,
|
|
|
+ collector
|
|
|
+ );
|
|
|
}
|
|
|
if (searchContext.getAggsCollector() != null) {
|
|
|
- // plug in additional collectors, like aggregations
|
|
|
- collectors.add(createAggsCollectorContext(searchContext.getAggsCollector()));
|
|
|
+ collector = wrapWithProfilerCollectorIfNeeded(
|
|
|
+ searchContext.getProfilers(),
|
|
|
+ MultiCollector.wrap(collector, searchContext.getAggsCollector()),
|
|
|
+ REASON_SEARCH_MULTI,
|
|
|
+ collector,
|
|
|
+ searchContext.getAggsCollector()
|
|
|
+ );
|
|
|
}
|
|
|
if (searchContext.minimumScore() != null) {
|
|
|
// apply the minimum score after multi collector so we filter aggs as well
|
|
|
- collectors.add(createMinScoreCollectorContext(searchContext.minimumScore()));
|
|
|
- // this collector can filter documents during the collection
|
|
|
- hasFilterCollector = true;
|
|
|
+ collector = wrapWithProfilerCollectorIfNeeded(
|
|
|
+ searchContext.getProfilers(),
|
|
|
+ new MinimumScoreCollector(collector, searchContext.minimumScore()),
|
|
|
+ REASON_SEARCH_MIN_SCORE,
|
|
|
+ collector
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
boolean timeoutSet = scrollContext == null
|
|
@@ -171,7 +212,8 @@ public class QueryPhase {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);
|
|
|
+ searchWithCollector(searchContext, searcher, query, collector, timeoutSet);
|
|
|
+ queryResult.topDocs(topDocsFactory.topDocsAndMaxScore(), topDocsFactory.sortValueFormats);
|
|
|
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
|
|
|
assert executor instanceof EWMATrackingEsThreadPoolExecutor
|
|
|
|| (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */)
|
|
@@ -192,30 +234,31 @@ public class QueryPhase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static Collector wrapWithProfilerCollectorIfNeeded(
|
|
|
+ Profilers profilers,
|
|
|
+ Collector collector,
|
|
|
+ String profilerName,
|
|
|
+ Collector... children
|
|
|
+ ) {
|
|
|
+ if (profilers == null) {
|
|
|
+ return collector;
|
|
|
+ }
|
|
|
+ return new InternalProfileCollector(collector, profilerName, children);
|
|
|
+ }
|
|
|
+
|
|
|
private static void searchWithCollector(
|
|
|
SearchContext searchContext,
|
|
|
ContextIndexSearcher searcher,
|
|
|
Query query,
|
|
|
- LinkedList<QueryCollectorContext> collectors,
|
|
|
- boolean hasFilterCollector,
|
|
|
+ Collector collector,
|
|
|
boolean timeoutSet
|
|
|
) throws IOException {
|
|
|
- // create the top docs collector last when the other collectors are known
|
|
|
- final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
|
|
|
- // add the top docs collector, the first collector context in the chain
|
|
|
- collectors.addFirst(topDocsFactory);
|
|
|
-
|
|
|
- final Collector queryCollector;
|
|
|
if (searchContext.getProfilers() != null) {
|
|
|
- InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
|
|
|
- searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
|
|
|
- queryCollector = profileCollector;
|
|
|
- } else {
|
|
|
- queryCollector = QueryCollectorContext.createQueryCollector(collectors);
|
|
|
+ searchContext.getProfilers().getCurrentQueryProfiler().setCollector((InternalProfileCollector) collector);
|
|
|
}
|
|
|
QuerySearchResult queryResult = searchContext.queryResult();
|
|
|
try {
|
|
|
- searcher.search(query, queryCollector);
|
|
|
+ searcher.search(query, collector);
|
|
|
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
|
|
|
queryResult.terminatedEarly(true);
|
|
|
} catch (TimeExceededException e) {
|
|
@@ -229,9 +272,6 @@ public class QueryPhase {
|
|
|
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
|
|
|
queryResult.terminatedEarly(false);
|
|
|
}
|
|
|
- for (QueryCollectorContext ctx : collectors) {
|
|
|
- ctx.postProcess(queryResult);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -253,4 +293,14 @@ public class QueryPhase {
|
|
|
}
|
|
|
|
|
|
public static class TimeExceededException extends RuntimeException {}
|
|
|
+
|
|
|
+ private static final Collector EMPTY_COLLECTOR = new SimpleCollector() {
|
|
|
+ @Override
|
|
|
+ public void collect(int doc) {}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ScoreMode scoreMode() {
|
|
|
+ return ScoreMode.COMPLETE_NO_SCORES;
|
|
|
+ }
|
|
|
+ };
|
|
|
}
|