Prechádzať zdrojové kódy

Implement runtime skip_unavailable=true (#121240)

* Implement runtime skip_unavailable=true
Stanislav Malyshev 8 mesiacov pred
rodič
commit
2fbec77015
19 zmenil súbory, kde vykonal 680 pridanie a 174 odobranie
  1. 5 0
      docs/changelog/121240.yaml
  2. 3 1
      test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java
  3. 5 0
      x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java
  4. 30 0
      x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnFalseIT.java
  5. 2 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java
  6. 6 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java
  7. 62 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java
  8. 7 2
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java
  9. 110 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java
  10. 1 1
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java
  11. 5 1
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java
  12. 32 7
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java
  13. 2 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  14. 122 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java
  15. 10 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  16. 62 12
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java
  17. 6 6
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
  18. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java
  19. 209 141
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java

+ 5 - 0
docs/changelog/121240.yaml

@@ -0,0 +1,5 @@
+pr: 121240
+summary: Implement runtime skip_unavailable=true
+area: ES|QL
+type: enhancement
+issues: []

+ 3 - 1
test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java

@@ -24,12 +24,14 @@ import java.util.Set;
 
 public class FailingFieldPlugin extends Plugin implements ScriptPlugin {
 
+    public static final String FAILING_FIELD_LANG = "failing_field";
+
     @Override
     public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
         return new ScriptEngine() {
             @Override
             public String getType() {
-                return "failing_field";
+                return FAILING_FIELD_LANG;
             }
 
             @Override

+ 5 - 0
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java

@@ -31,6 +31,10 @@ public class Clusters {
     }
 
     public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) {
+        return localCluster(remoteCluster, true);
+    }
+
+    public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) {
         return ElasticsearchCluster.local()
             .name(LOCAL_CLUSTER_NAME)
             .distribution(DistributionType.DEFAULT)
@@ -41,6 +45,7 @@ public class Clusters {
             .setting("node.roles", "[data,ingest,master,remote_cluster_client]")
             .setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")
             .setting("cluster.remote.connections_per_cluster", "1")
+            .setting("cluster.remote." + REMOTE_CLUSTER_NAME + ".skip_unavailable", skipUnavailable.toString())
             .shared(true)
             .setting("cluster.routing.rebalance.enable", "none")
             .build();

+ 30 - 0
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnFalseIT.java

@@ -0,0 +1,30 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.ccq;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+import org.elasticsearch.test.TestClustersThreadFilter;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.junit.ClassRule;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+// Duplicate of EsqlRestValidationIT test where skip_unavailable is set to false
+@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
+public class EsqlRestValidationSkipUnFalseIT extends EsqlRestValidationIT {
+    static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false);
+
+    @ClassRule
+    public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
+
+    @Override
+    protected String getTestRestCluster() {
+        return localCluster.getHttpAddresses();
+    }
+}

+ 2 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java

@@ -18,6 +18,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.AbstractMultiClustersTestCase;
+import org.elasticsearch.test.FailingFieldPlugin;
 import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -63,6 +64,7 @@ public abstract class AbstractCrossClusterTestCase extends AbstractMultiClusters
         plugins.add(CrossClusterAsyncQueryIT.InternalExchangePlugin.class);
         plugins.add(SimplePauseFieldPlugin.class);
         plugins.add(FailingPauseFieldPlugin.class);
+        plugins.add(FailingFieldPlugin.class);
         plugins.add(CrossClusterAsyncQueryIT.CountingPauseFieldPlugin.class);
         return plugins;
     }

+ 6 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java

@@ -510,11 +510,17 @@ public class CrossClusterEnrichUnavailableClustersIT extends AbstractEnrichBased
         assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
         assertTrue(executionInfo.isCrossClusterSearch());
 
+        boolean hasPartials = false;
         for (String clusterAlias : executionInfo.clusterAliases()) {
             EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
             assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
             assertThat(cluster.getTook().millis(), lessThanOrEqualTo(executionInfo.overallTook().millis()));
+            if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL
+                || cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
+                hasPartials = true;
+            }
         }
+        assertThat(executionInfo.isPartial(), equalTo(hasPartials));
     }
 
     private void setSkipUnavailable(String clusterAlias, boolean skip) {

+ 62 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

@@ -23,9 +23,12 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.TermsQueryBuilder;
+import org.elasticsearch.test.FailingFieldPlugin;
 import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 
@@ -433,6 +436,7 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase {
         Set<String> expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet());
         assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases));
 
+        boolean hasSkipped = false;
         for (ExpectedCluster expectedCluster : expected) {
             EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias());
             String msg = cluster.getClusterAlias();
@@ -451,10 +455,12 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase {
                 assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class));
                 String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]";
                 assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg));
+                hasSkipped = true;
             }
             // currently failed shards is always zero - change this once we start allowing partial data for individual shard failures
             assertThat(msg, cluster.getFailedShards(), equalTo(0));
         }
+        assertThat(executionInfo.isPartial(), equalTo(hasSkipped));
     }
 
     public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() throws Exception {
@@ -500,6 +506,7 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase {
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
             assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+            assertThat(executionInfo.isPartial(), equalTo(true));
 
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
 
@@ -556,6 +563,7 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase {
             long overallTookMillis = executionInfo.overallTook().millis();
             assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
             assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+            assertThat(executionInfo.isPartial(), equalTo(false));
             assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
 
             EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
@@ -604,6 +612,7 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase {
             assertThat(executionInfo.isCrossClusterSearch(), is(true));
             assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
             assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.isPartial(), equalTo(false));
 
             EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
             assertThat(remoteCluster.getIndexExpression(), equalTo("logs*"));
@@ -799,6 +808,17 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase {
         assertTrue(latch.await(30, TimeUnit.SECONDS));
     }
 
+    // Non-disconnect remote failures still fail the request even if skip_unavailable is true
+    public void testRemoteFailureSkipUnavailableTrue() throws IOException {
+        Map<String, Object> testClusterInfo = setupFailClusters();
+        String localIndex = (String) testClusterInfo.get("local.index");
+        String remote1Index = (String) testClusterInfo.get("remote.index");
+        int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
+        String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
+        IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
+        assertThat(e.getMessage(), containsString("Accessing failing field"));
+    }
+
     private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {
         try {
             final Map<String, Object> esqlResponseAsMap = XContentTestUtils.convertToMap(resp);
@@ -925,4 +945,46 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase {
 
         return clusterToEmptyIndexMap;
     }
+
+    Map<String, Object> setupFailClusters() throws IOException {
+        int numShardsLocal = randomIntBetween(1, 3);
+        populateLocalIndices(LOCAL_INDEX, numShardsLocal);
+
+        int numShardsRemote = randomIntBetween(1, 3);
+        populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);
+
+        Map<String, Object> clusterInfo = new HashMap<>();
+        clusterInfo.put("local.num_shards", numShardsLocal);
+        clusterInfo.put("local.index", LOCAL_INDEX);
+        clusterInfo.put("remote.num_shards", numShardsRemote);
+        clusterInfo.put("remote.index", REMOTE_INDEX);
+        setSkipUnavailable(REMOTE_CLUSTER_1, true);
+        return clusterInfo;
+    }
+
+    void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException {
+        Client remoteClient = client(clusterAlias);
+        XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
+        mapping.startObject("runtime");
+        {
+            mapping.startObject("fail_me");
+            {
+                mapping.field("type", "long");
+                mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject();
+            }
+            mapping.endObject();
+        }
+        mapping.endObject();
+        assertAcked(
+            remoteClient.admin()
+                .indices()
+                .prepareCreate(indexName)
+                .setSettings(Settings.builder().put("index.number_of_shards", numShards))
+                .setMapping(mapping.endObject())
+        );
+
+        remoteClient.prepareIndex(indexName).setSource("id", 0).get();
+        remoteClient.admin().indices().prepareRefresh(indexName).get();
+    }
+
 }

+ 7 - 2
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java

@@ -57,6 +57,7 @@ public class CrossClusterQueryUnavailableRemotesIT extends AbstractCrossClusterT
                 long overallTookMillis = executionInfo.overallTook().millis();
                 assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
                 assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+                assertThat(executionInfo.isPartial(), equalTo(true));
 
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));
 
@@ -109,6 +110,7 @@ public class CrossClusterQueryUnavailableRemotesIT extends AbstractCrossClusterT
                 long overallTookMillis = executionInfo.overallTook().millis();
                 assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
                 assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+                assertThat(executionInfo.isPartial(), equalTo(true));
 
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));
 
@@ -161,6 +163,7 @@ public class CrossClusterQueryUnavailableRemotesIT extends AbstractCrossClusterT
                 long overallTookMillis = executionInfo.overallTook().millis();
                 assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
                 assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+                assertThat(executionInfo.isPartial(), equalTo(true));
 
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));
 
@@ -233,6 +236,7 @@ public class CrossClusterQueryUnavailableRemotesIT extends AbstractCrossClusterT
                 long overallTookMillis = executionInfo.overallTook().millis();
                 assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
                 assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+                assertThat(executionInfo.isPartial(), equalTo(true));
 
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1)));
 
@@ -247,7 +251,7 @@ public class CrossClusterQueryUnavailableRemotesIT extends AbstractCrossClusterT
                 assertThat(remoteCluster.getFailedShards(), equalTo(0));
 
                 // ensure that the _clusters metadata is present only if requested
-                assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters);
+                assertClusterMetadataInResponse(resp, responseExpectMeta, 1);
             }
 
             // close remote cluster 2 so that it is also unavailable
@@ -275,6 +279,7 @@ public class CrossClusterQueryUnavailableRemotesIT extends AbstractCrossClusterT
                 long overallTookMillis = executionInfo.overallTook().millis();
                 assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
                 assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+                assertThat(executionInfo.isPartial(), equalTo(true));
 
                 assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
 
@@ -299,7 +304,7 @@ public class CrossClusterQueryUnavailableRemotesIT extends AbstractCrossClusterT
                 assertThat(remote2Cluster.getFailedShards(), equalTo(0));
 
                 // ensure that the _clusters metadata is present only if requested
-                assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters);
+                assertClusterMetadataInResponse(resp, responseExpectMeta, 2);
             }
 
         } finally {

+ 110 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.action;
 
+import org.elasticsearch.Build;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
@@ -75,6 +76,11 @@ public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
         SimplePauseFieldPlugin.resetPlugin();
     }
 
+    @Override
+    protected boolean reuseClusters() {
+        return false;
+    }
+
     private void createRemoteIndex(int numDocs) throws Exception {
         XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
         mapping.startObject("runtime");
@@ -96,6 +102,26 @@ public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
         bulk.get();
     }
 
+    private void createLocalIndex(int numDocs) throws Exception {
+        XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
+        mapping.startObject("runtime");
+        {
+            mapping.startObject("const");
+            {
+                mapping.field("type", "long");
+            }
+            mapping.endObject();
+        }
+        mapping.endObject();
+        mapping.endObject();
+        client(LOCAL_CLUSTER).admin().indices().prepareCreate("test").setMapping(mapping).get();
+        BulkRequestBuilder bulk = client(LOCAL_CLUSTER).prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        for (int i = 0; i < numDocs; i++) {
+            bulk.add(new IndexRequest().source("const", i));
+        }
+        bulk.get();
+    }
+
     public void testCancel() throws Exception {
         createRemoteIndex(between(10, 100));
         EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
@@ -208,4 +234,88 @@ public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
         }
         requestFuture.actionGet(30, TimeUnit.SECONDS).close();
     }
+
+    // Check that cancelling remote task with skip_unavailable=true produces failure
+    public void testCancelSkipUnavailable() throws Exception {
+        createRemoteIndex(between(10, 100));
+        EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
+        request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
+        request.pragmas(randomPragmas());
+        request.includeCCSMetadata(true);
+        PlainActionFuture<EsqlQueryResponse> requestFuture = new PlainActionFuture<>();
+        client().execute(EsqlQueryAction.INSTANCE, request, requestFuture);
+        assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
+        List<TaskInfo> rootTasks = new ArrayList<>();
+        assertBusy(() -> {
+            List<TaskInfo> tasks = client(REMOTE_CLUSTER).admin()
+                .cluster()
+                .prepareListTasks()
+                .setActions(ComputeService.CLUSTER_ACTION_NAME)
+                .get()
+                .getTasks();
+            assertThat(tasks, hasSize(1));
+            rootTasks.addAll(tasks);
+        });
+        var cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTasks.get(0).taskId()).setReason("remote failed");
+        client(REMOTE_CLUSTER).execute(TransportCancelTasksAction.TYPE, cancelRequest);
+        try {
+            assertBusy(() -> {
+                List<TaskInfo> drivers = client(REMOTE_CLUSTER).admin()
+                    .cluster()
+                    .prepareListTasks()
+                    .setActions(DriverTaskRunner.ACTION_NAME)
+                    .get()
+                    .getTasks();
+                assertThat(drivers.size(), greaterThanOrEqualTo(1));
+                for (TaskInfo driver : drivers) {
+                    assertTrue(driver.cancelled());
+                }
+            });
+        } finally {
+            SimplePauseFieldPlugin.allowEmitting.countDown();
+        }
+
+        Exception error = expectThrows(Exception.class, requestFuture::actionGet);
+        assertThat(error.getMessage(), containsString("remote failed"));
+    }
+
+    // Check that closing remote node with skip_unavailable=true produces partial
+    public void testCloseSkipUnavailable() throws Exception {
+        // We are using delay() here because closing cluster while inside pause fields doesn't seem to produce clean closure
+        assumeTrue("Only snapshot builds have delay()", Build.current().isSnapshot());
+        createRemoteIndex(between(1000, 5000));
+        createLocalIndex(10);
+        EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
+        request.query("""
+            FROM test*,cluster-a:test* METADATA _index
+            | EVAL cluster=MV_FIRST(SPLIT(_index, ":"))
+            | WHERE CASE(cluster == "cluster-a", delay(1ms), true)
+            | STATS total = sum(const) | LIMIT 1
+            """);
+        request.pragmas(randomPragmas());
+        var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request);
+        assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
+        SimplePauseFieldPlugin.allowEmitting.countDown();
+        cluster(REMOTE_CLUSTER).close();
+        try (var resp = requestFuture.actionGet()) {
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            assertNotNull(executionInfo);
+            assertThat(executionInfo.isPartial(), equalTo(true));
+
+            List<List<Object>> values = getValuesList(resp);
+            assertThat(values.get(0).size(), equalTo(1));
+            // We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there
+            assertThat((long) values.get(0).get(0), greaterThanOrEqualTo(45L));
+
+            EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER);
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
+
+            assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(localCluster.getSuccessfulShards(), equalTo(1));
+
+            assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
+            assertThat(cluster.getSuccessfulShards(), equalTo(0));
+            assertThat(cluster.getFailures().size(), equalTo(1));
+        }
+    }
 }

+ 1 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java

@@ -98,7 +98,7 @@ public final class EsqlAsyncTestUtils {
                 }
                 assertThat(clusterInfo.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)));
             }
-        });
+        }, 30, TimeUnit.SECONDS);
     }
 
     public static EsqlQueryResponse runAsyncQuery(Client client, EsqlQueryRequest request) {

+ 5 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java

@@ -31,6 +31,10 @@ public class SimplePauseFieldPlugin extends AbstractPauseFieldPlugin {
 
     @Override
     public boolean onWait() throws InterruptedException {
-        return allowEmitting.await(30, TimeUnit.SECONDS);
+        try {
+            return allowEmitting.await(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            return true;
+        }
     }
 }

+ 32 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

@@ -25,9 +25,11 @@ import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportRequestHandler;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
 import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.session.Configuration;
+import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -71,34 +73,56 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
         ExchangeSourceHandler exchangeSource,
         RemoteCluster cluster,
         Runnable cancelQueryOnFailure,
+        EsqlExecutionInfo executionInfo,
         ActionListener<ComputeResponse> listener
     ) {
         var queryPragmas = configuration.pragmas();
         listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
         final var childSessionId = computeService.newChildSession(sessionId);
         final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
+        final String clusterAlias = cluster.clusterAlias();
         try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
             var resp = finalResponse.get();
             return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles));
         }))) {
+            var openExchangeListener = computeListener.acquireAvoid();
             ExchangeService.openExchange(
                 transportService,
                 cluster.connection,
                 childSessionId,
                 queryPragmas.exchangeBufferSize(),
                 esqlExecutor,
-                computeListener.acquireCompute().delegateFailureAndWrap((l, unused) -> {
-                    var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
+                EsqlCCSUtils.skipUnavailableListener(
+                    openExchangeListener,
+                    executionInfo,
+                    clusterAlias,
+                    EsqlExecutionInfo.Cluster.Status.SKIPPED
+                ).delegateFailureAndWrap((l, unused) -> {
+                    var listenerGroup = new RemoteListenerGroup(
+                        transportService,
+                        rootTask,
+                        computeListener,
+                        clusterAlias,
+                        executionInfo,
+                        openExchangeListener
+                    );
+
+                    var remoteSink = exchangeService.newRemoteSink(
+                        listenerGroup.getGroupTask(),
+                        childSessionId,
+                        transportService,
+                        cluster.connection
+                    );
                     exchangeSource.addRemoteSink(
                         remoteSink,
-                        true,
+                        executionInfo.isSkipUnavailable(clusterAlias) == false,
                         () -> {},
                         queryPragmas.concurrentExchangeClients(),
-                        computeListener.acquireAvoid()
+                        listenerGroup.getExchangeRequestListener()
                     );
                     var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
-                    var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
-                    final ActionListener<ComputeResponse> clusterListener = l.map(r -> {
+                    var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
+                    final ActionListener<ComputeResponse> clusterListener = listenerGroup.getClusterRequestListener().map(r -> {
                         finalResponse.set(r);
                         return r.getProfiles();
                     });
@@ -106,13 +130,14 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
                         cluster.connection,
                         ComputeService.CLUSTER_ACTION_NAME,
                         clusterRequest,
-                        rootTask,
+                        listenerGroup.getGroupTask(),
                         TransportRequestOptions.EMPTY,
                         new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
                     );
                 })
             );
         }
+
     }
 
     List<RemoteCluster> getRemoteClusters(

+ 2 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -276,6 +276,7 @@ public class ComputeService {
                         exchangeSource,
                         cluster,
                         cancelQueryOnFailure,
+                        execInfo,
                         computeListener.acquireCompute().map(r -> {
                             updateExecutionInfo(execInfo, cluster.clusterAlias(), r);
                             return r.getProfiles();
@@ -309,11 +310,10 @@ public class ComputeService {
         } else {
             // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
             // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
-            var tookTime = TimeValue.timeValueNanos(System.nanoTime() - executionInfo.getRelativeStartNanos());
             executionInfo.swapCluster(
                 clusterAlias,
                 (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
-                    .setTook(tookTime)
+                    .setTook(executionInfo.tookSoFar())
                     .build()
             );
         }

+ 122 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java

@@ -0,0 +1,122 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plugin;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.tasks.TaskManager;
+import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
+import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * Create group task for this cluster. This group task ensures that two branches of the computation:
+ * the exchange sink and the cluster request, belong to the same group and each of them can cancel the other.
+ * runAfter listeners below ensure that the group is finalized when both branches are done.
+ * The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too.
+ */
+class RemoteListenerGroup {
+    private final CancellableTask groupTask;
+    private final ActionListener<Void> exchangeRequestListener;
+    private final ActionListener<List<DriverProfile>> clusterRequestListener;
+    private final TaskManager taskManager;
+    private final String clusterAlias;
+    private final EsqlExecutionInfo executionInfo;
+    private final TransportService transportService;
+
+    RemoteListenerGroup(
+        TransportService transportService,
+        Task rootTask,
+        ComputeListener computeListener,
+        String clusterAlias,
+        EsqlExecutionInfo executionInfo,
+        ActionListener<Void> delegate
+    ) {
+        this.transportService = transportService;
+        this.taskManager = transportService.getTaskManager();
+        this.clusterAlias = clusterAlias;
+        this.executionInfo = executionInfo;
+        groupTask = createGroupTask(rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]");
+        CountDown countDown = new CountDown(2);
+        // The group is done when both the sink and the cluster request are done
+        Runnable finishGroup = () -> {
+            if (countDown.countDown()) {
+                taskManager.unregister(groupTask);
+                delegate.onResponse(null);
+            }
+        };
+        // Cancel the group on sink failure
+        exchangeRequestListener = createCancellingListener("exchange sink failure", computeListener.acquireAvoid(), finishGroup);
+
+        // Cancel the group on cluster request failure
+        clusterRequestListener = createCancellingListener("exchange cluster action failure", computeListener.acquireCompute(), finishGroup);
+    }
+
+    /**
+     * Create a listener that:
+     * 1. Cancels the group task on failure
+     * 2. Marks the cluster as partial if the error is ignorable, otherwise propagates the error
+     */
+    private <T> ActionListener<T> createCancellingListener(String reason, ActionListener<T> delegate, Runnable finishGroup) {
+        return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> {
+            taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> {
+                EsqlCCSUtils.skipUnavailableListener(delegate, executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL)
+                    .onFailure(e);
+            }));
+        }), finishGroup);
+    }
+
+    public CancellableTask getGroupTask() {
+        return groupTask;
+    }
+
+    public ActionListener<Void> getExchangeRequestListener() {
+        return exchangeRequestListener;
+    }
+
+    public ActionListener<List<DriverProfile>> getClusterRequestListener() {
+        return clusterRequestListener;
+    }
+
+    private CancellableTask createGroupTask(Task parentTask, Supplier<String> description) {
+        return (CancellableTask) taskManager.register(
+            "transport",
+            "esql_compute_group",
+            new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description)
+        );
+    }
+
+    private static class ComputeGroupTaskRequest extends TransportRequest {
+        private final Supplier<String> parentDescription;
+
+        ComputeGroupTaskRequest(TaskId parentTask, Supplier<String> description) {
+            this.parentDescription = description;
+            setParentTask(parentTask);
+        }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            assert parentTaskId.isSet();
+            return new CancellableTask(id, type, action, "", parentTaskId, headers);
+        }
+
+        @Override
+        public String getDescription() {
+            return "group [" + parentDescription.get() + "]";
+        }
+    }
+}

+ 10 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -267,6 +267,16 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             planRunner,
             services,
             ActionListener.wrap(result -> {
+                // If we had any skipped or partial clusters, the result is partial
+                if (executionInfo.getClusters()
+                    .values()
+                    .stream()
+                    .anyMatch(
+                        c -> c.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED
+                            || c.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL
+                    )) {
+                    executionInfo.markAsPartial();
+                }
                 recordCCSTelemetry(task, executionInfo, request, null);
                 listener.onResponse(toResponse(task, request, configuration, result));
             }, ex -> {

+ 62 - 12
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java → x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

@@ -15,6 +15,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.indices.IndicesExpressionGrouper;
 import org.elasticsearch.license.XPackLicenseState;
@@ -25,6 +26,7 @@ import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
+import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.TableInfo;
 import org.elasticsearch.xpack.esql.index.IndexResolution;
@@ -35,11 +37,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
-class EsqlSessionCCSUtils {
+public class EsqlCCSUtils {
 
-    private EsqlSessionCCSUtils() {}
+    private EsqlCCSUtils() {}
 
     static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
         Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
@@ -171,16 +174,7 @@ class EsqlSessionCCSUtils {
                 entry.getValue().getException()
             );
             if (skipUnavailable) {
-                execInfo.swapCluster(
-                    clusterAlias,
-                    (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED)
-                        .setTotalShards(0)
-                        .setSuccessfulShards(0)
-                        .setSkippedShards(0)
-                        .setFailedShards(0)
-                        .setFailures(List.of(new ShardSearchFailure(e)))
-                        .build()
-                );
+                markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
             } else {
                 throw e;
             }
@@ -338,4 +332,60 @@ class EsqlSessionCCSUtils {
             }
         }
     }
+
+    /**
+     * Mark cluster with a final status (success or failure).
+     * Most metrics are set to 0 if not set yet, except for "took" which is set to the total time taken so far.
+     * The status must be the final status of the cluster, not RUNNING.
+     */
+    public static void markClusterWithFinalStateAndNoShards(
+        EsqlExecutionInfo executionInfo,
+        String clusterAlias,
+        Cluster.Status status,
+        @Nullable Exception ex
+    ) {
+        assert status != Cluster.Status.RUNNING : "status must be a final state, not RUNNING";
+        executionInfo.swapCluster(clusterAlias, (k, v) -> {
+            Cluster.Builder builder = new Cluster.Builder(v).setStatus(status)
+                .setTook(executionInfo.tookSoFar())
+                .setTotalShards(Objects.requireNonNullElse(v.getTotalShards(), 0))
+                .setSuccessfulShards(Objects.requireNonNullElse(v.getTotalShards(), 0))
+                .setSkippedShards(Objects.requireNonNullElse(v.getTotalShards(), 0))
+                .setFailedShards(Objects.requireNonNullElse(v.getTotalShards(), 0));
+            if (ex != null) {
+                builder.setFailures(List.of(new ShardSearchFailure(ex)));
+            }
+            return builder.build();
+        });
+    }
+
+    /**
+     * We will ignore the error if it's remote unavailable and the cluster is marked to skip unavailable.
+     */
+    public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, String clusterAlias, Exception e) {
+        if (executionInfo.isSkipUnavailable(clusterAlias) == false) {
+            return false;
+        }
+
+        return ExceptionsHelper.isRemoteUnavailableException(e);
+    }
+
+    /**
+     * Wrap a listener so that it will skip errors that are ignorable
+     */
+    public static <T> ActionListener<T> skipUnavailableListener(
+        ActionListener<T> delegate,
+        EsqlExecutionInfo executionInfo,
+        String clusterAlias,
+        EsqlExecutionInfo.Cluster.Status status
+    ) {
+        return delegate.delegateResponse((l, e) -> {
+            if (shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
+                markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, status, e);
+                l.onResponse(null);
+            } else {
+                l.onFailure(e);
+            }
+        });
+    }
 }

+ 6 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -161,7 +161,7 @@ public class EsqlSession {
             parse(request.query(), request.params()),
             executionInfo,
             request.filter(),
-            new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
+            new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
                 @Override
                 public void onResponse(LogicalPlan analyzedPlan) {
                     preMapper.preMapper(
@@ -188,7 +188,7 @@ public class EsqlSession {
     ) {
         PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
         // TODO: this could be snuck into the underlying listener
-        EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
+        EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
         // execute any potential subplans
         executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
     }
@@ -315,7 +315,7 @@ public class EsqlSession {
             .collect(Collectors.toSet());
         final List<TableInfo> indices = preAnalysis.indices;
 
-        EsqlSessionCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState());
+        EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState());
 
         final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
             indices.stream()
@@ -430,7 +430,7 @@ public class EsqlSession {
             }
             // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
             // based only on available clusters (which could now be an empty list)
-            String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
+            String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
             if (indexExpressionToResolve.isEmpty()) {
                 // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
                 listener.onResponse(
@@ -464,8 +464,8 @@ public class EsqlSession {
         ActionListener<PreAnalysisResult> l
     ) {
         IndexResolution indexResolution = result.indices;
-        EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
-        EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
+        EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
+        EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
         if (executionInfo.isCrossClusterSearch()
             && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
             // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

@@ -140,7 +140,7 @@ public class IndexResolver {
             fields.put(name, field);
         }
 
-        Map<String, FieldCapabilitiesFailure> unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(
+        Map<String, FieldCapabilitiesFailure> unavailableRemotes = EsqlCCSUtils.determineUnavailableRemoteClusters(
             fieldCapsResponse.getFailures()
         );
 

+ 209 - 141
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java → x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java

@@ -8,7 +8,9 @@
 package org.elasticsearch.xpack.esql.session;
 
 import org.apache.lucene.index.CorruptIndexException;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
 import org.elasticsearch.action.search.ShardSearchFailure;
@@ -20,6 +22,7 @@ import org.elasticsearch.license.License;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.license.internal.XPackLicenseStatus;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.NoSeedNodeLeftException;
@@ -47,26 +50,30 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
-import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.checkForCcsLicense;
+import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.checkForCcsLicense;
+import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.shouldIgnoreRuntimeError;
+import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.skipUnavailableListener;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
 
-public class EsqlSessionCCSUtilsTests extends ESTestCase {
+public class EsqlCCSUtilsTests extends ESTestCase {
+
+    private final String LOCAL_CLUSTER_ALIAS = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
+    private final String REMOTE1_ALIAS = "remote1";
+    private final String REMOTE2_ALIAS = "remote2";
 
     public void testCreateIndexExpressionFromAvailableClusters() {
-        final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
-        final String remote1Alias = "remote1";
-        final String remote2Alias = "remote2";
 
         // no clusters marked as skipped
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
-            executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
+            executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true));
 
-            String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
+            String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
             List<String> list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList();
             assertThat(list.size(), equalTo(5));
             assertThat(
@@ -78,19 +85,19 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         // one cluster marked as skipped, so not present in revised index expression
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true));
             executionInfo.swapCluster(
-                remote2Alias,
+                REMOTE2_ALIAS,
                 (k, v) -> new EsqlExecutionInfo.Cluster(
-                    remote2Alias,
+                    REMOTE2_ALIAS,
                     "mylogs1,mylogs2,logs*",
                     true,
                     EsqlExecutionInfo.Cluster.Status.SKIPPED
                 )
             );
 
-            String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
+            String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
             List<String> list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList();
             assertThat(list.size(), equalTo(3));
             assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo")));
@@ -99,73 +106,70 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         // two clusters marked as skipped, so only local cluster present in revised index expression
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
             executionInfo.swapCluster(
-                remote1Alias,
-                (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
+                REMOTE1_ALIAS,
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
             );
             executionInfo.swapCluster(
-                remote2Alias,
+                REMOTE2_ALIAS,
                 (k, v) -> new EsqlExecutionInfo.Cluster(
-                    remote2Alias,
+                    REMOTE2_ALIAS,
                     "mylogs1,mylogs2,logs*",
                     true,
                     EsqlExecutionInfo.Cluster.Status.SKIPPED
                 )
             );
 
-            assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*"));
+            assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*"));
         }
 
         // only remotes present and all marked as skipped, so in revised index expression should be empty string
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
             executionInfo.swapCluster(
-                remote1Alias,
-                (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
+                REMOTE1_ALIAS,
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
             );
             executionInfo.swapCluster(
-                remote2Alias,
+                REMOTE2_ALIAS,
                 (k, v) -> new EsqlExecutionInfo.Cluster(
-                    remote2Alias,
+                    REMOTE2_ALIAS,
                     "mylogs1,mylogs2,logs*",
                     true,
                     EsqlExecutionInfo.Cluster.Status.SKIPPED
                 )
             );
 
-            assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo(""));
+            assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo(""));
         }
     }
 
     public void testUpdateExecutionInfoWithUnavailableClusters() {
-        final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
-        final String remote1Alias = "remote1";
-        final String remote2Alias = "remote2";
 
         // skip_unavailable=true clusters are unavailable, both marked as SKIPPED
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
-            executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
+            executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true));
 
             var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
-            var unvailableClusters = Map.of(remote1Alias, failure, remote2Alias, failure);
-            EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters);
+            var unvailableClusters = Map.of(REMOTE1_ALIAS, failure, REMOTE2_ALIAS, failure);
+            EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters);
 
-            assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias)));
+            assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS)));
             assertNull(executionInfo.overallTook());
 
-            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias);
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
             assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
             assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
 
-            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
+            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
             assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
             assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED);
 
-            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
+            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
             assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*"));
             assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED);
         }
@@ -173,14 +177,17 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         // skip_unavailable=false cluster is unavailable, throws Exception
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
-            executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
+            executionInfo.swapCluster(
+                REMOTE2_ALIAS,
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)
+            );
 
             var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
             RemoteTransportException e = expectThrows(
                 RemoteTransportException.class,
-                () -> EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure))
+                () -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, failure))
             );
             assertThat(e.status().getStatus(), equalTo(500));
             assertThat(
@@ -193,42 +200,42 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         // all clusters available, no Clusters in ExecutionInfo should be modified
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true));
-            executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
+            executionInfo.swapCluster(
+                REMOTE2_ALIAS,
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)
+            );
 
-            EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of());
+            EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of());
 
-            assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias)));
+            assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS)));
             assertNull(executionInfo.overallTook());
 
-            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias);
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
             assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
             assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
 
-            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
+            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
             assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
             assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
 
-            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
+            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
             assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*"));
             assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
         }
     }
 
     public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
-        final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
-        final String remote1Alias = "remote1";
-        final String remote2Alias = "remote2";
 
         // all clusters had matching indices from field-caps call, so no updates to EsqlExecutionInfo should happen
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean()));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean()));
             executionInfo.swapCluster(
-                remote2Alias,
-                (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", randomBoolean())
+                REMOTE2_ALIAS,
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", randomBoolean())
             );
 
             EsIndex esIndex = new EsIndex(
@@ -251,17 +258,17 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
 
             IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of());
 
-            EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
+            EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
 
-            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias);
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
             assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
             assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
 
-            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
+            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
             assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
             assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
 
-            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
+            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
             assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*"));
             assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
         }
@@ -270,11 +277,11 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         // marked as SKIPPED with 0 total shards, 0 took time, etc.
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean()));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean()));
             executionInfo.swapCluster(
-                remote2Alias,
-                (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", randomBoolean())
+                REMOTE2_ALIAS,
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", randomBoolean())
             );
 
             EsIndex esIndex = new EsIndex(
@@ -295,13 +302,13 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
             Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of();
             IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
 
-            EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
+            EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
 
-            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias);
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
             assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
             assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
 
-            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
+            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
             assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
             assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
             assertThat(remote1Cluster.getTook().millis(), equalTo(0L));
@@ -310,7 +317,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
             assertThat(remote1Cluster.getSkippedShards(), equalTo(0));
             assertThat(remote1Cluster.getFailedShards(), equalTo(0));
 
-            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
+            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
             assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*"));
             assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
         }
@@ -320,11 +327,11 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         // marked as SKIPPED
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean()));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean()));
             executionInfo.swapCluster(
-                remote2Alias,
-                (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1*,mylogs2*,logs*", randomBoolean())
+                REMOTE2_ALIAS,
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1*,mylogs2*,logs*", randomBoolean())
             );
 
             EsIndex esIndex = new EsIndex(
@@ -334,22 +341,22 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
             );
             // remote1 is unavailable
             var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
-            Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(remote1Alias, failure);
+            Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(REMOTE1_ALIAS, failure);
             IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
 
-            EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
+            EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
 
-            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias);
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
             assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
             assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
 
-            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
+            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
             assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
             // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
             // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
             assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
 
-            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
+            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
             assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*"));
             assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
             assertThat(remote2Cluster.getTook().millis(), equalTo(0L));
@@ -363,11 +370,11 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         // but had no matching indices and since a concrete index was requested, a VerificationException is thrown
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*"));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean()));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*"));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean()));
             executionInfo.swapCluster(
-                remote2Alias,
-                (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", randomBoolean())
+                REMOTE2_ALIAS,
+                (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", randomBoolean())
             );
 
             EsIndex esIndex = new EsIndex(
@@ -377,11 +384,11 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
             );
 
             var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
-            Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(remote1Alias, failure);
+            Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(REMOTE1_ALIAS, failure);
             IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
             VerificationException ve = expectThrows(
                 VerificationException.class,
-                () -> EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution)
+                () -> EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution)
             );
             assertThat(ve.getDetailedMessage(), containsString("Unknown index [remote2:mylogs1,mylogs2,logs*]"));
         }
@@ -390,13 +397,13 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         // (the EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters() method handles that case not the one tested here)
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*"));
-            executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean()));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*"));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean()));
             // remote2 is already marked as SKIPPED (simulating failed enrich policy lookup due to unavailable cluster)
             executionInfo.swapCluster(
-                remote2Alias,
+                REMOTE2_ALIAS,
                 (k, v) -> new EsqlExecutionInfo.Cluster(
-                    remote2Alias,
+                    REMOTE2_ALIAS,
                     "mylogs1*,mylogs2*,logs*",
                     randomBoolean(),
                     EsqlExecutionInfo.Cluster.Status.SKIPPED
@@ -411,22 +418,22 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
 
             // remote1 is unavailable
             var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
-            Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(remote1Alias, failure);
+            Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(REMOTE1_ALIAS, failure);
             IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
 
-            EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
+            EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
 
-            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias);
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
             assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
             assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
 
-            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
+            EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
             assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
             // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
             // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
             assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
 
-            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
+            EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
             assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*"));
             assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
         }
@@ -444,7 +451,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
                 )
             );
 
-            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures);
+            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures);
             assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2")));
         }
 
@@ -454,7 +461,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
             failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2")));
             failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node")));
 
-            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures);
+            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures);
             assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2")));
         }
 
@@ -468,7 +475,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
                     new IllegalStateException("Unable to open any connections")
                 )
             );
-            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures);
+            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures);
             assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2")));
         }
 
@@ -476,29 +483,28 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         {
             List<FieldCapabilitiesFailure> failures = new ArrayList<>();
             failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo")));
-            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures);
+            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures);
             assertThat(unavailableClusters.keySet(), equalTo(Set.of()));
         }
 
         // empty failures list
         {
             List<FieldCapabilitiesFailure> failures = new ArrayList<>();
-            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures);
+            Map<String, FieldCapabilitiesFailure> unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures);
             assertThat(unavailableClusters.keySet(), equalTo(Set.of()));
         }
     }
 
     public void testUpdateExecutionInfoAtEndOfPlanning() {
-        String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
-        String remote1Alias = "remote1";
-        String remote2Alias = "remote2";
+        String REMOTE1_ALIAS = "remote1";
+        String REMOTE2_ALIAS = "remote2";
         EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
-        executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false));
+        executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
         executionInfo.swapCluster(
-            remote1Alias,
-            (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
+            REMOTE1_ALIAS,
+            (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
         );
-        executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false));
+        executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false));
 
         assertNull(executionInfo.planningTookTime());
         assertNull(executionInfo.overallTook());
@@ -506,7 +512,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
             Thread.sleep(1);
         } catch (InterruptedException e) {}
 
-        EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
+        EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
 
         assertThat(executionInfo.planningTookTime().millis(), greaterThanOrEqualTo(0L));
         assertNull(executionInfo.overallTook());
@@ -517,7 +523,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         assertNull(localCluster.getTotalShards());
         assertNull(localCluster.getTook());
 
-        EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
+        EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
         assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
         assertThat(remote1Cluster.getTotalShards(), equalTo(0));
         assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0));
@@ -526,7 +532,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L));
         assertThat(remote1Cluster.getTook().millis(), equalTo(executionInfo.planningTookTime().millis()));
 
-        EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
+        EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
         assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
         assertNull(remote2Cluster.getTotalShards());
         assertNull(remote2Cluster.getTook());
@@ -534,7 +540,10 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
 
     private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) {
         assertThat(cluster.getStatus(), equalTo(status));
-        assertNull(cluster.getTook());
+        if (cluster.getTook() != null) {
+            // It is also ok if it's null in some tests
+            assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
+        }
         if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) {
             assertNull(cluster.getTotalShards());
             assertNull(cluster.getSuccessfulShards());
@@ -545,6 +554,11 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
             assertThat(cluster.getSuccessfulShards(), equalTo(0));
             assertThat(cluster.getSkippedShards(), equalTo(0));
             assertThat(cluster.getFailedShards(), equalTo(0));
+        } else if (status == EsqlExecutionInfo.Cluster.Status.PARTIAL) {
+            assertThat(cluster.getTotalShards(), equalTo(0));
+            assertThat(cluster.getSuccessfulShards(), equalTo(0));
+            assertThat(cluster.getSkippedShards(), equalTo(0));
+            assertThat(cluster.getFailedShards(), equalTo(0));
         } else {
             fail("Unexpected status: " + status);
         }
@@ -560,35 +574,32 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
     }
 
     public void testReturnSuccessWithEmptyResult() {
-        String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
-        String remote1Alias = "remote1";
-        String remote2Alias = "remote2";
         String remote3Alias = "remote3";
         NoClustersToSearchException noClustersException = new NoClustersToSearchException();
         Predicate<String> skipUnPredicate = s -> {
-            if (s.equals("remote2") || s.equals("remote3")) {
+            if (s.equals(REMOTE2_ALIAS) || s.equals("remote3")) {
                 return true;
             }
             return false;
         };
 
-        EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false);
-        EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", false);
-        EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true);
+        EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false);
+        EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "logs*", false);
+        EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "logs*", true);
         EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true);
 
         // not a cross-cluster cluster search, so do not return empty result
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean());
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster);
-            assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException));
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> localCluster);
+            assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException));
         }
 
         // local cluster is present, so do not return empty result
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean());
-            executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster);
-            executionInfo.swapCluster(remote1Alias, (k, v) -> remote1);
+            executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> localCluster);
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> remote1);
             // TODO: this logic will be added in the follow-on PR that handles missing indices
             // assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException));
         }
@@ -596,16 +607,16 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         // remote-only, one cluster is skip_unavailable=false, so do not return empty result
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean());
-            executionInfo.swapCluster(remote1Alias, (k, v) -> remote1);
-            executionInfo.swapCluster(remote2Alias, (k, v) -> remote2);
-            assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException));
+            executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> remote1);
+            executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> remote2);
+            assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException));
         }
 
         // remote-only, all clusters are skip_unavailable=true, so should return empty result with
         // NoSuchClustersException or "remote unavailable" type exception
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean());
-            executionInfo.swapCluster(remote2Alias, (k, v) -> remote2);
+            executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> remote2);
             executionInfo.swapCluster(remote3Alias, (k, v) -> remote3);
             Exception e = randomFrom(
                 new NoSuchRemoteClusterException("foo"),
@@ -613,23 +624,22 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
                 new NoSeedNodeLeftException("foo"),
                 new IllegalStateException("unknown host")
             );
-            assertTrue(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e));
+            assertTrue(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, e));
         }
 
         // remote-only, all clusters are skip_unavailable=true, but exception is not "remote unavailable" so return false
         // Note: this functionality may change in follow-on PRs, so remove this test in that case
         {
             EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean());
-            executionInfo.swapCluster(remote2Alias, (k, v) -> remote2);
+            executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> remote2);
             executionInfo.swapCluster(remote3Alias, (k, v) -> remote3);
-            assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException()));
+            assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException()));
         }
     }
 
     public void testUpdateExecutionInfoToReturnEmptyResult() {
-        String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
-        String remote1Alias = "remote1";
-        String remote2Alias = "remote2";
+        String REMOTE1_ALIAS = "remote1";
+        String REMOTE2_ALIAS = "remote2";
         String remote3Alias = "remote3";
         ConnectTransportException transportEx = new ConnectTransportException(null, "foo");
         Predicate<String> skipUnPredicate = s -> {
@@ -639,9 +649,9 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
             return false;
         };
 
-        EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false);
-        EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", true);
-        EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true);
+        EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false);
+        EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "logs*", true);
+        EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "logs*", true);
         EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true);
 
         EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean());
@@ -652,13 +662,13 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
 
         assertNull(executionInfo.overallTook());
 
-        EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx);
+        EsqlCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx);
 
         assertNotNull(executionInfo.overallTook());
-        assertThat(executionInfo.getCluster(localClusterAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
-        assertThat(executionInfo.getCluster(localClusterAlias).getFailures().size(), equalTo(0));
+        assertThat(executionInfo.getCluster(LOCAL_CLUSTER_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+        assertThat(executionInfo.getCluster(LOCAL_CLUSTER_ALIAS).getFailures().size(), equalTo(0));
 
-        for (String remoteAlias : Set.of(remote1Alias, remote2Alias, remote3Alias)) {
+        for (String remoteAlias : Set.of(REMOTE1_ALIAS, REMOTE2_ALIAS, remote3Alias)) {
             assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
             List<ShardSearchFailure> remoteFailures = executionInfo.getCluster(remoteAlias).getFailures();
             assertThat(remoteFailures.size(), equalTo(1));
@@ -667,11 +677,11 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
     }
 
     public void testConcreteIndexRequested() {
-        assertThat(EsqlSessionCCSUtils.concreteIndexRequested("logs*"), equalTo(false));
-        assertThat(EsqlSessionCCSUtils.concreteIndexRequested("mylogs1,mylogs2,logs*"), equalTo(true));
-        assertThat(EsqlSessionCCSUtils.concreteIndexRequested("x*,logs"), equalTo(true));
-        assertThat(EsqlSessionCCSUtils.concreteIndexRequested("logs,metrics"), equalTo(true));
-        assertThat(EsqlSessionCCSUtils.concreteIndexRequested("*"), equalTo(false));
+        assertThat(EsqlCCSUtils.concreteIndexRequested("logs*"), equalTo(false));
+        assertThat(EsqlCCSUtils.concreteIndexRequested("mylogs1,mylogs2,logs*"), equalTo(true));
+        assertThat(EsqlCCSUtils.concreteIndexRequested("x*,logs"), equalTo(true));
+        assertThat(EsqlCCSUtils.concreteIndexRequested("logs,metrics"), equalTo(true));
+        assertThat(EsqlCCSUtils.concreteIndexRequested("*"), equalTo(false));
     }
 
     public void testCheckForCcsLicense() {
@@ -758,6 +768,64 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         }
     }
 
+    public void testShouldIgnoreRuntimeError() {
+        Predicate<String> skipUnPredicate = s -> s.equals(REMOTE1_ALIAS);
+
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true);
+        executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+        executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
+        executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false));
+
+        // remote1: skip_unavailable=true, so should ignore connect errors, but not others
+        assertThat(
+            shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new IllegalStateException("Unable to open any connections")),
+            is(true)
+        );
+        assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new TaskCancelledException("task cancelled")), is(false));
+        assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new ElasticsearchException("something is wrong")), is(false));
+        // remote2: skip_unavailable=false, so should not ignore any errors
+        assertThat(
+            shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new IllegalStateException("Unable to open any connections")),
+            is(false)
+        );
+        assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new TaskCancelledException("task cancelled")), is(false));
+        // same for local
+        assertThat(
+            shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new IllegalStateException("Unable to open any connections")),
+            is(false)
+        );
+        assertThat(shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new TaskCancelledException("task cancelled")), is(false));
+    }
+
+    public void testSkipUnavailableListener() {
+        Predicate<String> skipUnPredicate = s -> s.equals(REMOTE1_ALIAS);
+
+        EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true);
+        executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
+        executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
+        executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false));
+
+        ActionListener<Void> expectResult = ActionListener.wrap(unused -> {}, (e) -> fail("Listener should not have failed"));
+        ActionListener<Void> expectFailure = ActionListener.wrap(unused -> fail("Listener should have failed"), (e) -> {});
+
+        // snip_unavailable=true but not connect exception, so should fail
+        skipUnavailableListener(expectFailure, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure(
+            new ElasticsearchException("something is wrong")
+        );
+        assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
+
+        // snip_unavailable=true, so should not fail
+        skipUnavailableListener(expectResult, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure(
+            new IllegalStateException("Unable to open any connections")
+        );
+        assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
+        // snip_unavailable=false, so should fail
+        skipUnavailableListener(expectFailure, executionInfo, REMOTE2_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure(
+            new IllegalStateException("Unable to open any connections")
+        );
+
+    }
+
     private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) {
         return new XPackLicenseStatus(operationMode, true, null);
     }