瀏覽代碼

CCS metadata is opt-in in ESQL JSON responses (#114437) (#114688)

Since Kibana only needs CCS metadata in ESQL responses from certain well-defined locations,
we are making CCS metadata opt-in. This feature is patterned after ESQL profiling, where
you specify "profile": true in the ESQL body and if you asked for it will be present in the response
always (it will be written to the .async-search index and you can’t turn it off in later async-search
requests against this particular query ID) and if you didn’t ask for it at the beginning it will never
be present (it will NOT be written to the .async-search index when it is persisted).

The new option is "include_ccs_metadata": true/false.

(cherry picked from commit fd9d7335c8addb34ecc7b6b2b72a882ec63721f4)
Michael Peterson 1 年之前
父節點
當前提交
915d7cc487
共有 22 個文件被更改,包括 415 次插入137 次删除
  1. 10 10
      docs/reference/esql/esql-across-clusters.asciidoc
  2. 13 0
      docs/reference/esql/esql-query-api.asciidoc
  3. 2 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  4. 81 50
      x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java
  5. 1 0
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java
  6. 9 0
      x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java
  7. 58 13
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java
  8. 125 32
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java
  9. 22 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java
  10. 9 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java
  11. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java
  12. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java
  13. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  14. 17 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParser.java
  15. 4 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  16. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
  17. 3 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
  18. 3 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java
  19. 7 7
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java
  20. 38 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParserTests.java
  21. 6 6
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java
  22. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java

+ 10 - 10
docs/reference/esql/esql-across-clusters.asciidoc

@@ -188,9 +188,10 @@ FROM *:my-index-000001
 [[ccq-cluster-details]]
 ==== Cross-cluster metadata
 
-ES|QL {ccs} responses include metadata about the search on each cluster when the response format is JSON.
+Using the `"include_ccs_metadata": true` option, users can request that
+ES|QL {ccs} responses include metadata about the search on each cluster (when the response format is JSON).
 Here we show an example using the async search endpoint. {ccs-cap} metadata is also present in the synchronous
-search endpoint.
+search endpoint response when requested.
 
 [source,console]
 ----
@@ -200,7 +201,8 @@ POST /_query/async?format=json
     FROM my-index-000001,cluster_one:my-index-000001,cluster_two:my-index*
     | STATS COUNT(http.response.status_code) BY user.id
     | LIMIT 2
-  """
+  """,
+  "include_ccs_metadata": true
 }
 ----
 // TEST[setup:my_index]
@@ -238,7 +240,7 @@ Which returns:
       "(local)": { <4>
         "status": "successful",
         "indices": "blogs",
-        "took": 36,  <5>
+        "took": 41,  <5>
         "_shards": { <6>
           "total": 13,
           "successful": 13,
@@ -260,7 +262,7 @@ Which returns:
       "cluster_two": {
         "status": "successful",
         "indices": "cluster_two:my-index*",
-        "took": 41,
+        "took": 40,
         "_shards": {
           "total": 18,
           "successful": 18,
@@ -286,7 +288,7 @@ it is identified as "(local)".
 <5> How long (in milliseconds) the search took on each cluster. This can be useful to determine
 which clusters have slower response times than others.
 <6> The shard details for the search on that cluster, including a count of shards that were
-skipped due to the can-match phase. Shards are skipped when they cannot have any matching data
+skipped due to the can-match phase results. Shards are skipped when they cannot have any matching data
 and therefore are not included in the full ES|QL query.
 
 
@@ -294,9 +296,6 @@ The cross-cluster metadata can be used to determine whether any data came back f
 For instance, in the query below, the wildcard expression for `cluster-two` did not resolve
 to a concrete index (or indices). The cluster is, therefore, marked as 'skipped' and the total
 number of shards searched is set to zero.
-Since the other cluster did have a matching index, the search did not return an error, but
-instead returned all the matching data it could find.
-
 
 [source,console]
 ----
@@ -306,7 +305,8 @@ POST /_query/async?format=json
     FROM cluster_one:my-index*,cluster_two:logs*
     | STATS COUNT(http.response.status_code) BY user.id
     | LIMIT 2
-  """
+  """,
+  "include_ccs_metadata": true
 }
 ----
 // TEST[continued]

+ 13 - 0
docs/reference/esql/esql-query-api.asciidoc

@@ -67,6 +67,11 @@ precedence.
 `false`. The API only supports this parameter for CBOR, JSON, SMILE, and YAML
 responses. See <<esql-rest-columnar>>.
 
+`include_ccs_metadata`::
+(Optional, boolean) If `true`, cross-cluster searches will include metadata about the query
+on each cluster. Defaults to `false`. The API only supports this parameter for CBOR, JSON, SMILE,
+and YAML responses. See <<ccq-cluster-details>>.
+
 `locale`::
 (Optional, string) Returns results (especially dates) formatted per the conventions of the locale.
 For syntax, refer to <<esql-locale-param>>.
@@ -85,6 +90,7 @@ https://en.wikipedia.org/wiki/Query_plan[EXPLAIN PLAN].
 `query`::
 (Required, string) {esql} query to run. For syntax, refer to <<esql-syntax>>.
 
+
 ifeval::["{release-state}"=="unreleased"]
 `table`::
 (Optional, object) Named "table" parameters that can be referenced by the <<esql-lookup>> command.
@@ -108,6 +114,13 @@ returned if `drop_null_columns` is sent with the request.
 (array of arrays)
 Values for the search results.
 
+`_clusters`::
+(object)
+Metadata about clusters involved in the execution of a cross-cluster query. Only returned (1) for
+cross-cluster searches and (2) when `include_ccs_metadata` is sent in the body and set to `true`
+and (3) when `format` of the response is set to JSON (the default), CBOR, SMILE, or YAML.
+See <<ccq-cluster-details>> for more information.
+
 `profile`::
 (object)
 Profile describing the execution of the query. Only returned if `profile` was sent in the body.

+ 2 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -240,6 +240,8 @@ public class TransportVersions {
     public static final TransportVersion SIMULATE_INDEX_TEMPLATES_SUBSTITUTIONS = def(8_764_00_0);
     public static final TransportVersion RETRIEVERS_TELEMETRY_ADDED = def(8_765_00_0);
     public static final TransportVersion ESQL_CACHED_STRING_SERIALIZATION = def(8_766_00_0);
+    public static final TransportVersion CHUNK_SENTENCE_OVERLAP_SETTING_ADDE1D = def(8_767_00_0);
+    public static final TransportVersion OPT_IN_ESQL_CCS_EXECUTION_INFO = def(8_768_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 81 - 50
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

@@ -127,8 +127,18 @@ public class MultiClustersIT extends ESRestTestCase {
         refresh(client, index);
     }
 
-    private Map<String, Object> run(String query) throws IOException {
-        Map<String, Object> resp = runEsql(new RestEsqlTestCase.RequestObjectBuilder().query(query).build());
+    private Map<String, Object> run(String query, boolean includeCCSMetadata) throws IOException {
+        Map<String, Object> resp = runEsql(
+            new RestEsqlTestCase.RequestObjectBuilder().query(query).includeCCSMetadata(includeCCSMetadata).build()
+        );
+        logger.info("--> query {} response {}", query, resp);
+        return resp;
+    }
+
+    private Map<String, Object> runWithColumnarAndIncludeCCSMetadata(String query) throws IOException {
+        Map<String, Object> resp = runEsql(
+            new RestEsqlTestCase.RequestObjectBuilder().query(query).includeCCSMetadata(true).columnar(true).build()
+        );
         logger.info("--> query {} response {}", query, resp);
         return resp;
     }
@@ -147,62 +157,77 @@ public class MultiClustersIT extends ESRestTestCase {
 
     public void testCount() throws Exception {
         {
-            Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS c = COUNT(*)");
+            boolean includeCCSMetadata = randomBoolean();
+            Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
             var columns = List.of(Map.of("name", "c", "type", "long"));
             var values = List.of(List.of(localDocs.size() + remoteDocs.size()));
 
             MapMatcher mapMatcher = matchesMap();
-            assertMap(
-                result,
-                mapMatcher.entry("columns", columns)
-                    .entry("values", values)
-                    .entry("took", greaterThanOrEqualTo(0))
-                    .entry("_clusters", any(Map.class))
-            );
-            assertClusterDetailsMap(result, false);
+            if (includeCCSMetadata) {
+                mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (includeCCSMetadata) {
+                assertClusterDetailsMap(result, false);
+            }
         }
         {
-            Map<String, Object> result = run("FROM *:test-remote-index | STATS c = COUNT(*)");
+            boolean includeCCSMetadata = randomBoolean();
+            Map<String, Object> result = run("FROM *:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
             var columns = List.of(Map.of("name", "c", "type", "long"));
             var values = List.of(List.of(remoteDocs.size()));
 
             MapMatcher mapMatcher = matchesMap();
-            assertMap(
-                result,
-                mapMatcher.entry("columns", columns)
-                    .entry("values", values)
-                    .entry("took", greaterThanOrEqualTo(0))
-                    .entry("_clusters", any(Map.class))
-            );
-            assertClusterDetailsMap(result, true);
+            if (includeCCSMetadata) {
+                mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (includeCCSMetadata) {
+                assertClusterDetailsMap(result, true);
+            }
         }
     }
 
     public void testUngroupedAggs() throws Exception {
         {
-            Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data)");
+            boolean includeCCSMetadata = randomBoolean();
+            Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
             var columns = List.of(Map.of("name", "total", "type", "long"));
             long sum = Stream.concat(localDocs.stream(), remoteDocs.stream()).mapToLong(d -> d.data).sum();
             var values = List.of(List.of(Math.toIntExact(sum)));
 
             // check all sections of map except _cluster/details
             MapMatcher mapMatcher = matchesMap();
-            assertMap(
-                result,
-                mapMatcher.entry("columns", columns)
-                    .entry("values", values)
-                    .entry("took", greaterThanOrEqualTo(0))
-                    .entry("_clusters", any(Map.class))
-            );
-            assertClusterDetailsMap(result, false);
+            if (includeCCSMetadata) {
+                mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (includeCCSMetadata) {
+                assertClusterDetailsMap(result, false);
+            }
         }
         {
-            Map<String, Object> result = run("FROM *:test-remote-index | STATS total = SUM(data)");
+            boolean includeCCSMetadata = randomBoolean();
+            Map<String, Object> result = run("FROM *:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
+            var columns = List.of(Map.of("name", "total", "type", "long"));
+            long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
+            var values = List.of(List.of(Math.toIntExact(sum)));
+
+            MapMatcher mapMatcher = matchesMap();
+            if (includeCCSMetadata) {
+                mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (includeCCSMetadata) {
+                assertClusterDetailsMap(result, true);
+            }
+        }
+        {
+            Map<String, Object> result = runWithColumnarAndIncludeCCSMetadata("FROM *:test-remote-index | STATS total = SUM(data)");
             var columns = List.of(Map.of("name", "total", "type", "long"));
             long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
             var values = List.of(List.of(Math.toIntExact(sum)));
 
-            // check all sections of map except _cluster/details
             MapMatcher mapMatcher = matchesMap();
             assertMap(
                 result,
@@ -269,7 +294,11 @@ public class MultiClustersIT extends ESRestTestCase {
 
     public void testGroupedAggs() throws Exception {
         {
-            Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color");
+            boolean includeCCSMetadata = randomBoolean();
+            Map<String, Object> result = run(
+                "FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color",
+                includeCCSMetadata
+            );
             var columns = List.of(Map.of("name", "total", "type", "long"), Map.of("name", "color", "type", "keyword"));
             var values = Stream.concat(localDocs.stream(), remoteDocs.stream())
                 .collect(Collectors.toMap(d -> d.color, Doc::data, Long::sum))
@@ -280,17 +309,20 @@ public class MultiClustersIT extends ESRestTestCase {
                 .toList();
 
             MapMatcher mapMatcher = matchesMap();
-            assertMap(
-                result,
-                mapMatcher.entry("columns", columns)
-                    .entry("values", values)
-                    .entry("took", greaterThanOrEqualTo(0))
-                    .entry("_clusters", any(Map.class))
-            );
-            assertClusterDetailsMap(result, false);
+            if (includeCCSMetadata) {
+                mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (includeCCSMetadata) {
+                assertClusterDetailsMap(result, false);
+            }
         }
         {
-            Map<String, Object> result = run("FROM *:test-remote-index | STATS total = SUM(data) by color | SORT color");
+            boolean includeCCSMetadata = randomBoolean();
+            Map<String, Object> result = run(
+                "FROM *:test-remote-index | STATS total = SUM(data) by color | SORT color",
+                includeCCSMetadata
+            );
             var columns = List.of(Map.of("name", "total", "type", "long"), Map.of("name", "color", "type", "keyword"));
             var values = remoteDocs.stream()
                 .collect(Collectors.toMap(d -> d.color, Doc::data, Long::sum))
@@ -300,16 +332,15 @@ public class MultiClustersIT extends ESRestTestCase {
                 .map(e -> List.of(Math.toIntExact(e.getValue()), e.getKey()))
                 .toList();
 
-            // check all sections of map except _cluster/details
+            // check all sections of map except _clusters/details
             MapMatcher mapMatcher = matchesMap();
-            assertMap(
-                result,
-                mapMatcher.entry("columns", columns)
-                    .entry("values", values)
-                    .entry("took", greaterThanOrEqualTo(0))
-                    .entry("_clusters", any(Map.class))
-            );
-            assertClusterDetailsMap(result, true);
+            if (includeCCSMetadata) {
+                mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (includeCCSMetadata) {
+                assertClusterDetailsMap(result, true);
+            }
         }
     }
 

+ 1 - 0
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -76,6 +76,7 @@ public class RestEsqlIT extends RestEsqlTestCase {
         indexTimestampData(1);
 
         RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | stats avg(value)");
+        requestObjectBuilder().includeCCSMetadata(randomBoolean());
         if (Build.current().isSnapshot()) {
             builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
         }

+ 9 - 0
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

@@ -128,6 +128,7 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         private Boolean keepOnCompletion = null;
 
         private Boolean profile = null;
+        private Boolean includeCCSMetadata = null;
 
         private CheckedConsumer<XContentBuilder, IOException> filter;
 
@@ -197,6 +198,11 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
             return this;
         }
 
+        public RequestObjectBuilder includeCCSMetadata(boolean includeCCSMetadata) {
+            this.includeCCSMetadata = includeCCSMetadata;
+            return this;
+        }
+
         public RequestObjectBuilder filter(CheckedConsumer<XContentBuilder, IOException> filter) {
             this.filter = filter;
             return this;
@@ -220,6 +226,9 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
                 if (profile != null) {
                     builder.field("profile", profile);
                 }
+                if (includeCCSMetadata != null) {
+                    builder.field("include_ccs_metadata", includeCCSMetadata);
+                }
                 if (filter != null) {
                     builder.startObject("filter");
                     filter.accept(builder);

+ 58 - 13
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java

@@ -14,6 +14,7 @@ import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.ingest.common.IngestCommonPlugin;
 import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.license.LicenseService;
@@ -220,7 +221,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
     public void testWithHostsPolicy() {
         for (var mode : Enrich.Mode.values()) {
             String query = "FROM events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os";
-            try (EsqlQueryResponse resp = runQuery(query)) {
+            try (EsqlQueryResponse resp = runQuery(query, null)) {
                 List<List<Object>> rows = getValuesList(resp);
                 assertThat(
                     rows,
@@ -237,9 +238,14 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                 assertFalse(resp.getExecutionInfo().isCrossClusterSearch());
             }
         }
+
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         for (var mode : Enrich.Mode.values()) {
             String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os";
-            try (EsqlQueryResponse resp = runQuery(query)) {
+            try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
                 List<List<Object>> rows = getValuesList(resp);
                 assertThat(
                     rows,
@@ -255,6 +261,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                     )
                 );
                 EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of("c1", "c2")));
                 assertCCSExecutionInfoDetails(executionInfo);
             }
@@ -262,7 +269,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
 
         for (var mode : Enrich.Mode.values()) {
             String query = "FROM *:events,events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os";
-            try (EsqlQueryResponse resp = runQuery(query)) {
+            try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
                 List<List<Object>> rows = getValuesList(resp);
                 assertThat(
                     rows,
@@ -278,6 +285,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                     )
                 );
                 EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
                 assertCCSExecutionInfoDetails(executionInfo);
             }
@@ -285,6 +293,10 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
     }
 
     public void testEnrichHostsAggThenEnrichVendorCoordinator() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         for (var hostMode : Enrich.Mode.values()) {
             String query = String.format(Locale.ROOT, """
                 FROM *:events,events
@@ -295,7 +307,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                 | stats c = SUM(c) by vendor
                 | sort vendor
                 """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.COORDINATOR));
-            try (EsqlQueryResponse resp = runQuery(query)) {
+            try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
                 assertThat(
                     getValuesList(resp),
                     equalTo(
@@ -309,6 +321,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                     )
                 );
                 EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
                 assertCCSExecutionInfoDetails(executionInfo);
             }
@@ -316,6 +329,10 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
     }
 
     public void testEnrichTwiceThenAggs() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         for (var hostMode : Enrich.Mode.values()) {
             String query = String.format(Locale.ROOT, """
                 FROM *:events,events
@@ -325,7 +342,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                 | stats c = COUNT(*) by vendor
                 | sort vendor
                 """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.COORDINATOR));
-            try (EsqlQueryResponse resp = runQuery(query)) {
+            try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
                 assertThat(
                     getValuesList(resp),
                     equalTo(
@@ -339,6 +356,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                     )
                 );
                 EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
                 assertCCSExecutionInfoDetails(executionInfo);
             }
@@ -346,6 +364,10 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
     }
 
     public void testEnrichCoordinatorThenAny() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         String query = String.format(Locale.ROOT, """
             FROM *:events,events
             | eval ip= TO_STR(host)
@@ -354,7 +376,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
             | stats c = COUNT(*) by vendor
             | sort vendor
             """, enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.ANY));
-        try (EsqlQueryResponse resp = runQuery(query)) {
+        try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
             assertThat(
                 getValuesList(resp),
                 equalTo(
@@ -368,12 +390,17 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                 )
             );
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
             assertCCSExecutionInfoDetails(executionInfo);
         }
     }
 
     public void testEnrichCoordinatorWithVendor() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         for (Enrich.Mode hostMode : Enrich.Mode.values()) {
             String query = String.format(Locale.ROOT, """
                 FROM *:events,events
@@ -383,7 +410,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                 | stats c = COUNT(*) by vendor
                 | sort vendor
                 """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.COORDINATOR));
-            try (EsqlQueryResponse resp = runQuery(query)) {
+            try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
                 assertThat(
                     getValuesList(resp),
                     equalTo(
@@ -397,6 +424,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                     )
                 );
                 EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
                 assertCCSExecutionInfoDetails(executionInfo);
             }
@@ -405,6 +433,10 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
     }
 
     public void testEnrichRemoteWithVendor() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) {
             var query = String.format(Locale.ROOT, """
                 FROM *:events,events
@@ -414,7 +446,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                 | stats c = COUNT(*) by vendor
                 | sort vendor
                 """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE));
-            try (EsqlQueryResponse resp = runQuery(query)) {
+            try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
                 assertThat(
                     getValuesList(resp),
                     equalTo(
@@ -430,6 +462,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
                     )
                 );
                 EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
                 assertCCSExecutionInfoDetails(executionInfo);
             }
@@ -444,7 +477,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
             | LIMIT 5
             | %s
             """, enrichHosts(Enrich.Mode.REMOTE));
-        var error = expectThrows(VerificationException.class, () -> runQuery(query).close());
+        var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
         assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
     }
 
@@ -455,7 +488,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
             | eval ip= TO_STR(host)
             | %s
             """, enrichHosts(Enrich.Mode.REMOTE));
-        var error = expectThrows(VerificationException.class, () -> runQuery(query).close());
+        var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
         assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
     }
 
@@ -468,7 +501,7 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
             | %s
             | sort vendor
             """, enrichHosts(Enrich.Mode.ANY), enrichVendors(Enrich.Mode.REMOTE));
-        var error = expectThrows(VerificationException.class, () -> runQuery(query).close());
+        var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
         assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after STATS"));
     }
 
@@ -480,20 +513,23 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
             | %s
             | sort vendor
             """, enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.REMOTE));
-        var error = expectThrows(VerificationException.class, () -> runQuery(query).close());
+        var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
         assertThat(
             error.getMessage(),
             containsString("ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")
         );
     }
 
-    protected EsqlQueryResponse runQuery(String query) {
+    protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) {
         EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
         request.query(query);
         request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
         if (randomBoolean()) {
             request.profile(true);
         }
+        if (ccsMetadataInResponse != null) {
+            request.includeCCSMetadata(ccsMetadataInResponse);
+        }
         return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
     }
 
@@ -516,6 +552,15 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
         }
     }
 
+    public static Tuple<Boolean, Boolean> randomIncludeCCSMetadata() {
+        return switch (randomIntBetween(1, 3)) {
+            case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE);
+            case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE);
+            case 3 -> new Tuple<>(null, Boolean.FALSE);
+            default -> throw new AssertionError("should not get here");
+        };
+    }
+
     public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin {
 
         public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception {

+ 125 - 32
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

@@ -10,7 +10,6 @@ package org.elasticsearch.xpack.esql.action;
 import org.elasticsearch.Build;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.Priority;
@@ -21,13 +20,16 @@ import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.AbstractMultiClustersTestCase;
 import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -79,12 +81,15 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         }
     }
 
-    public void testSimple() {
+    public void testSuccessfulPathways() {
         Map<String, Object> testClusterInfo = setupTwoClusters();
         int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
         int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
 
-        try (EsqlQueryResponse resp = runQuery("from logs-*,*:logs-* | stats sum (v)")) {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+        try (EsqlQueryResponse resp = runQuery("from logs-*,*:logs-* | stats sum (v)", requestIncludeMeta)) {
             List<List<Object>> values = getValuesList(resp);
             assertThat(values, hasSize(1));
             assertThat(values.get(0), equalTo(List.of(330L)));
@@ -93,6 +98,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
@@ -113,9 +119,12 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards));
             assertThat(localCluster.getSkippedShards(), equalTo(0));
             assertThat(localCluster.getFailedShards(), equalTo(0));
+
+            // ensure that the _clusters metadata is present only if requested
+            assertClusterMetadataInResponse(resp, responseExpectMeta);
         }
 
-        try (EsqlQueryResponse resp = runQuery("from logs-*,*:logs-* | stats count(*) by tag | sort tag | keep tag")) {
+        try (EsqlQueryResponse resp = runQuery("from logs-*,*:logs-* | stats count(*) by tag | sort tag | keep tag", requestIncludeMeta)) {
             List<List<Object>> values = getValuesList(resp);
             assertThat(values, hasSize(2));
             assertThat(values.get(0), equalTo(List.of("local")));
@@ -125,6 +134,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
@@ -145,6 +155,9 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards));
             assertThat(localCluster.getSkippedShards(), equalTo(0));
             assertThat(localCluster.getFailedShards(), equalTo(0));
+
+            // ensure that the _clusters metadata is present only if requested
+            assertClusterMetadataInResponse(resp, responseExpectMeta);
         }
     }
 
@@ -153,9 +166,13 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
         int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
 
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         // since a valid local index was specified, the invalid index on cluster-a does not throw an exception,
         // but instead is simply ignored - ensure this is captured in the EsqlExecutionInfo
-        try (EsqlQueryResponse resp = runQuery("from logs-*,cluster-a:no_such_index | stats sum (v)")) {
+        try (EsqlQueryResponse resp = runQuery("from logs-*,cluster-a:no_such_index | stats sum (v)", requestIncludeMeta)) {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             List<List<Object>> values = getValuesList(resp);
             assertThat(values, hasSize(1));
@@ -164,6 +181,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
@@ -188,7 +206,12 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
 
         // since the remote cluster has a valid index expression, the missing local index is ignored
         // make this is captured in the EsqlExecutionInfo
-        try (EsqlQueryResponse resp = runQuery("from no_such_index,*:logs-* | stats count(*) by tag | sort tag | keep tag")) {
+        try (
+            EsqlQueryResponse resp = runQuery(
+                "from no_such_index,*:logs-* | stats count(*) by tag | sort tag | keep tag",
+                requestIncludeMeta
+            )
+        ) {
             List<List<Object>> values = getValuesList(resp);
             assertThat(values, hasSize(1));
             assertThat(values.get(0), equalTo(List.of("remote")));
@@ -197,6 +220,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
@@ -223,7 +247,8 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         // in the index expression of the EsqlExecutionInfo and with an indication that zero shards were searched
         try (
             EsqlQueryResponse resp = runQuery(
-                "FROM no_such_index*,*:no_such_index1,*:no_such_index2,logs-1 | STATS COUNT(*) by tag | SORT tag | KEEP tag"
+                "FROM no_such_index*,*:no_such_index1,*:no_such_index2,logs-1 | STATS COUNT(*) by tag | SORT tag | KEEP tag",
+                requestIncludeMeta
             )
         ) {
             List<List<Object>> values = getValuesList(resp);
@@ -234,6 +259,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
@@ -257,7 +283,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         }
 
         // wildcard on remote cluster that matches nothing - should be present in EsqlExecutionInfo marked as SKIPPED, no shards searched
-        try (EsqlQueryResponse resp = runQuery("from cluster-a:no_such_index*,logs-* | stats sum (v)")) {
+        try (EsqlQueryResponse resp = runQuery("from cluster-a:no_such_index*,logs-* | stats sum (v)", requestIncludeMeta)) {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             List<List<Object>> values = getValuesList(resp);
             assertThat(values, hasSize(1));
@@ -266,6 +292,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
@@ -293,8 +320,12 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         Map<String, Object> testClusterInfo = setupTwoClusters();
         int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
 
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         // a query which matches no remote cluster is not a cross cluster search
-        try (EsqlQueryResponse resp = runQuery("from logs-*,x*:no_such_index* | stats sum (v)")) {
+        try (EsqlQueryResponse resp = runQuery("from logs-*,x*:no_such_index* | stats sum (v)", requestIncludeMeta)) {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             List<List<Object>> values = getValuesList(resp);
             assertThat(values, hasSize(1));
@@ -303,12 +334,18 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNotNull(executionInfo);
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER)));
             assertThat(executionInfo.isCrossClusterSearch(), is(false));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
             // since this not a CCS, only the overall took time in the EsqlExecutionInfo matters
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
         }
 
         // cluster-foo* matches nothing and so should not be present in the EsqlExecutionInfo
-        try (EsqlQueryResponse resp = runQuery("from logs-*,no_such_index*,cluster-a:no_such_index*,cluster-foo*:* | stats sum (v)")) {
+        try (
+            EsqlQueryResponse resp = runQuery(
+                "from logs-*,no_such_index*,cluster-a:no_such_index*,cluster-foo*:* | stats sum (v)",
+                requestIncludeMeta
+            )
+        ) {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             List<List<Object>> values = getValuesList(resp);
             assertThat(values, hasSize(1));
@@ -317,6 +354,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
@@ -349,8 +387,12 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
     public void testCCSExecutionOnSearchesWithLimit0() {
         setupTwoClusters();
 
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
         // Ensure non-cross cluster queries have overall took time
-        try (EsqlQueryResponse resp = runQuery("FROM logs* | LIMIT 0")) {
+        try (EsqlQueryResponse resp = runQuery("FROM logs* | LIMIT 0", requestIncludeMeta)) {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(false));
@@ -358,12 +400,13 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         }
 
         // ensure cross-cluster searches have overall took time and correct per-cluster details in EsqlExecutionInfo
-        try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:* | LIMIT 0")) {
+        try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:* | LIMIT 0", requestIncludeMeta)) {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             long overallTookMillis = executionInfo.overallTook().millis();
             assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
             EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
@@ -387,12 +430,13 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNull(localCluster.getFailedShards());
         }
 
-        try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:nomatch* | LIMIT 0")) {
+        try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:nomatch* | LIMIT 0", requestIncludeMeta)) {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             long overallTookMillis = executionInfo.overallTook().millis();
             assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
             EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
@@ -415,12 +459,13 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertNull(localCluster.getFailedShards());
         }
 
-        try (EsqlQueryResponse resp = runQuery("FROM nomatch*,cluster-a:* | LIMIT 0")) {
+        try (EsqlQueryResponse resp = runQuery("FROM nomatch*,cluster-a:* | LIMIT 0", requestIncludeMeta)) {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             long overallTookMillis = executionInfo.overallTook().millis();
             assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
 
             EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
@@ -447,7 +492,16 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
         int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
 
-        try (EsqlQueryResponse resp = runQuery("FROM logs*,*:logs* METADATA _index | stats sum(v) by _index | sort _index")) {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
+        try (
+            EsqlQueryResponse resp = runQuery(
+                "FROM logs*,*:logs* METADATA _index | stats sum(v) by _index | sort _index",
+                requestIncludeMeta
+            )
+        ) {
             List<List<Object>> values = getValuesList(resp);
             assertThat(values.get(0), equalTo(List.of(285L, "cluster-a:logs-2")));
             assertThat(values.get(1), equalTo(List.of(45L, "logs-1")));
@@ -455,6 +509,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
 
             EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
@@ -477,18 +532,6 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         }
     }
 
-    void waitForNoInitializingShards(Client client, TimeValue timeout, String... indices) {
-        ClusterHealthResponse resp = client.admin()
-            .cluster()
-            .prepareHealth(TEST_REQUEST_TIMEOUT, indices)
-            .setWaitForEvents(Priority.LANGUID)
-            .setWaitForNoRelocatingShards(true)
-            .setWaitForNoInitializingShards(true)
-            .setTimeout(timeout)
-            .get();
-        assertFalse(Strings.toString(resp, true, true), resp.isTimedOut());
-    }
-
     public void testProfile() {
         Map<String, Object> testClusterInfo = setupTwoClusters();
         int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
@@ -529,6 +572,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
                 EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
                 assertNull(remoteCluster);
                 assertThat(executionInfo.isCrossClusterSearch(), is(false));
+                assertThat(executionInfo.includeCCSMetadata(), is(false));
                 // since this not a CCS, only the overall took time in the EsqlExecutionInfo matters
                 assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
             }
@@ -550,6 +594,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
                 EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
                 assertNotNull(executionInfo);
                 assertThat(executionInfo.isCrossClusterSearch(), is(true));
+                assertThat(executionInfo.includeCCSMetadata(), is(false));
                 assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
 
                 EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
@@ -582,6 +627,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
                 EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
                 assertNotNull(executionInfo);
                 assertThat(executionInfo.isCrossClusterSearch(), is(true));
+                assertThat(executionInfo.includeCCSMetadata(), is(false));
                 assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
 
                 EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
@@ -608,14 +654,11 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
 
     public void testWarnings() throws Exception {
         Map<String, Object> testClusterInfo = setupTwoClusters();
-        String localIndex = (String) testClusterInfo.get("local.index");
-        String remoteIndex = (String) testClusterInfo.get("remote.index");
         int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
         int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
 
         EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
         request.query("FROM logs*,*:logs* | EVAL ip = to_ip(id) | STATS total = sum(v) by ip | LIMIT 10");
-        PlainActionFuture<EsqlQueryResponse> future = new PlainActionFuture<>();
         InternalTestCluster cluster = cluster(LOCAL_CLUSTER);
         String node = randomFrom(cluster.getNodeNames());
         CountDownLatch latch = new CountDownLatch(1);
@@ -634,6 +677,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
+            assertThat(executionInfo.includeCCSMetadata(), is(false));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
 
             EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
@@ -662,11 +706,34 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         assertTrue(latch.await(30, TimeUnit.SECONDS));
     }
 
-    protected EsqlQueryResponse runQuery(String query) {
+    private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {
+        try {
+            final Map<String, Object> esqlResponseAsMap = XContentTestUtils.convertToMap(resp);
+            final Object clusters = esqlResponseAsMap.get("_clusters");
+            if (responseExpectMeta) {
+                assertNotNull(clusters);
+                // test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response)
+                @SuppressWarnings("unchecked")
+                Map<String, Object> inner = (Map<String, Object>) clusters;
+                assertTrue(inner.containsKey("total"));
+                assertTrue(inner.containsKey("details"));
+            } else {
+                assertNull(clusters);
+            }
+        } catch (IOException e) {
+            fail("Could not convert ESQL response to Map: " + e);
+        }
+    }
+
+    protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) {
         EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
         request.query(query);
         request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
-        request.profile(true);
+        request.profile(randomInt(5) == 2);
+        request.columnar(randomBoolean());
+        if (ccsMetadataInResponse != null) {
+            request.includeCCSMetadata(ccsMetadataInResponse);
+        }
         return runQuery(request);
     }
 
@@ -674,6 +741,32 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
     }
 
+    /**
+     * v1: value to send to runQuery (can be null; null means use default value)
+     * v2: whether to expect CCS Metadata in the response (cannot be null)
+     * @return
+     */
+    public static Tuple<Boolean, Boolean> randomIncludeCCSMetadata() {
+        return switch (randomIntBetween(1, 3)) {
+            case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE);
+            case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE);
+            case 3 -> new Tuple<>(null, Boolean.FALSE);
+            default -> throw new AssertionError("should not get here");
+        };
+    }
+
+    void waitForNoInitializingShards(Client client, TimeValue timeout, String... indices) {
+        ClusterHealthResponse resp = client.admin()
+            .cluster()
+            .prepareHealth(TEST_REQUEST_TIMEOUT, indices)
+            .setWaitForEvents(Priority.LANGUID)
+            .setWaitForNoRelocatingShards(true)
+            .setWaitForNoInitializingShards(true)
+            .setTimeout(timeout)
+            .get();
+        assertFalse(Strings.toString(resp, true, true), resp.isTimedOut());
+    }
+
     Map<String, Object> setupTwoClusters() {
         String localIndex = "logs-1";
         int numShardsLocal = randomIntBetween(1, 5);

+ 22 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.action;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -63,24 +64,29 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
     private final transient Predicate<String> skipUnavailablePredicate;
     private TimeValue overallTook;
 
-    public EsqlExecutionInfo() {
-        this(Predicates.always());  // default all clusters to skip_unavailable=true
+    // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present)
+    private final boolean includeCCSMetadata;
+
+    public EsqlExecutionInfo(boolean includeCCSMetadata) {
+        this(Predicates.always(), includeCCSMetadata);  // default all clusters to skip_unavailable=true
     }
 
     /**
      * @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false
      */
-    public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate) {
+    public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) {
         this.clusterInfo = ConcurrentCollections.newConcurrentMap();
         this.skipUnavailablePredicate = skipUnavailablePredicate;
+        this.includeCCSMetadata = includeCCSMetadata;
     }
 
     /**
      * For testing use with fromXContent parsing only
      * @param clusterInfo
      */
-    EsqlExecutionInfo(ConcurrentMap<String, Cluster> clusterInfo) {
+    EsqlExecutionInfo(ConcurrentMap<String, Cluster> clusterInfo, boolean includeCCSMetadata) {
         this.clusterInfo = clusterInfo;
+        this.includeCCSMetadata = includeCCSMetadata;
         this.skipUnavailablePredicate = Predicates.always();
     }
 
@@ -94,6 +100,11 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
             clusterList.forEach(c -> m.put(c.getClusterAlias(), c));
             this.clusterInfo = m;
         }
+        if (in.getTransportVersion().onOrAfter(TransportVersions.OPT_IN_ESQL_CCS_EXECUTION_INFO)) {
+            this.includeCCSMetadata = in.readBoolean();
+        } else {
+            this.includeCCSMetadata = false;
+        }
         this.skipUnavailablePredicate = Predicates.always();
     }
 
@@ -105,6 +116,13 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
         } else {
             out.writeCollection(Collections.emptyList());
         }
+        if (out.getTransportVersion().onOrAfter(TransportVersions.OPT_IN_ESQL_CCS_EXECUTION_INFO)) {
+            out.writeBoolean(includeCCSMetadata);
+        }
+    }
+
+    public boolean includeCCSMetadata() {
+        return includeCCSMetadata;
     }
 
     public void overallTook(TimeValue took) {

+ 9 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java

@@ -42,6 +42,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
     private String query;
     private boolean columnar;
     private boolean profile;
+    private boolean includeCCSMetadata;
     private Locale locale;
     private QueryBuilder filter;
     private QueryPragmas pragmas = new QueryPragmas(Settings.EMPTY);
@@ -128,6 +129,14 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
         this.profile = profile;
     }
 
+    public void includeCCSMetadata(boolean include) {
+        this.includeCCSMetadata = include;
+    }
+
+    public boolean includeCCSMetadata() {
+        return includeCCSMetadata;
+    }
+
     /**
      * Is profiling enabled?
      */

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

@@ -206,7 +206,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
                 b.append(ResponseXContentUtils.allColumns(columns, "columns"));
             }
             b.array("values", ResponseXContentUtils.columnValues(this.columns, this.pages, columnar, nullColumns));
-            if (executionInfo != null && executionInfo.isCrossClusterSearch()) {
+            if (executionInfo != null && executionInfo.isCrossClusterSearch() && executionInfo.includeCCSMetadata()) {
                 b.field("_clusters", executionInfo);
             }
             if (profile != null) {

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java

@@ -80,6 +80,7 @@ final class RequestXContent {
     private static final ParseField LOCALE_FIELD = new ParseField("locale");
     private static final ParseField PROFILE_FIELD = new ParseField("profile");
     private static final ParseField ACCEPT_PRAGMA_RISKS = new ParseField("accept_pragma_risks");
+    private static final ParseField INCLUDE_CCS_METADATA_FIELD = new ParseField("include_ccs_metadata");
     static final ParseField TABLES_FIELD = new ParseField("tables");
 
     static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout");
@@ -117,6 +118,7 @@ final class RequestXContent {
         parser.declareBoolean(EsqlQueryRequest::columnar, COLUMNAR_FIELD);
         parser.declareObject(EsqlQueryRequest::filter, (p, c) -> AbstractQueryBuilder.parseTopLevelQuery(p), FILTER_FIELD);
         parser.declareBoolean(EsqlQueryRequest::acceptedPragmaRisks, ACCEPT_PRAGMA_RISKS);
+        parser.declareBoolean(EsqlQueryRequest::includeCCSMetadata, INCLUDE_CCS_METADATA_FIELD);
         parser.declareObject(
             EsqlQueryRequest::pragmas,
             (p, c) -> new QueryPragmas(Settings.builder().loadFromMap(p.map()).build()),

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -834,7 +834,7 @@ public class ComputeService {
              * execution metadata for ES|QL processing local to this cluster. The execution info will be copied into the
              * ComputeResponse that is sent back to the primary coordinating cluster.
              */
-            EsqlExecutionInfo execInfo = new EsqlExecutionInfo();
+            EsqlExecutionInfo execInfo = new EsqlExecutionInfo(true);
             execInfo.swapCluster(clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(clusterAlias, Arrays.toString(request.indices())));
             CancellableTask cancellable = (CancellableTask) task;
             long start = request.configuration().getQueryStartTimeNanos();

+ 17 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParser.java

@@ -37,10 +37,15 @@ public class EsqlMediaTypeParser {
      * format. If there is a {@code format} parameter we use that. If there
      * isn't but there is a {@code Accept} header then we use that. If there
      * isn't then we use the {@code Content-Type} header which is required.
+     *
+     * Also validates certain parameter combinations and throws IllegalArgumentException if invalid
+     * combinations are detected.
      */
     public static MediaType getResponseMediaType(RestRequest request, EsqlQueryRequest esqlRequest) {
         var mediaType = request.hasParam(URL_PARAM_FORMAT) ? mediaTypeFromParams(request) : mediaTypeFromHeaders(request);
-        return validateColumnarRequest(esqlRequest.columnar(), mediaType, request);
+        validateColumnarRequest(esqlRequest.columnar(), mediaType);
+        validateIncludeCCSMetadata(esqlRequest.includeCCSMetadata(), mediaType);
+        return checkNonNullMediaType(mediaType, request);
     }
 
     private static MediaType mediaTypeFromHeaders(RestRequest request) {
@@ -53,7 +58,7 @@ public class EsqlMediaTypeParser {
         return MEDIA_TYPE_REGISTRY.queryParamToMediaType(request.param(URL_PARAM_FORMAT));
     }
 
-    private static MediaType validateColumnarRequest(boolean requestIsColumnar, MediaType fromMediaType, RestRequest request) {
+    private static void validateColumnarRequest(boolean requestIsColumnar, MediaType fromMediaType) {
         if (requestIsColumnar && fromMediaType instanceof TextFormat) {
             throw new IllegalArgumentException(
                 "Invalid use of [columnar] argument: cannot be used in combination with "
@@ -61,7 +66,16 @@ public class EsqlMediaTypeParser {
                     + " formats"
             );
         }
-        return checkNonNullMediaType(fromMediaType, request);
+    }
+
+    private static void validateIncludeCCSMetadata(boolean includeCCSMetadata, MediaType fromMediaType) {
+        if (includeCCSMetadata && fromMediaType instanceof TextFormat) {
+            throw new IllegalArgumentException(
+                "Invalid use of [include_ccs_metadata] argument: cannot be used in combination with "
+                    + Arrays.stream(TextFormat.values()).map(MediaType::queryParameter).toList()
+                    + " formats"
+            );
+        }
     }
 
     private static MediaType checkNonNullMediaType(MediaType mediaType, RestRequest request) {

+ 4 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -167,7 +167,10 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             System.nanoTime()
         );
         String sessionId = sessionID(task);
-        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias));
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(
+            clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias),
+            request.includeCCSMetadata()
+        );
         BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase = (physicalPlan, resultListener) -> computeService.execute(
             sessionId,
             (CancellableTask) task,

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -433,7 +433,7 @@ public class CsvTests extends ESTestCase {
 
         session.executeOptimizedPlan(
             new EsqlQueryRequest(),
-            new EsqlExecutionInfo(),
+            new EsqlExecutionInfo(randomBoolean()),
             runPhase(bigArrays, physicalOperationProviders),
             session.optimizedPlan(analyzed),
             listener.delegateFailureAndWrap(

+ 3 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

@@ -134,7 +134,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
     }
 
     EsqlExecutionInfo createExecutionInfo() {
-        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
         executionInfo.overallTook(new TimeValue(5000));
         executionInfo.swapCluster(
             "",
@@ -426,9 +426,9 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 }
             }
             if (clusterInfoMap.isEmpty()) {
-                return new EsqlExecutionInfo();
+                return new EsqlExecutionInfo(true);
             } else {
-                return new EsqlExecutionInfo(clusterInfoMap);
+                return new EsqlExecutionInfo(clusterInfoMap, true);
             }
         }
 

+ 3 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java

@@ -82,7 +82,7 @@ public class TextFormatterTests extends ESTestCase {
         null,
         randomBoolean(),
         randomBoolean(),
-        new EsqlExecutionInfo()
+        new EsqlExecutionInfo(randomBoolean())
     );
 
     TextFormatter formatter = new TextFormatter(esqlResponse);
@@ -157,7 +157,7 @@ public class TextFormatterTests extends ESTestCase {
             null,
             randomBoolean(),
             randomBoolean(),
-            new EsqlExecutionInfo()
+            new EsqlExecutionInfo(randomBoolean())
         );
 
         String[] result = getTextBodyContent(new TextFormatter(response).format(false)).split("\n");
@@ -198,7 +198,7 @@ public class TextFormatterTests extends ESTestCase {
                         null,
                         randomBoolean(),
                         randomBoolean(),
-                        new EsqlExecutionInfo()
+                        new EsqlExecutionInfo(randomBoolean())
                     )
                 ).format(false)
             )

+ 7 - 7
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java

@@ -125,7 +125,7 @@ public class ComputeListenerTests extends ESTestCase {
 
     public void testEmpty() {
         PlainActionFuture<ComputeResponse> results = new PlainActionFuture<>();
-        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(randomBoolean());
         try (
             ComputeListener ignored = ComputeListener.create(
                 RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
@@ -145,7 +145,7 @@ public class ComputeListenerTests extends ESTestCase {
     public void testCollectComputeResults() {
         PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
         List<DriverProfile> allProfiles = new ArrayList<>();
-        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(randomBoolean());
         try (
             ComputeListener computeListener = ComputeListener.create(
                 RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
@@ -194,7 +194,7 @@ public class ComputeListenerTests extends ESTestCase {
         PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
         List<DriverProfile> allProfiles = new ArrayList<>();
         String remoteAlias = "rc1";
-        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
         executionInfo.swapCluster(remoteAlias, (k, v) -> new EsqlExecutionInfo.Cluster(remoteAlias, "logs*", false));
         try (
             ComputeListener computeListener = ComputeListener.create(
@@ -248,7 +248,7 @@ public class ComputeListenerTests extends ESTestCase {
     public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() {
         PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
         List<DriverProfile> allProfiles = new ArrayList<>();
-        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
         String remoteAlias = "rc1";
         executionInfo.swapCluster(
             remoteAlias,
@@ -318,7 +318,7 @@ public class ComputeListenerTests extends ESTestCase {
     public void testAcquireComputeRunningOnQueryingClusterFillsInTookTime() {
         PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
         List<DriverProfile> allProfiles = new ArrayList<>();
-        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
         String localCluster = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
         // we need a remote cluster in the ExecutionInfo in order to simulate a CCS, since ExecutionInfo is only
         // fully filled in for cross-cluster searches
@@ -372,7 +372,7 @@ public class ComputeListenerTests extends ESTestCase {
         int failedTasks = between(1, 100);
         PlainActionFuture<ComputeResponse> rootListener = new PlainActionFuture<>();
         CancellableTask rootTask = newTask();
-        EsqlExecutionInfo execInfo = new EsqlExecutionInfo();
+        EsqlExecutionInfo execInfo = new EsqlExecutionInfo(randomBoolean());
         try (
             ComputeListener computeListener = ComputeListener.create(
                 RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
@@ -436,7 +436,7 @@ public class ComputeListenerTests extends ESTestCase {
             }
         };
         CountDownLatch latch = new CountDownLatch(1);
-        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(randomBoolean());
         try (
             ComputeListener computeListener = ComputeListener.create(
                 RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,

+ 38 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParserTests.java

@@ -80,6 +80,18 @@ public class EsqlMediaTypeParserTests extends ESTestCase {
         assertEquals(e.getMessage(), "Invalid use of [columnar] argument: cannot be used in combination with [txt, csv, tsv] formats");
     }
 
+    public void testIncludeCCSMetadataWithAcceptText() {
+        var accept = randomFrom("text/plain", "text/csv", "text/tab-separated-values");
+        IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> getResponseMediaType(reqWithAccept(accept), createTestInstance(false, true))
+        );
+        assertEquals(
+            "Invalid use of [include_ccs_metadata] argument: cannot be used in combination with [txt, csv, tsv] formats",
+            e.getMessage()
+        );
+    }
+
     public void testColumnarWithParamText() {
         IllegalArgumentException e = expectThrows(
             IllegalArgumentException.class,
@@ -88,6 +100,26 @@ public class EsqlMediaTypeParserTests extends ESTestCase {
         assertEquals(e.getMessage(), "Invalid use of [columnar] argument: cannot be used in combination with [txt, csv, tsv] formats");
     }
 
+    public void testIncludeCCSMetadataWithNonJSONMediaTypesInParams() {
+        {
+            RestRequest restRequest = reqWithParams(Map.of("format", randomFrom("txt", "csv", "tsv")));
+            IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> getResponseMediaType(restRequest, createTestInstance(false, true))
+            );
+            assertEquals(
+                "Invalid use of [include_ccs_metadata] argument: cannot be used in combination with [txt, csv, tsv] formats",
+                e.getMessage()
+            );
+        }
+        {
+            // check that no exception is thrown for the XContent types
+            RestRequest restRequest = reqWithParams(Map.of("format", randomFrom("SMILE", "YAML", "CBOR", "JSON")));
+            MediaType responseMediaType = getResponseMediaType(restRequest, createTestInstance(true, true));
+            assertNotNull(responseMediaType);
+        }
+    }
+
     public void testNoFormat() {
         IllegalArgumentException e = expectThrows(
             IllegalArgumentException.class,
@@ -113,4 +145,10 @@ public class EsqlMediaTypeParserTests extends ESTestCase {
         request.columnar(columnar);
         return request;
     }
+
+    protected EsqlQueryRequest createTestInstance(boolean columnar, boolean includeCCSMetadata) {
+        var request = createTestInstance(columnar);
+        request.includeCCSMetadata(includeCCSMetadata);
+        return request;
+    }
 }

+ 6 - 6
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java

@@ -30,7 +30,7 @@ public class EsqlSessionTests extends ESTestCase {
             final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
             final String remote1Alias = "remote1";
             final String remote2Alias = "remote2";
-            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
             executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
             executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
             executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true));
@@ -59,7 +59,7 @@ public class EsqlSessionTests extends ESTestCase {
             final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
             final String remote1Alias = "remote1";
             final String remote2Alias = "remote2";
-            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
             executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
             executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
             executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));
@@ -87,7 +87,7 @@ public class EsqlSessionTests extends ESTestCase {
             final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
             final String remote1Alias = "remote1";
             final String remote2Alias = "remote2";
-            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
             executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
             executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
             executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));
@@ -117,7 +117,7 @@ public class EsqlSessionTests extends ESTestCase {
             final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
             final String remote1Alias = "remote1";
             final String remote2Alias = "remote2";
-            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
             executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
             executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
             executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));
@@ -160,7 +160,7 @@ public class EsqlSessionTests extends ESTestCase {
             final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
             final String remote1Alias = "remote1";
             final String remote2Alias = "remote2";
-            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
             executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
             executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
             executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));
@@ -206,7 +206,7 @@ public class EsqlSessionTests extends ESTestCase {
             final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
             final String remote1Alias = "remote1";
             final String remote2Alias = "remote2";
-            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
+            EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
             executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
             executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
             executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java

@@ -120,7 +120,7 @@ public class PlanExecutorMetricsTests extends ESTestCase {
             randomAlphaOfLength(10),
             EsqlTestUtils.TEST_CFG,
             enrichResolver,
-            new EsqlExecutionInfo(),
+            new EsqlExecutionInfo(randomBoolean()),
             groupIndicesByCluster,
             runPhase,
             new ActionListener<>() {
@@ -149,7 +149,7 @@ public class PlanExecutorMetricsTests extends ESTestCase {
             randomAlphaOfLength(10),
             EsqlTestUtils.TEST_CFG,
             enrichResolver,
-            new EsqlExecutionInfo(),
+            new EsqlExecutionInfo(randomBoolean()),
             groupIndicesByCluster,
             runPhase,
             new ActionListener<>() {