|
@@ -8,15 +8,17 @@
|
|
|
package org.elasticsearch.search.aggregations;
|
|
|
|
|
|
import org.apache.lucene.search.Collector;
|
|
|
+import org.apache.lucene.search.CollectorManager;
|
|
|
import org.elasticsearch.action.search.SearchShardTask;
|
|
|
import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher;
|
|
|
import org.elasticsearch.search.internal.SearchContext;
|
|
|
import org.elasticsearch.search.query.QueryPhase;
|
|
|
-import org.elasticsearch.search.query.SingleThreadCollectorManager;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.List;
|
|
|
+import java.util.function.Supplier;
|
|
|
|
|
|
/**
|
|
|
* Aggregation phase of a search request, used to collect aggregations
|
|
@@ -29,30 +31,50 @@ public class AggregationPhase {
|
|
|
if (context.aggregations() == null) {
|
|
|
return;
|
|
|
}
|
|
|
- BucketCollector bucketCollector;
|
|
|
+ final Supplier<Collector> collectorSupplier;
|
|
|
+ if (context.aggregations().isInSortOrderExecutionRequired()) {
|
|
|
+ executeInSortOrder(context, newBucketCollector(context));
|
|
|
+ collectorSupplier = () -> BucketCollector.NO_OP_COLLECTOR;
|
|
|
+ } else {
|
|
|
+ collectorSupplier = () -> newBucketCollector(context).asCollector();
|
|
|
+ }
|
|
|
+ context.aggregations().registerAggsCollectorManager(new CollectorManager<>() {
|
|
|
+ @Override
|
|
|
+ public Collector newCollector() {
|
|
|
+ return collectorSupplier.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Void reduce(Collection<Collector> collectors) {
|
|
|
+ // we cannot run post-collection method here because we need to do it after the optional timeout
|
|
|
+ // has been removed from the index searcher. Therefore, we delay this processing to the
|
|
|
+ // AggregationPhase#execute method.
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private static BucketCollector newBucketCollector(SearchContext context) {
|
|
|
try {
|
|
|
- context.aggregations().aggregators(context.aggregations().factories().createTopLevelAggregators());
|
|
|
- bucketCollector = MultiBucketCollector.wrap(true, List.of(context.aggregations().aggregators()));
|
|
|
+ Aggregator[] aggregators = context.aggregations().factories().createTopLevelAggregators();
|
|
|
+ context.aggregations().aggregators(aggregators);
|
|
|
+ BucketCollector bucketCollector = MultiBucketCollector.wrap(true, List.of(aggregators));
|
|
|
bucketCollector.preCollection();
|
|
|
+ return bucketCollector;
|
|
|
} catch (IOException e) {
|
|
|
throw new AggregationInitializationException("Could not initialize aggregators", e);
|
|
|
}
|
|
|
- final Collector collector;
|
|
|
- if (context.aggregations().factories().context() != null
|
|
|
- && context.aggregations().factories().context().isInSortOrderExecutionRequired()) {
|
|
|
- TimeSeriesIndexSearcher searcher = new TimeSeriesIndexSearcher(context.searcher(), getCancellationChecks(context));
|
|
|
- searcher.setMinimumScore(context.minimumScore());
|
|
|
- searcher.setProfiler(context);
|
|
|
- try {
|
|
|
- searcher.search(context.rewrittenQuery(), bucketCollector);
|
|
|
- } catch (IOException e) {
|
|
|
- throw new AggregationExecutionException("Could not perform time series aggregation", e);
|
|
|
- }
|
|
|
- collector = BucketCollector.NO_OP_COLLECTOR;
|
|
|
- } else {
|
|
|
- collector = bucketCollector.asCollector();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void executeInSortOrder(SearchContext context, BucketCollector collector) {
|
|
|
+ TimeSeriesIndexSearcher searcher = new TimeSeriesIndexSearcher(context.searcher(), getCancellationChecks(context));
|
|
|
+ searcher.setMinimumScore(context.minimumScore());
|
|
|
+ searcher.setProfiler(context);
|
|
|
+ try {
|
|
|
+ searcher.search(context.rewrittenQuery(), collector);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new AggregationExecutionException("Could not perform time series aggregation", e);
|
|
|
}
|
|
|
- context.aggregations().registerAggsCollectorManager(new SingleThreadCollectorManager(collector));
|
|
|
}
|
|
|
|
|
|
private static List<Runnable> getCancellationChecks(SearchContext context) {
|
|
@@ -86,20 +108,35 @@ public class AggregationPhase {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- Aggregator[] aggregators = context.aggregations().aggregators();
|
|
|
-
|
|
|
- List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
|
|
|
- for (Aggregator aggregator : context.aggregations().aggregators()) {
|
|
|
- try {
|
|
|
- aggregator.postCollection();
|
|
|
- aggregations.add(aggregator.buildTopLevel());
|
|
|
- } catch (IOException e) {
|
|
|
- throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
|
|
|
+ final List<InternalAggregations> internalAggregations = new ArrayList<>(context.aggregations().aggregators().size());
|
|
|
+ for (Aggregator[] aggregators : context.aggregations().aggregators()) {
|
|
|
+ final List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
|
|
|
+ for (Aggregator aggregator : aggregators) {
|
|
|
+ try {
|
|
|
+ aggregator.postCollection();
|
|
|
+ aggregations.add(aggregator.buildTopLevel());
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
|
|
|
+ }
|
|
|
+ // release the aggregator to claim the used bytes as we don't need it anymore
|
|
|
+ aggregator.releaseAggregations();
|
|
|
}
|
|
|
- // release the aggregator to claim the used bytes as we don't need it anymore
|
|
|
- aggregator.releaseAggregations();
|
|
|
+ internalAggregations.add(InternalAggregations.from(aggregations));
|
|
|
+ }
|
|
|
+
|
|
|
+ if (internalAggregations.size() > 1) {
|
|
|
+ // we execute this search using more than one slice. In order to keep memory requirements
|
|
|
+ // low, we do a partial reduction here.
|
|
|
+ context.queryResult()
|
|
|
+ .aggregations(
|
|
|
+ InternalAggregations.topLevelReduce(
|
|
|
+ internalAggregations,
|
|
|
+ context.aggregations().getAggregationReduceContextBuilder().forPartialReduction()
|
|
|
+ )
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ context.queryResult().aggregations(internalAggregations.get(0));
|
|
|
}
|
|
|
- context.queryResult().aggregations(InternalAggregations.from(aggregations));
|
|
|
|
|
|
// disable aggregations so that they don't run on next pages in case of scrolling
|
|
|
context.aggregations(null);
|