瀏覽代碼

Don't use SearchRequest to create an AggregationReduceContext builder (#96249)

This commit removes the dependency between AggregationReduceContext and the SearchRequest.
Ignacio Vera 2 年之前
父節點
當前提交
b1a3dc6a17

+ 1 - 1
server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

@@ -87,7 +87,6 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
         this.executor = executor;
         this.circuitBreaker = circuitBreaker;
         this.progressListener = progressListener;
-        this.aggReduceContextBuilder = controller.getReduceContext(isCanceled, request);
         this.topNSize = getTopDocsSize(request);
         this.performFinalReduce = request.isFinalReduce();
         this.onPartialMergeFailure = onPartialMergeFailure;
@@ -100,6 +99,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
             : source.rankBuilder().buildRankCoordinatorContext(size, from);
         this.hasTopDocs = (source == null || size != 0) && rankCoordinatorContext == null;
         this.hasAggs = source != null && source.aggregations() != null;
+        this.aggReduceContextBuilder = hasAggs ? controller.getReduceContext(isCanceled, source.aggregations()) : null;
         int batchReduceSize = (hasAggs || hasTopDocs) ? Math.min(request.getBatchedReduceSize(), expectedResultSize) : expectedResultSize;
         this.pendingMerges = new PendingMerges(batchReduceSize, request.resolveTrackTotalHitsUpTo());
     }

+ 8 - 4
server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

@@ -31,6 +31,7 @@ import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.aggregations.AggregationReduceContext;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.dfs.AggregatedDfs;
@@ -64,10 +65,13 @@ import java.util.function.Supplier;
 public final class SearchPhaseController {
     private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
 
-    private final BiFunction<Supplier<Boolean>, SearchRequest, AggregationReduceContext.Builder> requestToAggReduceContextBuilder;
+    private final BiFunction<
+        Supplier<Boolean>,
+        AggregatorFactories.Builder,
+        AggregationReduceContext.Builder> requestToAggReduceContextBuilder;
 
     public SearchPhaseController(
-        BiFunction<Supplier<Boolean>, SearchRequest, AggregationReduceContext.Builder> requestToAggReduceContextBuilder
+        BiFunction<Supplier<Boolean>, AggregatorFactories.Builder, AggregationReduceContext.Builder> requestToAggReduceContextBuilder
     ) {
         this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
     }
@@ -747,8 +751,8 @@ public final class SearchPhaseController {
         }
     }
 
-    AggregationReduceContext.Builder getReduceContext(Supplier<Boolean> isCanceled, SearchRequest request) {
-        return requestToAggReduceContextBuilder.apply(isCanceled, request);
+    AggregationReduceContext.Builder getReduceContext(Supplier<Boolean> isCanceled, AggregatorFactories.Builder aggs) {
+        return requestToAggReduceContextBuilder.apply(isCanceled, aggs);
     }
 
     /**

+ 1 - 1
server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

@@ -83,7 +83,7 @@ final class SearchResponseMerger {
         this.size = size;
         this.trackTotalHitsUpTo = trackTotalHitsUpTo;
         this.searchTimeProvider = Objects.requireNonNull(searchTimeProvider);
-        this.aggReduceContextBuilder = Objects.requireNonNull(aggReduceContextBuilder);
+        this.aggReduceContextBuilder = aggReduceContextBuilder; // might be null if there are no aggregations
     }
 
     /**

+ 5 - 1
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -316,13 +316,17 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             } else {
                 final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).taskId();
                 if (shouldMinimizeRoundtrips(rewritten)) {
+                    final AggregationReduceContext.Builder aggregationReduceContextBuilder = rewritten.source() != null
+                        && rewritten.source().aggregations() != null
+                            ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
+                            : null;
                     ccsRemoteReduce(
                         parentTaskId,
                         rewritten,
                         localIndices,
                         remoteClusterIndices,
                         timeProvider,
-                        searchService.aggReduceContextBuilder(task::isCancelled, rewritten),
+                        aggregationReduceContextBuilder,
                         remoteClusterService,
                         threadPool,
                         listener,

+ 4 - 6
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -21,7 +21,6 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.search.CanMatchNodeRequest;
 import org.elasticsearch.action.search.CanMatchNodeResponse;
-import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchShardTask;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.action.support.TransportActions;
@@ -1656,14 +1655,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
     }
 
     /**
-     * Returns a builder for {@link AggregationReduceContext}. This
-     * builder retains a reference to the provided {@link SearchRequest}.
+     * Returns a builder for {@link AggregationReduceContext}.
      */
-    public AggregationReduceContext.Builder aggReduceContextBuilder(Supplier<Boolean> isCanceled, SearchRequest request) {
+    public AggregationReduceContext.Builder aggReduceContextBuilder(Supplier<Boolean> isCanceled, AggregatorFactories.Builder aggs) {
         return new AggregationReduceContext.Builder() {
             @Override
             public AggregationReduceContext forPartialReduction() {
-                return new AggregationReduceContext.ForPartial(bigArrays, scriptService, isCanceled, request.source().aggregations());
+                return new AggregationReduceContext.ForPartial(bigArrays, scriptService, isCanceled, aggs);
             }
 
             @Override
@@ -1672,7 +1670,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
                     bigArrays,
                     scriptService,
                     isCanceled,
-                    request.source().aggregations(),
+                    aggs,
                     multiBucketConsumerService.create()
                 );
             }

+ 3 - 3
server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

@@ -117,16 +117,16 @@ public class SearchPhaseControllerTests extends ESTestCase {
     @Before
     public void setup() {
         reductions = new CopyOnWriteArrayList<>();
-        searchPhaseController = new SearchPhaseController((t, s) -> new AggregationReduceContext.Builder() {
+        searchPhaseController = new SearchPhaseController((t, agg) -> new AggregationReduceContext.Builder() {
             @Override
             public AggregationReduceContext forPartialReduction() {
                 reductions.add(false);
-                return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, t, s.source().aggregations());
+                return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, t, agg);
             }
 
             public AggregationReduceContext forFinalReduction() {
                 reductions.add(true);
-                return new AggregationReduceContext.ForFinal(BigArrays.NON_RECYCLING_INSTANCE, null, t, s.source().aggregations(), b -> {});
+                return new AggregationReduceContext.ForFinal(BigArrays.NON_RECYCLING_INSTANCE, null, t, agg, b -> {});
             };
         });
         threadPool = new TestThreadPool(SearchPhaseControllerTests.class.getName());

+ 1 - 1
server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

@@ -1227,7 +1227,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
         SearchService service = getInstanceFromNode(SearchService.class);
         AggregationReduceContext.Builder reduceContextBuilder = service.aggReduceContextBuilder(
             () -> false,
-            new SearchRequest().source(new SearchSourceBuilder())
+            new SearchRequest().source(new SearchSourceBuilder()).source().aggregations()
         );
         {
             AggregationReduceContext reduceContext = reduceContextBuilder.forFinalReduction();

+ 4 - 1
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

@@ -65,7 +65,10 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
         super(SubmitAsyncSearchAction.NAME, transportService, actionFilters, SubmitAsyncSearchRequest::new);
         this.clusterService = clusterService;
         this.nodeClient = nodeClient;
-        this.requestToAggReduceContextBuilder = (task, request) -> searchService.aggReduceContextBuilder(task, request).forFinalReduction();
+        this.requestToAggReduceContextBuilder = (task, request) -> searchService.aggReduceContextBuilder(
+            task,
+            request.source().aggregations()
+        ).forFinalReduction();
         this.searchAction = searchAction;
         this.threadContext = transportService.getThreadPool().getThreadContext();
         this.store = new AsyncTaskIndexService<>(