Browse Source

Fix BWC for ES|QL cluster request (#117865) (#117901)

We identified a BWC bug in the cluster computer request. Specifically, 
the indices options were not properly selected for requests from an
older querying cluster. This caused the search_shards API on the remote
cluster to use restricted indices options, leading to failures when
resolving wildcard index patterns.

Our tests didn't catch this issue because the current BWC tests for 
cross-cluster queries only cover one direction: the querying cluster on
the current version and the remote cluster on a compatible version.

This PR fixes the issue and expands BWC tests to support both 
directions: the querying cluster on the current version with the remote
cluster on a compatible version, and vice versa.
Nhat Nguyen 10 months ago
parent
commit
9b5926043e

+ 5 - 0
docs/changelog/117865.yaml

@@ -0,0 +1,5 @@
+pr: 117865
+summary: Fix BWC for ES|QL cluster request
+area: ES|QL
+type: bug
+issues: []

+ 15 - 2
x-pack/plugin/esql/qa/server/multi-clusters/build.gradle

@@ -24,9 +24,22 @@ def supportedVersion = bwcVersion -> {
 }
 
 buildParams.bwcVersions.withWireCompatible(supportedVersion) { bwcVersion, baseName ->
-  tasks.register(bwcTaskName(bwcVersion), StandaloneRestIntegTestTask) {
+  tasks.register("${baseName}#newToOld", StandaloneRestIntegTestTask) {
+    usesBwcDistribution(bwcVersion)
+    systemProperty("tests.version.remote_cluster", bwcVersion)
+    maxParallelForks = 1
+  }
+
+  tasks.register("${baseName}#oldToNew", StandaloneRestIntegTestTask) {
     usesBwcDistribution(bwcVersion)
-    systemProperty("tests.old_cluster_version", bwcVersion)
+    systemProperty("tests.version.local_cluster", bwcVersion)
+    maxParallelForks = 1
+  }
+
+  // TODO: avoid running tests twice with the current version
+  tasks.register(bwcTaskName(bwcVersion), StandaloneRestIntegTestTask) {
+    dependsOn tasks.named("${baseName}#oldToNew")
+    dependsOn tasks.named("${baseName}#newToOld")
     maxParallelForks = 1
   }
 }

+ 15 - 4
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java

@@ -20,7 +20,7 @@ public class Clusters {
         return ElasticsearchCluster.local()
             .name(REMOTE_CLUSTER_NAME)
             .distribution(DistributionType.DEFAULT)
-            .version(Version.fromString(System.getProperty("tests.old_cluster_version")))
+            .version(distributionVersion("tests.version.remote_cluster"))
             .nodes(2)
             .setting("node.roles", "[data,ingest,master]")
             .setting("xpack.security.enabled", "false")
@@ -34,7 +34,7 @@ public class Clusters {
         return ElasticsearchCluster.local()
             .name(LOCAL_CLUSTER_NAME)
             .distribution(DistributionType.DEFAULT)
-            .version(Version.CURRENT)
+            .version(distributionVersion("tests.version.local_cluster"))
             .nodes(2)
             .setting("xpack.security.enabled", "false")
             .setting("xpack.license.self_generated.type", "trial")
@@ -46,7 +46,18 @@ public class Clusters {
             .build();
     }
 
-    public static org.elasticsearch.Version oldVersion() {
-        return org.elasticsearch.Version.fromString(System.getProperty("tests.old_cluster_version"));
+    public static org.elasticsearch.Version localClusterVersion() {
+        String prop = System.getProperty("tests.version.local_cluster");
+        return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
+    }
+
+    public static org.elasticsearch.Version remoteClusterVersion() {
+        String prop = System.getProperty("tests.version.remote_cluster");
+        return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
+    }
+
+    private static Version distributionVersion(String key) {
+        final String val = System.getProperty(key);
+        return val != null ? Version.fromString(val) : Version.CURRENT;
     }
 }

+ 7 - 0
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java

@@ -10,12 +10,14 @@ package org.elasticsearch.xpack.esql.ccq;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 
 import org.apache.http.HttpHost;
+import org.elasticsearch.Version;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.test.TestClustersThreadFilter;
 import org.elasticsearch.test.cluster.ElasticsearchCluster;
 import org.elasticsearch.xpack.esql.qa.rest.EsqlRestValidationTestCase;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.rules.RuleChain;
 import org.junit.rules.TestRule;
@@ -78,4 +80,9 @@ public class EsqlRestValidationIT extends EsqlRestValidationTestCase {
         }
         return remoteClient;
     }
+
+    @Before
+    public void skipTestOnOldVersions() {
+        assumeTrue("skip on old versions", Clusters.localClusterVersion().equals(Version.V_8_16_0));
+    }
 }

+ 3 - 4
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -12,6 +12,7 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
+import org.elasticsearch.Version;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
@@ -118,10 +119,8 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
         // Do not run tests including "METADATA _index" unless marked with metadata_fields_remote_test,
         // because they may produce inconsistent results with multiple clusters.
         assumeFalse("can't test with _index metadata", (remoteMetadata == false) && hasIndexMetadata(testCase.query));
-        assumeTrue(
-            "Test " + testName + " is skipped on " + Clusters.oldVersion(),
-            isEnabled(testName, instructions, Clusters.oldVersion())
-        );
+        Version oldVersion = Version.min(Clusters.localClusterVersion(), Clusters.remoteClusterVersion());
+        assumeTrue("Test " + testName + " is skipped on " + oldVersion, isEnabled(testName, instructions, oldVersion));
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
         assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));

+ 75 - 29
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.esql.ccq;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 
 import org.apache.http.HttpHost;
+import org.elasticsearch.Version;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.common.Strings;
@@ -29,7 +30,6 @@ import org.junit.rules.TestRule;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -127,10 +127,12 @@ public class MultiClustersIT extends ESRestTestCase {
     }
 
     private Map<String, Object> run(String query, boolean includeCCSMetadata) throws IOException {
-        Map<String, Object> resp = runEsql(
-            new RestEsqlTestCase.RequestObjectBuilder().query(query).includeCCSMetadata(includeCCSMetadata).build()
-        );
-        logger.info("--> query {} response {}", query, resp);
+        var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query);
+        if (includeCCSMetadata) {
+            queryBuilder.includeCCSMetadata(true);
+        }
+        Map<String, Object> resp = runEsql(queryBuilder.build());
+        logger.info("--> query {} response {}", queryBuilder, resp);
         return resp;
     }
 
@@ -156,7 +158,7 @@ public class MultiClustersIT extends ESRestTestCase {
 
     public void testCount() throws Exception {
         {
-            boolean includeCCSMetadata = randomBoolean();
+            boolean includeCCSMetadata = includeCCSMetadata();
             Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
             var columns = List.of(Map.of("name", "c", "type", "long"));
             var values = List.of(List.of(localDocs.size() + remoteDocs.size()));
@@ -165,13 +167,16 @@ public class MultiClustersIT extends ESRestTestCase {
             if (includeCCSMetadata) {
                 mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
             }
-            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (ccsMetadataAvailable()) {
+                mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
             if (includeCCSMetadata) {
                 assertClusterDetailsMap(result, false);
             }
         }
         {
-            boolean includeCCSMetadata = randomBoolean();
+            boolean includeCCSMetadata = includeCCSMetadata();
             Map<String, Object> result = run("FROM *:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
             var columns = List.of(Map.of("name", "c", "type", "long"));
             var values = List.of(List.of(remoteDocs.size()));
@@ -180,7 +185,10 @@ public class MultiClustersIT extends ESRestTestCase {
             if (includeCCSMetadata) {
                 mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
             }
-            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (ccsMetadataAvailable()) {
+                mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
             if (includeCCSMetadata) {
                 assertClusterDetailsMap(result, true);
             }
@@ -189,7 +197,7 @@ public class MultiClustersIT extends ESRestTestCase {
 
     public void testUngroupedAggs() throws Exception {
         {
-            boolean includeCCSMetadata = randomBoolean();
+            boolean includeCCSMetadata = includeCCSMetadata();
             Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
             var columns = List.of(Map.of("name", "total", "type", "long"));
             long sum = Stream.concat(localDocs.stream(), remoteDocs.stream()).mapToLong(d -> d.data).sum();
@@ -200,13 +208,16 @@ public class MultiClustersIT extends ESRestTestCase {
             if (includeCCSMetadata) {
                 mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
             }
-            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (ccsMetadataAvailable()) {
+                mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
             if (includeCCSMetadata) {
                 assertClusterDetailsMap(result, false);
             }
         }
         {
-            boolean includeCCSMetadata = randomBoolean();
+            boolean includeCCSMetadata = includeCCSMetadata();
             Map<String, Object> result = run("FROM *:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
             var columns = List.of(Map.of("name", "total", "type", "long"));
             long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
@@ -216,12 +227,16 @@ public class MultiClustersIT extends ESRestTestCase {
             if (includeCCSMetadata) {
                 mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
             }
-            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (ccsMetadataAvailable()) {
+                mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
             if (includeCCSMetadata) {
                 assertClusterDetailsMap(result, true);
             }
         }
         {
+            assumeTrue("requires ccs metadata", ccsMetadataAvailable());
             Map<String, Object> result = runWithColumnarAndIncludeCCSMetadata("FROM *:test-remote-index | STATS total = SUM(data)");
             var columns = List.of(Map.of("name", "total", "type", "long"));
             long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
@@ -293,7 +308,7 @@ public class MultiClustersIT extends ESRestTestCase {
 
     public void testGroupedAggs() throws Exception {
         {
-            boolean includeCCSMetadata = randomBoolean();
+            boolean includeCCSMetadata = includeCCSMetadata();
             Map<String, Object> result = run(
                 "FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color",
                 includeCCSMetadata
@@ -311,13 +326,16 @@ public class MultiClustersIT extends ESRestTestCase {
             if (includeCCSMetadata) {
                 mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
             }
-            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (ccsMetadataAvailable()) {
+                mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
             if (includeCCSMetadata) {
                 assertClusterDetailsMap(result, false);
             }
         }
         {
-            boolean includeCCSMetadata = randomBoolean();
+            boolean includeCCSMetadata = includeCCSMetadata();
             Map<String, Object> result = run(
                 "FROM *:test-remote-index | STATS total = SUM(data) by color | SORT color",
                 includeCCSMetadata
@@ -336,29 +354,57 @@ public class MultiClustersIT extends ESRestTestCase {
             if (includeCCSMetadata) {
                 mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
             }
-            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
+            if (ccsMetadataAvailable()) {
+                mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
             if (includeCCSMetadata) {
                 assertClusterDetailsMap(result, true);
             }
         }
     }
 
+    public void testIndexPattern() throws Exception {
+        {
+            String indexPattern = randomFrom(
+                "test-local-index,*:test-remote-index",
+                "test-local-index,*:test-remote-*",
+                "test-local-index,*:test-*",
+                "test-*,*:test-remote-index"
+            );
+            Map<String, Object> result = run("FROM " + indexPattern + " | STATS c = COUNT(*)", false);
+            var columns = List.of(Map.of("name", "c", "type", "long"));
+            var values = List.of(List.of(localDocs.size() + remoteDocs.size()));
+            MapMatcher mapMatcher = matchesMap();
+            if (ccsMetadataAvailable()) {
+                mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
+        }
+        {
+            String indexPattern = randomFrom("*:test-remote-index", "*:test-remote-*", "*:test-*");
+            Map<String, Object> result = run("FROM " + indexPattern + " | STATS c = COUNT(*)", false);
+            var columns = List.of(Map.of("name", "c", "type", "long"));
+            var values = List.of(List.of(remoteDocs.size()));
+
+            MapMatcher mapMatcher = matchesMap();
+            if (ccsMetadataAvailable()) {
+                mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
+            }
+            assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
+        }
+    }
+
     private RestClient remoteClusterClient() throws IOException {
         var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
         return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
     }
 
-    private TestFeatureService remoteFeaturesService() throws IOException {
-        if (remoteFeaturesService == null) {
-            try (RestClient remoteClient = remoteClusterClient()) {
-                var remoteNodeVersions = readVersionsFromNodesInfo(remoteClient);
-                var semanticNodeVersions = remoteNodeVersions.stream()
-                    .map(ESRestTestCase::parseLegacyVersion)
-                    .flatMap(Optional::stream)
-                    .collect(Collectors.toSet());
-                remoteFeaturesService = createTestFeatureService(getClusterStateFeatures(remoteClient), semanticNodeVersions);
-            }
-        }
-        return remoteFeaturesService;
+    private static boolean ccsMetadataAvailable() {
+        return Clusters.localClusterVersion().onOrAfter(Version.V_8_16_0);
+    }
+
+    private static boolean includeCCSMetadata() {
+        return ccsMetadataAvailable() && randomBoolean();
     }
 }

+ 0 - 1
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -76,7 +76,6 @@ public class RestEsqlIT extends RestEsqlTestCase {
         indexTimestampData(1);
 
         RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | stats avg(value)");
-        requestObjectBuilder().includeCCSMetadata(randomBoolean());
         if (Build.current().isSnapshot()) {
             builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
         }

+ 19 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteClusterPlan.java

@@ -9,12 +9,14 @@ package org.elasticsearch.xpack.esql.plugin;
 
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.OriginalIndices;
-import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
 
 record RemoteClusterPlan(PhysicalPlan plan, String[] targetIndices, OriginalIndices originalIndices) {
     static RemoteClusterPlan from(PlanStreamInput planIn) throws IOException {
@@ -24,7 +26,8 @@ record RemoteClusterPlan(PhysicalPlan plan, String[] targetIndices, OriginalIndi
         if (planIn.getTransportVersion().onOrAfter(TransportVersions.ESQL_ORIGINAL_INDICES)) {
             originalIndices = OriginalIndices.readOriginalIndices(planIn);
         } else {
-            originalIndices = new OriginalIndices(planIn.readStringArray(), IndicesOptions.strictSingleIndexNoExpandForbidClosed());
+            // fallback to the previous behavior
+            originalIndices = new OriginalIndices(planIn.readStringArray(), SearchRequest.DEFAULT_INDICES_OPTIONS);
         }
         return new RemoteClusterPlan(plan, targetIndices, originalIndices);
     }
@@ -38,4 +41,18 @@ record RemoteClusterPlan(PhysicalPlan plan, String[] targetIndices, OriginalIndi
             out.writeStringArray(originalIndices.indices());
         }
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        RemoteClusterPlan that = (RemoteClusterPlan) o;
+        return Objects.equals(plan, that.plan)
+            && Objects.deepEquals(targetIndices, that.targetIndices)
+            && Objects.equals(originalIndices, that.originalIndices);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(plan, Arrays.hashCode(targetIndices), originalIndices);
+    }
 }

+ 206 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ClusterRequestTests.java

@@ -0,0 +1,206 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plugin;
+
+import org.elasticsearch.TransportVersions;
+import org.elasticsearch.action.OriginalIndices;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.TransportVersionUtils;
+import org.elasticsearch.xpack.esql.ConfigurationTestUtils;
+import org.elasticsearch.xpack.esql.EsqlTestUtils;
+import org.elasticsearch.xpack.esql.analysis.Analyzer;
+import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
+import org.elasticsearch.xpack.esql.core.type.EsField;
+import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
+import org.elasticsearch.xpack.esql.index.EsIndex;
+import org.elasticsearch.xpack.esql.index.IndexResolution;
+import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
+import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
+import org.elasticsearch.xpack.esql.parser.EsqlParser;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration;
+import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomTables;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyPolicyResolution;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ClusterRequestTests extends AbstractWireSerializingTestCase<ClusterComputeRequest> {
+
+    @Override
+    protected Writeable.Reader<ClusterComputeRequest> instanceReader() {
+        return ClusterComputeRequest::new;
+    }
+
+    @Override
+    protected NamedWriteableRegistry getNamedWriteableRegistry() {
+        List<NamedWriteableRegistry.Entry> writeables = new ArrayList<>();
+        writeables.addAll(new SearchModule(Settings.EMPTY, List.of()).getNamedWriteables());
+        writeables.addAll(new EsqlPlugin().getNamedWriteables());
+        return new NamedWriteableRegistry(writeables);
+    }
+
+    @Override
+    protected ClusterComputeRequest createTestInstance() {
+        var sessionId = randomAlphaOfLength(10);
+        String query = randomQuery();
+        PhysicalPlan physicalPlan = DataNodeRequestTests.mapAndMaybeOptimize(parse(query));
+        OriginalIndices originalIndices = new OriginalIndices(
+            generateRandomStringArray(10, 10, false, false),
+            IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
+        );
+        String[] targetIndices = generateRandomStringArray(10, 10, false, false);
+        ClusterComputeRequest request = new ClusterComputeRequest(
+            randomAlphaOfLength(10),
+            sessionId,
+            randomConfiguration(query, randomTables()),
+            new RemoteClusterPlan(physicalPlan, targetIndices, originalIndices)
+        );
+        request.setParentTask(randomAlphaOfLength(10), randomNonNegativeLong());
+        return request;
+    }
+
+    @Override
+    protected ClusterComputeRequest mutateInstance(ClusterComputeRequest in) throws IOException {
+        return switch (between(0, 4)) {
+            case 0 -> {
+                var request = new ClusterComputeRequest(
+                    randomValueOtherThan(in.clusterAlias(), () -> randomAlphaOfLength(10)),
+                    in.sessionId(),
+                    in.configuration(),
+                    in.remoteClusterPlan()
+                );
+                request.setParentTask(in.getParentTask());
+                yield request;
+            }
+            case 1 -> {
+                var request = new ClusterComputeRequest(
+                    in.clusterAlias(),
+                    randomValueOtherThan(in.sessionId(), () -> randomAlphaOfLength(10)),
+                    in.configuration(),
+                    in.remoteClusterPlan()
+                );
+                request.setParentTask(in.getParentTask());
+                yield request;
+            }
+            case 2 -> {
+                var request = new ClusterComputeRequest(
+                    in.clusterAlias(),
+                    in.sessionId(),
+                    randomValueOtherThan(in.configuration(), ConfigurationTestUtils::randomConfiguration),
+                    in.remoteClusterPlan()
+                );
+                request.setParentTask(in.getParentTask());
+                yield request;
+            }
+            case 3 -> {
+                RemoteClusterPlan plan = in.remoteClusterPlan();
+                var request = new ClusterComputeRequest(
+                    in.clusterAlias(),
+                    in.sessionId(),
+                    in.configuration(),
+                    new RemoteClusterPlan(
+                        plan.plan(),
+                        randomValueOtherThan(plan.targetIndices(), () -> generateRandomStringArray(10, 10, false, false)),
+                        plan.originalIndices()
+                    )
+                );
+                request.setParentTask(in.getParentTask());
+                yield request;
+            }
+            case 4 -> {
+                RemoteClusterPlan plan = in.remoteClusterPlan();
+                var request = new ClusterComputeRequest(
+                    in.clusterAlias(),
+                    in.sessionId(),
+                    in.configuration(),
+                    new RemoteClusterPlan(
+                        plan.plan(),
+                        plan.targetIndices(),
+                        new OriginalIndices(
+                            plan.originalIndices().indices(),
+                            randomValueOtherThan(
+                                plan.originalIndices().indicesOptions(),
+                                () -> IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
+                            )
+                        )
+                    )
+                );
+                request.setParentTask(in.getParentTask());
+                yield request;
+            }
+            default -> throw new AssertionError("invalid value");
+        };
+    }
+
+    public void testFallbackIndicesOptions() throws Exception {
+        ClusterComputeRequest request = createTestInstance();
+        var version = TransportVersionUtils.randomVersionBetween(
+            random(),
+            TransportVersions.V_8_14_0,
+            TransportVersions.ESQL_ORIGINAL_INDICES
+        );
+        ClusterComputeRequest cloned = copyInstance(request, version);
+        assertThat(cloned.clusterAlias(), equalTo(request.clusterAlias()));
+        assertThat(cloned.sessionId(), equalTo(request.sessionId()));
+        assertThat(cloned.configuration(), equalTo(request.configuration()));
+        RemoteClusterPlan plan = cloned.remoteClusterPlan();
+        assertThat(plan.plan(), equalTo(request.remoteClusterPlan().plan()));
+        assertThat(plan.targetIndices(), equalTo(request.remoteClusterPlan().targetIndices()));
+        OriginalIndices originalIndices = plan.originalIndices();
+        assertThat(originalIndices.indices(), equalTo(request.remoteClusterPlan().originalIndices().indices()));
+        assertThat(originalIndices.indicesOptions(), equalTo(SearchRequest.DEFAULT_INDICES_OPTIONS));
+    }
+
+    private static String randomQuery() {
+        return randomFrom("""
+            from test
+            | where round(emp_no) > 10
+            | limit 10
+            """, """
+            from test
+            | sort last_name
+            | limit 10
+            | where round(emp_no) > 10
+            | eval c = first_name
+            """);
+    }
+
+    static LogicalPlan parse(String query) {
+        Map<String, EsField> mapping = loadMapping("mapping-basic.json");
+        EsIndex test = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD));
+        IndexResolution getIndexResult = IndexResolution.valid(test);
+        var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(TEST_CFG));
+        var analyzer = new Analyzer(
+            new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, emptyPolicyResolution()),
+            TEST_VERIFIER
+        );
+        return logicalOptimizer.optimize(analyzer.analyze(new EsqlParser().createStatement(query)));
+    }
+
+    @Override
+    protected List<String> filteredWarnings() {
+        return withDefaultLimitWarning(super.filteredWarnings());
+    }
+}