Browse Source

Fix CCS exchange when multi cluster aliases point to same cluster (#117297) (#117389)

[esql] > Unexpected error from Elasticsearch: illegal_state_exception - sink exchanger for id [ruxoDDxXTGW55oIPHoCT-g:964613010] already exists.

This issue occurs when two or more clusterAliases point to the same 
physical remote cluster. The exchange service assumes the destination is
unique, which is not true in this topology. This PR addresses the
problem by appending a suffix using a monotonic increasing number,
ensuring that different exchanges are created in such cases.

Another issue arising from this behavior is that data on a remote 
cluster is processed multiple times, leading to incorrect results. I can
work on the fix for this once we agree that this is an issue.
Nhat Nguyen 10 months ago
parent
commit
3fbf04eb56

+ 5 - 0
docs/changelog/117297.yaml

@@ -0,0 +1,5 @@
+pr: 117297
+summary: Fix CCS exchange when multi cluster aliases point to same cluster
+area: ES|QL
+type: bug
+issues: []

+ 20 - 9
test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java

@@ -17,6 +17,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.plugins.Plugin;
@@ -44,6 +45,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING;
 import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.not;
@@ -149,19 +151,23 @@ public abstract class AbstractMultiClustersTestCase extends ESTestCase {
     }
 
     protected void disconnectFromRemoteClusters() throws Exception {
-        Settings.Builder settings = Settings.builder();
         final Set<String> clusterAliases = clusterGroup.clusterAliases();
         for (String clusterAlias : clusterAliases) {
             if (clusterAlias.equals(LOCAL_CLUSTER) == false) {
-                settings.putNull("cluster.remote." + clusterAlias + ".seeds");
-                settings.putNull("cluster.remote." + clusterAlias + ".mode");
-                settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
+                removeRemoteCluster(clusterAlias);
             }
         }
+    }
+
+    protected void removeRemoteCluster(String clusterAlias) throws Exception {
+        Settings.Builder settings = Settings.builder();
+        settings.putNull("cluster.remote." + clusterAlias + ".seeds");
+        settings.putNull("cluster.remote." + clusterAlias + ".mode");
+        settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
         client().admin().cluster().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).get();
         assertBusy(() -> {
             for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
-                assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), empty());
+                assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), not(contains(clusterAlias)));
             }
         });
     }
@@ -178,12 +184,17 @@ public abstract class AbstractMultiClustersTestCase extends ESTestCase {
     }
 
     protected void configureRemoteCluster(String clusterAlias, Collection<String> seedNodes) throws Exception {
-        final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + ".";
-        Settings.Builder settings = Settings.builder();
-        final List<String> seedAddresses = seedNodes.stream().map(node -> {
+        final var seedAddresses = seedNodes.stream().map(node -> {
             final TransportService transportService = cluster(clusterAlias).getInstance(TransportService.class, node);
-            return transportService.boundAddress().publishAddress().toString();
+            return transportService.boundAddress().publishAddress();
         }).toList();
+        configureRemoteClusterWithSeedAddresses(clusterAlias, seedAddresses);
+    }
+
+    protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Collection<TransportAddress> seedNodes) throws Exception {
+        final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + ".";
+        Settings.Builder settings = Settings.builder();
+        final List<String> seedAddresses = seedNodes.stream().map(TransportAddress::toString).toList();
         boolean skipUnavailable = skipUnavailableForRemoteClusters().containsKey(clusterAlias)
             ? skipUnavailableForRemoteClusters().get(clusterAlias)
             : DEFAULT_SKIP_UNAVAILABLE;

+ 5 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

@@ -40,6 +40,7 @@ import org.elasticsearch.transport.Transports;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -339,6 +340,10 @@ public final class ExchangeService extends AbstractLifecycleComponent {
         return sinks.isEmpty();
     }
 
+    public Set<String> sinkKeys() {
+        return sinks.keySet();
+    }
+
     @Override
     protected void doStart() {
 

+ 46 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.action;
 
+import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -15,6 +16,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.compute.operator.DriverTaskRunner;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.core.TimeValue;
@@ -27,8 +29,10 @@ import org.elasticsearch.script.ScriptEngine;
 import org.elasticsearch.search.lookup.SearchLookup;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.test.AbstractMultiClustersTestCase;
+import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.esql.plugin.ComputeService;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.junit.Before;
 
@@ -40,8 +44,10 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
 import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomPragmas;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 
@@ -189,4 +195,44 @@ public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
         Exception error = expectThrows(Exception.class, requestFuture::actionGet);
         assertThat(error.getMessage(), containsString("proxy timeout"));
     }
+
+    public void testSameRemoteClusters() throws Exception {
+        TransportAddress address = cluster(REMOTE_CLUSTER).getInstance(TransportService.class).getLocalNode().getAddress();
+        int moreClusters = between(1, 5);
+        for (int i = 0; i < moreClusters; i++) {
+            String clusterAlias = REMOTE_CLUSTER + "-" + i;
+            configureRemoteClusterWithSeedAddresses(clusterAlias, List.of(address));
+        }
+        int numDocs = between(10, 100);
+        createRemoteIndex(numDocs);
+        EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
+        request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
+        request.pragmas(randomPragmas());
+        ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
+        try {
+            try {
+                assertBusy(() -> {
+                    List<TaskInfo> tasks = client(REMOTE_CLUSTER).admin()
+                        .cluster()
+                        .prepareListTasks()
+                        .setActions(ComputeService.CLUSTER_ACTION_NAME)
+                        .get()
+                        .getTasks();
+                    assertThat(tasks, hasSize(moreClusters + 1));
+                });
+            } finally {
+                PauseFieldPlugin.allowEmitting.countDown();
+            }
+            try (EsqlQueryResponse resp = future.actionGet(30, TimeUnit.SECONDS)) {
+                // TODO: This produces incorrect results because data on the remote cluster is processed multiple times.
+                long expectedCount = numDocs * (moreClusters + 1L);
+                assertThat(getValuesList(resp), equalTo(List.of(List.of(expectedCount))));
+            }
+        } finally {
+            for (int i = 0; i < moreClusters; i++) {
+                String clusterAlias = REMOTE_CLUSTER + "-" + i;
+                removeRemoteCluster(clusterAlias);
+            }
+        }
+    }
 }

+ 2 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

@@ -401,7 +401,8 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
                 });
                 sessionId = foundTasks.get(0).taskId().toString();
                 assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES));
-                ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId);
+                String exchangeId = exchangeService.sinkKeys().stream().filter(s -> s.startsWith(sessionId)).findFirst().get();
+                ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(exchangeId);
                 waitedForPages = randomBoolean();
                 if (waitedForPages) {
                     // do not fail exchange requests until we have some pages

+ 15 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -82,6 +82,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
 
@@ -101,6 +102,7 @@ public class ComputeService {
     private final EnrichLookupService enrichLookupService;
     private final LookupFromIndexService lookupFromIndexService;
     private final ClusterService clusterService;
+    private final AtomicLong childSessionIdGenerator = new AtomicLong();
 
     public ComputeService(
         SearchService searchService,
@@ -167,7 +169,7 @@ public class ComputeService {
                 return;
             }
             var computeContext = new ComputeContext(
-                sessionId,
+                newChildSession(sessionId),
                 RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
                 List.of(),
                 configuration,
@@ -330,14 +332,15 @@ public class ComputeService {
                 // the new remote exchange sink, and initialize the computation on the target node via data-node-request.
                 for (DataNode node : dataNodeResult.dataNodes()) {
                     var queryPragmas = configuration.pragmas();
+                    var childSessionId = newChildSession(sessionId);
                     ExchangeService.openExchange(
                         transportService,
                         node.connection,
-                        sessionId,
+                        childSessionId,
                         queryPragmas.exchangeBufferSize(),
                         esqlExecutor,
                         refs.acquire().delegateFailureAndWrap((l, unused) -> {
-                            var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection);
+                            var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, node.connection);
                             exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
                             ActionListener<ComputeResponse> computeResponseListener = computeListener.acquireCompute(clusterAlias);
                             var dataNodeListener = ActionListener.runBefore(computeResponseListener, () -> l.onResponse(null));
@@ -345,7 +348,7 @@ public class ComputeService {
                                 node.connection,
                                 DATA_ACTION_NAME,
                                 new DataNodeRequest(
-                                    sessionId,
+                                    childSessionId,
                                     configuration,
                                     clusterAlias,
                                     node.shardIds,
@@ -378,17 +381,18 @@ public class ComputeService {
         var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
         try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) {
             for (RemoteCluster cluster : clusters) {
+                final var childSessionId = newChildSession(sessionId);
                 ExchangeService.openExchange(
                     transportService,
                     cluster.connection,
-                    sessionId,
+                    childSessionId,
                     queryPragmas.exchangeBufferSize(),
                     esqlExecutor,
                     refs.acquire().delegateFailureAndWrap((l, unused) -> {
-                        var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection);
+                        var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
                         exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
                         var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
-                        var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, sessionId, configuration, remotePlan);
+                        var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
                         var clusterListener = ActionListener.runBefore(
                             computeListener.acquireCompute(cluster.clusterAlias()),
                             () -> l.onResponse(null)
@@ -912,4 +916,8 @@ public class ComputeService {
             return searchContexts.stream().map(ctx -> ctx.getSearchExecutionContext()).toList();
         }
     }
+
+    private String newChildSession(String session) {
+        return session + "/" + childSessionIdGenerator.incrementAndGet();
+    }
 }