Browse Source

Add finalReduce flag to SearchRequest (#38104)

With #37000 we made sure that fnial reduction is automatically disabled
whenever a localClusterAlias is provided with a SearchRequest.

While working on #37838, we found a scenario where we do need to set a
localClusterAlias yet we would like to perform a final reduction in the
remote cluster: when searching on a single remote cluster.

Relates to #32125

This commit adds support for a separate finalReduce flag to
SearchRequest and makes use of it in TransportSearchAction in case we
are searching against a single remote cluster.

This also makes sure that num_reduce_phases is correct when searching
against a single remote cluster: it makes little sense to return
`num_reduce_phases` set to `2`, which looks especially weird in case
the search was performed against a single remote shard. We should
perform one reduction phase only in this case and `num_reduce_phases`
should reflect that.

* line length
Luca Cavanna 6 years ago
parent
commit
e18cac3659

+ 0 - 1
docs/reference/modules/cross-cluster-search.asciidoc

@@ -65,7 +65,6 @@ GET /cluster_one:twitter/_search
 {
   "took": 150,
   "timed_out": false,
-  "num_reduce_phases": 2,
   "_shards": {
     "total": 1,
     "successful": 1,

+ 9 - 0
qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml

@@ -36,6 +36,7 @@
               terms:
                 field: f1.keyword
 
+  - match: { num_reduce_phases: 3 }
   - match: {_clusters.total: 2}
   - match: {_clusters.successful: 2}
   - match: {_clusters.skipped: 0}
@@ -63,6 +64,7 @@
               terms:
                 field: f1.keyword
 
+  - match: { num_reduce_phases: 3 }
   - match: {_clusters.total: 2}
   - match: {_clusters.successful: 2}
   - match: {_clusters.skipped: 0}
@@ -83,6 +85,7 @@
               terms:
                 field: f1.keyword
 
+  - is_false: num_reduce_phases
   - match: {_clusters.total: 1}
   - match: {_clusters.successful: 1}
   - match: {_clusters.skipped: 0}
@@ -103,6 +106,7 @@
               terms:
                 field: f1.keyword
 
+  - is_false: num_reduce_phases
   - is_false: _clusters
   - match: { _shards.total: 2 }
   - match: { hits.total: 5}
@@ -133,6 +137,7 @@
         rest_total_hits_as_int: true
         index: test_remote_cluster:test_index
 
+  - is_false: num_reduce_phases
   - match: {_clusters.total: 1}
   - match: {_clusters.successful: 1}
   - match: {_clusters.skipped: 0}
@@ -162,6 +167,7 @@
         rest_total_hits_as_int: true
         index: "*:test_index"
 
+  - match: { num_reduce_phases: 3 }
   - match: {_clusters.total: 2}
   - match: {_clusters.successful: 2}
   - match: {_clusters.skipped: 0}
@@ -176,6 +182,7 @@
         rest_total_hits_as_int: true
         index: my_remote_cluster:aliased_test_index
 
+  - is_false: num_reduce_phases
   - match: {_clusters.total: 1}
   - match: {_clusters.successful: 1}
   - match: {_clusters.skipped: 0}
@@ -192,6 +199,7 @@
         rest_total_hits_as_int: true
         index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1
 
+  - is_false: num_reduce_phases
   - match: {_clusters.total: 1}
   - match: {_clusters.successful: 1}
   - match: {_clusters.skipped: 0}
@@ -208,6 +216,7 @@
         rest_total_hits_as_int: true
         index: "my_remote_cluster:single_doc_index"
 
+  - is_false: num_reduce_phases
   - match: {_clusters.total: 1}
   - match: {_clusters.successful: 1}
   - match: {_clusters.skipped: 0}

+ 2 - 0
qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml

@@ -12,6 +12,7 @@
           query:
             match_all: {}
 
+  - is_false: num_reduce_phases
   - match: {_clusters.total: 1}
   - match: {_clusters.successful: 1}
   - match: {_clusters.skipped: 0}
@@ -28,6 +29,7 @@
         rest_total_hits_as_int: true
         body: { "scroll_id": "$scroll_id", "scroll": "1m"}
 
+  - is_false: num_reduce_phases
   - is_false: _clusters
   - match: {hits.total:      6    }
   - length: {hits.hits:      2    }

+ 2 - 4
server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

@@ -714,20 +714,18 @@ public final class SearchPhaseController {
         final boolean hasAggs = source != null && source.aggregations() != null;
         final boolean hasTopDocs = source == null || source.size() != 0;
         final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
-        final boolean finalReduce = request.getLocalClusterAlias() == null;
-
         if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
             // no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
             if (request.getBatchedReduceSize() < numShards) {
                 // only use this if there are aggs and if there are more shards than we should reduce at once
                 return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
-                    trackTotalHitsUpTo, finalReduce);
+                    trackTotalHitsUpTo, request.isFinalReduce());
             }
         }
         return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
             @Override
             ReducedQueryPhase reduce() {
-                return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, finalReduce);
+                return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
             }
         };
     }

+ 38 - 12
server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

@@ -67,6 +67,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
 
     private final String localClusterAlias;
     private final long absoluteStartMillis;
+    private final boolean finalReduce;
 
     private SearchType searchType = SearchType.DEFAULT;
 
@@ -102,13 +103,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
     public SearchRequest() {
         this.localClusterAlias = null;
         this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
+        this.finalReduce = true;
     }
 
     /**
      * Constructs a new search request from the provided search request
      */
     public SearchRequest(SearchRequest searchRequest) {
-        this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, searchRequest.absoluteStartMillis);
+        this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias,
+            searchRequest.absoluteStartMillis, searchRequest.finalReduce);
     }
 
     /**
@@ -132,25 +135,30 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
     }
 
     /**
-     * Creates a new search request by providing the search request to copy all fields from, the indices to search against,
-     * the alias of the cluster where it will be executed, as well as the start time in milliseconds from the epoch time.
-     * Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request performing local reduction
-     * on each cluster. The coordinating CCS node provides the original search request, the indices to search against as well as the
-     * alias to prefix index names with in the returned search results, and the absolute start time to be used on the remote clusters
-     * to ensure that the same value is used.
+     * Creates a new search request by providing the search request to copy all fields from, the indices to search against, the alias of
+     * the cluster where it will be executed, as well as the start time in milliseconds from the epoch time and whether the reduction
+     * should be final or not. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request
+     * performing reduction on each cluster in order to minimize network round-trips between the coordinating node and the remote clusters.
+     *
+     * @param originalSearchRequest the original search request
+     * @param indices the indices to search against
+     * @param localClusterAlias the alias to prefix index names with in the returned search results
+     * @param absoluteStartMillis the absolute start time to be used on the remote clusters to ensure that the same value is used
+     * @param finalReduce whether the reduction should be final or not
      */
     static SearchRequest withLocalReduction(SearchRequest originalSearchRequest, String[] indices,
-                                            String localClusterAlias, long absoluteStartMillis) {
+                                            String localClusterAlias, long absoluteStartMillis, boolean finalReduce) {
         Objects.requireNonNull(originalSearchRequest, "search request must not be null");
         validateIndices(indices);
         Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
         if (absoluteStartMillis < 0) {
             throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
         }
-        return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis);
+        return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis, finalReduce);
     }
 
-    private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis) {
+    private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis,
+                          boolean finalReduce) {
         this.allowPartialSearchResults = searchRequest.allowPartialSearchResults;
         this.batchedReduceSize = searchRequest.batchedReduceSize;
         this.ccsMinimizeRoundtrips = searchRequest.ccsMinimizeRoundtrips;
@@ -167,6 +175,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
         this.types = searchRequest.types;
         this.localClusterAlias = localClusterAlias;
         this.absoluteStartMillis = absoluteStartMillis;
+        this.finalReduce = finalReduce;
     }
 
     /**
@@ -203,6 +212,12 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
             localClusterAlias = null;
             absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
         }
+        //TODO move to the 6_7_0 branch once backported to 6.x
+        if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
+            finalReduce = in.readBoolean();
+        } else {
+            finalReduce = true;
+        }
         if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
             ccsMinimizeRoundtrips = in.readBoolean();
         }
@@ -232,6 +247,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
                 out.writeVLong(absoluteStartMillis);
             }
         }
+        //TODO move to the 6_7_0 branch once backported to 6.x
+        if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
+            out.writeBoolean(finalReduce);
+        }
         if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
             out.writeBoolean(ccsMinimizeRoundtrips);
         }
@@ -277,11 +296,18 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
         return localClusterAlias;
     }
 
+    /**
+     * Returns whether the reduction phase that will be performed needs to be final or not.
+     */
+    boolean isFinalReduce() {
+        return finalReduce;
+    }
+
     /**
      * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
      * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
-     * request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long)}, this method returns the provided
-     * current time, otherwise it will return {@link System#currentTimeMillis()}.
+     * request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long, boolean)}, this method returns
+     * the provided current time, otherwise it will return {@link System#currentTimeMillis()}.
      *
      */
     long getOrCreateAbsoluteStartMillis() {

+ 12 - 0
server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.search;
 
+import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.common.Nullable;
@@ -35,8 +36,10 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentParser.Token;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.action.RestActions;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.search.profile.ProfileShardResult;
 import org.elasticsearch.search.profile.SearchProfileShardResults;
@@ -47,6 +50,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
 import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
@@ -497,4 +501,12 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
             return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}';
         }
     }
+
+    static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
+        SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
+        InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
+            InternalAggregations.EMPTY, null, null, false, null, 0);
+        return new SearchResponse(internalSearchResponse, null, 0, 0, 0, tookInMillisSupplier.get(),
+            ShardSearchFailure.EMPTY_ARRAY, clusters);
+    }
 }

+ 1 - 5
server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

@@ -115,11 +115,7 @@ final class SearchResponseMerger {
         //if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
         //we end up calling merge without anything to merge, we just return an empty search response
         if (searchResponses.size() == 0) {
-            SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
-            InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
-                InternalAggregations.EMPTY, null, null, false, null, 0);
-            return new SearchResponse(internalSearchResponse, null, 0, 0, 0, searchTimeProvider.buildTookInMillis(),
-                ShardSearchFailure.EMPTY_ARRAY, clusters);
+            return SearchResponse.empty(searchTimeProvider::buildTookInMillis, clusters);
         }
         int totalShards = 0;
         int skippedShards = 0;

+ 64 - 23
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -48,9 +48,13 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.internal.AliasFilter;
+import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.profile.ProfileShardResult;
+import org.elasticsearch.search.profile.SearchProfileShardResults;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.RemoteClusterAware;
@@ -253,30 +257,66 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                                 SearchTimeProvider timeProvider, Function<Boolean, InternalAggregation.ReduceContext> reduceContext,
                                 RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener<SearchResponse> listener,
                                 BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer) {
-        SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, reduceContext);
-        AtomicInteger skippedClusters = new AtomicInteger(0);
-        final AtomicReference<Exception> exceptions = new AtomicReference<>();
-        int totalClusters = remoteIndices.size() + (localIndices == null ? 0 : 1);
-        final CountDown countDown = new CountDown(totalClusters);
-        for (Map.Entry<String, OriginalIndices> entry : remoteIndices.entrySet()) {
+
+        if (localIndices == null && remoteIndices.size() == 1) {
+            //if we are searching against a single remote cluster, we simply forward the original search request to such cluster
+            //and we directly perform final reduction in the remote cluster
+            Map.Entry<String, OriginalIndices> entry = remoteIndices.entrySet().iterator().next();
             String clusterAlias = entry.getKey();
             boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
             OriginalIndices indices = entry.getValue();
             SearchRequest ccsSearchRequest = SearchRequest.withLocalReduction(searchRequest, indices.indices(),
-                clusterAlias, timeProvider.getAbsoluteStartMillis());
-            ActionListener<SearchResponse> ccsListener = createCCSListener(clusterAlias, skipUnavailable, countDown,
-                skippedClusters, exceptions, searchResponseMerger, totalClusters,  listener);
+                clusterAlias, timeProvider.getAbsoluteStartMillis(), true);
             Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
-            remoteClusterClient.search(ccsSearchRequest, ccsListener);
-        }
-        if (localIndices != null) {
-            ActionListener<SearchResponse> ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
-                false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
-            //here we provide the empty string a cluster alias, which means no prefix in index name,
-            //but the coord node will perform non final reduce as it's not null.
-            SearchRequest ccsLocalSearchRequest = SearchRequest.withLocalReduction(searchRequest, localIndices.indices(),
-                RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis());
-            localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener);
+            remoteClusterClient.search(ccsSearchRequest, new ActionListener<SearchResponse>() {
+                @Override
+                public void onResponse(SearchResponse searchResponse) {
+                    Map<String, ProfileShardResult> profileResults = searchResponse.getProfileResults();
+                    SearchProfileShardResults profile = profileResults == null || profileResults.isEmpty()
+                        ? null : new SearchProfileShardResults(profileResults);
+                    InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchResponse.getHits(),
+                        (InternalAggregations) searchResponse.getAggregations(), searchResponse.getSuggest(), profile,
+                        searchResponse.isTimedOut(), searchResponse.isTerminatedEarly(), searchResponse.getNumReducePhases());
+                    listener.onResponse(new SearchResponse(internalSearchResponse, searchResponse.getScrollId(),
+                        searchResponse.getTotalShards(), searchResponse.getSuccessfulShards(), searchResponse.getSkippedShards(),
+                        timeProvider.buildTookInMillis(), searchResponse.getShardFailures(), new SearchResponse.Clusters(1, 1, 0)));
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    if (skipUnavailable) {
+                        listener.onResponse(SearchResponse.empty(timeProvider::buildTookInMillis, new SearchResponse.Clusters(1, 0, 1)));
+                    } else {
+                        listener.onFailure(wrapRemoteClusterFailure(clusterAlias, e));
+                    }
+                }
+            });
+        } else {
+            SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, reduceContext);
+            AtomicInteger skippedClusters = new AtomicInteger(0);
+            final AtomicReference<Exception> exceptions = new AtomicReference<>();
+            int totalClusters = remoteIndices.size() + (localIndices == null ? 0 : 1);
+            final CountDown countDown = new CountDown(totalClusters);
+            for (Map.Entry<String, OriginalIndices> entry : remoteIndices.entrySet()) {
+                String clusterAlias = entry.getKey();
+                boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
+                OriginalIndices indices = entry.getValue();
+                SearchRequest ccsSearchRequest = SearchRequest.withLocalReduction(searchRequest, indices.indices(),
+                    clusterAlias, timeProvider.getAbsoluteStartMillis(), false);
+                ActionListener<SearchResponse> ccsListener = createCCSListener(clusterAlias, skipUnavailable, countDown,
+                    skippedClusters, exceptions, searchResponseMerger, totalClusters,  listener);
+                Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
+                remoteClusterClient.search(ccsSearchRequest, ccsListener);
+            }
+            if (localIndices != null) {
+                ActionListener<SearchResponse> ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
+                    false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
+                //here we provide the empty string a cluster alias, which means no prefix in index name,
+                //but the coord node will perform non final reduce as it's not null.
+                SearchRequest ccsLocalSearchRequest = SearchRequest.withLocalReduction(searchRequest, localIndices.indices(),
+                    RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
+                localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener);
+            }
         }
     }
 
@@ -297,9 +337,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             //here we modify the original source so we can re-use it by setting it to each outgoing search request
             source.from(0);
             source.size(from + size);
-            //TODO when searching only against a remote cluster, we could ask directly for the final number of results and let
-            //the remote cluster do a final reduction, yet that is not possible as we are providing a localClusterAlias which
-            //will automatically make the reduction non final
         }
         return new SearchResponseMerger(from, size, trackTotalHitsUpTo, timeProvider, reduceContextFunction);
     }
@@ -604,7 +641,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             } else {
                 Exception exception = e;
                 if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) {
-                    exception = new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e);
+                    exception = wrapRemoteClusterFailure(clusterAlias, e);
                 }
                 if (exceptions.compareAndSet(null, exception) == false) {
                     exceptions.accumulateAndGet(exception, (previous, current) -> {
@@ -636,4 +673,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
 
         abstract FinalResponse createFinalResponse();
     }
+
+    private static RemoteTransportException wrapRemoteClusterFailure(String clusterAlias, Exception e) {
+        return new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e);
+    }
 }

+ 7 - 7
server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java

@@ -641,23 +641,23 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
      * Add an aggregation to perform as part of the search.
      */
     public SearchSourceBuilder aggregation(AggregationBuilder aggregation) {
-            if (aggregations == null) {
+        if (aggregations == null) {
             aggregations = AggregatorFactories.builder();
-            }
+        }
         aggregations.addAggregator(aggregation);
-            return this;
+        return this;
     }
 
     /**
      * Add an aggregation to perform as part of the search.
      */
     public SearchSourceBuilder aggregation(PipelineAggregationBuilder aggregation) {
-            if (aggregations == null) {
+        if (aggregations == null) {
             aggregations = AggregatorFactories.builder();
-            }
-        aggregations.addPipelineAggregator(aggregation);
-            return this;
         }
+        aggregations.addPipelineAggregator(aggregation);
+        return this;
+    }
 
     /**
      * Gets the bytes representing the aggregation builders for this request.

+ 13 - 8
server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

@@ -30,6 +30,7 @@ import org.apache.lucene.search.TotalHits.Relation;
 import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.action.OriginalIndices;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
 import org.elasticsearch.common.text.Text;
@@ -313,9 +314,14 @@ public class SearchPhaseControllerTests extends ESTestCase {
         return fetchResults;
     }
 
+    private static SearchRequest randomSearchRequest() {
+        return randomBoolean() ? new SearchRequest() : SearchRequest.withLocalReduction(new SearchRequest(),
+            Strings.EMPTY_ARRAY, "remote", 0, randomBoolean());
+    }
+
     public void testConsumer() {
         int bufferSize = randomIntBetween(2, 3);
-        SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
+        SearchRequest request = randomSearchRequest();
         request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
         request.setBatchedReduceSize(bufferSize);
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
@@ -377,7 +383,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
         int expectedNumResults = randomIntBetween(1, 100);
         int bufferSize = randomIntBetween(2, 200);
 
-        SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
+        SearchRequest request = randomSearchRequest();
         request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
         request.setBatchedReduceSize(bufferSize);
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@@ -424,7 +430,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
     public void testConsumerOnlyAggs() {
         int expectedNumResults = randomIntBetween(1, 100);
         int bufferSize = randomIntBetween(2, 200);
-        SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
+        SearchRequest request = randomSearchRequest();
         request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
         request.setBatchedReduceSize(bufferSize);
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@@ -460,7 +466,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
     public void testConsumerOnlyHits() {
         int expectedNumResults = randomIntBetween(1, 100);
         int bufferSize = randomIntBetween(2, 200);
-        SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
+        SearchRequest request = randomSearchRequest();
         if (randomBoolean()) {
             request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10)));
         }
@@ -493,8 +499,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
 
     private void assertFinalReduction(SearchRequest searchRequest) {
         assertThat(reductions.size(), greaterThanOrEqualTo(1));
-        //the last reduction step was the final one only if no cluster alias was provided with the search request
-        assertEquals(searchRequest.getLocalClusterAlias() == null, reductions.get(reductions.size() - 1));
+        assertEquals(searchRequest.isFinalReduce(), reductions.get(reductions.size() - 1));
     }
 
     public void testNewSearchPhaseResults() {
@@ -568,7 +573,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
     public void testConsumerSortByField() {
         int expectedNumResults = randomIntBetween(1, 100);
         int bufferSize = randomIntBetween(2, 200);
-        SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
+        SearchRequest request = randomSearchRequest();
         int size = randomIntBetween(1, 10);
         request.setBatchedReduceSize(bufferSize);
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@@ -604,7 +609,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
     public void testConsumerFieldCollapsing() {
         int expectedNumResults = randomIntBetween(30, 100);
         int bufferSize = randomIntBetween(2, 200);
-        SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
+        SearchRequest request = randomSearchRequest();
         int size = randomIntBetween(5, 10);
         request.setBatchedReduceSize(bufferSize);
         InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =

+ 17 - 7
server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java

@@ -54,17 +54,20 @@ public class SearchRequestTests extends AbstractSearchTestCase {
         }
         //clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
         return SearchRequest.withLocalReduction(request, request.indices(),
-            randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
+            randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong(), randomBoolean());
     }
 
     public void testWithLocalReduction() {
-        expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(null, Strings.EMPTY_ARRAY, "", 0));
+        expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(null, Strings.EMPTY_ARRAY, "", 0, randomBoolean()));
         SearchRequest request = new SearchRequest();
-        expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, null, "", 0));
-        expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, new String[]{null}, "", 0));
-        expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, null, 0));
-        expectThrows(IllegalArgumentException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", -1));
-        SearchRequest searchRequest = SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", 0);
+        expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, null, "", 0, randomBoolean()));
+        expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request,
+            new String[]{null}, "", 0, randomBoolean()));
+        expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request,
+            Strings.EMPTY_ARRAY, null, 0, randomBoolean()));
+        expectThrows(IllegalArgumentException.class, () -> SearchRequest.withLocalReduction(request,
+            Strings.EMPTY_ARRAY, "", -1, randomBoolean()));
+        SearchRequest searchRequest = SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", 0, randomBoolean());
         assertNull(searchRequest.validate());
     }
 
@@ -92,6 +95,12 @@ public class SearchRequestTests extends AbstractSearchTestCase {
             assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
             assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis());
         }
+        //TODO move to the 6_7_0 branch once backported to 6.x
+        if (version.before(Version.V_7_0_0)) {
+            assertTrue(deserializedRequest.isFinalReduce());
+        } else {
+            assertEquals(searchRequest.isFinalReduce(), deserializedRequest.isFinalReduce());
+        }
     }
 
     public void testReadFromPre6_7_0() throws IOException {
@@ -103,6 +112,7 @@ public class SearchRequestTests extends AbstractSearchTestCase {
             assertNull(searchRequest.getLocalClusterAlias());
             assertAbsoluteStartMillisIsCurrentTime(searchRequest);
             assertTrue(searchRequest.isCcsMinimizeRoundtrips());
+            assertTrue(searchRequest.isFinalReduce());
         }
     }
 

+ 61 - 6
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java

@@ -27,13 +27,17 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.index.query.RangeQueryBuilder;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.support.ValueType;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 
 public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
 
     public void testLocalClusterAlias() {
-        long nowInMillis = System.currentTimeMillis();
+        long nowInMillis = randomLongBetween(0, Long.MAX_VALUE);
         IndexRequest indexRequest = new IndexRequest("test");
         indexRequest.id("1");
         indexRequest.source("field", "value");
@@ -42,7 +46,8 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
         assertEquals(RestStatus.CREATED, indexResponse.status());
 
         {
-            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "local", nowInMillis);
+            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY,
+                "local", nowInMillis, randomBoolean());
             SearchResponse searchResponse = client().search(searchRequest).actionGet();
             assertEquals(1, searchResponse.getHits().getTotalHits().value);
             SearchHit[] hits = searchResponse.getHits().getHits();
@@ -53,7 +58,8 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
             assertEquals("1", hit.getId());
         }
         {
-            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", nowInMillis);
+            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY,
+                "", nowInMillis, randomBoolean());
             SearchResponse searchResponse = client().search(searchRequest).actionGet();
             assertEquals(1, searchResponse.getHits().getTotalHits().value);
             SearchHit[] hits = searchResponse.getHits().getHits();
@@ -94,19 +100,22 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
             assertEquals(0, searchResponse.getTotalShards());
         }
         {
-            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0);
+            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(),
+                Strings.EMPTY_ARRAY, "", 0, randomBoolean());
             SearchResponse searchResponse = client().search(searchRequest).actionGet();
             assertEquals(2, searchResponse.getHits().getTotalHits().value);
         }
         {
-            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0);
+            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(),
+                Strings.EMPTY_ARRAY, "", 0, randomBoolean());
             searchRequest.indices("<test-{now/d}>");
             SearchResponse searchResponse = client().search(searchRequest).actionGet();
             assertEquals(1, searchResponse.getHits().getTotalHits().value);
             assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
         }
         {
-            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0);
+            SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(),
+                Strings.EMPTY_ARRAY, "", 0, randomBoolean());
             SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
             RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
             rangeQuery.gte("1970-01-01");
@@ -118,4 +127,50 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
             assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
         }
     }
+
+    public void testFinalReduce()  {
+        long nowInMillis = randomLongBetween(0, Long.MAX_VALUE);
+        {
+            IndexRequest indexRequest = new IndexRequest("test");
+            indexRequest.id("1");
+            indexRequest.source("price", 10);
+            IndexResponse indexResponse = client().index(indexRequest).actionGet();
+            assertEquals(RestStatus.CREATED, indexResponse.status());
+        }
+        {
+            IndexRequest indexRequest = new IndexRequest("test");
+            indexRequest.id("2");
+            indexRequest.source("price", 100);
+            IndexResponse indexResponse = client().index(indexRequest).actionGet();
+            assertEquals(RestStatus.CREATED, indexResponse.status());
+        }
+        client().admin().indices().prepareRefresh("test").get();
+
+        SearchRequest originalRequest = new SearchRequest();
+        SearchSourceBuilder source = new SearchSourceBuilder();
+        source.size(0);
+        originalRequest.source(source);
+        TermsAggregationBuilder terms = new TermsAggregationBuilder("terms", ValueType.NUMERIC);
+        terms.field("price");
+        terms.size(1);
+        source.aggregation(terms);
+
+        {
+            SearchRequest searchRequest = randomBoolean() ? originalRequest : SearchRequest.withLocalReduction(originalRequest,
+                Strings.EMPTY_ARRAY, "remote", nowInMillis, true);
+            SearchResponse searchResponse = client().search(searchRequest).actionGet();
+            assertEquals(2, searchResponse.getHits().getTotalHits().value);
+            Aggregations aggregations = searchResponse.getAggregations();
+            LongTerms longTerms = aggregations.get("terms");
+            assertEquals(1, longTerms.getBuckets().size());
+        }
+        {
+            SearchRequest searchRequest = SearchRequest.withLocalReduction(originalRequest,
+                Strings.EMPTY_ARRAY, "remote", nowInMillis, false);
+            SearchResponse searchResponse = client().search(searchRequest).actionGet();
+            assertEquals(2, searchResponse.getHits().getTotalHits().value);
+            Aggregations aggregations = searchResponse.getAggregations();
+            LongTerms longTerms = aggregations.get("terms");
+        }
+    }
 }

+ 3 - 4
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

@@ -402,7 +402,7 @@ public class TransportSearchActionTests extends ESTestCase {
     }
 
     public void testCCSRemoteReduce() throws Exception {
-        int numClusters = randomIntBetween(2, 10);
+        int numClusters = randomIntBetween(1, 10);
         DiscoveryNode[] nodes = new DiscoveryNode[numClusters];
         Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
         Settings.Builder builder = Settings.builder();
@@ -440,7 +440,7 @@ public class TransportSearchActionTests extends ESTestCase {
                 assertEquals(0, searchResponse.getClusters().getSkipped());
                 assertEquals(totalClusters, searchResponse.getClusters().getTotal());
                 assertEquals(totalClusters, searchResponse.getClusters().getSuccessful());
-                assertEquals(totalClusters + 1, searchResponse.getNumReducePhases());
+                assertEquals(totalClusters == 1 ? 1 : totalClusters + 1, searchResponse.getNumReducePhases());
             }
             {
                 SearchRequest searchRequest = new SearchRequest();
@@ -510,7 +510,6 @@ public class TransportSearchActionTests extends ESTestCase {
                 awaitLatch(latch, 5, TimeUnit.SECONDS);
                 assertNotNull(failure.get());
                 assertThat(failure.get(), instanceOf(RemoteTransportException.class));
-                RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
                 assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
                 assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
             }
@@ -583,7 +582,7 @@ public class TransportSearchActionTests extends ESTestCase {
                 assertEquals(0, searchResponse.getClusters().getSkipped());
                 assertEquals(totalClusters, searchResponse.getClusters().getTotal());
                 assertEquals(totalClusters, searchResponse.getClusters().getSuccessful());
-                assertEquals(totalClusters + 1, searchResponse.getNumReducePhases());
+                assertEquals(totalClusters == 1 ? 1 : totalClusters + 1, searchResponse.getNumReducePhases());
             }
             assertEquals(0, service.getConnectionManager().size());
         } finally {