Browse Source

Support CCS minimize round trips in async search (#96012)

* Support CCS minimize round trips in async search

This commit makes the smallest set of changes to allow async-search based cross-cluster search
to work with the CCS minimize_round_trips feature without changing the internals/architecture of
the search action.

When ccsMinimizeRoundtrips is set to true on SubmitAsyncSearchRequest, the AsyncSearchTask on the
primary CCS coordinator sends a synchronous SearchRequest to all to clusters for a remote coordinator
to orchestrate and return the entire result set to the CCS coordinator as a single response.

This is the same functionality provided by synchronous CCS search using minimize_roundtrips.
Since this is an async search, it means that the async search coordinator has no visibility
into search progress on the remote clusters while they are running the search, thus losing one of
the key features of async search. However, this is a good first approach for improving overall search
latency for cross cluster searches that query a large number of shards on remote clusters, since
Kibana does not currently expose incremental progress of an async search to users.

Relates #73971
Michael Peterson 2 years ago
parent
commit
8b1cd47455

+ 4 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

@@ -300,6 +300,10 @@ public class RestHighLevelClient implements Closeable {
         return client;
     }
 
+    public final XContentParserConfiguration getParserConfig() {
+        return parserConfig;
+    }
+
     @Override
     public final void close() throws IOException {
         doClose.accept(client);

+ 5 - 0
docs/changelog/96012.yaml

@@ -0,0 +1,5 @@
+pr: 96012
+summary: Support CCS minimize round trips in async search
+area: Search
+type: enhancement
+issues: []

+ 26 - 27
docs/reference/search/async-search.asciidoc

@@ -2,13 +2,13 @@
 [[async-search]]
 === Async search
 
-The async search API let you asynchronously execute a search request, monitor 
+The async search API let you asynchronously execute a search request, monitor
 its progress, and retrieve partial results as they become available.
 
 [[submit-async-search]]
 ==== Submit async search API
 
-Executes a search request asynchronously. It accepts the same parameters and 
+Executes a search request asynchronously. It accepts the same parameters and
 request body as the <<search-search,search API>>.
 
 [source,console,id=submit-async-search-date-histogram-example]
@@ -31,9 +31,9 @@ POST /sales*/_async_search?size=0
 // TEST[setup:sales]
 // TEST[s/size=0/size=0&wait_for_completion_timeout=10s&keep_on_completion=true/]
 
-The response contains an identifier of the search being executed. You can use 
-this ID to later retrieve the search's final results. The currently available 
-search results are returned as part of the 
+The response contains an identifier of the search being executed. You can use
+this ID to later retrieve the search's final results. The currently available
+search results are returned as part of the
 <<search-api-response-body,`response`>> object.
 
 [source,console-result]
@@ -102,16 +102,16 @@ The `keep_on_completion` parameter, which defaults to `false`, can be set to
 `true` to request that results are stored for later retrieval also when the
 search completes within the `wait_for_completion_timeout`.
 
-You can also specify how long the async search needs to be available through the 
-`keep_alive` parameter, which defaults to `5d` (five days). Ongoing async 
+You can also specify how long the async search needs to be available through the
+`keep_alive` parameter, which defaults to `5d` (five days). Ongoing async
 searches and any saved search results are deleted after this period.
 
-NOTE: When the primary sort of the results is an indexed field, shards get 
-sorted based on minimum and maximum value that they hold for that field, hence 
+NOTE: When the primary sort of the results is an indexed field, shards get
+sorted based on minimum and maximum value that they hold for that field, hence
 partial results become available following the sort criteria that was requested.
 
-The submit async search API supports the same 
-<<search-search-api-query-params,parameters>> as the search API, though some 
+The submit async search API supports the same
+<<search-search-api-query-params,parameters>> as the search API, though some
 have different default values:
 
 * `batched_reduce_size` defaults to `5`: this affects how often partial results
@@ -123,13 +123,12 @@ number of new shard responses (`5` by default).
 enforce the execution of a pre-filter roundtrip to retrieve statistics from
 each shard so that the ones that surely don't hold any document matching the
 query get skipped.
-* `ccs_minimize_roundtrips` defaults to `false`, which is also the only
-supported value
+* `ccs_minimize_roundtrips` defaults to `false`. When doing a cross-cluster search,
+setting it to `true` may improve overall search latency, particularly when
+searching clusters with a large number of nodes.
 
 WARNING: Async search does not support <<scroll-search-results,scroll>>
 nor search requests that only include the <<search-suggesters,suggest section>>.
-{ccs-cap} is supported only with 
-<<ccs-min-roundtrips,`ccs_minimize_roundtrips`>> set to `false`.
 
 WARNING: By default, {es} doesn't allow to store an async search response
 larger than 10Mb, and an attempt to do this results in an error. The maximum
@@ -139,8 +138,8 @@ allowed size for a stored async search response can be set by changing the
 [[get-async-search]]
 ==== Get async search
 
-The get async search API retrieves the results of a previously submitted async 
-search request given its id. If the {es} {security-features} are enabled, the 
+The get async search API retrieves the results of a previously submitted async
+search request given its id. If the {es} {security-features} are enabled, the
 access to the results of a specific async search is restricted to
 <<can-access-resources-check,the user or API key that submitted it>>.
 
@@ -213,24 +212,24 @@ completed the execution of the query.
 The `wait_for_completion_timeout` parameter can also be provided when calling
 the Get Async Search API, in order to wait for the search to be completed up
 until the provided timeout. Final results will be returned if available before
-the timeout expires, otherwise the currently available results will be returned 
-once the timeout expires. By default no timeout is set meaning that the 
+the timeout expires, otherwise the currently available results will be returned
+once the timeout expires. By default no timeout is set meaning that the
 currently available results will be returned without any additional wait.
 
 The `keep_alive` parameter specifies how long the async search should be
 available in the cluster. When not specified, the `keep_alive` set with the
 corresponding submit async request will be used. Otherwise, it is possible to
 override such value and extend the validity of the request. When this period
-expires, the search, if still running, is cancelled. If the search is completed, 
+expires, the search, if still running, is cancelled. If the search is completed,
 its saved results are deleted.
 
 
 [[get-async-search-status]]
 ==== Get async search status
 
-The get async search status API, without retrieving search results, shows only 
-the status of a previously submitted async search request given its `id`. If the 
-{es} {security-features} are enabled, the access to the get async search status 
+The get async search status API, without retrieving search results, shows only
+the status of a previously submitted async search request given its `id`. If the
+{es} {security-features} are enabled, the access to the get async search status
 API is restricted to the <<built-in-roles, monitoring_user role>>.
 
 [source,console,id=get-async-search-status-example]
@@ -259,8 +258,8 @@ GET /_async_search/status/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVm
 
 <1> Indicates how many shards have executed the query so far.
 
-For an async search that has been completed, the status response has an 
-additional `completion_status` field that shows the status code of the completed 
+For an async search that has been completed, the status response has an
+additional `completion_status` field that shows the status code of the completed
 async search.
 
 [source,console-result]
@@ -310,8 +309,8 @@ async search.
 [[delete-async-search]]
 ==== Delete async search
 
-You can use the delete async search API to manually delete an async search by 
-ID. If the search is still running, the search request will be cancelled. 
+You can use the delete async search API to manually delete an async search by
+ID. If the search is still running, the search request will be cancelled.
 Otherwise, the saved search results are deleted.
 
 [source,console,id=delete-async-search-date-histogram-example]

File diff suppressed because it is too large
+ 517 - 116
qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java


+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java

@@ -223,7 +223,7 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
         PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
         SearchRequest searchRequest = new SearchRequest("demo", "cluster_a:prod");
         searchRequest.allowPartialSearchResults(false);
-        searchRequest.setCcsMinimizeRoundtrips(false);
+        searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
         searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
         client(LOCAL_CLUSTER).search(searchRequest, queryFuture);
         SearchListenerPlugin.waitSearchStarted();

+ 66 - 3
server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

@@ -479,7 +479,42 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
         private final int total;
         private final int successful;
         private final int skipped;
+        // NOTE: these two new fields (remoteClusters and ccsMinimizeRoundtrips) have not been added to the wire protocol
+        // or equals/hashCode methods. They are needed for CCS only (async-search CCS in particular). If we need to write
+        // these to the .async-search system index in the future, we may want to refactor Clusters to allow async-search
+        // to subclass it.
+        private final transient int remoteClusters;
+        private final transient boolean ccsMinimizeRoundtrips;
 
+        /**
+         * A Clusters object meant for use with CCS holding additional information about
+         * the number of remote clusters and whether ccsMinimizeRoundtrips is being used.
+         * @param total total number of clusters in the search
+         * @param successful number of clusters that have successfully completed the search
+         * @param skipped number of clusters that were skipped (e.g., unavailable or other error)
+         * @param remoteClusters number of remote clusters in the search
+         * @param ccsMinimizeRoundtrips specifies whether a CCS search is using minimizeRoundtrips feature
+         */
+        public Clusters(int total, int successful, int skipped, int remoteClusters, boolean ccsMinimizeRoundtrips) {
+            assert total >= 0 && successful >= 0 && skipped >= 0 && remoteClusters >= 0
+                : "total: " + total + " successful: " + successful + " skipped: " + skipped + " remote: " + remoteClusters;
+            assert successful <= total : "total: " + total + " successful: " + successful + " skipped: " + skipped;
+            assert remoteClusters <= total : "total: " + total + " remote: " + remoteClusters;
+            assert ccsMinimizeRoundtrips == false || remoteClusters > 0
+                : "ccsMinimizeRoundtrips is true but remoteClusters count is not a positive number: " + remoteClusters;
+            int localCount = total - remoteClusters;
+            assert localCount == 0 || localCount == 1 : "total - remoteClusters should only be 0 or 1";
+            this.total = total;
+            this.successful = successful;
+            this.skipped = skipped;
+            this.remoteClusters = remoteClusters;
+            this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips;
+        }
+
+        /**
+         * Assumes ccsMinimizeRoundtrips=false.
+         * We are not tracking number of remote clusters in this search.
+         */
         public Clusters(int total, int successful, int skipped) {
             assert total >= 0 && successful >= 0 && skipped >= 0
                 : "total: " + total + " successful: " + successful + " skipped: " + skipped;
@@ -488,6 +523,8 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
             this.total = total;
             this.successful = successful;
             this.skipped = skipped;
+            this.remoteClusters = -1;  // means "unknown" and not needed for this usage
+            this.ccsMinimizeRoundtrips = false;
         }
 
         private Clusters(StreamInput in) throws IOException {
@@ -514,26 +551,41 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
         }
 
         /**
-         * Returns how many total clusters the search was requested to be executed on
+         * @return how many total clusters the search was requested to be executed on
          */
         public int getTotal() {
             return total;
         }
 
         /**
-         * Returns how many total clusters the search was executed successfully on
+         * @return how many total clusters the search was executed successfully on
          */
         public int getSuccessful() {
             return successful;
         }
 
         /**
-         * Returns how many total clusters were during the execution of the search request
+         * @return how many total clusters were used during the execution of the search request
          */
         public int getSkipped() {
             return skipped;
         }
 
+        /**
+         * @return how many remote clusters were using during the execution of the search request
+         *         If not set, returns -1, meaning 'unknown'.
+         */
+        public int getRemoteClusters() {
+            return remoteClusters;
+        }
+
+        /**
+         * @return whether this search was a cross cluster search done with ccsMinimizeRoundtrips=true
+         */
+        public boolean isCcsMinimizeRoundtrips() {
+            return ccsMinimizeRoundtrips;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -555,6 +607,17 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
         public String toString() {
             return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}';
         }
+
+        public String toStringExtended() {
+            return Strings.format(
+                "Clusters{total=%d, successful=%d, skipped=%d, remote=%d, ccsMinimizeRoundtrips=%s}",
+                total,
+                successful,
+                skipped,
+                remoteClusters,
+                ccsMinimizeRoundtrips
+            );
+        }
     }
 
     // public for tests

+ 10 - 3
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -310,6 +310,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                     rewritten,
                     localIndices,
                     clusterState,
+                    SearchResponse.Clusters.EMPTY,
                     searchContext,
                     searchPhaseProvider.apply(listener)
                 );
@@ -320,6 +321,12 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                         && rewritten.source().aggregations() != null
                             ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
                             : null;
+                    final int totalClusters = (localIndices == null ? 0 : 1) + remoteClusterIndices.size();
+                    var initClusters = new SearchResponse.Clusters(totalClusters, 0, 0, remoteClusterIndices.size(), true);
+                    if (localIndices == null) {
+                        // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards)
+                        task.getProgressListener().notifyListShards(Collections.emptyList(), Collections.emptyList(), initClusters, false);
+                    }
                     ccsRemoteReduce(
                         parentTaskId,
                         rewritten,
@@ -336,6 +343,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                             r,
                             localIndices,
                             clusterState,
+                            initClusters,
                             searchContext,
                             searchPhaseProvider.apply(l)
                         )
@@ -458,7 +466,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         ActionListener<SearchResponse> listener,
         BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer
     ) {
-
         if (localIndices == null && remoteIndices.size() == 1) {
             // if we are searching against a single remote cluster, we simply forward the original search request to such cluster
             // and we directly perform final reduction in the remote cluster
@@ -721,6 +728,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         SearchRequest searchRequest,
         OriginalIndices localIndices,
         ClusterState clusterState,
+        SearchResponse.Clusters clusterInfo,
         SearchContextId searchContext,
         SearchPhaseProvider searchPhaseProvider
     ) {
@@ -733,7 +741,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             (clusterName, nodeId) -> null,
             clusterState,
             Collections.emptyMap(),
-            SearchResponse.Clusters.EMPTY,
+            clusterInfo,
             searchContext,
             searchPhaseProvider
         );
@@ -872,7 +880,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         @Nullable SearchContextId searchContext,
         SearchPhaseProvider searchPhaseProvider
     ) {
-
         clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
         if (searchRequest.allowPartialSearchResults() == null) {
             // No user preference defined in search request - apply cluster service default

+ 12 - 1
server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java

@@ -135,7 +135,18 @@ public class SearchResponseTests extends ESTestCase {
         int totalClusters = randomIntBetween(0, 10);
         int successfulClusters = randomIntBetween(0, totalClusters);
         int skippedClusters = totalClusters - successfulClusters;
-        return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
+        if (randomBoolean()) {
+            return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
+        } else {
+            int remoteClusters = totalClusters;
+            if (totalClusters > 0 && randomBoolean()) {
+                // remoteClusters can be same as total cluster count or one less (when doing local search)
+                remoteClusters--;
+            }
+            // Clusters has an assert that if ccsMinimizeRoundtrips = true, then remoteClusters must be > 0
+            boolean ccsMinimizeRoundtrips = (remoteClusters > 0 ? randomBoolean() : false);
+            return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters, remoteClusters, ccsMinimizeRoundtrips);
+        }
     }
 
     /**

+ 0 - 2
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

@@ -63,7 +63,6 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.SearchShardTarget;
-import org.elasticsearch.search.aggregations.AggregationReduceContext;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.collapse.CollapseBuilder;
@@ -501,7 +500,6 @@ public class TransportSearchActionTests extends ESTestCase {
         boolean local = randomBoolean();
         OriginalIndices localIndices = local ? new OriginalIndices(new String[] { "index" }, SearchRequest.DEFAULT_INDICES_OPTIONS) : null;
         TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
-        Function<Boolean, AggregationReduceContext> reduceContext = finalReduce -> null;
         try (
             MockTransportService service = MockTransportService.createNewService(
                 settings,

+ 11 - 4
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

@@ -58,6 +58,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
 
     private final Map<String, String> originHeaders;
 
+    private boolean ccsMinimizeRoundtrips;
     private boolean hasInitialized;
     private boolean hasCompleted;
     private long completionId;
@@ -225,7 +226,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
             if (hasCompleted || waitForCompletion.getMillis() == 0) {
                 executeImmediately = true;
             } else {
-                // ensure that we consumes the listener only once
+                // ensure that we consume the listener only once
                 AtomicBoolean hasRun = new AtomicBoolean(false);
                 long id = completionId++;
                 final Cancellable cancellable;
@@ -365,6 +366,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
     }
 
     class Listener extends SearchProgressActionListener {
+
         @Override
         protected void onQueryResult(int shardIndex) {
             checkCancellation();
@@ -403,6 +405,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
         protected void onListShards(List<SearchShard> shards, List<SearchShard> skipped, Clusters clusters, boolean fetchPhase) {
             // best effort to cancel expired tasks
             checkCancellation();
+            ccsMinimizeRoundtrips = clusters.isCcsMinimizeRoundtrips();
             searchResponse.compareAndSet(
                 null,
                 new MutableSearchResponse(shards.size() + skipped.size(), skipped.size(), clusters, threadPool.getThreadContext())
@@ -430,19 +433,23 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
                  */
                 reducedAggs = () -> InternalAggregations.topLevelReduce(singletonList(aggregations), aggReduceContextSupplier.get());
             }
-            searchResponse.get().updatePartialResponse(shards.size(), totalHits, reducedAggs, reducePhase);
+            searchResponse.get().updatePartialResponse(shards.size(), totalHits, reducedAggs, reducePhase, false);
         }
 
+        /**
+         * Called when the final reduce of <b>local</b> shards is done.
+         * During a CCS search, there may still be shard searches in progress on remote clusters when this is called.
+         */
         @Override
         public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggregations, int reducePhase) {
             // best effort to cancel expired tasks
             checkCancellation();
-            searchResponse.get().updatePartialResponse(shards.size(), totalHits, () -> aggregations, reducePhase);
+            searchResponse.get().updatePartialResponse(shards.size(), totalHits, () -> aggregations, reducePhase, true);
         }
 
         @Override
         public void onResponse(SearchResponse response) {
-            searchResponse.get().updateFinalResponse(response);
+            searchResponse.get().updateFinalResponse(response, ccsMinimizeRoundtrips);
             executeCompletionListeners();
         }
 

+ 74 - 7
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java

@@ -6,12 +6,15 @@
  */
 package org.elasticsearch.xpack.search;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchResponse.Clusters;
 import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.TimeValue;
@@ -35,10 +38,12 @@ import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.restoreRe
  * run concurrently to 1 and ensures that we pause the search progress when an {@link AsyncSearchResponse} is built.
  */
 class MutableSearchResponse {
+
+    private static final Logger logger = LogManager.getLogger(MutableSearchResponse.class);
     private static final TotalHits EMPTY_TOTAL_HITS = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
     private final int totalShards;
     private final int skippedShards;
-    private final Clusters clusters;
+    private Clusters clusters;
     private final AtomicArray<ShardSearchFailure> queryFailures;
     private final ThreadContext threadContext;
 
@@ -74,6 +79,7 @@ class MutableSearchResponse {
     MutableSearchResponse(int totalShards, int skippedShards, Clusters clusters, ThreadContext threadContext) {
         this.totalShards = totalShards;
         this.skippedShards = skippedShards;
+
         this.clusters = clusters;
         this.queryFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards - skippedShards);
         this.isPartial = true;
@@ -84,13 +90,16 @@ class MutableSearchResponse {
     /**
      * Updates the response with the result of a partial reduction.
      * @param reducedAggs is a strategy for producing the reduced aggs
+     * @param isFinalLocalReduce true if the local cluster search has finished (during CCS with minimize_roundtrips, this can be true
+     *                           even while the overall search is still running on remote clusters)
      */
     @SuppressWarnings("HiddenField")
     synchronized void updatePartialResponse(
         int successfulShards,
         TotalHits totalHits,
         Supplier<InternalAggregations> reducedAggs,
-        int reducePhase
+        int reducePhase,
+        boolean isFinalLocalReduce
     ) {
         failIfFrozen();
         if (reducePhase < this.reducePhase) {
@@ -103,18 +112,32 @@ class MutableSearchResponse {
         this.totalHits = totalHits;
         this.reducedAggsSource = reducedAggs;
         this.reducePhase = reducePhase;
+        if (isFinalLocalReduce && clusters.isCcsMinimizeRoundtrips()) {
+            // currently only ccsMinimizeRoundTrip=true creates Clusters in their initial state (where successful=0)
+            // ccsMinimizeRoundtrips=false creates Clusters in its final state even at the beginning (successful+skipped=total)
+            // so update the clusters object 'successful' count if local cluster search is done AND ccsMinimizeRoundtrips=true
+            Clusters newClusters = new Clusters(
+                clusters.getTotal(),
+                clusters.getSuccessful() + 1,
+                clusters.getSkipped(),
+                clusters.getRemoteClusters(),
+                clusters.isCcsMinimizeRoundtrips()
+            );
+            this.clusters = newClusters;
+            logger.debug("Updating Clusters info to indicate that the local cluster search has completed: {}", newClusters);
+        }
     }
 
     /**
      * Updates the response with the final {@link SearchResponse} once the
      * search is complete.
      */
-    synchronized void updateFinalResponse(SearchResponse response) {
+    synchronized void updateFinalResponse(SearchResponse response, boolean ccsMinimizeRoundtrips) {
         failIfFrozen();
-        assert response.getTotalShards() == totalShards
-            : "received number of total shards differs from the one " + "notified through onListShards";
-        assert response.getSkippedShards() == skippedShards
-            : "received number of skipped shards differs from the one " + "notified through onListShards";
+
+        assert shardsInResponseMatchExpected(response, ccsMinimizeRoundtrips)
+            : getShardsInResponseMismatchInfo(response, ccsMinimizeRoundtrips);
+
         this.responseHeaders = threadContext.getResponseHeaders();
         this.finalResponse = response;
         this.isPartial = false;
@@ -298,4 +321,48 @@ class MutableSearchResponse {
         }
         return failures.toArray(ShardSearchFailure[]::new);
     }
+
+    private boolean shardsInResponseMatchExpected(SearchResponse response, boolean ccsMinimizeRoundtrips) {
+        if (ccsMinimizeRoundtrips) {
+            return response.getTotalShards() >= totalShards && response.getSkippedShards() >= skippedShards;
+        } else {
+            return response.getTotalShards() == totalShards && response.getSkippedShards() == skippedShards;
+        }
+    }
+
+    private String getShardsInResponseMismatchInfo(SearchResponse response, boolean ccsMinimizeRoundtrips) {
+        if (ccsMinimizeRoundtrips) {
+            if (response.getTotalShards() < totalShards) {
+                return Strings.format(
+                    "received number of shards (%d) is less than the value notified via onListShards (%d)",
+                    response.getTotalShards(),
+                    totalShards
+                );
+            }
+            if (response.getSkippedShards() < skippedShards) {
+                return Strings.format(
+                    "received number of skipped shards (%d) is less than the value notified via onListShards (%d)",
+                    response.getSkippedShards(),
+                    skippedShards
+                );
+            }
+            throw new IllegalStateException("assert method hit unexpected case for ccsMinimizeRoundtrips=true");
+        } else {
+            if (response.getTotalShards() != totalShards) {
+                return Strings.format(
+                    "received number of shards (%d) differs from the one notified via onListShards (%d)",
+                    response.getTotalShards(),
+                    totalShards
+                );
+            }
+            if (response.getSkippedShards() != skippedShards) {
+                return Strings.format(
+                    "received number of skipped shards (%d) differs from the one notified via onListShards (%d)",
+                    response.getSkippedShards(),
+                    skippedShards
+                );
+            }
+            throw new IllegalStateException("assert method hit unexpected case for ccsMinimizeRoundtrips=false");
+        }
+    }
 }

+ 3 - 2
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java

@@ -52,9 +52,10 @@ public final class RestSubmitAsyncSearchAction extends BaseRestHandler {
     protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
         SubmitAsyncSearchRequest submit = new SubmitAsyncSearchRequest();
         IntConsumer setSize = size -> submit.getSearchRequest().source().size(size);
-        // for simplicity, we share parsing with ordinary search. That means a couple of unsupported parameters, like scroll,
-        // pre_filter_shard_size and ccs_minimize_roundtrips get set to the search request although the REST spec don't list
+        // for simplicity, we share parsing with ordinary search. That means a couple of unsupported parameters, like scroll
+        // and pre_filter_shard_size get set to the search request although the REST spec don't list
         // them as supported. We rely on SubmitAsyncSearchRequest#validate to fail in case they are set.
+        // Note that ccs_minimize_roundtrips is also set this way, which is a supported option.
         request.withContentOrSourceParamParserOrNull(
             parser -> parseSearchRequest(
                 submit.getSearchRequest(),

+ 8 - 0
x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchActionTests.java

@@ -91,6 +91,14 @@ public class RestSubmitAsyncSearchActionTests extends RestActionTestCase {
             batchedReduceSize,
             r -> r.getSearchRequest().getBatchedReduceSize()
         );
+
+        boolean ccsMinimizeRoundtrips = randomBoolean();
+        doTestParameter(
+            "ccs_minimize_roundtrips",
+            Boolean.toString(ccsMinimizeRoundtrips),
+            ccsMinimizeRoundtrips,
+            r -> r.getSearchRequest().isCcsMinimizeRoundtrips()
+        );
     }
 
     @SuppressWarnings("unchecked")

+ 1 - 3
x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/SubmitAsyncSearchRequestTests.java

@@ -85,9 +85,7 @@ public class SubmitAsyncSearchRequestTests extends AbstractWireSerializingTransf
         SubmitAsyncSearchRequest req = new SubmitAsyncSearchRequest();
         req.getSearchRequest().setCcsMinimizeRoundtrips(true);
         ActionRequestValidationException exc = req.validate();
-        assertNotNull(exc);
-        assertThat(exc.validationErrors().size(), equalTo(1));
-        assertThat(exc.validationErrors().get(0), containsString("[ccs_minimize_roundtrips]"));
+        assertNull(exc);
     }
 
     public void testValidateScroll() {

+ 19 - 6
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SubmitAsyncSearchRequest.java

@@ -82,6 +82,25 @@ public class SubmitAsyncSearchRequest extends ActionRequest {
         return request.getBatchedReduceSize();
     }
 
+    /**
+     * Returns whether network round-trips should be minimized when executing cross-cluster search requests.
+     * Defaults to <code>false</code>.
+     */
+    public boolean isCcsMinimizeRoundtrips() {
+        return request.isCcsMinimizeRoundtrips();
+    }
+
+    /**
+     * Sets whether network round-trips should be minimized when executing cross-cluster search requests.
+     * Defaults to <code>false</code>.
+     *
+     * <p>WARNING: The progress and partial responses of searches executed on remote clusters will not be
+     * available during the search if {@code ccsMinimizeRoundtrips} is enabled.</p>
+     */
+    public void setCcsMinimizeRoundtrips(boolean ccsMinimizeRoundtrips) {
+        request.setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips);
+    }
+
     /**
      * Sets the minimum time that the request should wait before returning a partial result (defaults to 1 second).
      */
@@ -140,12 +159,6 @@ public class SubmitAsyncSearchRequest extends ActionRequest {
                 validationException
             );
         }
-        if (request.isCcsMinimizeRoundtrips()) {
-            validationException = addValidationError(
-                "[ccs_minimize_roundtrips] is not supported on async search queries",
-                validationException
-            );
-        }
         if (request.getPreFilterShardSize() == null || request.getPreFilterShardSize() != 1) {
             validationException = addValidationError(
                 "[pre_filter_shard_size] cannot be changed for async search queries",

Some files were not shown because too many files changed in this diff