|
@@ -42,7 +42,6 @@ import org.elasticsearch.search.SearchPhaseResult;
|
|
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
|
|
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
|
|
import org.elasticsearch.search.aggregations.InternalAggregations;
|
|
|
-import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
|
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
import org.elasticsearch.search.dfs.AggregatedDfs;
|
|
|
import org.elasticsearch.search.dfs.DfsSearchResult;
|
|
@@ -65,8 +64,6 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.function.IntFunction;
|
|
|
-import java.util.stream.Collectors;
|
|
|
-import java.util.stream.StreamSupport;
|
|
|
|
|
|
public final class SearchPhaseController {
|
|
|
|
|
@@ -488,8 +485,8 @@ public final class SearchPhaseController {
|
|
|
reducedCompletionSuggestions = reducedSuggest.filter(CompletionSuggestion.class);
|
|
|
}
|
|
|
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
|
|
|
- final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
|
|
|
- firstResult.pipelineAggregators(), reduceContext);
|
|
|
+ final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
|
|
|
+ InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
|
|
|
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
|
|
|
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
|
|
|
reducedCompletionSuggestions);
|
|
@@ -499,32 +496,6 @@ public final class SearchPhaseController {
|
|
|
firstResult.sortValueFormats(), numReducePhases, size, from, false);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
|
|
|
- * that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
|
|
|
- */
|
|
|
- private InternalAggregations reduceAggsIncrementally(List<InternalAggregations> aggregationsList) {
|
|
|
- ReduceContext reduceContext = reduceContextFunction.apply(false);
|
|
|
- return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
|
|
|
- null, reduceContext);
|
|
|
- }
|
|
|
-
|
|
|
- private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
|
|
|
- List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
|
|
|
- InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
|
|
|
- if (pipelineAggregators != null) {
|
|
|
- List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
|
|
|
- .map((p) -> (InternalAggregation) p)
|
|
|
- .collect(Collectors.toList());
|
|
|
- for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
|
|
|
- InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
|
|
|
- newAggs.add(newAgg);
|
|
|
- }
|
|
|
- return new InternalAggregations(newAggs);
|
|
|
- }
|
|
|
- return aggregations;
|
|
|
- }
|
|
|
-
|
|
|
public static final class ReducedQueryPhase {
|
|
|
// the sum of all hits across all reduces shards
|
|
|
final TotalHits totalHits;
|
|
@@ -644,7 +615,8 @@ public final class SearchPhaseController {
|
|
|
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
|
|
|
if (index == bufferSize) {
|
|
|
if (hasAggs) {
|
|
|
- InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer));
|
|
|
+ ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
|
|
|
+ InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
|
|
|
Arrays.fill(aggsBuffer, null);
|
|
|
aggsBuffer[0] = reducedAggs;
|
|
|
}
|