|
@@ -19,18 +19,11 @@
|
|
|
|
|
|
package org.elasticsearch.action.search;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.function.Function;
|
|
|
-import java.util.function.IntFunction;
|
|
|
-import java.util.function.Supplier;
|
|
|
-import java.util.stream.Collectors;
|
|
|
+import com.carrotsearch.hppc.IntArrayList;
|
|
|
+import com.carrotsearch.hppc.ObjectObjectHashMap;
|
|
|
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
import org.apache.lucene.index.Term;
|
|
|
import org.apache.lucene.search.CollectionStatistics;
|
|
|
import org.apache.lucene.search.FieldDoc;
|
|
@@ -44,6 +37,8 @@ import org.apache.lucene.search.TotalHits;
|
|
|
import org.apache.lucene.search.TotalHits.Relation;
|
|
|
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
|
|
|
import org.elasticsearch.common.collect.HppcMaps;
|
|
|
+import org.elasticsearch.common.io.stream.DelayableWriteable;
|
|
|
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|
|
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
|
|
|
import org.elasticsearch.search.DocValueFormat;
|
|
|
import org.elasticsearch.search.SearchHit;
|
|
@@ -67,16 +62,28 @@ import org.elasticsearch.search.suggest.Suggest;
|
|
|
import org.elasticsearch.search.suggest.Suggest.Suggestion;
|
|
|
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
|
|
|
|
|
|
-import com.carrotsearch.hppc.IntArrayList;
|
|
|
-import com.carrotsearch.hppc.ObjectObjectHashMap;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.function.Function;
|
|
|
+import java.util.function.IntFunction;
|
|
|
+import java.util.function.Supplier;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
public final class SearchPhaseController {
|
|
|
+ private static final Logger logger = LogManager.getLogger(SearchPhaseController.class);
|
|
|
private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
|
|
|
|
|
|
+ private final NamedWriteableRegistry namedWriteableRegistry;
|
|
|
private final Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
|
|
|
|
|
|
- public SearchPhaseController(
|
|
|
+ public SearchPhaseController(NamedWriteableRegistry namedWriteableRegistry,
|
|
|
Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder) {
|
|
|
+ this.namedWriteableRegistry = namedWriteableRegistry;
|
|
|
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
|
|
|
}
|
|
|
|
|
@@ -430,7 +437,8 @@ public final class SearchPhaseController {
|
|
|
* @see QuerySearchResult#consumeProfileResult()
|
|
|
*/
|
|
|
private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
|
|
|
- List<Supplier<InternalAggregations>> bufferedAggs, List<TopDocs> bufferedTopDocs,
|
|
|
+ List<Supplier<InternalAggregations>> bufferedAggs,
|
|
|
+ List<TopDocs> bufferedTopDocs,
|
|
|
TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest,
|
|
|
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
|
|
|
boolean performFinalReduce) {
|
|
@@ -522,7 +530,7 @@ public final class SearchPhaseController {
|
|
|
private InternalAggregations reduceAggs(
|
|
|
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
|
|
|
boolean performFinalReduce,
|
|
|
- List<Supplier<InternalAggregations>> aggregationsList
|
|
|
+ List<? extends Supplier<InternalAggregations>> aggregationsList
|
|
|
) {
|
|
|
/*
|
|
|
* Parse the aggregations, clearing the list as we go so bits backing
|
|
@@ -617,8 +625,9 @@ public final class SearchPhaseController {
|
|
|
* iff the buffer is exhausted.
|
|
|
*/
|
|
|
static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhaseResult> {
|
|
|
+ private final NamedWriteableRegistry namedWriteableRegistry;
|
|
|
private final SearchShardTarget[] processedShards;
|
|
|
- private final Supplier<InternalAggregations>[] aggsBuffer;
|
|
|
+ private final DelayableWriteable.Serialized<InternalAggregations>[] aggsBuffer;
|
|
|
private final TopDocs[] topDocsBuffer;
|
|
|
private final boolean hasAggs;
|
|
|
private final boolean hasTopDocs;
|
|
@@ -631,6 +640,8 @@ public final class SearchPhaseController {
|
|
|
private final int topNSize;
|
|
|
private final InternalAggregation.ReduceContextBuilder aggReduceContextBuilder;
|
|
|
private final boolean performFinalReduce;
|
|
|
+ private long aggsCurrentBufferSize;
|
|
|
+ private long aggsMaxBufferSize;
|
|
|
|
|
|
/**
|
|
|
* Creates a new {@link QueryPhaseResultConsumer}
|
|
@@ -641,12 +652,14 @@ public final class SearchPhaseController {
|
|
|
* @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results
|
|
|
* the buffer is used to incrementally reduce aggregation results before all shards responded.
|
|
|
*/
|
|
|
- private QueryPhaseResultConsumer(SearchProgressListener progressListener, SearchPhaseController controller,
|
|
|
+ private QueryPhaseResultConsumer(NamedWriteableRegistry namedWriteableRegistry, SearchProgressListener progressListener,
|
|
|
+ SearchPhaseController controller,
|
|
|
int expectedResultSize, int bufferSize, boolean hasTopDocs, boolean hasAggs,
|
|
|
int trackTotalHitsUpTo, int topNSize,
|
|
|
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
|
|
|
boolean performFinalReduce) {
|
|
|
super(expectedResultSize);
|
|
|
+ this.namedWriteableRegistry = namedWriteableRegistry;
|
|
|
if (expectedResultSize != 1 && bufferSize < 2) {
|
|
|
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
|
|
|
}
|
|
@@ -661,7 +674,7 @@ public final class SearchPhaseController {
|
|
|
this.processedShards = new SearchShardTarget[expectedResultSize];
|
|
|
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- Supplier<InternalAggregations>[] aggsBuffer = new Supplier[hasAggs ? bufferSize : 0];
|
|
|
+ DelayableWriteable.Serialized<InternalAggregations>[] aggsBuffer = new DelayableWriteable.Serialized[hasAggs ? bufferSize : 0];
|
|
|
this.aggsBuffer = aggsBuffer;
|
|
|
this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
|
|
|
this.hasTopDocs = hasTopDocs;
|
|
@@ -684,15 +697,21 @@ public final class SearchPhaseController {
|
|
|
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
|
|
|
if (querySearchResult.isNull() == false) {
|
|
|
if (index == bufferSize) {
|
|
|
+ InternalAggregations reducedAggs = null;
|
|
|
if (hasAggs) {
|
|
|
List<InternalAggregations> aggs = new ArrayList<>(aggsBuffer.length);
|
|
|
for (int i = 0; i < aggsBuffer.length; i++) {
|
|
|
aggs.add(aggsBuffer[i].get());
|
|
|
aggsBuffer[i] = null; // null the buffer so it can be GCed now.
|
|
|
}
|
|
|
- InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(
|
|
|
- aggs, aggReduceContextBuilder.forPartialReduction());
|
|
|
- aggsBuffer[0] = () -> reducedAggs;
|
|
|
+ reducedAggs = InternalAggregations.topLevelReduce(aggs, aggReduceContextBuilder.forPartialReduction());
|
|
|
+ aggsBuffer[0] = DelayableWriteable.referencing(reducedAggs)
|
|
|
+ .asSerialized(InternalAggregations::new, namedWriteableRegistry);
|
|
|
+ long previousBufferSize = aggsCurrentBufferSize;
|
|
|
+ aggsMaxBufferSize = Math.max(aggsMaxBufferSize, aggsCurrentBufferSize);
|
|
|
+ aggsCurrentBufferSize = aggsBuffer[0].ramBytesUsed();
|
|
|
+ logger.trace("aggs partial reduction [{}->{}] max [{}]",
|
|
|
+ previousBufferSize, aggsCurrentBufferSize, aggsMaxBufferSize);
|
|
|
}
|
|
|
if (hasTopDocs) {
|
|
|
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
|
|
@@ -705,12 +724,13 @@ public final class SearchPhaseController {
|
|
|
index = 1;
|
|
|
if (hasAggs || hasTopDocs) {
|
|
|
progressListener.notifyPartialReduce(SearchProgressListener.buildSearchShards(processedShards),
|
|
|
- topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0].get() : null, numReducePhases);
|
|
|
+ topDocsStats.getTotalHits(), reducedAggs, numReducePhases);
|
|
|
}
|
|
|
}
|
|
|
final int i = index++;
|
|
|
if (hasAggs) {
|
|
|
- aggsBuffer[i] = querySearchResult.consumeAggs();
|
|
|
+ aggsBuffer[i] = querySearchResult.consumeAggs().asSerialized(InternalAggregations::new, namedWriteableRegistry);
|
|
|
+ aggsCurrentBufferSize += aggsBuffer[i].ramBytesUsed();
|
|
|
}
|
|
|
if (hasTopDocs) {
|
|
|
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
|
|
@@ -723,7 +743,7 @@ public final class SearchPhaseController {
|
|
|
}
|
|
|
|
|
|
private synchronized List<Supplier<InternalAggregations>> getRemainingAggs() {
|
|
|
- return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null;
|
|
|
+ return hasAggs ? Arrays.asList((Supplier<InternalAggregations>[]) aggsBuffer).subList(0, index) : null;
|
|
|
}
|
|
|
|
|
|
private synchronized List<TopDocs> getRemainingTopDocs() {
|
|
@@ -732,6 +752,8 @@ public final class SearchPhaseController {
|
|
|
|
|
|
@Override
|
|
|
public ReducedQueryPhase reduce() {
|
|
|
+ aggsMaxBufferSize = Math.max(aggsMaxBufferSize, aggsCurrentBufferSize);
|
|
|
+ logger.trace("aggs final reduction [{}] max [{}]", aggsCurrentBufferSize, aggsMaxBufferSize);
|
|
|
ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(),
|
|
|
topDocsStats, numReducePhases, false, aggReduceContextBuilder, performFinalReduce);
|
|
|
progressListener.notifyFinalReduce(SearchProgressListener.buildSearchShards(results.asList()),
|
|
@@ -766,8 +788,8 @@ public final class SearchPhaseController {
|
|
|
if (request.getBatchedReduceSize() < numShards) {
|
|
|
int topNSize = getTopDocsSize(request);
|
|
|
// only use this if there are aggs and if there are more shards than we should reduce at once
|
|
|
- return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
|
|
|
- trackTotalHitsUpTo, topNSize, aggReduceContextBuilder, request.isFinalReduce());
|
|
|
+ return new QueryPhaseResultConsumer(namedWriteableRegistry, listener, this, numShards, request.getBatchedReduceSize(),
|
|
|
+ hasTopDocs, hasAggs, trackTotalHitsUpTo, topNSize, aggReduceContextBuilder, request.isFinalReduce());
|
|
|
}
|
|
|
}
|
|
|
return new ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
|