1
0
Эх сурвалжийг харах

Add specific cluster error info, shard info and additional metadata for CCS when minimizing roundtrips (#97731)

For CCS searches with ccs_minimize_roundtrips=true, when an error is returned, it is unclear which cluster
caused the problem. This commit adds additional accounting and error information to the search response
for each cluster involved in a cross-cluster search.

The _clusters section of the SearchResponse has a new details section added with an entry for each cluster
(remote and local). It includes status info, shard accounting counters and error information that are added
incrementally as the search happens.

The search on each cluster can be in one of 5 states:
RUNNING
SUCCESSFUL - all shards were successfully searched (successful or skipped)
PARTIAL - some shard searches failed, but at least one succeeded and partial data has been returned
SKIPPED - no shards were successfully searched (all failed or cluster unavailable) when skip_unavailable=true
FAILED - no shards were successfully searched (all failed or cluster unavailable) when skip_unavailable=false

A new SearchResponse.Cluster object has been added. Each TransportSearchAction.CCSActionListener
(one for each cluster) has a reference to a separate Cluster instance and updates once it gets back 
information from its cluster.

The SearchResponse.Clusters object only uses the new Cluster object for CCS minimize_roundtrips=true.
For local-only searches and CCS minimize_roundtrips=false, it uses the current  Clusters object as before.

Follow on work will change CCS minimize_roundtrips=false to also use the new Cluster model and update
state in the _cluster/details section.

The Cluster objects are immutable, so a CAS operation is required to swap in new state to the 
map of Cluster objects held by the `SearchResponse.Clusters` class. This concurrency model is 
a little bit of overkill for the minimize_roundtrips=true use case, but it will be necessary for 
supporting minimize_roundtrips=false, since updates there will be done per shard, not per cluster.
Michael Peterson 2 жил өмнө
parent
commit
169f7d1774
14 өөрчлөгдсөн 2080 нэмэгдсэн , 233 устгасан
  1. 138 52
      docs/reference/search/search-your-data/search-across-clusters.asciidoc
  2. 2 1
      server/src/main/java/org/elasticsearch/TransportVersion.java
  3. 361 65
      server/src/main/java/org/elasticsearch/action/search/SearchResponse.java
  4. 131 14
      server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
  5. 1 12
      server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java
  6. 25 0
      server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java
  7. 28 9
      test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java
  8. 748 37
      x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java
  9. 2 2
      x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java
  10. 10 24
      x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java
  11. 503 5
      x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchResponseTests.java
  12. 100 10
      x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncStatusResponseTests.java
  13. 30 1
      x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/ThrowingQueryBuilder.java
  14. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java

+ 138 - 52
docs/reference/search/search-your-data/search-across-clusters.asciidoc

@@ -130,15 +130,29 @@ The API returns the following response:
   "took": 150,
   "timed_out": false,
   "_shards": {
-    "total": 1,
-    "successful": 1,
+    "total": 3,
+    "successful": 3,
     "failed": 0,
     "skipped": 0
   },
   "_clusters": {
     "total": 1,
     "successful": 1,
-    "skipped": 0
+    "skipped": 0,
+    "details": {
+      "cluster_one": {  <1>
+        "status": "successful",
+        "indices": "my-index-000001",
+        "took": 148,
+        "timed_out": false,
+        "_shards": {
+          "total": 3,
+          "successful": 3,
+          "skipped": 0,
+          "failed": 0
+        }
+      }
+    }
   },
   "hits": {
     "total" : {
@@ -148,7 +162,7 @@ The API returns the following response:
     "max_score": 1,
     "hits": [
       {
-        "_index": "cluster_one:my-index-000001", <1>
+        "_index": "cluster_one:my-index-000001", <2>
         "_id": "0",
         "_score": 1,
         "_source": {
@@ -171,10 +185,18 @@ The API returns the following response:
 // TESTRESPONSE[s/"took": 150/"took": "$body.took"/]
 // TESTRESPONSE[s/"max_score": 1/"max_score": "$body.hits.max_score"/]
 // TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]
-
-<1> The search response body includes the name of the remote cluster in the
+// TESTRESPONSE[s/"total": 3/"total": "$body._shards.total"/]
+// TESTRESPONSE[s/"successful": 3/"successful": "$body._shards.successful"/]
+// TESTRESPONSE[s/"skipped": 0/"skipped": "$body._shards.skipped"/]
+// TESTRESPONSE[s/"failed": 3/"failed": "$body._shards.failed"/]
+// TESTRESPONSE[s/"took": 148/"took": "$body._clusters.details.cluster_one.took"/]
+
+<1> The details section shows information about the search on each cluster.
+<2> The search response body includes the name of the remote cluster in the
 `_index` parameter.
 
+
+
 [discrete]
 [[ccs-search-multi-remote-cluster]]
 ==== Search multiple remote clusters
@@ -208,15 +230,53 @@ The API returns the following response:
   "timed_out": false,
   "num_reduce_phases": 4,
   "_shards": {
-    "total": 3,
-    "successful": 3,
+    "total": 12,
+    "successful": 12,
     "failed": 0,
     "skipped": 0
   },
   "_clusters": {
     "total": 3,
     "successful": 3,
-    "skipped": 0
+    "skipped": 0,
+    "details": {
+      "(local)": {            <1>
+        "status": "successful",
+        "indices": "my-index-000001",
+        "took": 21,
+        "timed_out": false,
+        "_shards": {
+          "total": 5,
+          "successful": 5,
+          "skipped": 0,
+          "failed": 0
+        }
+      },
+      "cluster_one": {
+        "status": "successful",
+        "indices": "my-index-000001",
+        "took": 48,
+        "timed_out": false,
+        "_shards": {
+          "total": 4,
+          "successful": 4,
+          "skipped": 0,
+          "failed": 0
+        }
+      },
+      "cluster_two": {
+        "status": "successful",
+        "indices": "my-index-000001",
+        "took": 141,
+        "timed_out": false,
+        "_shards": {
+          "total" : 3,
+          "successful" : 3,
+          "skipped": 0,
+          "failed": 0
+        }
+      }
+    }
   },
   "hits": {
     "total" : {
@@ -226,7 +286,7 @@ The API returns the following response:
     "max_score": 1,
     "hits": [
       {
-        "_index": "my-index-000001", <1>
+        "_index": "my-index-000001", <2>
         "_id": "0",
         "_score": 2,
         "_source": {
@@ -243,7 +303,7 @@ The API returns the following response:
         }
       },
       {
-        "_index": "cluster_one:my-index-000001", <2>
+        "_index": "cluster_one:my-index-000001", <3>
         "_id": "0",
         "_score": 1,
         "_source": {
@@ -260,7 +320,7 @@ The API returns the following response:
         }
       },
       {
-        "_index": "cluster_two:my-index-000001", <3>
+        "_index": "cluster_two:my-index-000001", <4>
         "_id": "0",
         "_score": 1,
         "_source": {
@@ -284,14 +344,23 @@ The API returns the following response:
 // TESTRESPONSE[s/"max_score": 1/"max_score": "$body.hits.max_score"/]
 // TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]
 // TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/]
-
-<1> This document's `_index` parameter doesn't include a cluster name. This
+// TESTRESPONSE[s/"total": 12/"total": "$body._shards.total"/]
+// TESTRESPONSE[s/"successful": 12/"successful": "$body._shards.successful"/]
+// TESTRESPONSE[s/"total": 5/"total": "$body._clusters.details.(local)._shards.total"/]
+// TESTRESPONSE[s/"successful": 5/"successful": "$body._clusters.details.(local)._shards.successful"/]
+// TESTRESPONSE[s/"took": 21/"took": "$body._clusters.details.(local).took"/]
+// TESTRESPONSE[s/"total": 4/"total": "$body._clusters.details.cluster_one._shards.total"/]
+// TESTRESPONSE[s/"successful": 4/"successful": "$body._clusters.details.cluster_one._shards.successful"/]
+// TESTRESPONSE[s/"took": 48/"took": "$body._clusters.details.cluster_one.took"/]
+// TESTRESPONSE[s/"total" : 3/"total": "$body._clusters.details.cluster_two._shards.total"/]
+// TESTRESPONSE[s/"successful" : 3/"successful": "$body._clusters.details.cluster_two._shards.successful"/]
+// TESTRESPONSE[s/"took": 141/"took": "$body._clusters.details.cluster_two.took"/]
+
+<1> The local (querying) cluster is identified as "(local)".
+<2> This document's `_index` parameter doesn't include a cluster name. This
 means the document came from the local cluster.
-<2> This document came from `cluster_one`.
-<3> This document came from `cluster_two`.
-
-
-
+<3> This document came from `cluster_one`.
+<4> This document came from `cluster_two`.
 
 
 [discrete]
@@ -321,7 +390,7 @@ POST /my-index-000001,cluster_one:my-index-000001,cluster_two:my-index-000001/_a
 }
 --------------------------------------------------
 // TEST[continued]
-// TEST[s/ccs_minimize_roundtrips=true/ccs_minimize_roundtrips=true&wait_for_completion_timeout=1s&keep_on_completion=true/]
+// TEST[s/ccs_minimize_roundtrips=true/ccs_minimize_roundtrips=true&wait_for_completion_timeout=100ms&keep_on_completion=true/]
 
 
 The API returns the following response:
@@ -360,21 +429,7 @@ The API returns the following response:
   }
 }
 --------------------------------------------------
-// TESTRESPONSE[s/FklQYndoTDJ2VEFlMEVBTzFJMGhJVFEaLVlKYndBWWZSMUdicUc4WVlEaFl4ZzoxNTU=/$body.id/]
-// TESTRESPONSE[s/"is_partial": true/"is_partial": $body.is_partial/]
-// TESTRESPONSE[s/"is_running": true/"is_running": $body.is_running/]
-// TESTRESPONSE[s/1685563581380/$body.start_time_in_millis/]
-// TESTRESPONSE[s/1685995581380/$body.expiration_time_in_millis/]
-// TESTRESPONSE[s/"response"/"completion_time_in_millis": $body.completion_time_in_millis,\n  "response"/]
-// TESTRESPONSE[s/"num_reduce_phases": 0/"num_reduce_phases": "$body.response.num_reduce_phases"/]
-// TESTRESPONSE[s/"took": 1020/"took": "$body.response.took"/]
-// TESTRESPONSE[s/"total": 8/"total": $body.response._shards.total/]
-// TESTRESPONSE[s/"successful": 0/"successful": $body.response._shards.successful/]
-// TESTRESPONSE[s/"successful" : 0/"successful": $body.response._clusters.successful/]
-// TESTRESPONSE[s/"value": 0/"value": "$body.response.hits.total.value"/]
-// TESTRESPONSE[s/"max_score": null/"max_score": "$body.response.hits.max_score"/]
-// TESTRESPONSE[s/"hits": \[\]/"hits": $body.response.hits.hits/]
-
+// TEST[skip: terminated_early is absent from final results so is hard to reproduce here]
 
 <1> The async search id.
 <2> When `ccs_minimize_roundtrips` = `true` and searches on the remote clusters
@@ -385,7 +440,6 @@ across all clusters only when the search is completed.
 and all are currently running (since `successful` and `skipped` both equal 0).
 
 
-
 If you query the <<get-async-search,get async search>> endpoint while the query is
 still running, you will see an update in the `_clusters` and `_shards` section of
 the response when the local search has finished.
@@ -394,7 +448,7 @@ the response when the local search has finished.
 --------------------------------------------------
 GET /_async_search/FklQYndoTDJ2VEFlMEVBTzFJMGhJVFEaLVlKYndBWWZSMUdicUc4WVlEaFl4ZzoxNTU=
 --------------------------------------------------
-// TEST[skip: terminated_early is absent from final results so is hard to reproduce here]
+// TEST[continued s/FklQYndoTDJ2VEFlMEVBTzFJMGhJVFEaLVlKYndBWWZSMUdicUc4WVlEaFl4ZzoxNTU=/\${body.id}/]
 
 Response:
 
@@ -443,8 +497,6 @@ until all remote searches have finished (either successfully or been skipped).
 shown until searches on all clusters have been completed and merged.
 
 
-
-
 After searches on all the clusters have completed, when you query the
 <<get-async-search,get async search>> endpoint, you will see the final
 status of the `_clusters` and `_shards` section as well as the hits.
@@ -480,7 +532,45 @@ Response:
     "_clusters": {
       "total": 3,
       "successful": 3,   <3>
-      "skipped": 0
+      "skipped": 0,
+      "details": {
+        "(local)": {
+          "status": "successful",
+          "indices": "my-index-000001",
+          "took": 14382,
+          "timed_out": false,
+          "_shards": {
+            "total": 10,
+            "successful": 10,
+            "skipped": 0,
+            "failed": 0
+          }
+        },
+        "cluster_one": {
+          "status": "successful",
+          "indices": "my-index-000001",
+          "took": 22193,
+          "timed_out": false,
+          "_shards": {
+            "total": 12,
+            "successful": 12,
+            "skipped": 0,
+            "failed": 0
+          }
+        },
+        "cluster_two": {
+          "status": "successful",
+          "indices": "my-index-000001",
+          "took": 27550,
+          "timed_out": false,
+          "_shards": {
+            "total": 6,
+            "successful": 6,
+            "skipped": 0,
+            "failed": 0
+          }
+        }
+      }
     },
     "hits": {
       "total": {
@@ -500,6 +590,9 @@ Response:
 // TESTRESPONSE[s/1685996911108/$body.expiration_time_in_millis/]
 // TESTRESPONSE[s/1685564938727/$body.completion_time_in_millis/]
 // TESTRESPONSE[s/"took": 27619/"took": "$body.response.took"/]
+// TESTRESPONSE[s/"took": 14382/"took": "$body.$_path"/]
+// TESTRESPONSE[s/"took": 22193/"took": "$body.$_path"/]
+// TESTRESPONSE[s/"took": 27550/"took": "$body.$_path"/]
 // TESTRESPONSE[s/"total": 28/"total": $body.response._shards.total/]
 // TESTRESPONSE[s/"successful": 28/"successful": $body.response._shards.successful/]
 // TESTRESPONSE[s/"successful": 3/"successful": $body.response._clusters.successful/]
@@ -507,6 +600,8 @@ Response:
 // TESTRESPONSE[s/"relation": "eq"/"relation": "$body.response.hits.total.relation"/]
 // TESTRESPONSE[s/"max_score": 1.8293576/"max_score": "$body.response.hits.max_score"/]
 // TESTRESPONSE[s/"hits": \[...list of hits here...\]/"hits": $body.response.hits.hits/]
+// TESTRESPONSE[s/"total": \d+/"total": $body.$_path/]
+// TESTRESPONSE[s/"successful": \d+/"successful": $body.$_path/]
 
 
 <1> Once the search has finished, the completion_time is present.
@@ -515,7 +610,6 @@ were searched across all clusters and that all were successful.
 <3> The `_clusters` section shows that searches on all 3 clusters were successful.
 
 
-
 [discrete]
 [[ccs-async-search-minimize-roundtrips-false]]
 === Using async search for {ccs} with ccs_minimize_roundtrips=false
@@ -540,7 +634,7 @@ Example using the same set up as in the previous section (`ccs_minimize_roundtri
 
 [source,console]
 --------------------------------------------------
-GET /my-index-000001,cluster_one:my-index-000001,cluster_two:my-index-000001/_async_search?ccs_minimize_roundtrips=false
+POST /my-index-000001,cluster_one:my-index-000001,cluster_two:my-index-000001/_async_search?ccs_minimize_roundtrips=false
 {
   "query": {
     "match": {
@@ -551,7 +645,7 @@ GET /my-index-000001,cluster_one:my-index-000001,cluster_two:my-index-000001/_as
 }
 --------------------------------------------------
 // TEST[continued]
-// TEST[s/ccs_minimize_roundtrips=false/ccs_minimize_roundtrips=false&wait_for_completion_timeout=1s&keep_on_completion=true/]
+// TEST[s/ccs_minimize_roundtrips=false/ccs_minimize_roundtrips=false&wait_for_completion_timeout=2s&keep_on_completion=true/]
 
 
 The API returns the following response if the query takes longer than
@@ -568,7 +662,6 @@ the `wait_for_completion_timeout` duration (see <<async-search>>).
   "response": {
     "took": 1020,
     "timed_out": false,
-    "num_reduce_phases": 0,
     "_shards": {
       "total": 28,     <1>
       "successful": 0,
@@ -597,13 +690,8 @@ the `wait_for_completion_timeout` duration (see <<async-search>>).
 // TESTRESPONSE[s/1685563581380/$body.start_time_in_millis/]
 // TESTRESPONSE[s/1685995581380/$body.expiration_time_in_millis/]
 // TESTRESPONSE[s/"response"/"completion_time_in_millis": $body.completion_time_in_millis,\n  "response"/]
-// TESTRESPONSE[s/"num_reduce_phases": 0/"num_reduce_phases": "$body.response.num_reduce_phases"/]
-// TESTRESPONSE[s/"took": 1020/"took": "$body.response.took"/]
-// TESTRESPONSE[s/"total": 28/"total": $body.response._shards.total/]
-// TESTRESPONSE[s/"successful": 0/"successful": $body.response._shards.successful/]
-// TESTRESPONSE[s/"successful": 3/"successful": $body.response._clusters.successful/]
-// TESTRESPONSE[s/"value": 0/"value": "$body.response.hits.total.value"/]
 // TESTRESPONSE[s/"max_score": null/"max_score": "$body.response.hits.max_score"/]
+// TESTRESPONSE[s/\d+/$body.$_path/]
 // TESTRESPONSE[s/"hits": \[\]/"hits": $body.response.hits.hits/]
 
 <1> All shards from all clusters in scope for the search are listed here. Watch this
@@ -612,8 +700,6 @@ section for updates to monitor search progress.
 gathered from all 3 clusters and that all will be searched (none are being skipped).
 
 
-
-
 [discrete]
 [[skip-unavailable-clusters]]
 === Optional remote clusters

+ 2 - 1
server/src/main/java/org/elasticsearch/TransportVersion.java

@@ -175,9 +175,10 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
     public static final TransportVersion V_8_500_050 = registerTransportVersion(8_500_050, "69722fa2-7c0a-4227-86fb-6d6a9a0a0321");
     public static final TransportVersion V_8_500_051 = registerTransportVersion(8_500_051, "a28b43bc-bb5f-4406-afcf-26900aa98a71");
     public static final TransportVersion V_8_500_052 = registerTransportVersion(8_500_052, "2d382b3d-9838-4cce-84c8-4142113e5c2b");
+    public static final TransportVersion V_8_500_053 = registerTransportVersion(8_500_053, "aa603bae-01e2-380a-8950-6604468e8c6d");
 
     private static class CurrentHolder {
-        private static final TransportVersion CURRENT = findCurrent(V_8_500_052);
+        private static final TransportVersion CURRENT = findCurrent(V_8_500_053);
 
         // finds the pluggable current version, or uses the given fallback
         private static TransportVersion findCurrent(TransportVersion fallback) {

+ 361 - 65
server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

@@ -11,6 +11,7 @@ package org.elasticsearch.action.search;
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -30,6 +31,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.search.profile.SearchProfileResults;
 import org.elasticsearch.search.profile.SearchProfileShardResult;
 import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.ToXContentFragment;
@@ -39,10 +41,15 @@ import org.elasticsearch.xcontent.XContentParser.Token;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
@@ -465,7 +472,8 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
 
     /**
      * Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
-     * and how many of them were skipped.
+     * and how many of them were skipped and further details in a Map of Cluster objects
+     * (when doing a cross-cluster search).
      */
     public static class Clusters implements ToXContentFragment, Writeable {
 
@@ -477,67 +485,88 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
         static final ParseField TOTAL_FIELD = new ParseField("total");
 
         private final int total;
-        private final int successful;
-        private final int skipped;
-        // NOTE: these two new fields (remoteClusters and ccsMinimizeRoundtrips) have not been added to the wire protocol
-        // or equals/hashCode methods. They are needed for CCS only (async-search CCS in particular). If we need to write
-        // these to the .async-search system index in the future, we may want to refactor Clusters to allow async-search
-        // to subclass it.
-        private final transient int remoteClusters;
+        private final int successful; // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
+        private final int skipped;    // not used for minimize_roundtrips=true; dynamically determined from clusterInfo map
+
+        // key to map is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
+        // the Map itself is immutable after construction - all Clusters will be accounted for at the start of the search
+        // updates to the Cluster occur by CAS swapping in new Cluster objects into the AtomicReference in the map.
+        private final Map<String, AtomicReference<Cluster>> clusterInfo;
+
+        // this field is not Writeable, as it is only needed on the initial "querying cluster" coordinator of a CCS search
         private final transient boolean ccsMinimizeRoundtrips;
 
         /**
-         * A Clusters object meant for use with CCS holding additional information about
-         * the number of remote clusters and whether ccsMinimizeRoundtrips is being used.
-         * @param total total number of clusters in the search
-         * @param successful number of clusters that have successfully completed the search
-         * @param skipped number of clusters that were skipped (e.g., unavailable or other error)
-         * @param remoteClusters number of remote clusters in the search
-         * @param ccsMinimizeRoundtrips specifies whether a CCS search is using minimizeRoundtrips feature
+         * For use with cross-cluster searches.
+         * When minimizing roundtrips, the number of successful and skipped clusters is not known until
+         * the end of the search and it the information in SearchResponse.Cluster object will be updated
+         * as each cluster returns.
+         * @param localIndices The localIndices to be searched - null if no local indices are to be searched
+         * @param remoteClusterIndices mapping of clusterAlias -> OriginalIndices for each remote cluster
+         * @param ccsMinimizeRoundtrips whether minimizing roundtrips for the CCS
          */
-        public Clusters(int total, int successful, int skipped, int remoteClusters, boolean ccsMinimizeRoundtrips) {
-            assert total >= 0 && successful >= 0 && skipped >= 0 && remoteClusters >= 0
-                : "total: " + total + " successful: " + successful + " skipped: " + skipped + " remote: " + remoteClusters;
-            assert successful <= total : "total: " + total + " successful: " + successful + " skipped: " + skipped;
-            assert remoteClusters <= total : "total: " + total + " remote: " + remoteClusters;
-            assert ccsMinimizeRoundtrips == false || remoteClusters > 0
-                : "ccsMinimizeRoundtrips is true but remoteClusters count is not a positive number: " + remoteClusters;
-            int localCount = total - remoteClusters;
-            assert localCount == 0 || localCount == 1 : "total - remoteClusters should only be 0 or 1";
-            this.total = total;
-            this.successful = successful;
-            this.skipped = skipped;
-            this.remoteClusters = remoteClusters;
+        public Clusters(
+            @Nullable OriginalIndices localIndices,
+            Map<String, OriginalIndices> remoteClusterIndices,
+            boolean ccsMinimizeRoundtrips
+        ) {
+            this.total = remoteClusterIndices.size() + (localIndices == null ? 0 : 1);
+            assert total >= 1 : "No local indices or remote clusters passed in";
+            this.successful = 0; // calculated from clusterInfo map for minimize_roundtrips
+            this.skipped = 0;    // calculated from clusterInfo map for minimize_roundtrips
             this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips;
+            Map<String, AtomicReference<Cluster>> m = new HashMap<>();
+            if (localIndices != null) {
+                String localKey = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
+                Cluster c = new Cluster(localKey, String.join(",", localIndices.indices()));
+                m.put(localKey, new AtomicReference<>(c));
+            }
+            for (Map.Entry<String, OriginalIndices> remote : remoteClusterIndices.entrySet()) {
+                String clusterAlias = remote.getKey();
+                Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()));
+                m.put(clusterAlias, new AtomicReference<>(c));
+            }
+            this.clusterInfo = Collections.unmodifiableMap(m);
         }
 
         /**
-         * Assumes ccsMinimizeRoundtrips=false.
-         * We are not tracking number of remote clusters in this search.
+         * Used for searches that are either not cross-cluster or CCS with minimize_roundtrips=false.
+         * For CCS minimize_roundtrips=true use {@code Clusters(OriginalIndices, Map<String, OriginalIndices>, boolean)}
+         * @param total total number of clusters in the search
+         * @param successful number of successful clusters in the search
+         * @param skipped number of skipped clusters (skipped can only happen for remote clusters with skip_unavailable=true)
          */
         public Clusters(int total, int successful, int skipped) {
-            this(total, successful, skipped, true);
-        }
-
-        /**
-         * @param finalState if true, then do an assert that total = successful + skipped. This is true
-         *                   only when the cluster is in its final state, not an initial or intermediate state.
-         */
-        Clusters(int total, int successful, int skipped, boolean finalState) {
             assert total >= 0 && successful >= 0 && skipped >= 0 && successful <= total
                 : "total: " + total + " successful: " + successful + " skipped: " + skipped;
-            assert finalState == false || skipped == total - successful
-                : "total: " + total + " successful: " + successful + " skipped: " + skipped;
+            assert skipped == total - successful : "total: " + total + " successful: " + successful + " skipped: " + skipped;
             this.total = total;
             this.successful = successful;
             this.skipped = skipped;
-            this.remoteClusters = -1;  // means "unknown" and not needed for this usage
             this.ccsMinimizeRoundtrips = false;
+            this.clusterInfo = Collections.emptyMap();  // will never be used if created from this constructor
         }
 
         public Clusters(StreamInput in) throws IOException {
-            // when coming across the wire, we don't have context to know if this Cluster is in a final state, so set finalState=false
-            this(in.readVInt(), in.readVInt(), in.readVInt(), false);
+            this.total = in.readVInt();
+            this.successful = in.readVInt();
+            this.skipped = in.readVInt();
+            if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_053)) {
+                List<Cluster> clusterList = in.readList(Cluster::new);
+                if (clusterList.isEmpty()) {
+                    this.clusterInfo = Collections.emptyMap();
+                } else {
+                    Map<String, AtomicReference<Cluster>> m = new HashMap<>();
+                    clusterList.forEach(c -> m.put(c.getClusterAlias(), new AtomicReference<>(c)));
+                    this.clusterInfo = Collections.unmodifiableMap(m);
+                }
+            } else {
+                this.clusterInfo = Collections.emptyMap();
+            }
+            this.ccsMinimizeRoundtrips = false;
+            assert total >= 0 : "total is negative: " + total;
+            assert total >= successful + skipped
+                : "successful + skipped is larger than total. total: " + total + " successful: " + successful + " skipped: " + skipped;
         }
 
         @Override
@@ -545,6 +574,14 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
             out.writeVInt(total);
             out.writeVInt(successful);
             out.writeVInt(skipped);
+            if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_053)) {
+                if (clusterInfo != null) {
+                    List<Cluster> clusterList = clusterInfo.values().stream().map(AtomicReference::get).toList();
+                    out.writeList(clusterList);
+                } else {
+                    out.writeList(Collections.emptyList());
+                }
+            }
         }
 
         @Override
@@ -552,8 +589,16 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
             if (total > 0) {
                 builder.startObject(_CLUSTERS_FIELD.getPreferredName());
                 builder.field(TOTAL_FIELD.getPreferredName(), total);
-                builder.field(SUCCESSFUL_FIELD.getPreferredName(), successful);
-                builder.field(SKIPPED_FIELD.getPreferredName(), skipped);
+                builder.field(SUCCESSFUL_FIELD.getPreferredName(), getSuccessful());
+                builder.field(SKIPPED_FIELD.getPreferredName(), getSkipped());
+                // TODO: add FAILED_FIELD
+                if (clusterInfo.size() > 0) {
+                    builder.startObject("details");
+                    for (AtomicReference<Cluster> cluster : clusterInfo.values()) {
+                        cluster.get().toXContent(builder, params);
+                    }
+                    builder.endObject();
+                }
                 builder.endObject();
             }
             return builder;
@@ -570,22 +615,37 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
          * @return how many total clusters the search was executed successfully on
          */
         public int getSuccessful() {
-            return successful;
+            if (clusterInfo.isEmpty()) {
+                return successful;
+            } else {
+                return determineCountFromClusterInfo(
+                    cluster -> cluster.getStatus() == Cluster.Status.SUCCESSFUL || cluster.getStatus() == Cluster.Status.PARTIAL
+                );
+            }
         }
 
         /**
-         * @return how many total clusters were used during the execution of the search request
+         * When Clusters is using the clusterInfo map (and Cluster objects are being updated in various
+         * ActionListener threads), this method will count how many clusters match the passed in predicate.
+         *
+         * @param predicate to evaluate
+         * @return count of clusters matching the predicate
          */
-        public int getSkipped() {
-            return skipped;
+        private int determineCountFromClusterInfo(Predicate<Cluster> predicate) {
+            return (int) clusterInfo.values().stream().filter(c -> predicate.test(c.get())).count();
         }
 
         /**
-         * @return how many remote clusters were using during the execution of the search request
-         *         If not set, returns -1, meaning 'unknown'.
+         * @return how many total clusters were used during the execution of the search request
          */
-        public int getRemoteClusters() {
-            return remoteClusters;
+        public int getSkipped() {
+            if (clusterInfo.isEmpty()) {
+                return skipped;
+            } else {
+                return determineCountFromClusterInfo(cluster ->
+                // TODO: change this after adding an XContent field for FAILED clusters
+                cluster.getStatus() == Cluster.Status.SKIPPED || cluster.getStatus() == Cluster.Status.FAILED);
+            }
         }
 
         /**
@@ -595,6 +655,14 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
             return ccsMinimizeRoundtrips;
         }
 
+        /**
+         * @param clusterAlias The cluster alias as specified in the cluster collection
+         * @return Cluster object associated with teh clusterAlias or null if not present
+         */
+        public AtomicReference<Cluster> getCluster(String clusterAlias) {
+            return clusterInfo.get(clusterAlias);
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -614,18 +682,246 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
 
         @Override
         public String toString() {
-            return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}';
-        }
-
-        public String toStringExtended() {
-            return Strings.format(
-                "Clusters{total=%d, successful=%d, skipped=%d, remote=%d, ccsMinimizeRoundtrips=%s}",
-                total,
-                successful,
-                skipped,
-                remoteClusters,
-                ccsMinimizeRoundtrips
-            );
+            return "Clusters{total=" + total + ", successful=" + getSuccessful() + ", skipped=" + getSkipped() + '}';
+        }
+
+        /**
+         * @return true if any underlying Cluster objects have PARTIAL, SKIPPED, FAILED or RUNNING status.
+         */
+        public boolean hasPartialResults() {
+            for (AtomicReference<Cluster> cluster : clusterInfo.values()) {
+                switch (cluster.get().getStatus()) {
+                    case PARTIAL, SKIPPED, FAILED, RUNNING -> {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+    }
+
+    /**
+     * Represents the search metadata about a particular cluster involved in a cross-cluster search.
+     * The Cluster object can represent either the local cluster or a remote cluster.
+     * For the local cluster, clusterAlias should be specified as RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.
+     * Its XContent is put into the "details" section the "_clusters" entry in the SearchResponse.
+     * This is an immutable class, so updates made during the search progress (especially important for async
+     * CCS searches) must be done by replacing the Cluster object with a new one.
+     * See the Clusters clusterInfo Map for details.
+     */
+    public static class Cluster implements ToXContentFragment, Writeable {
+        private final String clusterAlias;
+        private final String indexExpression; // original index expression from the user for this cluster
+        private final Status status;
+        private final Integer totalShards;
+        private final Integer successfulShards;
+        private final Integer skippedShards;
+        private final Integer failedShards;
+        private final List<ShardSearchFailure> failures;
+        private final TimeValue took;  // search latency in millis for this cluster sub-search
+        private final boolean timedOut;
+
+        /**
+         * Marks the status of a Cluster search involved in a Cross-Cluster search.
+         */
+        public enum Status {
+            RUNNING,     // still running
+            SUCCESSFUL,  // all shards completed search
+            PARTIAL,     // only some shards completed the search, partial results from cluster
+            SKIPPED,     // entire cluster was skipped
+            FAILED;      // search was failed due to errors on this cluster
+
+            @Override
+            public String toString() {
+                return this.name().toLowerCase(Locale.ROOT);
+            }
+        }
+
+        /**
+         * Create a Cluster object representing the initial RUNNING state of a Cluster.
+         *
+         * @param clusterAlias clusterAlias as defined in the remote cluster settings or RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY
+         *                     for the local cluster
+         * @param indexExpression the original (not resolved/concrete) indices expression provided for this cluster.
+         */
+        public Cluster(String clusterAlias, String indexExpression) {
+            this(clusterAlias, indexExpression, Status.RUNNING, null, null, null, null, null, null, false);
+        }
+
+        /**
+         * Create a Cluster with a new Status and one or more ShardSearchFailures. This constructor
+         * should only be used for fatal failures where shard counters (total, successful, skipped, failed)
+         * are not known (unset).
+         * @param clusterAlias clusterAlias as defined in the remote cluster settings or RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY
+         *                     for the local cluster
+         * @param indexExpression the original (not resolved/concrete) indices expression provided for this cluster.
+         * @param status current status of the search on this Cluster
+         * @param failures list of failures that occurred during the search on this Cluster
+         */
+        public Cluster(String clusterAlias, String indexExpression, Status status, List<ShardSearchFailure> failures) {
+            this(clusterAlias, indexExpression, status, null, null, null, null, failures, null, false);
+        }
+
+        public Cluster(
+            String clusterAlias,
+            String indexExpression,
+            Status status,
+            Integer totalShards,
+            Integer successfulShards,
+            Integer skippedShards,
+            Integer failedShards,
+            List<ShardSearchFailure> failures,
+            TimeValue took,
+            boolean timedOut
+        ) {
+            assert clusterAlias != null : "clusterAlias cannot be null";
+            assert indexExpression != null : "indexExpression of Cluster cannot be null";
+            assert status != null : "status of Cluster cannot be null";
+            this.clusterAlias = clusterAlias;
+            this.indexExpression = indexExpression;
+            this.status = status;
+            this.totalShards = totalShards;
+            this.successfulShards = successfulShards;
+            this.skippedShards = skippedShards;
+            this.failedShards = failedShards;
+            this.failures = failures == null ? Collections.emptyList() : Collections.unmodifiableList(failures);
+            this.took = took;
+            this.timedOut = timedOut;
+        }
+
+        public Cluster(StreamInput in) throws IOException {
+            this.clusterAlias = in.readString();
+            this.indexExpression = in.readString();
+            this.status = Status.valueOf(in.readString().toUpperCase(Locale.ROOT));
+            this.totalShards = in.readOptionalInt();
+            this.successfulShards = in.readOptionalInt();
+            this.skippedShards = in.readOptionalInt();
+            this.failedShards = in.readOptionalInt();
+            Long took = in.readOptionalLong();
+            if (took == null) {
+                this.took = null;
+            } else {
+                this.took = new TimeValue(took);
+            }
+            this.timedOut = in.readBoolean();
+            this.failures = Collections.unmodifiableList(in.readList(ShardSearchFailure::readShardSearchFailure));
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeString(clusterAlias);
+            out.writeString(indexExpression);
+            out.writeString(status.toString());
+            out.writeOptionalInt(totalShards);
+            out.writeOptionalInt(successfulShards);
+            out.writeOptionalInt(skippedShards);
+            out.writeOptionalInt(failedShards);
+            out.writeOptionalLong(took == null ? null : took.millis());
+            out.writeBoolean(timedOut);
+            out.writeList(failures);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            String name = clusterAlias;
+            if (clusterAlias.equals("")) {
+                name = "(local)";
+            }
+            builder.startObject(name);
+            {
+                builder.field("status", getStatus().toString());
+                builder.field("indices", indexExpression);
+                if (took != null) {
+                    builder.field("took", took.millis());
+                }
+                builder.field("timed_out", timedOut);
+                if (totalShards != null) {
+                    builder.startObject("_shards");
+                    builder.field("total", totalShards);
+                    if (successfulShards != null) {
+                        builder.field("successful", successfulShards);
+                    }
+                    if (skippedShards != null) {
+                        builder.field("skipped", skippedShards);
+                    }
+                    if (failedShards != null) {
+                        builder.field("failed", failedShards);
+                    }
+                    builder.endObject();
+                }
+                if (failures != null && failures.size() > 0) {
+                    builder.startArray("failures");
+                    for (ShardSearchFailure failure : failures) {
+                        failure.toXContent(builder, params);
+                    }
+                    builder.endArray();
+                }
+            }
+            builder.endObject();
+            return builder;
+        }
+
+        public String getClusterAlias() {
+            return clusterAlias;
+        }
+
+        public String getIndexExpression() {
+            return indexExpression;
+        }
+
+        public Status getStatus() {
+            return status;
+        }
+
+        public boolean isTimedOut() {
+            return timedOut;
+        }
+
+        public List<ShardSearchFailure> getFailures() {
+            return failures;
+        }
+
+        public TimeValue getTook() {
+            return took;
+        }
+
+        public Integer getTotalShards() {
+            return totalShards;
+        }
+
+        public Integer getSuccessfulShards() {
+            return successfulShards;
+        }
+
+        public Integer getSkippedShards() {
+            return skippedShards;
+        }
+
+        public Integer getFailedShards() {
+            return failedShards;
+        }
+
+        @Override
+        public String toString() {
+            return "Cluster{"
+                + "clusterAlias='"
+                + clusterAlias
+                + '\''
+                + ", status="
+                + status
+                + ", failures="
+                + failures
+                + ", totalShards="
+                + totalShards
+                + ", successfulShards="
+                + successfulShards
+                + ", skippedShards="
+                + skippedShards
+                + ", failedShards="
+                + failedShards
+                + ", searchLatencyMillis="
+                + took
+                + '}';
         }
     }
 

+ 131 - 14
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -8,6 +8,8 @@
 
 package org.elasticsearch.action.search;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
@@ -106,6 +108,7 @@ import static org.elasticsearch.threadpool.ThreadPool.Names.SYSTEM_READ;
 
 public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
 
+    private static final Logger logger = LogManager.getLogger(TransportSearchAction.class);
     private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(TransportSearchAction.class);
     public static final String FROZEN_INDICES_DEPRECATION_MESSAGE = "Searching frozen indices [{}] is deprecated."
         + " Consider cold or frozen tiers in place of frozen indices. The frozen feature will be removed in a feature release.";
@@ -323,8 +326,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                         && rewritten.source().aggregations() != null
                             ? searchService.aggReduceContextBuilder(task::isCancelled, rewritten.source().aggregations())
                             : null;
-                    final int totalClusters = (localIndices == null ? 0 : 1) + remoteClusterIndices.size();
-                    var initClusters = new SearchResponse.Clusters(totalClusters, 0, 0, remoteClusterIndices.size(), true);
+                    SearchResponse.Clusters initClusters = new SearchResponse.Clusters(localIndices, remoteClusterIndices, true);
                     if (localIndices == null) {
                         // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards)
                         task.getProgressListener().notifyListShards(Collections.emptyList(), Collections.emptyList(), initClusters, false);
@@ -334,6 +336,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                         rewritten,
                         localIndices,
                         remoteClusterIndices,
+                        initClusters,
                         timeProvider,
                         aggregationReduceContextBuilder,
                         remoteClusterService,
@@ -456,11 +459,15 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             || source.collapse().getInnerHits().isEmpty();
     }
 
+    /**
+     * Handles ccs_minimize_roundtrips=true
+     */
     static void ccsRemoteReduce(
         TaskId parentTaskId,
         SearchRequest searchRequest,
         OriginalIndices localIndices,
         Map<String, OriginalIndices> remoteIndices,
+        SearchResponse.Clusters clusters,
         SearchTimeProvider timeProvider,
         AggregationReduceContext.Builder aggReduceContextBuilder,
         RemoteClusterService remoteClusterService,
@@ -493,10 +500,14 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             remoteClusterClient.search(ccsSearchRequest, new ActionListener<SearchResponse>() {
                 @Override
                 public void onResponse(SearchResponse searchResponse) {
+                    // TODO: in CCS fail fast ticket we may need to fail the query if the cluster is marked as FAILED
+                    // overwrite the existing cluster entry with the updated one
+                    ccsClusterInfoUpdate(searchResponse, clusters.getCluster(clusterAlias), skipUnavailable);
                     Map<String, SearchProfileShardResult> profileResults = searchResponse.getProfileResults();
                     SearchProfileResults profile = profileResults == null || profileResults.isEmpty()
                         ? null
                         : new SearchProfileResults(profileResults);
+
                     InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
                         searchResponse.getHits(),
                         (InternalAggregations) searchResponse.getAggregations(),
@@ -506,6 +517,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                         searchResponse.isTerminatedEarly(),
                         searchResponse.getNumReducePhases()
                     );
+
                     listener.onResponse(
                         new SearchResponse(
                             internalSearchResponse,
@@ -515,7 +527,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                             searchResponse.getSkippedShards(),
                             timeProvider.buildTookInMillis(),
                             searchResponse.getShardFailures(),
-                            new SearchResponse.Clusters(1, 1, 0),
+                            clusters,
                             searchResponse.pointInTimeId()
                         )
                     );
@@ -523,8 +535,10 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
 
                 @Override
                 public void onFailure(Exception e) {
+                    ShardSearchFailure failure = new ShardSearchFailure(e);
+                    ccsClusterInfoUpdate(failure, clusters.getCluster(clusterAlias), skipUnavailable);
                     if (skipUnavailable) {
-                        listener.onResponse(SearchResponse.empty(timeProvider::buildTookInMillis, new SearchResponse.Clusters(1, 0, 1)));
+                        listener.onResponse(SearchResponse.empty(timeProvider::buildTookInMillis, clusters));
                     } else {
                         listener.onFailure(wrapRemoteClusterFailure(clusterAlias, e));
                     }
@@ -559,7 +573,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                     skippedClusters,
                     exceptions,
                     searchResponseMerger,
-                    totalClusters,
+                    clusters,
                     listener
                 );
                 Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(
@@ -577,7 +591,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                     skippedClusters,
                     exceptions,
                     searchResponseMerger,
-                    totalClusters,
+                    clusters,
                     listener
                 );
                 SearchRequest ccsLocalSearchRequest = SearchRequest.subSearchRequest(
@@ -644,6 +658,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                     responsesCountDown,
                     skippedClusters,
                     exceptions,
+                    null,
                     listener
                 ) {
                     @Override
@@ -704,6 +719,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         }
     }
 
+    /**
+     * Only used for ccs_minimize_roundtrips=true pathway
+     */
     private static ActionListener<SearchResponse> createCCSListener(
         String clusterAlias,
         boolean skipUnavailable,
@@ -711,7 +729,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         AtomicInteger skippedClusters,
         AtomicReference<Exception> exceptions,
         SearchResponseMerger searchResponseMerger,
-        int totalClusters,
+        SearchResponse.Clusters clusters,
         ActionListener<SearchResponse> originalListener
     ) {
         return new CCSActionListener<SearchResponse, SearchResponse>(
@@ -720,25 +738,109 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             countDown,
             skippedClusters,
             exceptions,
+            clusters.getCluster(clusterAlias),
             originalListener
         ) {
             @Override
             void innerOnResponse(SearchResponse searchResponse) {
+                // TODO: in CCS fail fast ticket we may need to fail the query if the cluster gets marked as FAILED
+                ccsClusterInfoUpdate(searchResponse, cluster, skipUnavailable);
                 searchResponseMerger.add(searchResponse);
             }
 
             @Override
             SearchResponse createFinalResponse() {
-                SearchResponse.Clusters clusters = new SearchResponse.Clusters(
-                    totalClusters,
-                    searchResponseMerger.numResponses(),
-                    skippedClusters.get()
-                );
                 return searchResponseMerger.getMergedResponse(clusters);
             }
         };
     }
 
+    /**
+     * Creates a new Cluster object using the {@link ShardSearchFailure} info and skip_unavailable
+     * flag to set Status. The new Cluster object is swapped into the clusterRef {@link AtomicReference}.
+     */
+    static void ccsClusterInfoUpdate(
+        ShardSearchFailure failure,
+        AtomicReference<SearchResponse.Cluster> clusterRef,
+        boolean skipUnavailable
+    ) {
+        SearchResponse.Cluster.Status status;
+        if (skipUnavailable) {
+            status = SearchResponse.Cluster.Status.SKIPPED;
+        } else {
+            status = SearchResponse.Cluster.Status.FAILED;
+        }
+        boolean swapped;
+        do {
+            SearchResponse.Cluster orig = clusterRef.get();
+            String clusterAlias = orig.getClusterAlias();
+            List<ShardSearchFailure> failures;
+            if (orig.getFailures() != null) {
+                failures = new ArrayList<>(orig.getFailures());
+            } else {
+                failures = new ArrayList<>(1);
+            }
+            failures.add(failure);
+            String indexExpression = orig.getIndexExpression();
+            SearchResponse.Cluster updated = new SearchResponse.Cluster(clusterAlias, indexExpression, status, failures);
+            swapped = clusterRef.compareAndSet(orig, updated);
+        } while (swapped == false);
+    }
+
+    /**
+     * Helper method common to multiple ccs_minimize_roundtrips=true code paths.
+     * Used to update a specific SearchResponse.Cluster object state based upon
+     * the SearchResponse coming from the cluster coordinator the search was performed on.
+     * @param searchResponse SearchResponse from cluster sub-search
+     * @param clusterRef AtomicReference of the Cluster object to be updated
+     */
+    private static void ccsClusterInfoUpdate(
+        SearchResponse searchResponse,
+        AtomicReference<SearchResponse.Cluster> clusterRef,
+        boolean skipUnavailable
+    ) {
+        /*
+         * Cluster Status logic:
+         * 1) FAILED if all shards failed and skip_unavailable=false
+         * 2) SKIPPED if all shards failed and skip_unavailable=true
+         * 3) PARTIAL if it timed out
+         * 4) PARTIAL if it at least one of the shards succeeded but not all
+         * 5) SUCCESSFUL if no shards failed (and did not time out)
+         */
+        SearchResponse.Cluster.Status status;
+        if (searchResponse.getFailedShards() >= searchResponse.getTotalShards()) {
+            if (skipUnavailable) {
+                status = SearchResponse.Cluster.Status.SKIPPED;
+            } else {
+                status = SearchResponse.Cluster.Status.FAILED;
+            }
+        } else if (searchResponse.isTimedOut()) {
+            status = SearchResponse.Cluster.Status.PARTIAL;
+        } else if (searchResponse.getFailedShards() > 0) {
+            status = SearchResponse.Cluster.Status.PARTIAL;
+        } else {
+            status = SearchResponse.Cluster.Status.SUCCESSFUL;
+        }
+
+        boolean swapped;
+        do {
+            SearchResponse.Cluster orig = clusterRef.get();
+            SearchResponse.Cluster updated = new SearchResponse.Cluster(
+                orig.getClusterAlias(),
+                orig.getIndexExpression(),
+                status,
+                searchResponse.getTotalShards(),
+                searchResponse.getSuccessfulShards(),
+                searchResponse.getSkippedShards(),
+                searchResponse.getFailedShards(),
+                Arrays.asList(searchResponse.getShardFailures()),
+                searchResponse.getTook(),
+                searchResponse.isTimedOut()
+            );
+            swapped = clusterRef.compareAndSet(orig, updated);
+        } while (swapped == false);
+    }
+
     void executeLocalSearch(
         Task task,
         SearchTimeProvider timeProvider,
@@ -1263,19 +1365,25 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
     }
 
     abstract static class CCSActionListener<Response, FinalResponse> implements ActionListener<Response> {
-        private final String clusterAlias;
-        private final boolean skipUnavailable;
+        protected final String clusterAlias;
+        protected final boolean skipUnavailable;
         private final CountDown countDown;
         private final AtomicInteger skippedClusters;
         private final AtomicReference<Exception> exceptions;
+        protected final AtomicReference<SearchResponse.Cluster> cluster;
         private final ActionListener<FinalResponse> originalListener;
+        protected final long startTime;
 
+        /**
+         * Used by both minimize_roundtrips true and false
+         */
         CCSActionListener(
             String clusterAlias,
             boolean skipUnavailable,
             CountDown countDown,
             AtomicInteger skippedClusters,
             AtomicReference<Exception> exceptions,
+            @Nullable AtomicReference<SearchResponse.Cluster> cluster, // null for ccs_minimize_roundtrips=false
             ActionListener<FinalResponse> originalListener
         ) {
             this.clusterAlias = clusterAlias;
@@ -1283,7 +1391,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             this.countDown = countDown;
             this.skippedClusters = skippedClusters;
             this.exceptions = exceptions;
+            this.cluster = cluster;
             this.originalListener = originalListener;
+            this.startTime = System.currentTimeMillis();
         }
 
         @Override
@@ -1296,9 +1406,16 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
 
         @Override
         public final void onFailure(Exception e) {
+            ShardSearchFailure f = new ShardSearchFailure(e);
             if (skipUnavailable) {
+                if (cluster != null) {
+                    ccsClusterInfoUpdate(f, cluster, skipUnavailable);
+                }
                 skippedClusters.incrementAndGet();
             } else {
+                if (cluster != null) {
+                    ccsClusterInfoUpdate(f, cluster, skipUnavailable);
+                }
                 Exception exception = e;
                 if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) {
                     exception = wrapRemoteClusterFailure(clusterAlias, e);

+ 1 - 12
server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java

@@ -135,18 +135,7 @@ public class SearchResponseTests extends ESTestCase {
         int totalClusters = randomIntBetween(0, 10);
         int successfulClusters = randomIntBetween(0, totalClusters);
         int skippedClusters = totalClusters - successfulClusters;
-        if (randomBoolean()) {
-            return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
-        } else {
-            int remoteClusters = totalClusters;
-            if (totalClusters > 0 && randomBoolean()) {
-                // remoteClusters can be same as total cluster count or one less (when doing local search)
-                remoteClusters--;
-            }
-            // Clusters has an assert that if ccsMinimizeRoundtrips = true, then remoteClusters must be > 0
-            boolean ccsMinimizeRoundtrips = (remoteClusters > 0 ? randomBoolean() : false);
-            return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters, remoteClusters, ccsMinimizeRoundtrips);
-        }
+        return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
     }
 
     /**

+ 25 - 0
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

@@ -523,11 +523,14 @@ public class TransportSearchActionTests extends ESTestCase {
                 ActionListener.wrap(r -> fail("no response expected"), failure::set),
                 latch
             );
+            SearchResponse.Clusters initClusters = new SearchResponse.Clusters(localIndices, remoteIndicesByCluster, true);
+
             TransportSearchAction.ccsRemoteReduce(
                 new TaskId("n", 1),
                 searchRequest,
                 localIndices,
                 remoteIndicesByCluster,
+                initClusters,
                 timeProvider,
                 emptyReduceContextBuilder(),
                 remoteClusterService,
@@ -588,11 +591,14 @@ public class TransportSearchActionTests extends ESTestCase {
                     ActionTestUtils.assertNoFailureListener(response::set),
                     latch
                 );
+                SearchResponse.Clusters initClusters = new SearchResponse.Clusters(localIndices, remoteIndicesByCluster, true);
+
                 TransportSearchAction.ccsRemoteReduce(
                     new TaskId("n", 1),
                     searchRequest,
                     localIndices,
                     remoteIndicesByCluster,
+                    initClusters,
                     timeProvider,
                     emptyReduceContextBuilder(),
                     remoteClusterService,
@@ -626,11 +632,13 @@ public class TransportSearchActionTests extends ESTestCase {
                     ActionListener.wrap(r -> fail("no response expected"), failure::set),
                     latch
                 );
+                SearchResponse.Clusters initClusters = new SearchResponse.Clusters(localIndices, remoteIndicesByCluster, true);
                 TransportSearchAction.ccsRemoteReduce(
                     new TaskId("n", 1),
                     searchRequest,
                     localIndices,
                     remoteIndicesByCluster,
+                    initClusters,
                     timeProvider,
                     emptyReduceContextBuilder(),
                     remoteClusterService,
@@ -685,11 +693,14 @@ public class TransportSearchActionTests extends ESTestCase {
                     ActionListener.wrap(r -> fail("no response expected"), failure::set),
                     latch
                 );
+                SearchResponse.Clusters initClusters = new SearchResponse.Clusters(localIndices, remoteIndicesByCluster, true);
+
                 TransportSearchAction.ccsRemoteReduce(
                     new TaskId("n", 1),
                     searchRequest,
                     localIndices,
                     remoteIndicesByCluster,
+                    initClusters,
                     timeProvider,
                     emptyReduceContextBuilder(),
                     remoteClusterService,
@@ -726,11 +737,18 @@ public class TransportSearchActionTests extends ESTestCase {
                     ActionTestUtils.assertNoFailureListener(response::set),
                     latch
                 );
+                Set<String> clusterAliases = new HashSet<>(remoteClusterService.getRegisteredRemoteClusterNames());
+                if (localIndices != null) {
+                    clusterAliases.add("");
+                }
+                SearchResponse.Clusters initClusters = new SearchResponse.Clusters(localIndices, remoteIndicesByCluster, true);
+
                 TransportSearchAction.ccsRemoteReduce(
                     new TaskId("n", 1),
                     searchRequest,
                     localIndices,
                     remoteIndicesByCluster,
+                    initClusters,
                     timeProvider,
                     emptyReduceContextBuilder(),
                     remoteClusterService,
@@ -779,11 +797,18 @@ public class TransportSearchActionTests extends ESTestCase {
                     ActionTestUtils.assertNoFailureListener(response::set),
                     latch
                 );
+                Set<String> clusterAliases = new HashSet<>(remoteClusterService.getRegisteredRemoteClusterNames());
+                if (localIndices != null) {
+                    clusterAliases.add("");
+                }
+                SearchResponse.Clusters initClusters = new SearchResponse.Clusters(localIndices, remoteIndicesByCluster, true);
+
                 TransportSearchAction.ccsRemoteReduce(
                     new TaskId("n", 1),
                     searchRequest,
                     localIndices,
                     remoteIndicesByCluster,
+                    initClusters,
                     timeProvider,
                     emptyReduceContextBuilder(),
                     remoteClusterService,

+ 28 - 9
test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java

@@ -12,13 +12,16 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
 import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.IOUtils;
+import org.elasticsearch.core.Strings;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.transport.RemoteClusterAware;
+import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.transport.RemoteConnectionInfo;
 import org.elasticsearch.transport.TransportService;
 import org.junit.After;
@@ -46,6 +49,7 @@ import static org.hamcrest.Matchers.not;
 
 public abstract class AbstractMultiClustersTestCase extends ESTestCase {
     public static final String LOCAL_CLUSTER = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
+    public static final boolean DEFAULT_SKIP_UNAVAILABLE = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getDefault(Settings.EMPTY);
 
     private static final Logger LOGGER = LogManager.getLogger(AbstractMultiClustersTestCase.class);
 
@@ -55,6 +59,10 @@ public abstract class AbstractMultiClustersTestCase extends ESTestCase {
         return randomSubsetOf(List.of("cluster-a", "cluster-b"));
     }
 
+    protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
+        return Map.of("cluster-a", DEFAULT_SKIP_UNAVAILABLE, "cluster-b", DEFAULT_SKIP_UNAVAILABLE);
+    }
+
     protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
         return Collections.emptyList();
     }
@@ -171,26 +179,37 @@ public abstract class AbstractMultiClustersTestCase extends ESTestCase {
     protected void configureRemoteCluster(String clusterAlias, Collection<String> seedNodes) throws Exception {
         final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + ".";
         Settings.Builder settings = Settings.builder();
-        final List<String> seedAdresses = seedNodes.stream().map(node -> {
+        final List<String> seedAddresses = seedNodes.stream().map(node -> {
             final TransportService transportService = cluster(clusterAlias).getInstance(TransportService.class, node);
             return transportService.boundAddress().publishAddress().toString();
         }).toList();
+        boolean skipUnavailable = skipUnavailableForRemoteClusters().containsKey(clusterAlias)
+            ? skipUnavailableForRemoteClusters().get(clusterAlias)
+            : DEFAULT_SKIP_UNAVAILABLE;
+        Settings.Builder builder;
         if (randomBoolean()) {
             LOGGER.info("--> use sniff mode with seed [{}], remote nodes [{}]", Collectors.joining(","), seedNodes);
-            settings.putNull(remoteClusterSettingPrefix + "proxy_address")
+            builder = settings.putNull(remoteClusterSettingPrefix + "proxy_address")
                 .put(remoteClusterSettingPrefix + "mode", "sniff")
-                .put(remoteClusterSettingPrefix + "seeds", String.join(",", seedAdresses))
-                .build();
+                .put(remoteClusterSettingPrefix + "seeds", String.join(",", seedAddresses));
         } else {
-            final String proxyNode = randomFrom(seedAdresses);
+            final String proxyNode = randomFrom(seedAddresses);
             LOGGER.info("--> use proxy node [{}], remote nodes [{}]", proxyNode, seedNodes);
-            settings.putNull(remoteClusterSettingPrefix + "seeds")
+            builder = settings.putNull(remoteClusterSettingPrefix + "seeds")
                 .put(remoteClusterSettingPrefix + "mode", "proxy")
-                .put(remoteClusterSettingPrefix + "proxy_address", proxyNode)
-                .build();
+                .put(remoteClusterSettingPrefix + "proxy_address", proxyNode);
+        }
+        if (skipUnavailable != DEFAULT_SKIP_UNAVAILABLE) {
+            builder.put(remoteClusterSettingPrefix + "skip_unavailable", String.valueOf(skipUnavailable));
+        }
+        builder.build();
+
+        ClusterUpdateSettingsResponse resp = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings).get();
+        if (skipUnavailable != DEFAULT_SKIP_UNAVAILABLE) {
+            String key = Strings.format("cluster.remote.%s.skip_unavailable", clusterAlias);
+            assertEquals(String.valueOf(skipUnavailable), resp.getPersistentSettings().get(key));
         }
 
-        client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings).get();
         assertBusy(() -> {
             List<RemoteConnectionInfo> remoteConnectionInfos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest())
                 .actionGet()

+ 748 - 37
x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/CrossClusterAsyncSearchIT.java

@@ -14,12 +14,13 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksReque
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
-import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexModule;
@@ -37,6 +38,7 @@ import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.test.AbstractMultiClustersTestCase;
 import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentFactory;
@@ -58,7 +60,9 @@ import org.junit.Before;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -87,6 +91,11 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
         return List.of(REMOTE_CLUSTER);
     }
 
+    @Override
+    protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
+        return Map.of(REMOTE_CLUSTER, randomBoolean());
+    }
+
     @Override
     protected boolean reuseClusters() {
         return false;
@@ -116,17 +125,697 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
                     throw new IllegalStateException("not implemented");
                 }
             );
+            QuerySpec<ThrowingQueryBuilder> throwingSpec = new QuerySpec<>(ThrowingQueryBuilder.NAME, ThrowingQueryBuilder::new, p -> {
+                throw new IllegalStateException("not implemented");
+            });
+
+            return List.of(slowRunningSpec, throwingSpec);
+        }
+    }
+
+    public void testClusterDetailsAfterSuccessfulCCS() throws Exception {
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String localIndex = (String) testClusterInfo.get("local.index");
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
+        int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
+        int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
+
+        SearchListenerPlugin.blockQueryPhase();
+
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
+        request.setCcsMinimizeRoundtrips(true);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
+        request.setKeepOnCompletion(true);
+        request.getSearchRequest().source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
+
+        AsyncSearchResponse response = submitAsyncSearch(request);
+        assertNotNull(response.getSearchResponse());
+        assertTrue(response.isRunning());
+
+        {
+            SearchResponse.Clusters clusters = response.getSearchResponse().getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertTrue("search cluster results should be marked as partial", clusters.hasPartialResults());
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
+        }
+
+        SearchListenerPlugin.waitSearchStarted();
+        SearchListenerPlugin.allowQueryPhase();
+
+        assertBusy(() -> {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            assertFalse(statusResponse.isRunning());
+            assertNotNull(statusResponse.getCompletionStatus());
+        });
+
+        {
+            AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
+
+            SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
+            assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertThat(clusters.getSuccessful(), equalTo(2));
+            assertThat(clusters.getSkipped(), equalTo(0));
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
+            assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
+            assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
+            assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
+        }
+
+        // check that the async_search/status response includes the same cluster details
+        {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+
+            SearchResponse.Clusters clusters = statusResponse.getClusters();
+            assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertThat(clusters.getSuccessful(), equalTo(2));
+            assertThat(clusters.getSkipped(), equalTo(0));
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
+            assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
+            assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
+            assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
+        }
+    }
+
+    public void testClusterDetailsAfterCCSWithFailuresOnAllShards() throws Exception {
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String localIndex = (String) testClusterInfo.get("local.index");
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
+        boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable");
+
+        SearchListenerPlugin.blockQueryPhase();
+
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
+        request.setCcsMinimizeRoundtrips(true);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
+        request.setKeepOnCompletion(true);
+        // shardId -1 means to throw the Exception on all shards, so should result in complete search failure
+        ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), -1);
+        request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
+
+        AsyncSearchResponse response = submitAsyncSearch(request);
+        assertNotNull(response.getSearchResponse());
+        assertTrue(response.isRunning());
+
+        {
+            SearchResponse.Clusters clusters = response.getSearchResponse().getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertTrue("search cluster results should be marked as partial", clusters.hasPartialResults());
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
+        }
+
+        SearchListenerPlugin.waitSearchStarted();
+        SearchListenerPlugin.allowQueryPhase();
+
+        assertBusy(() -> {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            assertFalse(statusResponse.isRunning());
+            assertNotNull(statusResponse.getCompletionStatus());
+        });
+
+        {
+            AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
+
+            SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertThat(clusters.getSuccessful(), equalTo(0));
+            assertThat(clusters.getSkipped(), equalTo(2));
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.FAILED));
+            assertNull(localClusterSearchInfo.getTotalShards());
+            assertNull(localClusterSearchInfo.getSuccessfulShards());
+            assertNull(localClusterSearchInfo.getSkippedShards());
+            assertNull(localClusterSearchInfo.getFailedShards());
+            assertThat(localClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertNull(localClusterSearchInfo.getTook());
+            assertFalse(localClusterSearchInfo.isTimedOut());
+            ShardSearchFailure localShardSearchFailure = localClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", localShardSearchFailure.reason().contains("index corrupted"));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            SearchResponse.Cluster.Status expectedStatus = skipUnavailable
+                ? SearchResponse.Cluster.Status.SKIPPED
+                : SearchResponse.Cluster.Status.FAILED;
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
+            assertNull(remoteClusterSearchInfo.getTotalShards());
+            assertNull(remoteClusterSearchInfo.getSuccessfulShards());
+            assertNull(remoteClusterSearchInfo.getSkippedShards());
+            assertNull(remoteClusterSearchInfo.getFailedShards());
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertNull(remoteClusterSearchInfo.getTook());
+            assertFalse(remoteClusterSearchInfo.isTimedOut());
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
+        }
+        // check that the async_search/status response includes the same cluster details
+        {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            SearchResponse.Clusters clusters = statusResponse.getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertThat(clusters.getSuccessful(), equalTo(0));
+            assertThat(clusters.getSkipped(), equalTo(2));
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.FAILED));
+            assertNull(localClusterSearchInfo.getTotalShards());
+            assertNull(localClusterSearchInfo.getSuccessfulShards());
+            assertNull(localClusterSearchInfo.getSkippedShards());
+            assertNull(localClusterSearchInfo.getFailedShards());
+            assertThat(localClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertNull(localClusterSearchInfo.getTook());
+            assertFalse(localClusterSearchInfo.isTimedOut());
+            ShardSearchFailure localShardSearchFailure = localClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", localShardSearchFailure.reason().contains("index corrupted"));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            SearchResponse.Cluster.Status expectedStatus = skipUnavailable
+                ? SearchResponse.Cluster.Status.SKIPPED
+                : SearchResponse.Cluster.Status.FAILED;
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
+            assertNull(remoteClusterSearchInfo.getTotalShards());
+            assertNull(remoteClusterSearchInfo.getSuccessfulShards());
+            assertNull(remoteClusterSearchInfo.getSkippedShards());
+            assertNull(remoteClusterSearchInfo.getFailedShards());
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertNull(remoteClusterSearchInfo.getTook());
+            assertFalse(remoteClusterSearchInfo.isTimedOut());
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
+        }
+    }
+
+    public void testClusterDetailsAfterCCSWithFailuresOnOneShardOnly() throws Exception {
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String localIndex = (String) testClusterInfo.get("local.index");
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
+        int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
+        int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
+
+        SearchListenerPlugin.blockQueryPhase();
+
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
+        request.setCcsMinimizeRoundtrips(true);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
+        request.setKeepOnCompletion(true);
+        // shardId 0 means to throw the Exception only on shard 0; all others should work
+        ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), 0);
+        request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
+
+        AsyncSearchResponse response = submitAsyncSearch(request);
+        assertNotNull(response.getSearchResponse());
+        assertTrue(response.isRunning());
+
+        {
+            SearchResponse.Clusters clusters = response.getSearchResponse().getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertTrue("search cluster results should be marked as partial", clusters.hasPartialResults());
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
+        }
+
+        SearchListenerPlugin.waitSearchStarted();
+        SearchListenerPlugin.allowQueryPhase();
+
+        assertBusy(() -> {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            assertFalse(statusResponse.isRunning());
+            assertNotNull(statusResponse.getCompletionStatus());
+        });
+
+        {
+            AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
+
+            SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertThat(clusters.getSuccessful(), equalTo(2));
+            assertThat(clusters.getSkipped(), equalTo(0));
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
+            assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards - 1));
+            assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailedShards(), equalTo(1));
+            assertThat(localClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
+            ShardSearchFailure localShardSearchFailure = localClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", localShardSearchFailure.reason().contains("index corrupted"));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
+            assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
+            assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
+        }
+        // check that the async_search/status response includes the same cluster details
+        {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            SearchResponse.Clusters clusters = statusResponse.getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertThat(clusters.getSuccessful(), equalTo(2));
+            assertThat(clusters.getSkipped(), equalTo(0));
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
+            assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards - 1));
+            assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailedShards(), equalTo(1));
+            assertThat(localClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
+            ShardSearchFailure localShardSearchFailure = localClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", localShardSearchFailure.reason().contains("index corrupted"));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
+            assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
+            assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
+        }
+    }
+
+    public void testClusterDetailsAfterCCSWithFailuresOnOneClusterOnly() throws Exception {
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String localIndex = (String) testClusterInfo.get("local.index");
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
+        int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
+        boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable");
 
-            return List.of(slowRunningSpec);
+        SearchListenerPlugin.blockQueryPhase();
+
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
+        request.setCcsMinimizeRoundtrips(true);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
+        request.setKeepOnCompletion(true);
+        // throw Exception of all shards of remoteIndex, but against localIndex
+        ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(
+            randomLong(),
+            new IllegalStateException("index corrupted"),
+            remoteIndex
+        );
+        request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
+
+        AsyncSearchResponse response = submitAsyncSearch(request);
+        assertNotNull(response.getSearchResponse());
+        assertTrue(response.isRunning());
+        {
+            SearchResponse.Clusters clusters = response.getSearchResponse().getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertTrue("search cluster results should be marked as partial", clusters.hasPartialResults());
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.RUNNING));
+        }
+
+        SearchListenerPlugin.waitSearchStarted();
+        SearchListenerPlugin.allowQueryPhase();
+
+        assertBusy(() -> {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            assertFalse(statusResponse.isRunning());
+            assertNotNull(statusResponse.getCompletionStatus());
+        });
+
+        {
+            AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
+
+            SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertThat(clusters.getSuccessful(), equalTo(1));
+            assertThat(clusters.getSkipped(), equalTo(1));
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
+            assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
+            assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            SearchResponse.Cluster.Status expectedStatus = skipUnavailable
+                ? SearchResponse.Cluster.Status.SKIPPED
+                : SearchResponse.Cluster.Status.FAILED;
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
+            assertNull(remoteClusterSearchInfo.getTotalShards());
+            assertNull(remoteClusterSearchInfo.getSuccessfulShards());
+            assertNull(remoteClusterSearchInfo.getSkippedShards());
+            assertNull(remoteClusterSearchInfo.getFailedShards());
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertNull(remoteClusterSearchInfo.getTook());
+            assertFalse(remoteClusterSearchInfo.isTimedOut());
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
+        }
+        // check that the async_search/status response includes the same cluster details
+        {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            SearchResponse.Clusters clusters = statusResponse.getClusters();
+            assertThat(clusters.getTotal(), equalTo(2));
+            assertThat(clusters.getSuccessful(), equalTo(1));
+            assertThat(clusters.getSkipped(), equalTo(1));
+
+            SearchResponse.Cluster localClusterSearchInfo = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).get();
+            assertNotNull(localClusterSearchInfo);
+            assertThat(localClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
+            assertThat(localClusterSearchInfo.getTotalShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSuccessfulShards(), equalTo(localNumShards));
+            assertThat(localClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailedShards(), equalTo(0));
+            assertThat(localClusterSearchInfo.getFailures().size(), equalTo(0));
+            assertThat(localClusterSearchInfo.getTook().millis(), greaterThan(0L));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            SearchResponse.Cluster.Status expectedStatus = skipUnavailable
+                ? SearchResponse.Cluster.Status.SKIPPED
+                : SearchResponse.Cluster.Status.FAILED;
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
+            assertNull(remoteClusterSearchInfo.getTotalShards());
+            assertNull(remoteClusterSearchInfo.getSuccessfulShards());
+            assertNull(remoteClusterSearchInfo.getSkippedShards());
+            assertNull(remoteClusterSearchInfo.getFailedShards());
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertNull(remoteClusterSearchInfo.getTook());
+            assertFalse(remoteClusterSearchInfo.isTimedOut());
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
+        }
+    }
+
+    public void testRemoteClusterOnlyCCSSuccessfulResult() throws Exception {
+        // for remote-only queries, we can't use the SearchListenerPlugin since that listens for search
+        // stage on the local cluster, so we only test final state of the search response
+        SearchListenerPlugin.negate();
+
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
+        int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
+
+        // search only the remote cluster
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(REMOTE_CLUSTER + ":" + remoteIndex);
+        request.setCcsMinimizeRoundtrips(true);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
+        request.setKeepOnCompletion(true);
+        request.getSearchRequest().source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(1000));
+
+        AsyncSearchResponse response = submitAsyncSearch(request);
+        assertNotNull(response.getSearchResponse());
+
+        assertBusy(() -> {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            assertFalse(statusResponse.isRunning());
+            assertNotNull(statusResponse.getCompletionStatus());
+        });
+
+        {
+            AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
+
+            SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
+            assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
+            assertThat(clusters.getTotal(), equalTo(1));
+            assertThat(clusters.getSuccessful(), equalTo(1));
+            assertThat(clusters.getSkipped(), equalTo(0));
+
+            assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
+            assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
+        }
+
+        // check that the async_search/status response includes the same cluster details
+        {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+
+            SearchResponse.Clusters clusters = statusResponse.getClusters();
+            assertFalse("search cluster results should NOT be marked as partial", clusters.hasPartialResults());
+            assertThat(clusters.getTotal(), equalTo(1));
+            assertThat(clusters.getSuccessful(), equalTo(1));
+            assertThat(clusters.getSkipped(), equalTo(0));
+
+            assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
+            assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
+        }
+    }
+
+    public void testRemoteClusterOnlyCCSWithFailuresOnOneShardOnly() throws Exception {
+        // for remote-only queries, we can't use the SearchListenerPlugin since that listens for search
+        // stage on the local cluster, so we only test final state of the search response
+        SearchListenerPlugin.negate();
+
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
+        int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
+
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(REMOTE_CLUSTER + ":" + remoteIndex);
+        request.setCcsMinimizeRoundtrips(true);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
+        request.setKeepOnCompletion(true);
+        // shardId 0 means to throw the Exception only on shard 0; all others should work
+        ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), 0);
+        request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
+
+        AsyncSearchResponse response = submitAsyncSearch(request);
+        assertNotNull(response.getSearchResponse());
+        assertTrue(response.isRunning());
+
+        assertBusy(() -> {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            assertFalse(statusResponse.isRunning());
+            assertNotNull(statusResponse.getCompletionStatus());
+        });
+
+        {
+            AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
+
+            SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
+            assertThat(clusters.getTotal(), equalTo(1));
+            assertThat(clusters.getSuccessful(), equalTo(1));
+            assertThat(clusters.getSkipped(), equalTo(0));
+
+            assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
+            assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
+            assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
+        }
+        // check that the async_search/status response includes the same cluster details
+        {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            SearchResponse.Clusters clusters = statusResponse.getClusters();
+            assertThat(clusters.getTotal(), equalTo(1));
+            assertThat(clusters.getSuccessful(), equalTo(1));
+            assertThat(clusters.getSkipped(), equalTo(0));
+
+            assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(SearchResponse.Cluster.Status.PARTIAL));
+            assertThat(remoteClusterSearchInfo.getTotalShards(), equalTo(remoteNumShards));
+            assertThat(remoteClusterSearchInfo.getSuccessfulShards(), equalTo(remoteNumShards - 1));
+            assertThat(remoteClusterSearchInfo.getSkippedShards(), equalTo(0));
+            assertThat(remoteClusterSearchInfo.getFailedShards(), equalTo(1));
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertThat(remoteClusterSearchInfo.getTook().millis(), greaterThan(0L));
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
+        }
+    }
+
+    public void testRemoteClusterOnlyCCSWithFailuresOnAllShards() throws Exception {
+        // for remote-only queries, we can't use the SearchListenerPlugin since that listens for search
+        // stage on the local cluster, so we only test final state of the search response
+        SearchListenerPlugin.negate();
+
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
+        boolean skipUnavailable = (Boolean) testClusterInfo.get("remote.skip_unavailable");
+
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(REMOTE_CLUSTER + ":" + remoteIndex);
+        request.setCcsMinimizeRoundtrips(true);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
+        request.setKeepOnCompletion(true);
+        // shardId -1 means to throw the Exception on all shards, so should result in complete search failure
+        ThrowingQueryBuilder queryBuilder = new ThrowingQueryBuilder(randomLong(), new IllegalStateException("index corrupted"), -1);
+        request.getSearchRequest().source(new SearchSourceBuilder().query(queryBuilder).size(10));
+
+        AsyncSearchResponse response = submitAsyncSearch(request);
+        assertNotNull(response.getSearchResponse());
+        assertTrue(response.isRunning());
+
+        assertBusy(() -> {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            assertFalse(statusResponse.isRunning());
+            assertNotNull(statusResponse.getCompletionStatus());
+        });
+
+        {
+            AsyncSearchResponse finishedResponse = getAsyncSearch(response.getId());
+
+            SearchResponse.Clusters clusters = finishedResponse.getSearchResponse().getClusters();
+            assertThat(clusters.getTotal(), equalTo(1));
+            assertThat(clusters.getSuccessful(), equalTo(0));
+            assertThat(clusters.getSkipped(), equalTo(1));
+
+            assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            SearchResponse.Cluster.Status expectedStatus = skipUnavailable
+                ? SearchResponse.Cluster.Status.SKIPPED
+                : SearchResponse.Cluster.Status.FAILED;
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
+            assertNull(remoteClusterSearchInfo.getTotalShards());
+            assertNull(remoteClusterSearchInfo.getSuccessfulShards());
+            assertNull(remoteClusterSearchInfo.getSkippedShards());
+            assertNull(remoteClusterSearchInfo.getFailedShards());
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertNull(remoteClusterSearchInfo.getTook());
+            assertFalse(remoteClusterSearchInfo.isTimedOut());
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
+        }
+        // check that the async_search/status response includes the same cluster details
+        {
+            AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+            SearchResponse.Clusters clusters = statusResponse.getClusters();
+            assertThat(clusters.getTotal(), equalTo(1));
+            assertThat(clusters.getSuccessful(), equalTo(0));
+            assertThat(clusters.getSkipped(), equalTo(1));
+
+            assertNull(clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
+
+            SearchResponse.Cluster remoteClusterSearchInfo = clusters.getCluster(REMOTE_CLUSTER).get();
+            assertNotNull(remoteClusterSearchInfo);
+            SearchResponse.Cluster.Status expectedStatus = skipUnavailable
+                ? SearchResponse.Cluster.Status.SKIPPED
+                : SearchResponse.Cluster.Status.FAILED;
+            assertThat(remoteClusterSearchInfo.getStatus(), equalTo(expectedStatus));
+            assertNull(remoteClusterSearchInfo.getTotalShards());
+            assertNull(remoteClusterSearchInfo.getSuccessfulShards());
+            assertNull(remoteClusterSearchInfo.getSkippedShards());
+            assertNull(remoteClusterSearchInfo.getFailedShards());
+            assertThat(remoteClusterSearchInfo.getFailures().size(), equalTo(1));
+            assertNull(remoteClusterSearchInfo.getTook());
+            assertFalse(remoteClusterSearchInfo.isTimedOut());
+            ShardSearchFailure remoteShardSearchFailure = remoteClusterSearchInfo.getFailures().get(0);
+            assertTrue("should have 'index corrupted' in reason", remoteShardSearchFailure.reason().contains("index corrupted"));
         }
     }
 
     public void testCancelViaTasksAPI() throws Exception {
-        setupTwoClusters();
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String localIndex = (String) testClusterInfo.get("local.index");
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
 
         SearchListenerPlugin.blockQueryPhase();
 
-        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("demo", REMOTE_CLUSTER + ":prod");
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
         request.setCcsMinimizeRoundtrips(randomBoolean());
         request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
         request.setKeepOnCompletion(true);
@@ -250,11 +939,13 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
     }
 
     public void testCancelViaAsyncSearchDelete() throws Exception {
-        setupTwoClusters();
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String localIndex = (String) testClusterInfo.get("local.index");
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
 
         SearchListenerPlugin.blockQueryPhase();
 
-        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("demo", REMOTE_CLUSTER + ":prod");
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
         request.setCcsMinimizeRoundtrips(randomBoolean());
         request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
         request.setKeepOnCompletion(true);
@@ -348,7 +1039,9 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
 
     @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/97286")
     public void testCancellationViaTimeoutWithAllowPartialResultsSetToFalse() throws Exception {
-        setupTwoClusters();
+        Map<String, Object> testClusterInfo = setupTwoClusters();
+        String localIndex = (String) testClusterInfo.get("local.index");
+        String remoteIndex = (String) testClusterInfo.get("remote.index");
 
         SearchListenerPlugin.blockQueryPhase();
 
@@ -357,7 +1050,7 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
         SlowRunningQueryBuilder slowRunningQueryBuilder = new SlowRunningQueryBuilder(searchTimeout.millis() * 5);
         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(slowRunningQueryBuilder).timeout(searchTimeout);
 
-        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest("demo", REMOTE_CLUSTER + ":prod");
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(localIndex, REMOTE_CLUSTER + ":" + remoteIndex);
         request.setCcsMinimizeRoundtrips(randomBoolean());
         request.getSearchRequest().source(sourceBuilder);
         request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(1));
@@ -457,48 +1150,50 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
         return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get();
     }
 
-    private void setupTwoClusters() throws Exception {
-        assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate("demo"));
-        indexDocs(client(LOCAL_CLUSTER), "demo");
+    private Map<String, Object> setupTwoClusters() {
+        String localIndex = "demo";
+        int numShardsLocal = randomIntBetween(3, 6);
+        Settings localSettings = indexSettings(numShardsLocal, 0).build();
+        assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate(localIndex).setSettings(localSettings));
+        indexDocs(client(LOCAL_CLUSTER), localIndex);
+
+        String remoteIndex = "prod";
+        int numShardsRemote = randomIntBetween(3, 6);
         final InternalTestCluster remoteCluster = cluster(REMOTE_CLUSTER);
-        remoteCluster.ensureAtLeastNumDataNodes(1);
-        final Settings.Builder allocationFilter = Settings.builder();
-        if (randomBoolean()) {
-            remoteCluster.ensureAtLeastNumDataNodes(3);
-            List<String> remoteDataNodes = remoteCluster.clusterService()
-                .state()
-                .nodes()
-                .stream()
-                .filter(DiscoveryNode::canContainData)
-                .map(DiscoveryNode::getName)
-                .toList();
-            assertThat(remoteDataNodes.size(), Matchers.greaterThanOrEqualTo(3));
-            List<String> seedNodes = randomSubsetOf(between(1, remoteDataNodes.size() - 1), remoteDataNodes);
-            disconnectFromRemoteClusters();
-            configureRemoteCluster(REMOTE_CLUSTER, seedNodes);
-            if (randomBoolean()) {
-                // Using proxy connections
-                allocationFilter.put("index.routing.allocation.exclude._name", String.join(",", seedNodes));
-            } else {
-                allocationFilter.put("index.routing.allocation.include._name", String.join(",", seedNodes));
-            }
-        }
+        remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3));
+        final Settings.Builder remoteSettings = Settings.builder();
+        remoteSettings.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsRemote);
+
         assertAcked(
             client(REMOTE_CLUSTER).admin()
                 .indices()
-                .prepareCreate("prod")
-                .setSettings(Settings.builder().put(allocationFilter.build()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
+                .prepareCreate(remoteIndex)
+                .setSettings(Settings.builder().put(remoteSettings.build()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
         );
         assertFalse(
             client(REMOTE_CLUSTER).admin()
                 .cluster()
-                .prepareHealth("prod")
+                .prepareHealth(remoteIndex)
                 .setWaitForYellowStatus()
                 .setTimeout(TimeValue.timeValueSeconds(10))
                 .get()
                 .isTimedOut()
         );
-        indexDocs(client(REMOTE_CLUSTER), "prod");
+        indexDocs(client(REMOTE_CLUSTER), remoteIndex);
+
+        String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
+        Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
+        boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
+            .getClusterSettings()
+            .get(skipUnavailableSetting);
+
+        Map<String, Object> clusterInfo = new HashMap<>();
+        clusterInfo.put("local.num_shards", numShardsLocal);
+        clusterInfo.put("local.index", localIndex);
+        clusterInfo.put("remote.num_shards", numShardsRemote);
+        clusterInfo.put("remote.index", remoteIndex);
+        clusterInfo.put("remote.skip_unavailable", skipUnavailable);
+        return clusterInfo;
     }
 
     private int indexDocs(Client client, String index) {
@@ -520,6 +1215,22 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
         private static final AtomicReference<CountDownLatch> queryLatch = new AtomicReference<>();
         private static final AtomicReference<CountDownLatch> failedQueryLatch = new AtomicReference<>();
 
+        /**
+         * For tests that cannot use SearchListenerPlugin, ensure all latches are unset to
+         * avoid test problems around searches of the .async-search index
+         */
+        static void negate() {
+            if (startedLatch.get() != null) {
+                startedLatch.get().countDown();
+            }
+            if (queryLatch.get() != null) {
+                queryLatch.get().countDown();
+            }
+            if (failedQueryLatch.get() != null) {
+                failedQueryLatch.get().countDown();
+            }
+        }
+
         static void reset() {
             startedLatch.set(new CountDownLatch(1));
             failedQueryLatch.set(new CountDownLatch(1));

+ 2 - 2
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

@@ -433,7 +433,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
                  */
                 reducedAggs = () -> InternalAggregations.topLevelReduce(singletonList(aggregations), aggReduceContextSupplier.get());
             }
-            searchResponse.get().updatePartialResponse(shards.size(), totalHits, reducedAggs, reducePhase, false);
+            searchResponse.get().updatePartialResponse(shards.size(), totalHits, reducedAggs, reducePhase);
         }
 
         /**
@@ -444,7 +444,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
         public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggregations, int reducePhase) {
             // best effort to cancel expired tasks
             checkCancellation();
-            searchResponse.get().updatePartialResponse(shards.size(), totalHits, () -> aggregations, reducePhase, true);
+            searchResponse.get().updatePartialResponse(shards.size(), totalHits, () -> aggregations, reducePhase);
         }
 
         @Override

+ 10 - 24
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java

@@ -6,8 +6,6 @@
  */
 package org.elasticsearch.xpack.search;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
@@ -38,12 +36,10 @@ import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.restoreRe
  * run concurrently to 1 and ensures that we pause the search progress when an {@link AsyncSearchResponse} is built.
  */
 class MutableSearchResponse {
-
-    private static final Logger logger = LogManager.getLogger(MutableSearchResponse.class);
     private static final TotalHits EMPTY_TOTAL_HITS = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
     private final int totalShards;
     private final int skippedShards;
-    private Clusters clusters;
+    private final Clusters clusters;
     private final AtomicArray<ShardSearchFailure> queryFailures;
     private final ThreadContext threadContext;
 
@@ -90,16 +86,13 @@ class MutableSearchResponse {
     /**
      * Updates the response with the result of a partial reduction.
      * @param reducedAggs is a strategy for producing the reduced aggs
-     * @param isFinalLocalReduce true if the local cluster search has finished (during CCS with minimize_roundtrips, this can be true
-     *                           even while the overall search is still running on remote clusters)
      */
     @SuppressWarnings("HiddenField")
     synchronized void updatePartialResponse(
         int successfulShards,
         TotalHits totalHits,
         Supplier<InternalAggregations> reducedAggs,
-        int reducePhase,
-        boolean isFinalLocalReduce
+        int reducePhase
     ) {
         failIfFrozen();
         if (reducePhase < this.reducePhase) {
@@ -112,20 +105,6 @@ class MutableSearchResponse {
         this.totalHits = totalHits;
         this.reducedAggsSource = reducedAggs;
         this.reducePhase = reducePhase;
-        if (isFinalLocalReduce && clusters.isCcsMinimizeRoundtrips()) {
-            // currently only ccsMinimizeRoundTrip=true creates Clusters in their initial state (where successful=0)
-            // ccsMinimizeRoundtrips=false creates Clusters in its final state even at the beginning (successful+skipped=total)
-            // so update the clusters object 'successful' count if local cluster search is done AND ccsMinimizeRoundtrips=true
-            Clusters newClusters = new Clusters(
-                clusters.getTotal(),
-                clusters.getSuccessful() + 1,
-                clusters.getSkipped(),
-                clusters.getRemoteClusters(),
-                clusters.isCcsMinimizeRoundtrips()
-            );
-            this.clusters = newClusters;
-            logger.debug("Updating Clusters info to indicate that the local cluster search has completed: {}", newClusters);
-        }
     }
 
     /**
@@ -140,10 +119,17 @@ class MutableSearchResponse {
 
         this.responseHeaders = threadContext.getResponseHeaders();
         this.finalResponse = response;
-        this.isPartial = false;
+        this.isPartial = isPartialResponse(response);
         this.frozen = true;
     }
 
+    private boolean isPartialResponse(SearchResponse response) {
+        if (response.getClusters() == null) {
+            return true;
+        }
+        return response.getClusters().hasPartialResults();
+    }
+
     /**
      * Updates the response with a fatal failure. This method preserves the partial response
      * received from previous updates

+ 503 - 5
x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchResponseTests.java

@@ -7,20 +7,27 @@
 
 package org.elasticsearch.xpack.search;
 
+import org.apache.lucene.index.CorruptIndexException;
 import org.elasticsearch.TransportVersion;
+import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchResponseSections;
 import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.script.ScriptException;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentType;
@@ -32,13 +39,17 @@ import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.emptyList;
 import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId;
 
 public class AsyncSearchResponseTests extends ESTestCase {
-    private SearchResponse searchResponse = randomSearchResponse();
+    private SearchResponse searchResponse = randomSearchResponse(randomBoolean());
     private NamedWriteableRegistry namedWriteableRegistry;
 
     @Before
@@ -113,12 +124,18 @@ public class AsyncSearchResponseTests extends ESTestCase {
         };
     }
 
-    static SearchResponse randomSearchResponse() {
+    static SearchResponse randomSearchResponse(boolean ccs) {
         long tookInMillis = randomNonNegativeLong();
         int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
         int successfulShards = randomIntBetween(0, totalShards);
         int skippedShards = randomIntBetween(0, successfulShards);
         InternalSearchResponse internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS;
+        SearchResponse.Clusters clusters;
+        if (ccs) {
+            clusters = createCCSClusterObjects(20, 19, true, 10, 1, 2);
+        } else {
+            clusters = SearchResponse.Clusters.EMPTY;
+        }
         return new SearchResponse(
             internalSearchResponse,
             null,
@@ -127,7 +144,7 @@ public class AsyncSearchResponseTests extends ESTestCase {
             skippedShards,
             tookInMillis,
             ShardSearchFailure.EMPTY_ARRAY,
-            SearchResponse.Clusters.EMPTY
+            clusters
         );
     }
 
@@ -201,7 +218,7 @@ public class AsyncSearchResponseTests extends ESTestCase {
             9,
             1,
             took,
-            new ShardSearchFailure[0],
+            ShardSearchFailure.EMPTY_ARRAY,
             SearchResponse.Clusters.EMPTY
         );
 
@@ -290,6 +307,347 @@ public class AsyncSearchResponseTests extends ESTestCase {
         }
     }
 
+    public void testToXContentWithCCSSearchResponseWhileRunning() throws IOException {
+        boolean isRunning = true;
+        long startTimeMillis = 1689352924517L;
+        long expirationTimeMillis = 1689784924517L;
+        long took = 22968L;
+
+        SearchHits hits = SearchHits.EMPTY_WITHOUT_TOTAL_HITS;
+        SearchResponseSections sections = new SearchResponseSections(hits, null, null, false, null, null, 2);
+
+        SearchResponse.Clusters clusters = createCCSClusterObjects(3, 3, true);
+
+        SearchResponse searchResponse = new SearchResponse(sections, null, 10, 9, 1, took, ShardSearchFailure.EMPTY_ARRAY, clusters);
+
+        AsyncSearchResponse asyncSearchResponse = new AsyncSearchResponse(
+            "id",
+            searchResponse,
+            null,
+            true,
+            isRunning,
+            startTimeMillis,
+            expirationTimeMillis
+        );
+
+        try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
+            builder.prettyPrint();
+            asyncSearchResponse.toXContent(builder, ToXContent.EMPTY_PARAMS);
+            assertEquals(Strings.format("""
+                {
+                  "id" : "id",
+                  "is_partial" : true,
+                  "is_running" : true,
+                  "start_time_in_millis" : %s,
+                  "expiration_time_in_millis" : %s,
+                  "response" : {
+                    "took" : %s,
+                    "timed_out" : false,
+                    "num_reduce_phases" : 2,
+                    "_shards" : {
+                      "total" : 10,
+                      "successful" : 9,
+                      "skipped" : 1,
+                      "failed" : 0
+                    },
+                    "_clusters" : {
+                      "total" : 3,
+                      "successful" : 0,
+                      "skipped" : 0,
+                      "details" : {
+                        "cluster_1" : {
+                          "status" : "running",
+                          "indices" : "foo,bar*",
+                          "timed_out" : false
+                        },
+                        "cluster_2" : {
+                          "status" : "running",
+                          "indices" : "foo,bar*",
+                          "timed_out" : false
+                        },
+                        "cluster_0" : {
+                          "status" : "running",
+                          "indices" : "foo,bar*",
+                          "timed_out" : false
+                        }
+                      }
+                    },
+                    "hits" : {
+                      "max_score" : 0.0,
+                      "hits" : [ ]
+                    }
+                  }
+                }""", startTimeMillis, expirationTimeMillis, took), Strings.toString(builder));
+        }
+
+        try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
+            builder.prettyPrint();
+            builder.humanReadable(true);
+            asyncSearchResponse.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("human", "true")));
+            assertEquals(
+                Strings.format(
+                    """
+                        {
+                          "id" : "id",
+                          "is_partial" : true,
+                          "is_running" : true,
+                          "start_time" : "%s",
+                          "start_time_in_millis" : %s,
+                          "expiration_time" : "%s",
+                          "expiration_time_in_millis" : %s,
+                          "response" : {
+                            "took" : %s,
+                            "timed_out" : false,
+                            "num_reduce_phases" : 2,
+                            "_shards" : {
+                              "total" : 10,
+                              "successful" : 9,
+                              "skipped" : 1,
+                              "failed" : 0
+                            },
+                            "_clusters" : {
+                              "total" : 3,
+                              "successful" : 0,
+                              "skipped" : 0,
+                              "details" : {
+                                "cluster_1" : {
+                                  "status" : "running",
+                                  "indices" : "foo,bar*",
+                                  "timed_out" : false
+                                },
+                                "cluster_2" : {
+                                  "status" : "running",
+                                  "indices" : "foo,bar*",
+                                  "timed_out" : false
+                                },
+                                "cluster_0" : {
+                                  "status" : "running",
+                                  "indices" : "foo,bar*",
+                                  "timed_out" : false
+                                }
+                              }
+                            },
+                            "hits" : {
+                              "max_score" : 0.0,
+                              "hits" : [ ]
+                            }
+                          }
+                        }""",
+                    XContentElasticsearchExtension.DEFAULT_FORMATTER.format(Instant.ofEpochMilli(startTimeMillis)),
+                    startTimeMillis,
+                    XContentElasticsearchExtension.DEFAULT_FORMATTER.format(Instant.ofEpochMilli(expirationTimeMillis)),
+                    expirationTimeMillis,
+                    took
+                ),
+                Strings.toString(builder)
+            );
+        }
+    }
+
+    // completion_time should be present since search has completed
+    public void testToXContentWithCCSSearchResponseAfterCompletion() throws IOException {
+        boolean isRunning = false;
+        long startTimeMillis = 1689352924517L;
+        long expirationTimeMillis = 1689784924517L;
+        long took = 22968L;
+        long expectedCompletionTime = startTimeMillis + took;
+
+        SearchHits hits = SearchHits.EMPTY_WITHOUT_TOTAL_HITS;
+        SearchResponseSections sections = new SearchResponseSections(hits, null, null, true, null, null, 2);
+        SearchResponse.Clusters clusters = createCCSClusterObjects(4, 3, true);
+
+        AtomicReference<SearchResponse.Cluster> clusterRef = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
+        SearchResponse.Cluster localCluster = clusterRef.get();
+        SearchResponse.Cluster updated = new SearchResponse.Cluster(
+            localCluster.getClusterAlias(),
+            localCluster.getIndexExpression(),
+            SearchResponse.Cluster.Status.SUCCESSFUL,
+            10,
+            10,
+            3,
+            0,
+            Collections.emptyList(),
+            new TimeValue(11111),
+            false
+        );
+        boolean swapped = clusterRef.compareAndSet(localCluster, updated);
+        assertTrue("CAS swap failed for cluster " + updated, swapped);
+
+        clusterRef = clusters.getCluster("cluster_0");
+        SearchResponse.Cluster cluster0 = clusterRef.get();
+        updated = new SearchResponse.Cluster(
+            cluster0.getClusterAlias(),
+            cluster0.getIndexExpression(),
+            SearchResponse.Cluster.Status.SUCCESSFUL,
+            8,
+            8,
+            1,
+            0,
+            Collections.emptyList(),
+            new TimeValue(7777),
+            false
+        );
+        swapped = clusterRef.compareAndSet(cluster0, updated);
+        assertTrue("CAS swap failed for cluster " + updated, swapped);
+
+        clusterRef = clusters.getCluster("cluster_1");
+        SearchResponse.Cluster cluster1 = clusterRef.get();
+        ShardSearchFailure failure1 = new ShardSearchFailure(
+            new NullPointerException("NPE details"),
+            new SearchShardTarget("nodeId0", new ShardId("foo", UUID.randomUUID().toString(), 0), "cluster_1")
+        );
+        ShardSearchFailure failure2 = new ShardSearchFailure(
+            new CorruptIndexException("abc", "123"),
+            new SearchShardTarget("nodeId0", new ShardId("bar1", UUID.randomUUID().toString(), 0), "cluster_1")
+        );
+        updated = new SearchResponse.Cluster(
+            cluster1.getClusterAlias(),
+            cluster1.getIndexExpression(),
+            SearchResponse.Cluster.Status.SKIPPED,
+            2,
+            0,
+            0,
+            2,
+            List.of(failure1, failure2),
+            null,
+            false
+        );
+        swapped = clusterRef.compareAndSet(cluster1, updated);
+        assertTrue("CAS swap failed for cluster " + updated, swapped);
+
+        clusterRef = clusters.getCluster("cluster_2");
+        SearchResponse.Cluster cluster2 = clusterRef.get();
+        updated = new SearchResponse.Cluster(
+            cluster2.getClusterAlias(),
+            cluster2.getIndexExpression(),
+            SearchResponse.Cluster.Status.PARTIAL,
+            8,
+            8,
+            0,
+            0,
+            Collections.emptyList(),
+            new TimeValue(17322),
+            true
+        );
+        swapped = clusterRef.compareAndSet(cluster2, updated);
+        assertTrue("CAS swap failed for cluster " + updated, swapped);
+
+        SearchResponse searchResponse = new SearchResponse(sections, null, 10, 9, 1, took, new ShardSearchFailure[0], clusters);
+
+        AsyncSearchResponse asyncSearchResponse = new AsyncSearchResponse(
+            "id",
+            searchResponse,
+            null,
+            false,
+            isRunning,
+            startTimeMillis,
+            expirationTimeMillis
+        );
+
+        try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
+            builder.prettyPrint();
+            asyncSearchResponse.toXContent(builder, ToXContent.EMPTY_PARAMS);
+            assertEquals(Strings.format("""
+                {
+                  "id" : "id",
+                  "is_partial" : false,
+                  "is_running" : false,
+                  "start_time_in_millis" : %s,
+                  "expiration_time_in_millis" : %s,
+                  "completion_time_in_millis" : %s,
+                  "response" : {
+                    "took" : %s,
+                    "timed_out" : true,
+                    "num_reduce_phases" : 2,
+                    "_shards" : {
+                      "total" : 10,
+                      "successful" : 9,
+                      "skipped" : 1,
+                      "failed" : 0
+                    },
+                    "_clusters" : {
+                      "total" : 4,
+                      "successful" : 3,
+                      "skipped" : 1,
+                      "details" : {
+                        "(local)" : {
+                          "status" : "successful",
+                          "indices" : "foo,bar*",
+                          "took" : 11111,
+                          "timed_out" : false,
+                          "_shards" : {
+                            "total" : 10,
+                            "successful" : 10,
+                            "skipped" : 3,
+                            "failed" : 0
+                          }
+                        },
+                        "cluster_1" : {
+                          "status" : "skipped",
+                          "indices" : "foo,bar*",
+                          "timed_out" : false,
+                          "_shards" : {
+                            "total" : 2,
+                            "successful" : 0,
+                            "skipped" : 0,
+                            "failed" : 2
+                          },
+                          "failures" : [
+                            {
+                              "shard" : 0,
+                              "index" : "cluster_1:foo",
+                              "node" : "nodeId0",
+                              "reason" : {
+                                "type" : "null_pointer_exception",
+                                "reason" : "NPE details"
+                              }
+                            },
+                            {
+                              "shard" : 0,
+                              "index" : "cluster_1:bar1",
+                              "node" : "nodeId0",
+                              "reason" : {
+                                "type" : "corrupt_index_exception",
+                                "reason" : "abc (resource=123)"
+                              }
+                            }
+                          ]
+                        },
+                        "cluster_2" : {
+                          "status" : "partial",
+                          "indices" : "foo,bar*",
+                          "took" : 17322,
+                          "timed_out" : true,
+                          "_shards" : {
+                            "total" : 8,
+                            "successful" : 8,
+                            "skipped" : 0,
+                            "failed" : 0
+                          }
+                        },
+                        "cluster_0" : {
+                          "status" : "successful",
+                          "indices" : "foo,bar*",
+                          "took" : 7777,
+                          "timed_out" : false,
+                          "_shards" : {
+                            "total" : 8,
+                            "successful" : 8,
+                            "skipped" : 1,
+                            "failed" : 0
+                          }
+                        }
+                      }
+                    },
+                    "hits" : {
+                      "max_score" : 0.0,
+                      "hits" : [ ]
+                    }
+                  }
+                }""", startTimeMillis, expirationTimeMillis, expectedCompletionTime, took), Strings.toString(builder));
+        }
+    }
+
     // completion_time should NOT be present since search is still running
     public void testToXContentWithSearchResponseWhileRunning() throws IOException {
         boolean isRunning = true;
@@ -306,7 +664,7 @@ public class AsyncSearchResponseTests extends ESTestCase {
             9,
             1,
             took,
-            new ShardSearchFailure[0],
+            ShardSearchFailure.EMPTY_ARRAY,
             SearchResponse.Clusters.EMPTY
         );
 
@@ -389,4 +747,144 @@ public class AsyncSearchResponseTests extends ESTestCase {
             );
         }
     }
+
+    static SearchResponse.Clusters createCCSClusterObjects(int totalClusters, int remoteClusters, boolean ccsMinimizeRoundtrips) {
+        OriginalIndices localIndices = null;
+        if (totalClusters > remoteClusters) {
+            localIndices = new OriginalIndices(new String[] { "foo", "bar*" }, IndicesOptions.lenientExpand());
+        }
+        assert remoteClusters > 0 : "CCS Cluster must have at least one remote cluster";
+        Map<String, OriginalIndices> remoteClusterIndices = new HashMap<>();
+        for (int i = 0; i < remoteClusters; i++) {
+            remoteClusterIndices.put("cluster_" + i, new OriginalIndices(new String[] { "foo", "bar*" }, IndicesOptions.lenientExpand()));
+        }
+
+        return new SearchResponse.Clusters(localIndices, remoteClusterIndices, ccsMinimizeRoundtrips);
+    }
+
+    static SearchResponse.Clusters createCCSClusterObjects(
+        int totalClusters,
+        int remoteClusters,
+        boolean ccsMinimizeRoundtrips,
+        int successfulClusters,
+        int skippedClusters,
+        int partialClusters
+    ) {
+        assert successfulClusters + skippedClusters <= totalClusters : "successful + skipped > totalClusters";
+        assert totalClusters == remoteClusters || totalClusters - remoteClusters == 1
+            : "totalClusters and remoteClusters must be same or total = remote + 1";
+        assert successfulClusters + skippedClusters + partialClusters > 0 : "successful + skipped + partial must be > 0";
+
+        SearchResponse.Clusters clusters = createCCSClusterObjects(totalClusters, remoteClusters, ccsMinimizeRoundtrips);
+
+        int successful = successfulClusters;
+        int skipped = skippedClusters;
+        int partial = partialClusters;
+        if (totalClusters > remoteClusters) {
+            String localAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
+            AtomicReference<SearchResponse.Cluster> localRef = clusters.getCluster(localAlias);
+            SearchResponse.Cluster orig = localRef.get();
+            SearchResponse.Cluster updated;
+            if (successful > 0) {
+                updated = new SearchResponse.Cluster(
+                    localAlias,
+                    localRef.get().getIndexExpression(),
+                    SearchResponse.Cluster.Status.SUCCESSFUL,
+                    5,
+                    5,
+                    0,
+                    0,
+                    Collections.emptyList(),
+                    new TimeValue(1000),
+                    false
+                );
+                successful--;
+            } else if (skipped > 0) {
+                updated = new SearchResponse.Cluster(
+                    localAlias,
+                    localRef.get().getIndexExpression(),
+                    SearchResponse.Cluster.Status.SKIPPED,
+                    5,
+                    0,
+                    0,
+                    5,
+                    Collections.emptyList(),
+                    new TimeValue(1000),
+                    false
+                );
+                skipped--;
+            } else {
+                updated = new SearchResponse.Cluster(
+                    localAlias,
+                    localRef.get().getIndexExpression(),
+                    SearchResponse.Cluster.Status.PARTIAL,
+                    5,
+                    2,
+                    1,
+                    3,
+                    Collections.emptyList(),
+                    new TimeValue(1000),
+                    false
+                );
+                partial--;
+            }
+            boolean swapped = localRef.compareAndSet(orig, updated);
+            assertTrue("CAS swap failed for cluster " + updated, swapped);
+        }
+
+        int numClusters = successful + skipped + partial;
+
+        for (int i = 0; i < numClusters; i++) {
+            String clusterAlias = "cluster_" + i;
+            AtomicReference<SearchResponse.Cluster> clusterRef = clusters.getCluster(clusterAlias);
+            SearchResponse.Cluster orig = clusterRef.get();
+            SearchResponse.Cluster updated;
+            if (successful > 0) {
+                updated = new SearchResponse.Cluster(
+                    clusterAlias,
+                    clusterRef.get().getIndexExpression(),
+                    SearchResponse.Cluster.Status.SUCCESSFUL,
+                    5,
+                    5,
+                    0,
+                    0,
+                    Collections.emptyList(),
+                    new TimeValue(1000),
+                    false
+                );
+                successful--;
+            } else if (skipped > 0) {
+                updated = new SearchResponse.Cluster(
+                    clusterAlias,
+                    clusterRef.get().getIndexExpression(),
+                    SearchResponse.Cluster.Status.SKIPPED,
+                    5,
+                    0,
+                    0,
+                    5,
+                    Collections.emptyList(),
+                    new TimeValue(1000),
+                    false
+                );
+                skipped--;
+            } else {
+                updated = new SearchResponse.Cluster(
+                    clusterAlias,
+                    clusterRef.get().getIndexExpression(),
+                    SearchResponse.Cluster.Status.PARTIAL,
+                    5,
+                    2,
+                    1,
+                    3,
+                    Collections.emptyList(),
+                    new TimeValue(1000),
+                    false
+                );
+                partial--;
+            }
+            boolean swapped = clusterRef.compareAndSet(orig, updated);
+            assertTrue("CAS swap failed for cluster " + updated, swapped);
+        }
+        return clusters;
+    }
 }

+ 100 - 10
x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncStatusResponseTests.java

@@ -47,7 +47,7 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
         SearchResponse.Clusters clusters = switch (randomIntBetween(0, 3)) {
             case 1 -> SearchResponse.Clusters.EMPTY;
             case 2 -> new SearchResponse.Clusters(1, 1, 0);
-            case 3 -> new SearchResponse.Clusters(4, 1, 0, 3, true);
+            case 3 -> AsyncSearchResponseTests.createCCSClusterObjects(4, 3, true);
             default -> null;  // case 0
         };
         return new AsyncStatusResponse(
@@ -80,7 +80,7 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
         SearchResponse.Clusters clusters = switch (randomIntBetween(0, 3)) {
             case 1 -> SearchResponse.Clusters.EMPTY;
             case 2 -> new SearchResponse.Clusters(1, 1, 0);
-            case 3 -> new SearchResponse.Clusters(4, 1, 0, 3, true);
+            case 3 -> AsyncSearchResponseTests.createCCSClusterObjects(4, 3, true); // new SearchResponse.Clusters(4, 1, 0, 3, true);
             default -> null;  // case 0
         };
         return new AsyncStatusResponse(
@@ -140,7 +140,7 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
                       %s
                     }
                     """, args);
-            } else {
+            } else if (clusters.getTotal() == 1) {
                 Object[] args = new Object[] {
                     response.getId(),
                     response.isRunning(),
@@ -180,6 +180,69 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
                       %s
                     }
                     """, args);
+            } else {
+                Object[] args = new Object[] {
+                    response.getId(),
+                    response.isRunning(),
+                    response.isPartial(),
+                    response.getStartTime(),
+                    response.getExpirationTime(),
+                    completionTimeEntry,
+                    response.getTotalShards(),
+                    response.getSuccessfulShards(),
+                    response.getSkippedShards(),
+                    response.getFailedShards(),
+                    clusters.getTotal(),
+                    clusters.getSuccessful(),
+                    clusters.getSkipped(),
+                    response.getCompletionStatus() == null ? "" : Strings.format("""
+                        ,"completion_status" : %s""", response.getCompletionStatus().getStatus()) };
+
+                expectedJson = Strings.format("""
+                    {
+                      "id" : "%s",
+                      "is_running" : %s,
+                      "is_partial" : %s,
+                      "start_time_in_millis" : %s,
+                      "expiration_time_in_millis" : %s,
+                      %s
+                      "_shards" : {
+                        "total" : %s,
+                        "successful" : %s,
+                        "skipped" : %s,
+                        "failed" : %s
+                      },
+                      "_clusters": {
+                       "total": %s,
+                       "successful": %s,
+                       "skipped": %s,
+                        "details": {
+                          "(local)": {
+                            "status": "running",
+                            "indices": "foo,bar*",
+                            "timed_out": false
+                          },
+                          "cluster_1": {
+                            "status": "running",
+                            "indices": "foo,bar*",
+                            "timed_out": false
+                          },
+                          "cluster_2": {
+                            "status": "running",
+                            "indices": "foo,bar*",
+                            "timed_out": false
+                          },
+                          "cluster_0": {
+                            "status": "running",
+                            "indices": "foo,bar*",
+                            "timed_out": false
+                          }
+                        }
+                      }
+                      %s
+                    }
+                    """, args);
+
             }
             response.toXContent(builder, ToXContent.EMPTY_PARAMS);
             assertEquals(XContentHelper.stripWhitespace(expectedJson), Strings.toString(builder));
@@ -187,10 +250,11 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
     }
 
     public void testGetStatusFromStoredSearchRandomizedInputs() {
+        boolean ccs = randomBoolean();
         String searchId = randomSearchId();
         AsyncSearchResponse asyncSearchResponse = AsyncSearchResponseTests.randomAsyncSearchResponse(
             searchId,
-            AsyncSearchResponseTests.randomSearchResponse()
+            AsyncSearchResponseTests.randomSearchResponse(ccs)
         );
 
         if (asyncSearchResponse.getSearchResponse() == null
@@ -241,7 +305,7 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
         int successfulShards = randomIntBetween(0, totalShards);
         int skippedShards = randomIntBetween(0, successfulShards);
         InternalSearchResponse internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS;
-        SearchResponse.Clusters clusters = new SearchResponse.Clusters(100, 99, 1, 99, false);
+        SearchResponse.Clusters clusters = new SearchResponse.Clusters(100, 99, 1);
         SearchResponse searchResponse = new SearchResponse(
             internalSearchResponse,
             null,
@@ -268,6 +332,7 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
         int successfulShards = randomIntBetween(0, totalShards);
         int skippedShards = randomIntBetween(0, successfulShards);
         InternalSearchResponse internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS;
+
         SearchResponse searchResponse = new SearchResponse(
             internalSearchResponse,
             null,
@@ -294,7 +359,26 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
         int successfulShards = randomIntBetween(0, totalShards);
         int skippedShards = randomIntBetween(0, successfulShards);
         InternalSearchResponse internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS;
-        SearchResponse.Clusters clusters = new SearchResponse.Clusters(100, 99, 1, 99, false);
+
+        int totalClusters;
+        int successfulClusters;
+        int skippedClusters;
+        SearchResponse.Clusters clusters;
+        if (randomBoolean()) {
+            // local search only
+            totalClusters = 1;
+            successfulClusters = 1;
+            skippedClusters = 0;
+            clusters = new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
+        } else {
+            // CCS search
+            totalClusters = 80;
+            int successful = randomInt(60);
+            int partial = randomInt(20);
+            successfulClusters = successful + partial;
+            skippedClusters = totalClusters - successfulClusters;
+            clusters = AsyncSearchResponseTests.createCCSClusterObjects(80, 80, true, successful, skippedClusters, partial);
+        }
         SearchResponse searchResponse = new SearchResponse(
             internalSearchResponse,
             null,
@@ -311,7 +395,9 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
         assertNotNull(statusFromStoredSearch);
         assertEquals(0, statusFromStoredSearch.getFailedShards());
         assertEquals(statusFromStoredSearch.getCompletionStatus(), RestStatus.OK);
-        assertEquals(100, statusFromStoredSearch.getClusters().getTotal());
+        assertEquals(totalClusters, statusFromStoredSearch.getClusters().getTotal());
+        assertEquals(skippedClusters, statusFromStoredSearch.getClusters().getSkipped());
+        assertEquals(successfulClusters, statusFromStoredSearch.getClusters().getSuccessful());
     }
 
     public void testGetStatusFromStoredSearchWithNonEmptyClustersStillRunning() {
@@ -322,7 +408,11 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
         int successfulShards = randomIntBetween(0, totalShards);
         int skippedShards = randomIntBetween(0, successfulShards);
         InternalSearchResponse internalSearchResponse = InternalSearchResponse.EMPTY_WITH_TOTAL_HITS;
-        SearchResponse.Clusters clusters = new SearchResponse.Clusters(100, 2, 3, 99, true);
+        int successful = randomInt(10);
+        int partial = randomInt(10);
+        int skipped = randomInt(10);
+        SearchResponse.Clusters clusters = AsyncSearchResponseTests.createCCSClusterObjects(100, 99, true, successful, skipped, partial);
+
         SearchResponse searchResponse = new SearchResponse(
             internalSearchResponse,
             null,
@@ -341,7 +431,7 @@ public class AsyncStatusResponseTests extends AbstractWireSerializingTestCase<As
         assertEquals(0, statusFromStoredSearch.getFailedShards());
         assertNull("completion_status should not be present if still running", statusFromStoredSearch.getCompletionStatus());
         assertEquals(100, statusFromStoredSearch.getClusters().getTotal());
-        assertEquals(2, statusFromStoredSearch.getClusters().getSuccessful());
-        assertEquals(3, statusFromStoredSearch.getClusters().getSkipped());
+        assertEquals(successful + partial, statusFromStoredSearch.getClusters().getSuccessful());
+        assertEquals(skipped, statusFromStoredSearch.getClusters().getSkipped());
     }
 }

+ 30 - 1
x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/ThrowingQueryBuilder.java

@@ -27,15 +27,36 @@ class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
     private final long randomUID;
     private final RuntimeException failure;
     private final int shardId;
+    private final String index;
 
     /**
      * Creates a {@link ThrowingQueryBuilder} with the provided <code>randomUID</code>.
+     *
+     * @param randomUID used solely for identification
+     * @param failure what exception to throw
+     * @param shardId what shardId to throw the exception. If shardId is less than 0, it will throw for all shards.
      */
     ThrowingQueryBuilder(long randomUID, RuntimeException failure, int shardId) {
         super();
         this.randomUID = randomUID;
         this.failure = failure;
         this.shardId = shardId;
+        this.index = null;
+    }
+
+    /**
+     * Creates a {@link ThrowingQueryBuilder} with the provided <code>randomUID</code>.
+     *
+     * @param randomUID used solely for identification
+     * @param failure what exception to throw
+     * @param index what index to throw the exception against (all shards of that index)
+     */
+    ThrowingQueryBuilder(long randomUID, RuntimeException failure, String index) {
+        super();
+        this.randomUID = randomUID;
+        this.failure = failure;
+        this.shardId = Integer.MAX_VALUE;
+        this.index = index;
     }
 
     ThrowingQueryBuilder(StreamInput in) throws IOException {
@@ -43,6 +64,11 @@ class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
         this.randomUID = in.readLong();
         this.failure = in.readException();
         this.shardId = in.readVInt();
+        if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_040)) {
+            this.index = in.readOptionalString();
+        } else {
+            this.index = null;
+        }
     }
 
     @Override
@@ -50,6 +76,9 @@ class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
         out.writeLong(randomUID);
         out.writeException(failure);
         out.writeVInt(shardId);
+        if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_040)) {
+            out.writeOptionalString(index);
+        }
     }
 
     @Override
@@ -64,7 +93,7 @@ class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
         return new Query() {
             @Override
             public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
-                if (context.getShardId() == shardId) {
+                if (context.getShardId() == shardId || shardId < 0 || context.index().getName().equals(index)) {
                     throw failure;
                 }
                 return delegate.createWeight(searcher, scoreMode, boost);

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java

@@ -189,7 +189,7 @@ public class AsyncStatusResponse extends ActionResponse implements SearchStatusR
         }
         RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, skippedShards, failedShards, null);
         if (clusters != null) {
-            builder = clusters.toXContent(builder, null);
+            builder = clusters.toXContent(builder, params);
         }
         if (isRunning == false) { // completion status information is only available for a completed search
             builder.field("completion_status", completionStatus.getStatus());