浏览代码

Backport bugfix esql per cluster took time (#115168)

* ES|QL per-cluster took time is incorrectly calculated and causes fatal exceptions (#115017)

The model for calculating per-cluster `took` times from remote clusters in https://github.com/elastic/elasticsearch/pull/112595 was flawed. 
It attempted to use Java's System.nanoTime between the local and remote clusters,
which is not safe. This results in per-cluster took times that have arbitrary (invalid) values
including negative values which cause exceptions to be thrown by the `TimeValue` constructor.
(Note: the overall took time calculation was done correctly, so it was the remote per-cluster
took times that were flawed.)

In this PR, I've done a redesign to address this. A key decision of this re-design was whether
to always calculate took times only on the querying cluster (bypassing this whole problem) or
to continue to allow the remote clusters to calculate their own took times for the remote processing
and report that back to the querying cluster via the `ComputeResponse`.

I decided in favor of having remote clusters compute their own took times for the remote processing
and to additionally track "planning" time (encompassing field-caps and policy enrich remote calls), so
that total per-cluster took time is a combination of the two. In _search, remote cluster took times are
calculated entirely on the remote cluster, so network time is not included in the per-cluster took times.
This has been helpful in diagnosing issues on user environments because if you see an overall took time
that is significantly larger than the per cluster took times, that may indicate a network issue, which has
happened in diagnosing cross-cluster issues in _search.

I moved relative time tracking into `EsqlExecutionInfo`. 

The "planning time" marker is currently only used in cross-cluster searches, so it will conflict with
the INLINESTATS 2 phase model (where planning can be done twice). We will improve this design
to handle a 2 phase model in a later ticket, as part of the INLINESTATS work. I tested the 
current overall took time calculation model with local-only INLINESTATS queries and they
work correctly.

I also fixed another secondary bug in this PR. If the remote cluster is an older version that does
not return took time (and shard info) in the ComputeResponse, the per-cluster took time is then
calculated on the querying cluster as a fallback.

Finally, I fixed some minor inconsistencies about whether the `_shards` info is shown in the response.
The rule now is that `_shards` is always shown with 0 shards for SKIPPED clusters, with actual
counts for SUCCESSFUL clusters and for remotes running an older version that doesn't report
shard stats, the `_shards` field is left out of the XContent response.

Fixes https://github.com/elastic/elasticsearch/issues/115022

* Update execution info at end of planning before kicking off execution phase (#115127)

The revised took time model bug fix #115017
introduced a new bug that allows a race condition between updating the execution info with
"end of planning" timestamp and using that timestamp during execution.

This one line fix reverses the order to ensure the planning phase execution update occurs
before starting the ESQL query execution phase.
Michael Peterson 1 年之前
父节点
当前提交
ade7f7c071

+ 48 - 26
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

@@ -97,7 +97,8 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
-            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            long overallTookMillis = executionInfo.overallTook().millis();
+            assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
             assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
@@ -106,6 +107,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
             assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(remoteCluster.getTotalShards(), equalTo(remoteNumShards));
             assertThat(remoteCluster.getSuccessfulShards(), equalTo(remoteNumShards));
             assertThat(remoteCluster.getSkippedShards(), equalTo(0));
@@ -115,6 +117,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
             assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(localCluster.getTotalShards(), equalTo(localNumShards));
             assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards));
             assertThat(localCluster.getSkippedShards(), equalTo(0));
@@ -133,7 +136,8 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
-            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            long overallTookMillis = executionInfo.overallTook().millis();
+            assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
             assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
@@ -142,6 +146,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
             assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(remoteCluster.getTotalShards(), equalTo(remoteNumShards));
             assertThat(remoteCluster.getSuccessfulShards(), equalTo(remoteNumShards));
             assertThat(remoteCluster.getSkippedShards(), equalTo(0));
@@ -151,6 +156,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
             assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(localCluster.getTotalShards(), equalTo(localNumShards));
             assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards));
             assertThat(localCluster.getSkippedShards(), equalTo(0));
@@ -180,7 +186,8 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
 
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
-            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            long overallTookMillis = executionInfo.overallTook().millis();
+            assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
             assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
@@ -189,6 +196,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(remoteCluster.getIndexExpression(), equalTo("no_such_index"));
             assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
             assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(remoteCluster.getTotalShards(), equalTo(0));  // 0 since no matching index, thus no shards to search
             assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
             assertThat(remoteCluster.getSkippedShards(), equalTo(0));
@@ -198,6 +206,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
             assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(localCluster.getTotalShards(), equalTo(localNumShards));
             assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards));
             assertThat(localCluster.getSkippedShards(), equalTo(0));
@@ -219,7 +228,8 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
-            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            long overallTookMillis = executionInfo.overallTook().millis();
+            assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
             assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
@@ -228,6 +238,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
             assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(remoteCluster.getTotalShards(), equalTo(remoteNumShards));
             assertThat(remoteCluster.getSuccessfulShards(), equalTo(remoteNumShards));
             assertThat(remoteCluster.getSkippedShards(), equalTo(0));
@@ -235,8 +246,9 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
 
             EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
             assertThat(localCluster.getIndexExpression(), equalTo("no_such_index"));
-            assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
             assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(localCluster.getTotalShards(), equalTo(0));
             assertThat(localCluster.getSuccessfulShards(), equalTo(0));
             assertThat(localCluster.getSkippedShards(), equalTo(0));
@@ -258,7 +270,8 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
-            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            long overallTookMillis = executionInfo.overallTook().millis();
+            assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
             assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
@@ -267,6 +280,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(remoteCluster.getIndexExpression(), equalTo("no_such_index1,no_such_index2"));
             assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
             assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(remoteCluster.getTotalShards(), equalTo(0));
             assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
             assertThat(remoteCluster.getSkippedShards(), equalTo(0));
@@ -276,6 +290,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(localCluster.getIndexExpression(), equalTo("no_such_index*,logs-1"));
             assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(localCluster.getTotalShards(), equalTo(localNumShards));
             assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards));
             assertThat(localCluster.getSkippedShards(), equalTo(0));
@@ -291,7 +306,8 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
 
             assertNotNull(executionInfo);
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
-            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            long overallTookMillis = executionInfo.overallTook().millis();
+            assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
             assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
@@ -300,6 +316,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(remoteCluster.getIndexExpression(), equalTo("no_such_index*"));
             assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
             assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(remoteCluster.getTotalShards(), equalTo(0));
             assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
             assertThat(remoteCluster.getSkippedShards(), equalTo(0));
@@ -309,6 +326,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
             assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(localCluster.getTotalShards(), equalTo(localNumShards));
             assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards));
             assertThat(localCluster.getSkippedShards(), equalTo(0));
@@ -414,20 +432,20 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
             assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
-            assertNull(remoteCluster.getTotalShards());
-            assertNull(remoteCluster.getSuccessfulShards());
-            assertNull(remoteCluster.getSkippedShards());
-            assertNull(remoteCluster.getFailedShards());
+            assertThat(remoteCluster.getTotalShards(), equalTo(0));
+            assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
+            assertThat(remoteCluster.getSkippedShards(), equalTo(0));
+            assertThat(remoteCluster.getFailedShards(), equalTo(0));
 
             EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
             assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
             assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
             assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
-            assertNull(localCluster.getTotalShards());
-            assertNull(localCluster.getSuccessfulShards());
-            assertNull(localCluster.getSkippedShards());
-            assertNull(localCluster.getFailedShards());
+            assertThat(remoteCluster.getTotalShards(), equalTo(0));
+            assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
+            assertThat(remoteCluster.getSkippedShards(), equalTo(0));
+            assertThat(remoteCluster.getFailedShards(), equalTo(0));
         }
 
         try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:nomatch* | LIMIT 0", requestIncludeMeta)) {
@@ -442,7 +460,8 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
             assertThat(remoteCluster.getIndexExpression(), equalTo("nomatch*"));
             assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
-            assertThat(remoteCluster.getTook().millis(), equalTo(0L));
+            assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
             assertThat(remoteCluster.getTotalShards(), equalTo(0));
             assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
             assertThat(remoteCluster.getSkippedShards(), equalTo(0));
@@ -453,10 +472,10 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
             assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
-            assertNull(localCluster.getTotalShards());
-            assertNull(localCluster.getSuccessfulShards());
-            assertNull(localCluster.getSkippedShards());
-            assertNull(localCluster.getFailedShards());
+            assertThat(localCluster.getTotalShards(), equalTo(0));
+            assertThat(localCluster.getSuccessfulShards(), equalTo(0));
+            assertThat(localCluster.getSkippedShards(), equalTo(0));
+            assertThat(localCluster.getFailedShards(), equalTo(0));
         }
 
         try (EsqlQueryResponse resp = runQuery("FROM nomatch*,cluster-a:* | LIMIT 0", requestIncludeMeta)) {
@@ -473,17 +492,20 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
             assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
             assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
             assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
-            assertNull(remoteCluster.getTotalShards());
-            assertNull(remoteCluster.getSuccessfulShards());
-            assertNull(remoteCluster.getSkippedShards());
-            assertNull(remoteCluster.getFailedShards());
+            assertThat(remoteCluster.getTotalShards(), equalTo(0));
+            assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
+            assertThat(remoteCluster.getSkippedShards(), equalTo(0));
+            assertThat(remoteCluster.getFailedShards(), equalTo(0));
 
             EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
             assertThat(localCluster.getIndexExpression(), equalTo("nomatch*"));
-            // TODO: in https://github.com/elastic/elasticsearch/issues/112886, this will be changed to be SKIPPED
-            assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
             assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
             assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
+            assertThat(remoteCluster.getTotalShards(), equalTo(0));
+            assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
+            assertThat(remoteCluster.getSkippedShards(), equalTo(0));
+            assertThat(remoteCluster.getFailedShards(), equalTo(0));
         }
     }
 

+ 42 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
 
@@ -55,29 +56,33 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
     public static final ParseField DETAILS_FIELD = new ParseField("details");
     public static final ParseField TOOK = new ParseField("took");
 
-    // map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
-    // the Map itself is immutable after construction - all Clusters will be accounted for at the start of the search
-    // updates to the Cluster occur with the updateCluster method that given the key to map transforms an
+    // Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
+    // The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search.
+    // Updates to the Cluster occur with the updateCluster method that given the key to map transforms an
     // old Cluster Object to a new Cluster Object with the remapping function.
     public final Map<String, Cluster> clusterInfo;
-    // not Writeable since it is only needed on the primary CCS coordinator
-    private final transient Predicate<String> skipUnavailablePredicate;
     private TimeValue overallTook;
-
     // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present)
     private final boolean includeCCSMetadata;
 
+    // fields that are not Writeable since they are only needed on the primary CCS coordinator
+    private final transient Predicate<String> skipUnavailablePredicate;
+    private final transient Long relativeStartNanos;  // start time for an ESQL query for calculating took times
+    private transient TimeValue planningTookTime;  // time elapsed since start of query to calling ComputeService.execute
+
     public EsqlExecutionInfo(boolean includeCCSMetadata) {
         this(Predicates.always(), includeCCSMetadata);  // default all clusters to skip_unavailable=true
     }
 
     /**
      * @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false
+     * @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response
      */
     public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) {
         this.clusterInfo = ConcurrentCollections.newConcurrentMap();
         this.skipUnavailablePredicate = skipUnavailablePredicate;
         this.includeCCSMetadata = includeCCSMetadata;
+        this.relativeStartNanos = System.nanoTime();
     }
 
     /**
@@ -88,6 +93,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
         this.clusterInfo = clusterInfo;
         this.includeCCSMetadata = includeCCSMetadata;
         this.skipUnavailablePredicate = Predicates.always();
+        this.relativeStartNanos = null;
     }
 
     public EsqlExecutionInfo(StreamInput in) throws IOException {
@@ -106,6 +112,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
             this.includeCCSMetadata = false;
         }
         this.skipUnavailablePredicate = Predicates.always();
+        this.relativeStartNanos = null;
     }
 
     @Override
@@ -125,7 +132,35 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
         return includeCCSMetadata;
     }
 
-    public void overallTook(TimeValue took) {
+    public Long getRelativeStartNanos() {
+        return relativeStartNanos;
+    }
+
+    /**
+     * Call when ES|QL "planning" phase is complete and query execution (in ComputeService) is about to start.
+     * Note this is currently only built for a single phase planning/execution model. When INLINESTATS
+     * moves towards GA we may need to revisit this model. Currently, it should never be called more than once.
+     */
+    public void markEndPlanning() {
+        assert planningTookTime == null : "markEndPlanning should only be called once";
+        assert relativeStartNanos != null : "Relative start time must be set when markEndPlanning is called";
+        planningTookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
+    }
+
+    public TimeValue planningTookTime() {
+        return planningTookTime;
+    }
+
+    /**
+     * Call when ES|QL execution is complete in order to set the overall took time for an ES|QL query.
+     */
+    public void markEndQuery() {
+        assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called";
+        overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
+    }
+
+    // for testing only - use markEndQuery in production code
+    void overallTook(TimeValue took) {
         this.overallTook = took;
     }
 

+ 55 - 29
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

@@ -47,7 +47,6 @@ final class ComputeListener implements Releasable {
     private final List<DriverProfile> collectedProfiles;
     private final ResponseHeadersCollector responseHeaders;
     private final EsqlExecutionInfo esqlExecutionInfo;
-    private final long queryStartTimeNanos;
     // clusterAlias indicating where this ComputeListener is running
     // used by the top level ComputeListener in ComputeService on both local and remote clusters
     private final String whereRunning;
@@ -61,7 +60,7 @@ final class ComputeListener implements Releasable {
         CancellableTask task,
         ActionListener<ComputeResponse> delegate
     ) {
-        return new ComputeListener(transportService, task, null, null, -1, delegate);
+        return new ComputeListener(transportService, task, null, null, delegate);
     }
 
     /**
@@ -75,7 +74,6 @@ final class ComputeListener implements Releasable {
      * @param transportService
      * @param task
      * @param executionInfo {@link EsqlExecutionInfo} to capture execution metadata
-     * @param queryStartTimeNanos Start time of the ES|QL query (stored in {@link org.elasticsearch.xpack.esql.session.Configuration})
      * @param delegate
      */
     public static ComputeListener create(
@@ -83,10 +81,9 @@ final class ComputeListener implements Releasable {
         TransportService transportService,
         CancellableTask task,
         EsqlExecutionInfo executionInfo,
-        long queryStartTimeNanos,
         ActionListener<ComputeResponse> delegate
     ) {
-        return new ComputeListener(transportService, task, clusterAlias, executionInfo, queryStartTimeNanos, delegate);
+        return new ComputeListener(transportService, task, clusterAlias, executionInfo, delegate);
     }
 
     private ComputeListener(
@@ -94,7 +91,6 @@ final class ComputeListener implements Releasable {
         CancellableTask task,
         String clusterAlias,
         EsqlExecutionInfo executionInfo,
-        long queryStartTimeNanos,
         ActionListener<ComputeResponse> delegate
     ) {
         this.transportService = transportService;
@@ -102,7 +98,6 @@ final class ComputeListener implements Releasable {
         this.responseHeaders = new ResponseHeadersCollector(transportService.getThreadPool().getThreadContext());
         this.collectedProfiles = Collections.synchronizedList(new ArrayList<>());
         this.esqlExecutionInfo = executionInfo;
-        this.queryStartTimeNanos = queryStartTimeNanos;
         this.whereRunning = clusterAlias;
         // for the DataNodeHandler ComputeListener, clusterAlias and executionInfo will be null
         // for the top level ComputeListener in ComputeService both will be non-null
@@ -129,11 +124,15 @@ final class ComputeListener implements Releasable {
             } else {
                 result = new ComputeResponse(collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList());
                 if (coordinatingClusterIsSearchedInCCS()) {
-                    // mark local cluster as finished once the coordinator and all data nodes have finished processing
-                    executionInfo.swapCluster(
-                        RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
-                        (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build()
-                    );
+                    // if not already marked as SKIPPED, mark the local cluster as finished once the coordinator and all
+                    // data nodes have finished processing
+                    executionInfo.swapCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> {
+                        if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
+                            return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
+                        } else {
+                            return v;
+                        }
+                    });
                 }
             }
             delegate.onResponse(result);
@@ -196,8 +195,8 @@ final class ComputeListener implements Releasable {
      *                            info to be gathered (namely, the DataNodeRequestHandler ComputeListener) should pass in null.
      */
     ActionListener<ComputeResponse> acquireCompute(@Nullable String computeClusterAlias) {
-        assert computeClusterAlias == null || (esqlExecutionInfo != null && queryStartTimeNanos > 0)
-            : "When clusterAlias is provided to acquireCompute, executionInfo must be non-null and queryStartTimeNanos must be positive";
+        assert computeClusterAlias == null || (esqlExecutionInfo != null && esqlExecutionInfo.getRelativeStartNanos() != null)
+            : "When clusterAlias is provided to acquireCompute, executionInfo and relativeStartTimeNanos must be non-null";
 
         return acquireAvoid().map(resp -> {
             responseHeaders.collect();
@@ -209,24 +208,17 @@ final class ComputeListener implements Releasable {
                 return null;
             }
             if (isCCSListener(computeClusterAlias)) {
-                // this is the callback for the listener to the CCS compute
-                esqlExecutionInfo.swapCluster(
-                    computeClusterAlias,
-                    (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
-                        // for now ESQL doesn't return partial results, so set status to SUCCESSFUL
-                        .setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
-                        .setTook(resp.getTook())
-                        .setTotalShards(resp.getTotalShards())
-                        .setSuccessfulShards(resp.getSuccessfulShards())
-                        .setSkippedShards(resp.getSkippedShards())
-                        .setFailedShards(resp.getFailedShards())
-                        .build()
-                );
+                // this is the callback for the listener on the primary coordinator that receives a remote ComputeResponse
+                updateExecutionInfoWithRemoteResponse(computeClusterAlias, resp);
+
             } else if (shouldRecordTookTime()) {
+                Long relativeStartNanos = esqlExecutionInfo.getRelativeStartNanos();
                 // handler for this cluster's data node and coordinator completion (runs on "local" and remote clusters)
-                TimeValue tookTime = new TimeValue(System.nanoTime() - queryStartTimeNanos, TimeUnit.NANOSECONDS);
+                assert relativeStartNanos != null : "queryStartTimeNanos not set properly";
+                TimeValue tookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
                 esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> {
-                    if (v.getTook() == null || v.getTook().nanos() < tookTime.nanos()) {
+                    if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED
+                        && (v.getTook() == null || v.getTook().nanos() < tookTime.nanos())) {
                         return new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime).build();
                     } else {
                         return v;
@@ -237,6 +229,40 @@ final class ComputeListener implements Releasable {
         });
     }
 
+    private void updateExecutionInfoWithRemoteResponse(String computeClusterAlias, ComputeResponse resp) {
+        TimeValue tookOnCluster;
+        if (resp.getTook() != null) {
+            TimeValue remoteExecutionTime = resp.getTook();
+            TimeValue planningTookTime = esqlExecutionInfo.planningTookTime();
+            tookOnCluster = new TimeValue(planningTookTime.nanos() + remoteExecutionTime.nanos(), TimeUnit.NANOSECONDS);
+            esqlExecutionInfo.swapCluster(
+                computeClusterAlias,
+                (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
+                    // for now ESQL doesn't return partial results, so set status to SUCCESSFUL
+                    .setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
+                    .setTook(tookOnCluster)
+                    .setTotalShards(resp.getTotalShards())
+                    .setSuccessfulShards(resp.getSuccessfulShards())
+                    .setSkippedShards(resp.getSkippedShards())
+                    .setFailedShards(resp.getFailedShards())
+                    .build()
+            );
+        } else {
+            // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
+            // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
+            long remoteTook = System.nanoTime() - esqlExecutionInfo.getRelativeStartNanos();
+            tookOnCluster = new TimeValue(remoteTook, TimeUnit.NANOSECONDS);
+            esqlExecutionInfo.swapCluster(
+                computeClusterAlias,
+                (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
+                    // for now ESQL doesn't return partial results, so set status to SUCCESSFUL
+                    .setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
+                    .setTook(tookOnCluster)
+                    .build()
+            );
+        }
+    }
+
     /**
      * Use this method when no execution metadata needs to be added to {@link EsqlExecutionInfo}
      */

+ 23 - 35
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -32,7 +32,6 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -81,7 +80,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
@@ -173,19 +171,10 @@ public class ComputeService {
                 null
             );
             String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
-            try (
-                var computeListener = ComputeListener.create(
-                    local,
-                    transportService,
-                    rootTask,
-                    execInfo,
-                    configuration.getQueryStartTimeNanos(),
-                    listener.map(r -> {
-                        updateExecutionInfoAfterCoordinatorOnlyQuery(configuration.getQueryStartTimeNanos(), execInfo);
-                        return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo);
-                    })
-                )
-            ) {
+            try (var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> {
+                updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo);
+                return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo);
+            }))) {
                 runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute(local));
                 return;
             }
@@ -205,7 +194,6 @@ public class ComputeService {
             queryPragmas.exchangeBufferSize(),
             transportService.getThreadPool().executor(ThreadPool.Names.SEARCH)
         );
-        long start = configuration.getQueryStartTimeNanos();
         String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
         /*
          * Grab the output attributes here, so we can pass them to
@@ -216,9 +204,8 @@ public class ComputeService {
         try (
             Releasable ignored = exchangeSource.addEmptySink();
             // this is the top level ComputeListener called once at the end (e.g., once all clusters have finished for a CCS)
-            var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, start, listener.map(r -> {
-                long tookTimeNanos = System.nanoTime() - configuration.getQueryStartTimeNanos();
-                execInfo.overallTook(new TimeValue(tookTimeNanos, TimeUnit.NANOSECONDS));
+            var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> {
+                execInfo.markEndQuery();  // TODO: revisit this time recording model as part of INLINESTATS improvements
                 return new Result(outputAttributes, collectedPages, r.getProfiles(), execInfo);
             }))
         ) {
@@ -258,22 +245,24 @@ public class ComputeService {
         }
     }
 
-    private static void updateExecutionInfoAfterCoordinatorOnlyQuery(long queryStartNanos, EsqlExecutionInfo execInfo) {
-        long tookTimeNanos = System.nanoTime() - queryStartNanos;
-        execInfo.overallTook(new TimeValue(tookTimeNanos, TimeUnit.NANOSECONDS));
+    // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
+    private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
+        execInfo.markEndQuery();  // TODO: revisit this time recording model as part of INLINESTATS improvements
         if (execInfo.isCrossClusterSearch()) {
+            assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null";
             for (String clusterAlias : execInfo.clusterAliases()) {
-                // The local cluster 'took' time gets updated as part of the acquireCompute(local) call in the coordinator, so
-                // here we only need to update status for remote clusters since there are no remote ComputeListeners in this case.
-                // This happens in cross cluster searches that use LIMIT 0, e.g, FROM logs*,remote*:logs* | LIMIT 0.
-                if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
-                    execInfo.swapCluster(clusterAlias, (k, v) -> {
-                        if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
-                            return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
-                        } else {
-                            return v;
-                        }
-                    });
+                // took time and shard counts for SKIPPED clusters were added at end of planning, so only update other cases here
+                if (execInfo.getCluster(clusterAlias).getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
+                    execInfo.swapCluster(
+                        clusterAlias,
+                        (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook())
+                            .setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
+                            .setTotalShards(0)
+                            .setSuccessfulShards(0)
+                            .setSkippedShards(0)
+                            .setFailedShards(0)
+                            .build()
+                    );
                 }
             }
         }
@@ -837,8 +826,7 @@ public class ComputeService {
             EsqlExecutionInfo execInfo = new EsqlExecutionInfo(true);
             execInfo.swapCluster(clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(clusterAlias, Arrays.toString(request.indices())));
             CancellableTask cancellable = (CancellableTask) task;
-            long start = request.configuration().getQueryStartTimeNanos();
-            try (var computeListener = ComputeListener.create(clusterAlias, transportService, cancellable, execInfo, start, listener)) {
+            try (var computeListener = ComputeListener.create(clusterAlias, transportService, cancellable, execInfo, listener)) {
                 runComputeOnRemoteCluster(
                     clusterAlias,
                     request.sessionId(),

+ 63 - 64
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -72,7 +72,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
@@ -163,6 +162,7 @@ public class EsqlSession {
     ) {
         LogicalPlan firstPhase = Phased.extractFirstPhase(optimizedPlan);
         if (firstPhase == null) {
+            updateExecutionInfoAtEndOfPlanning(executionInfo);
             runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener);
         } else {
             executePhased(new ArrayList<>(), optimizedPlan, request, executionInfo, firstPhase, runPhase, listener);
@@ -246,7 +246,6 @@ public class EsqlSession {
                 if (indexResolution.isValid()) {
                     updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
                     updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
-                    updateTookTimeForRemoteClusters(executionInfo);
                     Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
                         indexResolution.get().concreteIndices().toArray(String[]::new)
                     ).keySet();
@@ -267,68 +266,6 @@ public class EsqlSession {
         }));
     }
 
-    // visible for testing
-    static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo executionInfo, Set<String> unavailableClusters) {
-        for (String clusterAlias : unavailableClusters) {
-            executionInfo.swapCluster(
-                clusterAlias,
-                (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED).build()
-            );
-            // TODO: follow-on PR will set SKIPPED status when skip_unavailable=true and throw an exception when skip_un=false
-        }
-    }
-
-    // visible for testing
-    static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
-        Set<String> clustersWithResolvedIndices = new HashSet<>();
-        // determine missing clusters
-        for (String indexName : indexResolution.get().indexNameWithModes().keySet()) {
-            clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName));
-        }
-        Set<String> clustersRequested = executionInfo.clusterAliases();
-        Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
-        clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters());
-        /*
-         * These are clusters in the original request that are not present in the field-caps response. They were
-         * specified with an index or indices that do not exist, so the search on that cluster is done.
-         * Mark it as SKIPPED with 0 shards searched and took=0.
-         */
-        for (String c : clustersWithNoMatchingIndices) {
-            executionInfo.swapCluster(
-                c,
-                (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED)
-                    .setTook(new TimeValue(0))
-                    .setTotalShards(0)
-                    .setSuccessfulShards(0)
-                    .setSkippedShards(0)
-                    .setFailedShards(0)
-                    .build()
-            );
-        }
-    }
-
-    private void updateTookTimeForRemoteClusters(EsqlExecutionInfo executionInfo) {
-        if (executionInfo.isCrossClusterSearch()) {
-            for (String clusterAlias : executionInfo.clusterAliases()) {
-                if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
-                    executionInfo.swapCluster(clusterAlias, (k, v) -> {
-                        if (v.getTook() == null && v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
-                            // set took time in case we are finished with the remote cluster (e.g., FROM foo | LIMIT 0).
-                            // this will be overwritten later if ES|QL operations happen on the remote cluster (the typical scenario)
-                            TimeValue took = new TimeValue(
-                                System.nanoTime() - configuration.getQueryStartTimeNanos(),
-                                TimeUnit.NANOSECONDS
-                            );
-                            return new EsqlExecutionInfo.Cluster.Builder(v).setTook(took).build();
-                        } else {
-                            return v;
-                        }
-                    });
-                }
-            }
-        }
-    }
-
     private void preAnalyzeIndices(
         LogicalPlan parsed,
         EsqlExecutionInfo executionInfo,
@@ -508,4 +445,66 @@ public class EsqlSession {
         LOGGER.debug("Optimized physical plan:\n{}", plan);
         return plan;
     }
+
+    // visible for testing
+    static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo executionInfo, Set<String> unavailableClusters) {
+        for (String clusterAlias : unavailableClusters) {
+            executionInfo.swapCluster(
+                clusterAlias,
+                (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED).build()
+            );
+            // TODO: follow-on PR will set SKIPPED status when skip_unavailable=true and throw an exception when skip_un=false
+        }
+    }
+
+    // visible for testing
+    static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
+        Set<String> clustersWithResolvedIndices = new HashSet<>();
+        // determine missing clusters
+        for (String indexName : indexResolution.get().indexNameWithModes().keySet()) {
+            clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName));
+        }
+        Set<String> clustersRequested = executionInfo.clusterAliases();
+        Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
+        clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters());
+        /*
+         * These are clusters in the original request that are not present in the field-caps response. They were
+         * specified with an index or indices that do not exist, so the search on that cluster is done.
+         * Mark it as SKIPPED with 0 shards searched and took=0.
+         */
+        for (String c : clustersWithNoMatchingIndices) {
+            executionInfo.swapCluster(
+                c,
+                (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED)
+                    .setTook(new TimeValue(0))
+                    .setTotalShards(0)
+                    .setSuccessfulShards(0)
+                    .setSkippedShards(0)
+                    .setFailedShards(0)
+                    .build()
+            );
+        }
+    }
+
+    // visible for testing
+    static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
+        // TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible
+        if (execInfo.isCrossClusterSearch()) {
+            execInfo.markEndPlanning();
+            for (String clusterAlias : execInfo.clusterAliases()) {
+                EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias);
+                if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
+                    execInfo.swapCluster(
+                        clusterAlias,
+                        (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime())
+                            .setTotalShards(0)
+                            .setSuccessfulShards(0)
+                            .setSkippedShards(0)
+                            .setFailedShards(0)
+                            .build()
+                    );
+                }
+            }
+        }
+    }
 }

+ 55 - 7
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java

@@ -132,7 +132,6 @@ public class ComputeListenerTests extends ESTestCase {
                 transportService,
                 newTask(),
                 executionInfo,
-                System.nanoTime(),
                 results
             )
         ) {
@@ -152,7 +151,6 @@ public class ComputeListenerTests extends ESTestCase {
                 transportService,
                 newTask(),
                 executionInfo,
-                System.nanoTime(),
                 future
             )
         ) {
@@ -196,6 +194,7 @@ public class ComputeListenerTests extends ESTestCase {
         String remoteAlias = "rc1";
         EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
         executionInfo.swapCluster(remoteAlias, (k, v) -> new EsqlExecutionInfo.Cluster(remoteAlias, "logs*", false));
+        executionInfo.markEndPlanning();  // set planning took time, so it can be used to calculate per-cluster took time
         try (
             ComputeListener computeListener = ComputeListener.create(
                 // 'whereRunning' for this test is the local cluster, waiting for a response from the remote cluster
@@ -203,7 +202,6 @@ public class ComputeListenerTests extends ESTestCase {
                 transportService,
                 newTask(),
                 executionInfo,
-                System.nanoTime(),
                 future
             )
         ) {
@@ -239,6 +237,60 @@ public class ComputeListenerTests extends ESTestCase {
         Mockito.verifyNoInteractions(transportService.getTaskManager());
     }
 
+    /**
+     * Tests the acquireCompute functionality running on the querying ("local") cluster, that is waiting upon
+     * a ComputeResponse from a remote cluster where we simulate connecting to a remote cluster running a version
+     * of ESQL that does not record and return CCS metadata. Ensure that the local cluster {@link EsqlExecutionInfo}
+     * is properly updated with took time and shard info is left unset.
+     */
+    public void testAcquireComputeCCSListenerWithComputeResponseFromOlderCluster() {
+        PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
+        List<DriverProfile> allProfiles = new ArrayList<>();
+        String remoteAlias = "rc1";
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
+        executionInfo.swapCluster(remoteAlias, (k, v) -> new EsqlExecutionInfo.Cluster(remoteAlias, "logs*", false));
+        executionInfo.markEndPlanning();  // set planning took time, so it can be used to calculate per-cluster took time
+        try (
+            ComputeListener computeListener = ComputeListener.create(
+                // 'whereRunning' for this test is the local cluster, waiting for a response from the remote cluster
+                RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
+                transportService,
+                newTask(),
+                executionInfo,
+                future
+            )
+        ) {
+            int tasks = randomIntBetween(1, 5);
+            for (int t = 0; t < tasks; t++) {
+                ComputeResponse resp = randomResponse(false); // older clusters will not return CCS metadata in response
+                allProfiles.addAll(resp.getProfiles());
+                // Use remoteAlias here to indicate what remote cluster alias the listener is waiting to hear back from
+                ActionListener<ComputeResponse> subListener = computeListener.acquireCompute(remoteAlias);
+                threadPool.schedule(
+                    ActionRunnable.wrap(subListener, l -> l.onResponse(resp)),
+                    TimeValue.timeValueNanos(between(0, 100)),
+                    threadPool.generic()
+                );
+            }
+        }
+        ComputeResponse response = future.actionGet(10, TimeUnit.SECONDS);
+        assertThat(
+            response.getProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
+            equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)))
+        );
+
+        assertTrue(executionInfo.isCrossClusterSearch());
+        EsqlExecutionInfo.Cluster rc1Cluster = executionInfo.getCluster(remoteAlias);
+        assertThat(rc1Cluster.getTook().millis(), greaterThanOrEqualTo(0L));
+        assertNull(rc1Cluster.getTotalShards());
+        assertNull(rc1Cluster.getSuccessfulShards());
+        assertNull(rc1Cluster.getSkippedShards());
+        assertNull(rc1Cluster.getFailedShards());
+        assertThat(rc1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+
+        Mockito.verifyNoInteractions(transportService.getTaskManager());
+    }
+
     /**
      * Run an acquireCompute cycle on the RemoteCluster.
      * AcquireCompute will fill in the took time on the EsqlExecutionInfo (the shard info is filled in before this,
@@ -271,7 +323,6 @@ public class ComputeListenerTests extends ESTestCase {
                 transportService,
                 newTask(),
                 executionInfo,
-                System.nanoTime(),
                 future
             )
         ) {
@@ -331,7 +382,6 @@ public class ComputeListenerTests extends ESTestCase {
                 transportService,
                 newTask(),
                 executionInfo,
-                System.nanoTime(),
                 future
             )
         ) {
@@ -379,7 +429,6 @@ public class ComputeListenerTests extends ESTestCase {
                 transportService,
                 rootTask,
                 execInfo,
-                System.nanoTime(),
                 rootListener
             )
         ) {
@@ -443,7 +492,6 @@ public class ComputeListenerTests extends ESTestCase {
                 transportService,
                 newTask(),
                 executionInfo,
-                System.nanoTime(),
                 ActionListener.runAfter(rootListener, latch::countDown)
             )
         ) {

+ 45 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java

@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 
 public class EsqlSessionTests extends ESTestCase {
 
@@ -243,6 +244,50 @@ public class EsqlSessionTests extends ESTestCase {
         }
     }
 
+    public void testUpdateExecutionInfoAtEndOfPlanning() {
+        String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
+        String remote1Alias = "remote1";
+        String remote2Alias = "remote2";
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
+        executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
+        executionInfo.swapCluster(
+            remote1Alias,
+            (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
+        );
+        executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));
+
+        assertNull(executionInfo.planningTookTime());
+        assertNull(executionInfo.overallTook());
+        try {
+            Thread.sleep(1);
+        } catch (InterruptedException e) {}
+
+        EsqlSession.updateExecutionInfoAtEndOfPlanning(executionInfo);
+
+        assertThat(executionInfo.planningTookTime().millis(), greaterThanOrEqualTo(0L));
+        assertNull(executionInfo.overallTook());
+
+        // only remote1 should be altered, since it is the only one marked as SKIPPED when passed into updateExecutionInfoAtEndOfPlanning
+        EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
+        assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
+        assertNull(localCluster.getTotalShards());
+        assertNull(localCluster.getTook());
+
+        EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
+        assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
+        assertThat(remote1Cluster.getTotalShards(), equalTo(0));
+        assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0));
+        assertThat(remote1Cluster.getSkippedShards(), equalTo(0));
+        assertThat(remote1Cluster.getFailedShards(), equalTo(0));
+        assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L));
+        assertThat(remote1Cluster.getTook().millis(), equalTo(executionInfo.planningTookTime().millis()));
+
+        EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
+        assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
+        assertNull(remote2Cluster.getTotalShards());
+        assertNull(remote2Cluster.getTook());
+    }
+
     private void assertClusterStatusAndHasNullCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) {
         assertThat(cluster.getStatus(), equalTo(status));
         assertNull(cluster.getTook());