Browse Source

ESQL: async search responses have CCS metadata while searches are running (#117265) (#117664)

ES|QL async search responses now include CCS metadata while the query is still running.
The CCS metadata will be present only if a remote cluster is queried and the user requested
it with the `include_ccs_metadata: true` setting on the original request to `POST /_query/async`.
The setting cannot be modified in the query to `GET /_query/async/:id`.

The core change is that the EsqlExecutionInfo object is set on the EsqlQueryTask, which is used
for async ES|QL queries, so that calls to `GET /_query/async/:id` have access to the same
EsqlExecutionInfo object that is being updated as the planning and query progress.

Secondly, the overall `took` time is now always present on ES|QL responses, even for
async-searches while the query is still running. The took time shows a "took-so-far" value
and will change upon refresh until the query has finished. This is present regardless of
the `include_ccs_metadata` setting.

Example response showing in progress state of the query:

```
GET _query/async/FlhaeTBxUU0yU2xhVzM2TlRLY3F1eXcceWlSWWZlRDhUVTJEUGFfZUROaDdtUTo0MDQwNA
```

```json
{
  "id": "FlhaeTBxUU0yU2xhVzM2TlRLY3F1eXcceWlSWWZlRDhUVTJEUGFfZUROaDdtUTo0MDQwNA==",
  "is_running": true,
  "took": 2032,
  "columns": [],
  "values": [],
  "_clusters": {
    "total": 3,
    "successful": 1,
    "running": 2,
    "skipped": 0,
    "partial": 0,
    "failed": 0,
    "details": {
      "(local)": {
        "status": "running",
        "indices": "web_traffic",
        "_shards": {
          "total": 2,
          "skipped": 0
        }
      },
      "remote1": {
        "status": "running",
        "indices": "web_traffic"
      },
      "remote2": {
        "status": "successful",
        "indices": "web_traffic",
        "took": 180,
        "_shards": {
          "total": 2,
          "successful": 2,
          "skipped": 0,
          "failed": 0
        }
      }
    }
  }
}
```
Michael Peterson 10 months ago
parent
commit
1a330e584a

+ 5 - 0
docs/changelog/117265.yaml

@@ -0,0 +1,5 @@
+pr: 117265
+summary: Async search responses have CCS metadata while searches are running
+area: ES|QL
+type: enhancement
+issues: []

+ 522 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java

@@ -0,0 +1,522 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.compute.operator.exchange.ExchangeService;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.mapper.OnScriptError;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.ScriptPlugin;
+import org.elasticsearch.script.LongFieldScript;
+import org.elasticsearch.script.ScriptContext;
+import org.elasticsearch.script.ScriptEngine;
+import org.elasticsearch.search.lookup.SearchLookup;
+import org.elasticsearch.test.AbstractMultiClustersTestCase;
+import org.elasticsearch.test.XContentTestUtils;
+import org.elasticsearch.transport.RemoteClusterAware;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
+import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
+import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.core.TimeValue.timeValueMillis;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+
+public class CrossClusterAsyncQueryIT extends AbstractMultiClustersTestCase {
+
+    private static final String REMOTE_CLUSTER_1 = "cluster-a";
+    private static final String REMOTE_CLUSTER_2 = "remote-b";
+    private static String LOCAL_INDEX = "logs-1";
+    private static String REMOTE_INDEX = "logs-2";
+    private static final String INDEX_WITH_RUNTIME_MAPPING = "blocking";
+
+    @Override
+    protected Collection<String> remoteClusterAlias() {
+        return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
+    }
+
+    @Override
+    protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
+        return Map.of(REMOTE_CLUSTER_1, randomBoolean(), REMOTE_CLUSTER_2, randomBoolean());
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
+        List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
+        plugins.add(EsqlPlugin.class);
+        plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action
+        plugins.add(InternalExchangePlugin.class);
+        plugins.add(PauseFieldPlugin.class);
+        return plugins;
+    }
+
+    public static class InternalExchangePlugin extends Plugin {
+        @Override
+        public List<Setting<?>> getSettings() {
+            return List.of(
+                Setting.timeSetting(
+                    ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING,
+                    TimeValue.timeValueSeconds(30),
+                    Setting.Property.NodeScope
+                )
+            );
+        }
+    }
+
+    @Before
+    public void resetPlugin() {
+        PauseFieldPlugin.allowEmitting = new CountDownLatch(1);
+        PauseFieldPlugin.startEmitting = new CountDownLatch(1);
+    }
+
+    public static class PauseFieldPlugin extends Plugin implements ScriptPlugin {
+        public static CountDownLatch startEmitting = new CountDownLatch(1);
+        public static CountDownLatch allowEmitting = new CountDownLatch(1);
+
+        @Override
+        public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
+            return new ScriptEngine() {
+                @Override
+
+                public String getType() {
+                    return "pause";
+                }
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public <FactoryType> FactoryType compile(
+                    String name,
+                    String code,
+                    ScriptContext<FactoryType> context,
+                    Map<String, String> params
+                ) {
+                    if (context == LongFieldScript.CONTEXT) {
+                        return (FactoryType) new LongFieldScript.Factory() {
+                            @Override
+                            public LongFieldScript.LeafFactory newFactory(
+                                String fieldName,
+                                Map<String, Object> params,
+                                SearchLookup searchLookup,
+                                OnScriptError onScriptError
+                            ) {
+                                return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
+                                    @Override
+                                    public void execute() {
+                                        startEmitting.countDown();
+                                        try {
+                                            assertTrue(allowEmitting.await(30, TimeUnit.SECONDS));
+                                        } catch (InterruptedException e) {
+                                            throw new AssertionError(e);
+                                        }
+                                        emit(1);
+                                    }
+                                };
+                            }
+                        };
+                    }
+                    throw new IllegalStateException("unsupported type " + context);
+                }
+
+                @Override
+                public Set<ScriptContext<?>> getSupportedContexts() {
+                    return Set.of(LongFieldScript.CONTEXT);
+                }
+            };
+        }
+    }
+
+    /**
+     * Includes testing for CCS metadata in the GET /_query/async/:id response while the search is still running
+     */
+    public void testSuccessfulPathways() throws Exception {
+        Map<String, Object> testClusterInfo = setupClusters(3);
+        int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
+        int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards");
+        int remote2NumShards = (Integer) testClusterInfo.get("remote2.blocking_index.num_shards");
+
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
+        AtomicReference<String> asyncExecutionId = new AtomicReference<>();
+
+        String q = "FROM logs-*,cluster-a:logs-*,remote-b:blocking | STATS total=sum(const) | LIMIT 10";
+        try (EsqlQueryResponse resp = runAsyncQuery(q, requestIncludeMeta, null, TimeValue.timeValueMillis(100))) {
+            assertTrue(resp.isRunning());
+            assertNotNull("async execution id is null", resp.asyncExecutionId());
+            asyncExecutionId.set(resp.asyncExecutionId().get());
+            // executionInfo may or may not be set on the initial response when there is a relatively low wait_for_completion_timeout
+            // so we do not check for it here
+        }
+
+        // wait until we know that the query against 'remote-b:blocking' has started
+        PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
+
+        // wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
+        assertBusy(() -> {
+            try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) {
+                EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
+                assertNotNull(executionInfo);
+                EsqlExecutionInfo.Cluster clusterA = executionInfo.getCluster("cluster-a");
+                assertThat(clusterA.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)));
+            }
+        });
+
+        /* at this point:
+         *  the query against cluster-a should be finished
+         *  the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
+         *  the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
+         */
+        try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) {
+            EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
+            assertThat(asyncResponse.isRunning(), is(true));
+            assertThat(
+                executionInfo.clusterAliases(),
+                equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY))
+            );
+            assertThat(executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING), equalTo(2));
+            assertThat(executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL), equalTo(1));
+
+            EsqlExecutionInfo.Cluster clusterA = executionInfo.getCluster(REMOTE_CLUSTER_1);
+            assertThat(clusterA.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(clusterA.getTotalShards(), greaterThanOrEqualTo(1));
+            assertThat(clusterA.getSuccessfulShards(), equalTo(clusterA.getTotalShards()));
+            assertThat(clusterA.getSkippedShards(), equalTo(0));
+            assertThat(clusterA.getFailedShards(), equalTo(0));
+            assertThat(clusterA.getFailures().size(), equalTo(0));
+            assertThat(clusterA.getTook().millis(), greaterThanOrEqualTo(0L));
+
+            EsqlExecutionInfo.Cluster local = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
+            // should still be RUNNING since the local cluster has to do a STATS on the coordinator, waiting on remoteB
+            assertThat(local.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
+            assertThat(clusterA.getTotalShards(), greaterThanOrEqualTo(1));
+
+            EsqlExecutionInfo.Cluster remoteB = executionInfo.getCluster(REMOTE_CLUSTER_2);
+            // should still be RUNNING since we haven't released the countdown lock to proceed
+            assertThat(remoteB.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
+            assertNull(remoteB.getSuccessfulShards());  // should not be filled in until query is finished
+
+            assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
+        }
+
+        // allow remoteB query to proceed
+        PauseFieldPlugin.allowEmitting.countDown();
+
+        // wait until both remoteB and local queries have finished
+        assertBusy(() -> {
+            try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) {
+                EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
+                assertNotNull(executionInfo);
+                EsqlExecutionInfo.Cluster remoteB = executionInfo.getCluster(REMOTE_CLUSTER_2);
+                assertThat(remoteB.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)));
+                EsqlExecutionInfo.Cluster local = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
+                assertThat(local.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)));
+                assertThat(asyncResponse.isRunning(), is(false));
+            }
+        });
+
+        try (EsqlQueryResponse asyncResponse = getAsyncResponse(asyncExecutionId.get())) {
+            EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo();
+            assertNotNull(executionInfo);
+            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(1L));
+
+            EsqlExecutionInfo.Cluster clusterA = executionInfo.getCluster(REMOTE_CLUSTER_1);
+            assertThat(clusterA.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(clusterA.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(clusterA.getTotalShards(), equalTo(remote1NumShards));
+            assertThat(clusterA.getSuccessfulShards(), equalTo(remote1NumShards));
+            assertThat(clusterA.getSkippedShards(), equalTo(0));
+            assertThat(clusterA.getFailedShards(), equalTo(0));
+            assertThat(clusterA.getFailures().size(), equalTo(0));
+
+            EsqlExecutionInfo.Cluster remoteB = executionInfo.getCluster(REMOTE_CLUSTER_2);
+            assertThat(remoteB.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteB.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(remoteB.getTotalShards(), equalTo(remote2NumShards));
+            assertThat(remoteB.getSuccessfulShards(), equalTo(remote2NumShards));
+            assertThat(remoteB.getSkippedShards(), equalTo(0));
+            assertThat(remoteB.getFailedShards(), equalTo(0));
+            assertThat(remoteB.getFailures().size(), equalTo(0));
+
+            EsqlExecutionInfo.Cluster local = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
+            assertThat(local.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(local.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(local.getTotalShards(), equalTo(localNumShards));
+            assertThat(local.getSuccessfulShards(), equalTo(localNumShards));
+            assertThat(local.getSkippedShards(), equalTo(0));
+            assertThat(local.getFailedShards(), equalTo(0));
+            assertThat(local.getFailures().size(), equalTo(0));
+        } finally {
+            AcknowledgedResponse acknowledgedResponse = deleteAsyncId(asyncExecutionId.get());
+            assertThat(acknowledgedResponse.isAcknowledged(), is(true));
+        }
+    }
+
+    public void testAsyncQueriesWithLimit0() throws IOException {
+        setupClusters(3);
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
+        final TimeValue waitForCompletion = TimeValue.timeValueNanos(randomFrom(1L, Long.MAX_VALUE));
+        String asyncExecutionId = null;
+        try (EsqlQueryResponse resp = runAsyncQuery("FROM logs*,*:logs* | LIMIT 0", requestIncludeMeta, null, waitForCompletion)) {
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            if (resp.isRunning()) {
+                asyncExecutionId = resp.asyncExecutionId().get();
+                assertThat(resp.columns().size(), equalTo(0));
+                assertThat(resp.values().hasNext(), is(false));  // values should be empty list
+
+            } else {
+                assertThat(resp.columns().size(), equalTo(4));
+                assertThat(resp.columns().contains(new ColumnInfoImpl("const", "long")), is(true));
+                assertThat(resp.columns().contains(new ColumnInfoImpl("id", "keyword")), is(true));
+                assertThat(resp.columns().contains(new ColumnInfoImpl("tag", "keyword")), is(true));
+                assertThat(resp.columns().contains(new ColumnInfoImpl("v", "long")), is(true));
+                assertThat(resp.values().hasNext(), is(false));  // values should be empty list
+
+                assertNotNull(executionInfo);
+                assertThat(executionInfo.isCrossClusterSearch(), is(true));
+                long overallTookMillis = executionInfo.overallTook().millis();
+                assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+                assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
+
+                EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
+                assertThat(remoteCluster.getIndexExpression(), equalTo("logs*"));
+                assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+                assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+                assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
+                assertThat(remoteCluster.getTotalShards(), equalTo(0));
+                assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
+                assertThat(remoteCluster.getSkippedShards(), equalTo(0));
+                assertThat(remoteCluster.getFailedShards(), equalTo(0));
+
+                EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
+                assertThat(remote2Cluster.getIndexExpression(), equalTo("logs*"));
+                assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+                assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L));
+                assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
+                assertThat(remote2Cluster.getTotalShards(), equalTo(0));
+                assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0));
+                assertThat(remote2Cluster.getSkippedShards(), equalTo(0));
+                assertThat(remote2Cluster.getFailedShards(), equalTo(0));
+
+                EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
+                assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
+                assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+                assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+                assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
+                assertThat(remote2Cluster.getTotalShards(), equalTo(0));
+                assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0));
+                assertThat(remote2Cluster.getSkippedShards(), equalTo(0));
+                assertThat(remote2Cluster.getFailedShards(), equalTo(0));
+
+                assertClusterMetadataInResponse(resp, responseExpectMeta, 3);
+            }
+        } finally {
+            if (asyncExecutionId != null) {
+                AcknowledgedResponse acknowledgedResponse = deleteAsyncId(asyncExecutionId);
+                assertThat(acknowledgedResponse.isAcknowledged(), is(true));
+            }
+        }
+    }
+
+    protected EsqlQueryResponse runAsyncQuery(String query, Boolean ccsMetadata, QueryBuilder filter, TimeValue waitCompletionTime) {
+        EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest();
+        request.query(query);
+        request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
+        request.profile(randomInt(5) == 2);
+        request.columnar(randomBoolean());
+        if (ccsMetadata != null) {
+            request.includeCCSMetadata(ccsMetadata);
+        }
+        request.waitForCompletionTimeout(waitCompletionTime);
+        request.keepOnCompletion(false);
+        if (filter != null) {
+            request.filter(filter);
+        }
+        return runAsyncQuery(request);
+    }
+
+    protected EsqlQueryResponse runAsyncQuery(EsqlQueryRequest request) {
+        try {
+            return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
+        } catch (ElasticsearchTimeoutException e) {
+            throw new AssertionError("timeout waiting for query response", e);
+        }
+    }
+
+    AcknowledgedResponse deleteAsyncId(String id) {
+        try {
+            DeleteAsyncResultRequest request = new DeleteAsyncResultRequest(id);
+            return client().execute(TransportDeleteAsyncResultAction.TYPE, request).actionGet(30, TimeUnit.SECONDS);
+        } catch (ElasticsearchTimeoutException e) {
+            throw new AssertionError("timeout waiting for DELETE response", e);
+        }
+    }
+
+    EsqlQueryResponse getAsyncResponse(String id) {
+        try {
+            var getResultsRequest = new GetAsyncResultRequest(id).setWaitForCompletionTimeout(timeValueMillis(1));
+            return client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).actionGet(30, TimeUnit.SECONDS);
+        } catch (ElasticsearchTimeoutException e) {
+            throw new AssertionError("timeout waiting for GET async result", e);
+        }
+    }
+
+    private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta, int numClusters) {
+        try {
+            final Map<String, Object> esqlResponseAsMap = XContentTestUtils.convertToMap(resp);
+            final Object clusters = esqlResponseAsMap.get("_clusters");
+            if (responseExpectMeta) {
+                assertNotNull(clusters);
+                // test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response)
+                @SuppressWarnings("unchecked")
+                Map<String, Object> inner = (Map<String, Object>) clusters;
+                assertTrue(inner.containsKey("total"));
+                assertThat((int) inner.get("total"), equalTo(numClusters));
+                assertTrue(inner.containsKey("details"));
+            } else {
+                assertNull(clusters);
+            }
+        } catch (IOException e) {
+            fail("Could not convert ESQLQueryResponse to Map: " + e);
+        }
+    }
+
+    /**
+     * v1: value to send to runQuery (can be null; null means use default value)
+     * v2: whether to expect CCS Metadata in the response (cannot be null)
+     * @return
+     */
+    public static Tuple<Boolean, Boolean> randomIncludeCCSMetadata() {
+        return switch (randomIntBetween(1, 3)) {
+            case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE);
+            case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE);
+            case 3 -> new Tuple<>(null, Boolean.FALSE);
+            default -> throw new AssertionError("should not get here");
+        };
+    }
+
+    Map<String, Object> setupClusters(int numClusters) throws IOException {
+        assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters;
+        int numShardsLocal = randomIntBetween(1, 5);
+        populateLocalIndices(LOCAL_INDEX, numShardsLocal);
+
+        int numShardsRemote = randomIntBetween(1, 5);
+        populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);
+
+        Map<String, Object> clusterInfo = new HashMap<>();
+        clusterInfo.put("local.num_shards", numShardsLocal);
+        clusterInfo.put("local.index", LOCAL_INDEX);
+        clusterInfo.put("remote1.num_shards", numShardsRemote);
+        clusterInfo.put("remote1.index", REMOTE_INDEX);
+
+        if (numClusters == 3) {
+            int numShardsRemote2 = randomIntBetween(1, 5);
+            populateRemoteIndices(REMOTE_CLUSTER_2, REMOTE_INDEX, numShardsRemote2);
+            populateRemoteIndicesWithRuntimeMapping(REMOTE_CLUSTER_2);
+            clusterInfo.put("remote2.index", REMOTE_INDEX);
+            clusterInfo.put("remote2.num_shards", numShardsRemote2);
+            clusterInfo.put("remote2.blocking_index", INDEX_WITH_RUNTIME_MAPPING);
+            clusterInfo.put("remote2.blocking_index.num_shards", 1);
+        }
+
+        String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER_1);
+        Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER_1).clusterService().getClusterSettings().get(skipUnavailableKey);
+        boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
+            .getClusterSettings()
+            .get(skipUnavailableSetting);
+        clusterInfo.put("remote.skip_unavailable", skipUnavailable);
+
+        return clusterInfo;
+    }
+
+    void populateLocalIndices(String indexName, int numShards) {
+        Client localClient = client(LOCAL_CLUSTER);
+        assertAcked(
+            localClient.admin()
+                .indices()
+                .prepareCreate(indexName)
+                .setSettings(Settings.builder().put("index.number_of_shards", numShards))
+                .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long")
+        );
+        for (int i = 0; i < 10; i++) {
+            localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get();
+        }
+        localClient.admin().indices().prepareRefresh(indexName).get();
+    }
+
+    void populateRemoteIndicesWithRuntimeMapping(String clusterAlias) throws IOException {
+        XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
+        mapping.startObject("runtime");
+        {
+            mapping.startObject("const");
+            {
+                mapping.field("type", "long");
+                mapping.startObject("script").field("source", "").field("lang", "pause").endObject();
+            }
+            mapping.endObject();
+        }
+        mapping.endObject();
+        mapping.endObject();
+        client(clusterAlias).admin().indices().prepareCreate(INDEX_WITH_RUNTIME_MAPPING).setMapping(mapping).get();
+        BulkRequestBuilder bulk = client(clusterAlias).prepareBulk(INDEX_WITH_RUNTIME_MAPPING)
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        for (int i = 0; i < 10; i++) {
+            bulk.add(new IndexRequest().source("foo", i));
+        }
+        bulk.get();
+    }
+
+    void populateRemoteIndices(String clusterAlias, String indexName, int numShards) throws IOException {
+        Client remoteClient = client(clusterAlias);
+        assertAcked(
+            remoteClient.admin()
+                .indices()
+                .prepareCreate(indexName)
+                .setSettings(Settings.builder().put("index.number_of_shards", numShards))
+                .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
+        );
+        for (int i = 0; i < 10; i++) {
+            remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get();
+        }
+        remoteClient.admin().indices().prepareRefresh(indexName).get();
+    }
+}

+ 4 - 5
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

@@ -61,6 +61,10 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
 public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
     private static final String REMOTE_CLUSTER_1 = "cluster-a";
     private static final String REMOTE_CLUSTER_2 = "remote-b";
+    private static String LOCAL_INDEX = "logs-1";
+    private static String IDX_ALIAS = "alias1";
+    private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
+    private static String REMOTE_INDEX = "logs-2";
 
     @Override
     protected Collection<String> remoteClusterAlias() {
@@ -1278,11 +1282,6 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         return setupClusters(2);
     }
 
-    private static String LOCAL_INDEX = "logs-1";
-    private static String IDX_ALIAS = "alias1";
-    private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
-    private static String REMOTE_INDEX = "logs-2";
-
     Map<String, Object> setupClusters(int numClusters) {
         assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters;
         int numShardsLocal = randomIntBetween(1, 5);

+ 12 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

@@ -169,6 +169,17 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
         return overallTook;
     }
 
+    /**
+     * How much time the query took since starting.
+     */
+    public TimeValue tookSoFar() {
+        if (relativeStartNanos == null) {
+            return new TimeValue(0);
+        } else {
+            return new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
+        }
+    }
+
     public Set<String> clusterAliases() {
         return clusterInfo.keySet();
     }
@@ -478,7 +489,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
             {
                 builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString());
                 builder.field(INDICES_FIELD.getPreferredName(), indexExpression);
-                if (took != null) {
+                if (took != null && status != Status.RUNNING) {
                     builder.field(TOOK.getPreferredName(), took.millis());
                 }
                 if (totalShards != null) {

+ 5 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

@@ -196,8 +196,11 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
                 }
                 b.field("is_running", isRunning);
             }
-            if (executionInfo != null && executionInfo.overallTook() != null) {
-                b.field("took", executionInfo.overallTook().millis());
+            if (executionInfo != null) {
+                long tookInMillis = executionInfo.overallTook() == null
+                    ? executionInfo.tookSoFar().millis()
+                    : executionInfo.overallTook().millis();
+                b.field("took", tookInMillis);
             }
             if (dropNullColumns) {
                 b.append(ResponseXContentUtils.allColumns(columns, "all_columns"))

+ 12 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java

@@ -17,6 +17,8 @@ import java.util.Map;
 
 public class EsqlQueryTask extends StoredAsyncTask<EsqlQueryResponse> {
 
+    private EsqlExecutionInfo executionInfo;
+
     public EsqlQueryTask(
         long id,
         String type,
@@ -29,10 +31,19 @@ public class EsqlQueryTask extends StoredAsyncTask<EsqlQueryResponse> {
         TimeValue keepAlive
     ) {
         super(id, type, action, description, parentTaskId, headers, originHeaders, asyncExecutionId, keepAlive);
+        this.executionInfo = null;
+    }
+
+    public void setExecutionInfo(EsqlExecutionInfo executionInfo) {
+        this.executionInfo = executionInfo;
+    }
+
+    public EsqlExecutionInfo executionInfo() {
+        return executionInfo;
     }
 
     @Override
     public EsqlQueryResponse getCurrentResult() {
-        return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, null);
+        return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, executionInfo);
     }
 }

+ 22 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

@@ -112,6 +112,7 @@ final class ComputeListener implements Releasable {
             if (runningOnRemoteCluster()) {
                 // for remote executions - this ComputeResponse is created on the remote cluster/node and will be serialized and
                 // received by the acquireCompute method callback on the coordinating cluster
+                setFinalStatusAndShardCounts(clusterAlias, executionInfo);
                 EsqlExecutionInfo.Cluster cluster = esqlExecutionInfo.getCluster(clusterAlias);
                 result = new ComputeResponse(
                     collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList(),
@@ -126,19 +127,33 @@ final class ComputeListener implements Releasable {
                 if (coordinatingClusterIsSearchedInCCS()) {
                     // if not already marked as SKIPPED, mark the local cluster as finished once the coordinator and all
                     // data nodes have finished processing
-                    executionInfo.swapCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> {
-                        if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
-                            return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
-                        } else {
-                            return v;
-                        }
-                    });
+                    setFinalStatusAndShardCounts(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, executionInfo);
                 }
             }
             delegate.onResponse(result);
         }, e -> delegate.onFailure(failureCollector.getFailure())));
     }
 
+    private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) {
+        executionInfo.swapCluster(clusterAlias, (k, v) -> {
+            // TODO: once PARTIAL status is supported (partial results work to come), modify this code as needed
+            if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
+                assert v.getTotalShards() != null && v.getSkippedShards() != null : "Null total or skipped shard count: " + v;
+                return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
+                    /*
+                     * Total and skipped shard counts are set early in execution (after can-match).
+                     * Until ES|QL supports shard-level partial results, we just set all non-skipped shards
+                     * as successful and none are failed.
+                     */
+                    .setSuccessfulShards(v.getTotalShards())
+                    .setFailedShards(0)
+                    .build();
+            } else {
+                return v;
+            }
+        });
+    }
+
     /**
      * @return true if the "local" querying/coordinator cluster is being searched in a cross-cluster search
      */

+ 19 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -178,6 +178,7 @@ public class ComputeService {
                 null
             );
             String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
+            updateShardCountForCoordinatorOnlyQuery(execInfo);
             try (var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> {
                 updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo);
                 return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo);
@@ -260,6 +261,22 @@ public class ComputeService {
         }
     }
 
+    // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
+    private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
+        if (execInfo.isCrossClusterSearch()) {
+            for (String clusterAlias : execInfo.clusterAliases()) {
+                execInfo.swapCluster(
+                    clusterAlias,
+                    (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(0)
+                        .setSuccessfulShards(0)
+                        .setSkippedShards(0)
+                        .setFailedShards(0)
+                        .build()
+                );
+            }
+        }
+    }
+
     // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
     private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
         execInfo.markEndQuery();  // TODO: revisit this time recording model as part of INLINESTATS improvements
@@ -267,11 +284,7 @@ public class ComputeService {
             assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null";
             for (String clusterAlias : execInfo.clusterAliases()) {
                 execInfo.swapCluster(clusterAlias, (k, v) -> {
-                    var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook())
-                        .setTotalShards(0)
-                        .setSuccessfulShards(0)
-                        .setSkippedShards(0)
-                        .setFailedShards(0);
+                    var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook());
                     if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
                         builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
                     }
@@ -324,9 +337,8 @@ public class ComputeService {
                 executionInfo.swapCluster(
                     clusterAlias,
                     (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(dataNodeResult.totalShards())
-                        .setSuccessfulShards(dataNodeResult.totalShards())
+                        // do not set successful or failed shard count here - do it when search is done
                         .setSkippedShards(dataNodeResult.skippedShards())
-                        .setFailedShards(0)
                         .build()
                 );
 

+ 18 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -151,6 +151,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
 
     @Override
     public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
+        // set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
+        task.setExecutionInfo(createEsqlExecutionInfo(request));
         ActionListener.run(listener, l -> innerExecute(task, request, l));
     }
 
@@ -170,10 +172,9 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             System.nanoTime()
         );
         String sessionId = sessionID(task);
-        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(
-            clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias),
-            request.includeCCSMetadata()
-        );
+        // async-query uses EsqlQueryTask, so pull the EsqlExecutionInfo out of the task
+        // sync query uses CancellableTask which does not have EsqlExecutionInfo, so create one
+        EsqlExecutionInfo executionInfo = getOrCreateExecutionInfo(task, request);
         PlanRunner planRunner = (plan, resultListener) -> computeService.execute(
             sessionId,
             (CancellableTask) task,
@@ -194,6 +195,18 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
         );
     }
 
+    private EsqlExecutionInfo getOrCreateExecutionInfo(Task task, EsqlQueryRequest request) {
+        if (task instanceof EsqlQueryTask esqlQueryTask && esqlQueryTask.executionInfo() != null) {
+            return esqlQueryTask.executionInfo();
+        } else {
+            return createEsqlExecutionInfo(request);
+        }
+    }
+
+    private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
+        return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias), request.includeCCSMetadata());
+    }
+
     private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
         List<ColumnInfoImpl> columns = result.schema().stream().map(c -> new ColumnInfoImpl(c.name(), c.dataType().outputType())).toList();
         EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
@@ -269,7 +282,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             asyncExecutionId,
             true, // is_running
             true, // isAsync
-            null
+            task.executionInfo()
         );
     }
 

+ 1 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -145,6 +145,7 @@ public class EsqlSession {
      * Execute an ESQL request.
      */
     public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener<Result> listener) {
+        assert executionInfo != null : "Null EsqlExecutionInfo";
         LOGGER.debug("ESQL query:\n{}", request.query());
         analyzedPlan(
             parse(request.query(), request.params()),

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

@@ -519,14 +519,15 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
     }
 
     public void testChunkResponseSizeColumnar() {
-        int sizeClusterDetails = 14;
         try (EsqlQueryResponse resp = randomResponse(true, null)) {
+            int sizeClusterDetails = 14;
             int columnCount = resp.pages().get(0).getBlockCount();
             int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
             assertChunkCount(resp, r -> 5 + sizeClusterDetails + bodySize);
         }
 
         try (EsqlQueryResponse resp = randomResponseAsync(true, null, true)) {
+            int sizeClusterDetails = resp.isRunning() ? 13 : 14;  // overall took time not present when is_running=true
             int columnCount = resp.pages().get(0).getBlockCount();
             int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
             assertChunkCount(resp, r -> 7 + sizeClusterDetails + bodySize); // is_running

+ 12 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java

@@ -353,10 +353,7 @@ public class ComputeListenerTests extends ESTestCase {
         assertThat(response.getTook().millis(), greaterThanOrEqualTo(0L));
         assertThat(executionInfo.getCluster(remoteAlias).getTook().millis(), greaterThanOrEqualTo(0L));
         assertThat(executionInfo.getCluster(remoteAlias).getTook(), equalTo(response.getTook()));
-
-        // the status in the (remote) executionInfo will still be RUNNING, since the SUCCESSFUL status gets set on the querying
-        // cluster executionInfo in the acquireCompute CCS listener, NOT present in this test - see testCollectComputeResultsInCCSListener
-        assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
+        assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
 
         Mockito.verifyNoInteractions(transportService.getTaskManager());
     }
@@ -376,6 +373,17 @@ public class ComputeListenerTests extends ESTestCase {
         // fully filled in for cross-cluster searches
         executionInfo.swapCluster(localCluster, (k, v) -> new EsqlExecutionInfo.Cluster(localCluster, "logs*", false));
         executionInfo.swapCluster("my_remote", (k, v) -> new EsqlExecutionInfo.Cluster("my_remote", "my_remote:logs*", false));
+
+        // before acquire-compute, can-match (SearchShards) runs filling in total shards and skipped shards, so simulate that here
+        executionInfo.swapCluster(
+            localCluster,
+            (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(10).setSkippedShards(1).build()
+        );
+        executionInfo.swapCluster(
+            "my_remote",
+            (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(10).setSkippedShards(1).build()
+        );
+
         try (
             ComputeListener computeListener = ComputeListener.create(
                 // whereRunning=localCluster simulates running on the querying cluster