瀏覽代碼

RCS2 - Fix remote nodes collection for scroll (#98186)

For RCS 2.0, we added a RemoteClusterNodesAction (#93893) for collecting
remote cluster server nodes. This action is used in two places: 1.
Collecting nodes that can be directly connected in sniff mode 2. In the
method RemoteClusterService#collectNodes

However, in the 2nd use case, we should collect all ndoes from the
remote cluster instead of just those nodes that have remote cluster
server enabled. This is because the method is used in the context of
search instead of building connection. A remote node can be *not*
directly accessible but still hosts searchable data. Search requests
should be sent to all the data nodes and *not* limited to just the
server nodes.

This PR fixes this issue by augmenting RemoteClusterNodesAction to
support retrieving either server nodes or all nodes. The caller can
decide which set of nodes are of interest depending on the context.
Yang Wang 2 年之前
父節點
當前提交
9c07f1df6f
共有 13 個文件被更改,包括 377 次插入74 次删除
  1. 44 22
      server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesAction.java
  2. 1 1
      server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
  3. 1 1
      server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java
  4. 72 2
      server/src/test/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesActionTests.java
  5. 6 2
      test/test-clusters/src/main/java/org/elasticsearch/test/cluster/ClusterHandle.java
  6. 2 2
      test/test-clusters/src/main/java/org/elasticsearch/test/cluster/DefaultElasticsearchCluster.java
  7. 8 4
      test/test-clusters/src/main/java/org/elasticsearch/test/cluster/local/LocalClusterFactory.java
  8. 2 2
      test/test-clusters/src/main/java/org/elasticsearch/test/cluster/local/LocalClusterHandle.java
  9. 4 0
      test/test-clusters/src/main/java/org/elasticsearch/test/cluster/local/LocalClusterSpec.java
  10. 28 1
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityTestCase.java
  11. 4 1
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityFcActionAuthorizationIT.java
  12. 34 36
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java
  13. 171 0
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityTopologyRestIT.java

+ 44 - 22
server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesAction.java

@@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.action.support.nodes.BaseNodeResponse;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -43,13 +44,23 @@ public class RemoteClusterNodesAction extends ActionType<RemoteClusterNodesActio
     }
 
     public static class Request extends ActionRequest {
+        public static final Request ALL_NODES = new Request(false);
+        public static final Request REMOTE_CLUSTER_SERVER_NODES = new Request(true);
+        private final boolean remoteClusterServer;
 
-        public static final Request INSTANCE = new Request();
-
-        public Request() {}
+        private Request(boolean remoteClusterServer) {
+            this.remoteClusterServer = remoteClusterServer;
+        }
 
         public Request(StreamInput in) throws IOException {
             super(in);
+            this.remoteClusterServer = in.readBoolean();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeBoolean(remoteClusterServer);
         }
 
         @Override
@@ -82,7 +93,6 @@ public class RemoteClusterNodesAction extends ActionType<RemoteClusterNodesActio
     }
 
     public static class TransportAction extends HandledTransportAction<Request, Response> {
-
         private final TransportService transportService;
 
         @Inject
@@ -93,27 +103,39 @@ public class RemoteClusterNodesAction extends ActionType<RemoteClusterNodesActio
 
         @Override
         protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
-            final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
-            nodesInfoRequest.clear();
-            nodesInfoRequest.addMetrics(NodesInfoRequest.Metric.REMOTE_CLUSTER_SERVER.metricName());
             final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
             try (var ignore = threadContext.stashContext()) {
                 threadContext.markAsSystemContext();
-                transportService.sendRequest(
-                    transportService.getLocalNode(),
-                    NodesInfoAction.NAME,
-                    nodesInfoRequest,
-                    new ActionListenerResponseHandler<>(listener.delegateFailureAndWrap((l, response) -> {
-                        final List<DiscoveryNode> remoteClusterNodes = response.getNodes().stream().map(nodeInfo -> {
-                            final RemoteClusterServerInfo remoteClusterServerInfo = nodeInfo.getInfo(RemoteClusterServerInfo.class);
-                            if (remoteClusterServerInfo == null) {
-                                return null;
-                            }
-                            return nodeInfo.getNode().withTransportAddress(remoteClusterServerInfo.getAddress().publishAddress());
-                        }).filter(Objects::nonNull).toList();
-                        l.onResponse(new Response(remoteClusterNodes));
-                    }), NodesInfoResponse::new, TransportResponseHandler.TRANSPORT_WORKER)
-                );
+                if (request.remoteClusterServer) {
+                    final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().clear()
+                        .addMetrics(NodesInfoRequest.Metric.REMOTE_CLUSTER_SERVER.metricName());
+                    transportService.sendRequest(
+                        transportService.getLocalNode(),
+                        NodesInfoAction.NAME,
+                        nodesInfoRequest,
+                        new ActionListenerResponseHandler<>(listener.delegateFailureAndWrap((l, response) -> {
+                            final List<DiscoveryNode> remoteClusterNodes = response.getNodes().stream().map(nodeInfo -> {
+                                final RemoteClusterServerInfo remoteClusterServerInfo = nodeInfo.getInfo(RemoteClusterServerInfo.class);
+                                if (remoteClusterServerInfo == null) {
+                                    return null;
+                                }
+                                return nodeInfo.getNode().withTransportAddress(remoteClusterServerInfo.getAddress().publishAddress());
+                            }).filter(Objects::nonNull).toList();
+                            l.onResponse(new Response(remoteClusterNodes));
+                        }), NodesInfoResponse::new, TransportResponseHandler.TRANSPORT_WORKER)
+                    );
+                } else {
+                    final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest().clear();
+                    transportService.sendRequest(
+                        transportService.getLocalNode(),
+                        NodesInfoAction.NAME,
+                        nodesInfoRequest,
+                        new ActionListenerResponseHandler<>(listener.delegateFailureAndWrap((l, response) -> {
+                            final List<DiscoveryNode> nodes = response.getNodes().stream().map(BaseNodeResponse::getNode).toList();
+                            l.onResponse(new Response(nodes));
+                        }), NodesInfoResponse::new, TransportResponseHandler.TRANSPORT_WORKER)
+                    );
+                }
             }
         }
     }

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

@@ -125,7 +125,7 @@ final class RemoteClusterConnection implements Closeable {
                     transportService.sendRequest(
                         connection,
                         RemoteClusterNodesAction.NAME,
-                        RemoteClusterNodesAction.Request.INSTANCE,
+                        RemoteClusterNodesAction.Request.ALL_NODES,
                         TransportRequestOptions.EMPTY,
                         new ActionListenerResponseHandler<>(contextPreservingActionListener.map(response -> {
                             final Map<String, DiscoveryNode> nodeLookup = response.getNodes()

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

@@ -312,7 +312,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
                 // Use different action to collect nodes information depending on the connection model
                 if (REMOTE_CLUSTER_PROFILE.equals(connectionManager.getConnectionProfile().getTransportProfile())) {
                     action = RemoteClusterNodesAction.NAME;
-                    request = RemoteClusterNodesAction.Request.INSTANCE;
+                    request = RemoteClusterNodesAction.Request.REMOTE_CLUSTER_SERVER_NODES;
                     sniffResponseHandler = new RemoteClusterNodesSniffResponseHandler(connection, listener, seedNodesSuppliers);
                 } else {
                     action = ClusterStateAction.NAME;

+ 72 - 2
server/src/test/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesActionTests.java

@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -53,7 +54,7 @@ public class RemoteClusterNodesActionTests extends ESTestCase {
         assumeTrue("untrusted remote cluster feature flag must be enabled", TcpTransport.isUntrustedRemoteClusterEnabled());
     }
 
-    public void testDoExecute() {
+    public void testDoExecuteForRemoteServerNodes() {
         final ThreadPool threadPool = mock(ThreadPool.class);
         final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
         when(threadPool.getThreadContext()).thenReturn(threadContext);
@@ -122,7 +123,7 @@ public class RemoteClusterNodesActionTests extends ESTestCase {
         );
 
         final PlainActionFuture<RemoteClusterNodesAction.Response> future = new PlainActionFuture<>();
-        action.doExecute(mock(Task.class), RemoteClusterNodesAction.Request.INSTANCE, future);
+        action.doExecute(mock(Task.class), RemoteClusterNodesAction.Request.REMOTE_CLUSTER_SERVER_NODES, future);
 
         final List<DiscoveryNode> actualNodes = future.actionGet().getNodes();
         assertThat(Set.copyOf(actualNodes), equalTo(expectedRemoteServerNodes));
@@ -132,6 +133,75 @@ public class RemoteClusterNodesActionTests extends ESTestCase {
         );
     }
 
+    public void testDoExecuteForRemoteNodes() {
+        final ThreadPool threadPool = mock(ThreadPool.class);
+        final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+        when(threadPool.getThreadContext()).thenReturn(threadContext);
+
+        final TransportService transportService = mock(TransportService.class);
+        final DiscoveryNode localNode = mock(DiscoveryNode.class);
+        when(transportService.getLocalNode()).thenReturn(localNode);
+        when(transportService.getThreadPool()).thenReturn(threadPool);
+
+        // Prepare nodesInfo response
+        final int numberOfNodes = randomIntBetween(1, 6);
+        final List<NodeInfo> nodeInfos = new ArrayList<>();
+        final Set<DiscoveryNode> expectedRemoteNodes = new HashSet<>();
+        for (int i = 0; i < numberOfNodes; i++) {
+            final DiscoveryNode node = randomNode(i);
+            expectedRemoteNodes.add(node);
+            nodeInfos.add(
+                new NodeInfo(
+                    Version.CURRENT,
+                    TransportVersion.current(),
+                    null,
+                    node,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    null
+                )
+            );
+        }
+
+        final NodesInfoResponse nodesInfoResponse = new NodesInfoResponse(
+            new ClusterName(randomAlphaOfLengthBetween(3, 8)),
+            nodeInfos,
+            List.of()
+        );
+
+        doAnswer(invocation -> {
+            final NodesInfoRequest nodesInfoRequest = invocation.getArgument(2);
+            assertThat(nodesInfoRequest.requestedMetrics(), empty());
+            final ActionListenerResponseHandler<NodesInfoResponse> handler = invocation.getArgument(3);
+            handler.handleResponse(nodesInfoResponse);
+            return null;
+        }).when(transportService).sendRequest(eq(localNode), eq(NodesInfoAction.NAME), any(NodesInfoRequest.class), any());
+
+        final RemoteClusterNodesAction.TransportAction action = new RemoteClusterNodesAction.TransportAction(
+            transportService,
+            mock(ActionFilters.class)
+        );
+
+        final PlainActionFuture<RemoteClusterNodesAction.Response> future = new PlainActionFuture<>();
+        action.doExecute(mock(Task.class), RemoteClusterNodesAction.Request.ALL_NODES, future);
+
+        final List<DiscoveryNode> actualNodes = future.actionGet().getNodes();
+        assertThat(Set.copyOf(actualNodes), equalTo(expectedRemoteNodes));
+        assertThat(
+            actualNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toUnmodifiableSet()),
+            equalTo(expectedRemoteNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toUnmodifiableSet()))
+        );
+    }
+
     private DiscoveryNode randomNode(final int id) {
         return DiscoveryNodeUtils.builder(Integer.toString(id)).name("node-" + id).roles(Set.of()).build();
     }

+ 6 - 2
test/test-clusters/src/main/java/org/elasticsearch/test/cluster/ClusterHandle.java

@@ -79,7 +79,7 @@ public interface ClusterHandle extends Closeable {
 
     /**
      * Returns a comma-separated list of TCP transport endpoints for cluster. If this method is called on an unstarted cluster, the cluster
-     * will be started. This method is thread-safe and subsequent calls will wait for cluster start and availability.
+     * will be started. This method is thread-safe and subsequent calls will wait for cluster start and availability.\
      *
      * @return cluster node TCP transport endpoints
      */
@@ -96,14 +96,18 @@ public interface ClusterHandle extends Closeable {
     /**
      * Returns a comma-separated list of remote cluster server endpoints for cluster. If this method is called on an unstarted cluster,
      * the cluster will be started. This method is thread-safe and subsequent calls will wait for cluster start and availability.
+     * Note individual node can enable or disable remote cluster server independently. When a node has remote cluster server disabled,
+     * an empty string is returned for that node. Hence, it is possible for this method to return something like "[::1]:63300,,".
      *
      * @return cluster node remote cluster server endpoints
      */
-    String getRemoteClusterServerEndpoint();
+    String getRemoteClusterServerEndpoints();
 
     /**
      * Returns the remote cluster server endpoint for the node at the given index. If this method is called on an unstarted cluster,
      * the cluster will be started. This method is thread-safe and subsequent calls will wait for cluster start and availability.
+     * Note individual node can enable or disable remote cluster server independently. When a node has remote cluster server disabled,
+     * an empty string is returned.
      *
      * @return cluster node remote cluster server endpoints
      */

+ 2 - 2
test/test-clusters/src/main/java/org/elasticsearch/test/cluster/DefaultElasticsearchCluster.java

@@ -115,9 +115,9 @@ public class DefaultElasticsearchCluster<S extends ClusterSpec, H extends Cluste
     }
 
     @Override
-    public String getRemoteClusterServerEndpoint() {
+    public String getRemoteClusterServerEndpoints() {
         checkHandle();
-        return handle.getRemoteClusterServerEndpoint();
+        return handle.getRemoteClusterServerEndpoints();
     }
 
     @Override

+ 8 - 4
test/test-clusters/src/main/java/org/elasticsearch/test/cluster/local/LocalClusterFactory.java

@@ -196,11 +196,15 @@ public class LocalClusterFactory implements ClusterFactory<LocalClusterSpec, Loc
         }
 
         public String getRemoteClusterServerEndpoint() {
-            Path portsFile = workingDir.resolve("logs").resolve("remote_cluster.ports");
-            if (Files.notExists(portsFile)) {
-                waitUntilReady();
+            if (spec.isRemoteClusterServerEnabled()) {
+                Path portsFile = workingDir.resolve("logs").resolve("remote_cluster.ports");
+                if (Files.notExists(portsFile)) {
+                    waitUntilReady();
+                }
+                return readPortsFile(portsFile).get(0);
+            } else {
+                return "";
             }
-            return readPortsFile(portsFile).get(0);
         }
 
         public void deletePortsFiles() {

+ 2 - 2
test/test-clusters/src/main/java/org/elasticsearch/test/cluster/local/LocalClusterHandle.java

@@ -129,14 +129,14 @@ public class LocalClusterHandle implements ClusterHandle {
     }
 
     @Override
-    public String getRemoteClusterServerEndpoint() {
+    public String getRemoteClusterServerEndpoints() {
         start();
         return execute(() -> nodes.parallelStream().map(Node::getRemoteClusterServerEndpoint).collect(Collectors.joining(",")));
     }
 
     @Override
     public String getRemoteClusterServerEndpoint(int index) {
-        return getRemoteClusterServerEndpoint().split(",")[index];
+        return getRemoteClusterServerEndpoints().split(",")[index];
     }
 
     @Override

+ 4 - 0
test/test-clusters/src/main/java/org/elasticsearch/test/cluster/local/LocalClusterSpec.java

@@ -197,6 +197,10 @@ public class LocalClusterSpec implements ClusterSpec {
             return Boolean.parseBoolean(getSetting("xpack.security.enabled", getVersion().onOrAfter("8.0.0") ? "true" : "false"));
         }
 
+        public boolean isRemoteClusterServerEnabled() {
+            return Boolean.parseBoolean(getSetting("remote_cluster_server.enabled", "false"));
+        }
+
         public boolean isMasterEligible() {
             return getSetting("node.roles", "master").contains("master");
         }

+ 28 - 1
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityTestCase.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.remotecluster;
 
 import org.apache.http.HttpHost;
+import org.apache.http.client.methods.HttpPost;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
@@ -31,7 +32,9 @@ import org.junit.BeforeClass;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Base64;
+import java.util.Locale;
 import java.util.Map;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -111,6 +114,24 @@ public abstract class AbstractRemoteClusterSecurityTestCase extends ESRestTestCa
         }
     }
 
+    protected static String headerFromRandomAuthMethod(final String username, final SecureString password) throws IOException {
+        final boolean useBearerTokenAuth = randomBoolean();
+        if (useBearerTokenAuth) {
+            final Request request = new Request(HttpPost.METHOD_NAME, "/_security/oauth2/token");
+            request.setJsonEntity(String.format(Locale.ROOT, """
+                {
+                  "grant_type":"password",
+                  "username":"%s",
+                  "password":"%s"
+                }
+                """, username, password));
+            final Map<String, Object> responseBody = entityAsMap(adminClient().performRequest(request));
+            return "Bearer " + responseBody.get("access_token");
+        } else {
+            return basicAuthHeaderValue(username, password);
+        }
+    }
+
     @Override
     protected String getTestRestCluster() {
         return queryCluster.getHttpAddress(0);
@@ -187,7 +208,6 @@ public abstract class AbstractRemoteClusterSecurityTestCase extends ESRestTestCa
         updateClusterSettings(builder.build());
 
         // Ensure remote cluster is connected
-        final int numberOfFcNodes = targetFulfillingCluster.getHttpAddresses().split(",").length;
         final Request remoteInfoRequest = new Request("GET", "/_remote/info");
         assertBusy(() -> {
             final Response remoteInfoResponse = adminClient().performRequest(remoteInfoRequest);
@@ -195,6 +215,13 @@ public abstract class AbstractRemoteClusterSecurityTestCase extends ESRestTestCa
             final ObjectPath remoteInfoObjectPath = assertOKAndCreateObjectPath(remoteInfoResponse);
             assertThat(remoteInfoObjectPath.evaluate(clusterAlias + ".connected"), is(true));
             if (false == isProxyMode) {
+                int numberOfFcNodes = (int) Arrays.stream(targetFulfillingCluster.getRemoteClusterServerEndpoints().split(","))
+                    .filter(endpoint -> endpoint.length() > 0)
+                    .count();
+                if (numberOfFcNodes == 0) {
+                    // The cluster is an RCS 1.0 remote cluster
+                    numberOfFcNodes = targetFulfillingCluster.getTransportEndpoints().split(",").length;
+                }
                 assertThat(remoteInfoObjectPath.evaluate(clusterAlias + ".num_nodes_connected"), equalTo(numberOfFcNodes));
             }
             final String credentialsValue = remoteInfoObjectPath.evaluate(clusterAlias + ".cluster_credentials");

+ 4 - 1
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityFcActionAuthorizationIT.java

@@ -288,7 +288,10 @@ public class RemoteClusterSecurityFcActionAuthorizationIT extends ESRestTestCase
 
             final ElasticsearchSecurityException e = expectThrows(
                 ElasticsearchSecurityException.class,
-                () -> remoteClusterClient.execute(RemoteClusterNodesAction.INSTANCE, RemoteClusterNodesAction.Request.INSTANCE).actionGet()
+                () -> remoteClusterClient.execute(
+                    RemoteClusterNodesAction.INSTANCE,
+                    RemoteClusterNodesAction.Request.REMOTE_CLUSTER_SERVER_NODES
+                ).actionGet()
             );
             assertThat(
                 e.getMessage(),

+ 34 - 36
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java

@@ -7,13 +7,11 @@
 
 package org.elasticsearch.xpack.remotecluster;
 
-import org.apache.http.client.methods.HttpPost;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
-import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.search.SearchHit;
@@ -43,31 +41,36 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTestCase {
 
     private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
     private static final AtomicReference<Map<String, Object>> REST_API_KEY_MAP_REF = new AtomicReference<>();
     private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean();
+    private static final AtomicBoolean NODE1_RCS_SERVER_ENABLED = new AtomicBoolean();
+    private static final AtomicBoolean NODE2_RCS_SERVER_ENABLED = new AtomicBoolean();
 
     static {
         fulfillingCluster = ElasticsearchCluster.local()
             .name("fulfilling-cluster")
             .nodes(3)
             .apply(commonClusterConfig)
-            .setting("remote_cluster_server.enabled", "true")
             .setting("remote_cluster.port", "0")
-            .setting("xpack.security.remote_cluster_server.ssl.enabled", String.valueOf(SSL_ENABLED_REF.get()))
+            .setting("xpack.security.remote_cluster_server.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
             .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
             .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
             .setting("xpack.security.authc.token.enabled", "true")
             .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
+            .node(0, spec -> spec.setting("remote_cluster_server.enabled", "true"))
+            .node(1, spec -> spec.setting("remote_cluster_server.enabled", () -> String.valueOf(NODE1_RCS_SERVER_ENABLED.get())))
+            .node(2, spec -> spec.setting("remote_cluster_server.enabled", () -> String.valueOf(NODE2_RCS_SERVER_ENABLED.get())))
             .build();
 
         queryCluster = ElasticsearchCluster.local()
             .name("query-cluster")
             .apply(commonClusterConfig)
-            .setting("xpack.security.remote_cluster_client.ssl.enabled", String.valueOf(SSL_ENABLED_REF.get()))
+            .setting("xpack.security.remote_cluster_client.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
             .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
             .setting("xpack.security.authc.token.enabled", "true")
             .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
@@ -114,9 +117,11 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
     // Use a RuleChain to ensure that fulfilling cluster is started before query cluster
     // `SSL_ENABLED_REF` is used to control the SSL-enabled setting on the test clusters
     // We set it here, since randomization methods are not available in the static initialize context above
-    public static TestRule clusterRule = RuleChain.outerRule(new RunnableTestRuleAdapter(() -> SSL_ENABLED_REF.set(usually())))
-        .around(fulfillingCluster)
-        .around(queryCluster);
+    public static TestRule clusterRule = RuleChain.outerRule(new RunnableTestRuleAdapter(() -> {
+        SSL_ENABLED_REF.set(usually());
+        NODE1_RCS_SERVER_ENABLED.set(randomBoolean());
+        NODE2_RCS_SERVER_ENABLED.set(randomBoolean());
+    })).around(fulfillingCluster).around(queryCluster);
 
     public void testCrossClusterSearch() throws Exception {
         configureRemoteCluster();
@@ -151,7 +156,8 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
                 { "index": { "_index": "shared-metrics" } }
                 { "name": "metric3" }
                 { "index": { "_index": "shared-metrics" } }
-                { "name": "metric4" }\n"""));
+                { "name": "metric4" }
+                """));
             assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
         }
 
@@ -353,24 +359,34 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
 
     @SuppressWarnings("unchecked")
     public void testNodesInfo() throws IOException {
-        final Request request = new Request("GET", "/_nodes/transport,remote_cluster_server");
+        final Request request = new Request("GET", "/_nodes/settings,transport,remote_cluster_server");
         final Response response = performRequestAgainstFulfillingCluster(request);
         assertOK(response);
         final Map<String, Object> responseMap = responseAsMap(response);
 
         assertThat(ObjectPath.eval("_nodes.total", responseMap), equalTo(3));
         final Map<String, Object> nodes = ObjectPath.eval("nodes", responseMap);
-        nodes.forEach((k, v) -> {
-            final Map<String, Object> node = (Map<String, Object>) v;
+        int numberOfRemoteClusterServerNodes = 0;
+        for (Map.Entry<String, Object> entry : nodes.entrySet()) {
+            final Map<String, Object> node = (Map<String, Object>) entry.getValue();
             // remote cluster is not reported in transport profiles
             assertThat(ObjectPath.eval("transport.profiles", node), anEmptyMap());
 
-            final List<String> boundAddresses = ObjectPath.eval("remote_cluster_server.bound_address", node);
-            assertThat(boundAddresses, notNullValue());
-            assertThat(boundAddresses, not(empty()));
-            final String publishAddress = ObjectPath.eval("remote_cluster_server.publish_address", node);
-            assertThat(publishAddress, notNullValue());
-        });
+            if (Boolean.parseBoolean(ObjectPath.eval("settings.remote_cluster_server.enabled", node))) {
+                numberOfRemoteClusterServerNodes += 1;
+                final List<String> boundAddresses = ObjectPath.eval("remote_cluster_server.bound_address", node);
+                assertThat(boundAddresses, notNullValue());
+                assertThat(boundAddresses, not(empty()));
+                final String publishAddress = ObjectPath.eval("remote_cluster_server.publish_address", node);
+                assertThat(publishAddress, notNullValue());
+            } else {
+                assertThat(ObjectPath.eval("remote_cluster_server", node), nullValue());
+            }
+        }
+        assertThat(
+            numberOfRemoteClusterServerNodes,
+            equalTo(1 + (NODE1_RCS_SERVER_ENABLED.get() ? 1 : 0) + (NODE2_RCS_SERVER_ENABLED.get() ? 1 : 0))
+        );
     }
 
     private Response performRequestWithRemoteSearchUser(final Request request) throws IOException {
@@ -393,22 +409,4 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
         );
         return client().performRequest(request);
     }
-
-    private String headerFromRandomAuthMethod(final String username, final SecureString password) throws IOException {
-        final boolean useBearerTokenAuth = randomBoolean();
-        if (useBearerTokenAuth) {
-            final Request request = new Request(HttpPost.METHOD_NAME, "/_security/oauth2/token");
-            request.setJsonEntity(String.format(Locale.ROOT, """
-                {
-                  "grant_type":"password",
-                  "username":"%s",
-                  "password":"%s"
-                }
-                """, username, password));
-            final Map<String, Object> responseBody = entityAsMap(adminClient().performRequest(request));
-            return "Bearer " + responseBody.get("access_token");
-        } else {
-            return basicAuthHeaderValue(username, password);
-        }
-    }
 }

+ 171 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityTopologyRestIT.java

@@ -0,0 +1,171 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.remotecluster;
+
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.util.resource.Resource;
+import org.elasticsearch.test.junit.RunnableTestRuleAdapter;
+import org.elasticsearch.test.rest.ObjectPath;
+import org.junit.ClassRule;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class RemoteClusterSecurityTopologyRestIT extends AbstractRemoteClusterSecurityTestCase {
+
+    private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
+    private static final AtomicBoolean NODE1_RCS_SERVER_ENABLED = new AtomicBoolean();
+
+    static {
+        fulfillingCluster = ElasticsearchCluster.local()
+            .name("fulfilling-cluster")
+            .nodes(3)
+            .apply(commonClusterConfig)
+            .setting("remote_cluster.port", "0")
+            .setting("xpack.security.remote_cluster_server.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
+            .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
+            .setting("xpack.security.authc.token.enabled", "true")
+            .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
+            .node(0, spec -> spec.setting("remote_cluster_server.enabled", "true"))
+            .node(1, spec -> spec.setting("remote_cluster_server.enabled", () -> String.valueOf(NODE1_RCS_SERVER_ENABLED.get())))
+            // at least one remote node has server disabled
+            .node(2, spec -> spec.setting("remote_cluster_server.enabled", "false"))
+            .build();
+
+        queryCluster = ElasticsearchCluster.local()
+            .name("query-cluster")
+            .apply(commonClusterConfig)
+            .setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
+            .setting("xpack.security.authc.token.enabled", "true")
+            .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
+                if (API_KEY_MAP_REF.get() == null) {
+                    final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
+                        {
+                          "search": [
+                            {
+                                "names": ["index*", "not_found_index", "shared-metrics"]
+                            }
+                          ]
+                        }""");
+                    API_KEY_MAP_REF.set(apiKeyMap);
+                }
+                return (String) API_KEY_MAP_REF.get().get("encoded");
+            })
+            // Define a bogus API key for another remote cluster
+            .keystore("cluster.remote.invalid_remote.credentials", randomEncodedApiKey())
+            // Define remote with a REST API key to observe expected failure
+            .rolesFile(Resource.fromClasspath("roles.yml"))
+            .user(REMOTE_METRIC_USER, PASS.toString(), "read_remote_shared_metrics")
+            .build();
+    }
+
+    @ClassRule
+    public static TestRule clusterRule = RuleChain.outerRule(new RunnableTestRuleAdapter(() -> {
+        NODE1_RCS_SERVER_ENABLED.set(randomBoolean());
+    })).around(fulfillingCluster).around(queryCluster);
+
+    public void testCrossClusterScrollWithSniffModeWhenSomeRemoteNodesAreNotDirectlyAccessible() throws Exception {
+        configureRemoteCluster(false);
+
+        // Fulfilling cluster
+        {
+            // Spread the shards to all nodes
+            final Request createIndexRequest = new Request("PUT", "shared-metrics");
+            createIndexRequest.setJsonEntity("""
+                {
+                  "settings": {
+                    "number_of_shards": 3,
+                    "number_of_replicas": 0
+                  }
+                }""");
+            assertOK(performRequestAgainstFulfillingCluster(createIndexRequest));
+
+            // Index some documents, so we can attempt to search them from the querying cluster
+            final Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
+            bulkRequest.setJsonEntity(Strings.format("""
+                { "index": { "_index": "shared-metrics" } }
+                { "name": "metric1" }
+                { "index": { "_index": "shared-metrics" } }
+                { "name": "metric2" }
+                { "index": { "_index": "shared-metrics" } }
+                { "name": "metric3" }
+                { "index": { "_index": "shared-metrics" } }
+                { "name": "metric4" }
+                { "index": { "_index": "shared-metrics" } }
+                { "name": "metric5" }
+                { "index": { "_index": "shared-metrics" } }
+                { "name": "metric6" }
+                """));
+            assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
+        }
+
+        // Query cluster
+        {
+            final var documentFieldValues = new HashSet<>();
+            final var searchRequest = new Request("GET", "/my_remote_cluster:*/_search?scroll=1h&size=1");
+            final SearchResponse searchResponse = SearchResponse.fromXContent(
+                responseAsParser(performRequestWithRemoteMetricUser(searchRequest))
+            );
+            assertThat(searchResponse.getHits().getTotalHits().value, equalTo(6L));
+            assertThat(Arrays.stream(searchResponse.getHits().getHits()).map(SearchHit::getIndex).toList(), contains("shared-metrics"));
+            documentFieldValues.add(searchResponse.getHits().getHits()[0].getSourceAsMap().get("name"));
+
+            // Scroll should be able to fetch all documents from all nodes even when some nodes are not directly accessible in sniff mode
+            final String scrollId = searchResponse.getScrollId();
+            final Request scrollRequest = new Request("GET", "/_search/scroll");
+            scrollRequest.setJsonEntity(Strings.format("""
+                { "scroll_id": "%s" }
+                """, scrollId));
+            // Fetch all documents
+            for (int i = 0; i < 5; i++) {
+                final SearchResponse scrollResponse = SearchResponse.fromXContent(
+                    responseAsParser(performRequestWithRemoteMetricUser(scrollRequest))
+                );
+                assertThat(scrollResponse.getHits().getTotalHits().value, equalTo(6L));
+                assertThat(Arrays.stream(scrollResponse.getHits().getHits()).map(SearchHit::getIndex).toList(), contains("shared-metrics"));
+                documentFieldValues.add(scrollResponse.getHits().getHits()[0].getSourceAsMap().get("name"));
+            }
+            assertThat(documentFieldValues, containsInAnyOrder("metric1", "metric2", "metric3", "metric4", "metric5", "metric6"));
+
+            // Scroll from all nodes should be freed
+            final Request deleteScrollRequest = new Request("DELETE", "/_search/scroll");
+            deleteScrollRequest.setJsonEntity(Strings.format("""
+                { "scroll_id": "%s" }
+                """, scrollId));
+            final ObjectPath deleteScrollObjectPath = assertOKAndCreateObjectPath(performRequestWithRemoteMetricUser(deleteScrollRequest));
+            assertThat(deleteScrollObjectPath.evaluate("succeeded"), is(true));
+            assertThat(deleteScrollObjectPath.evaluate("num_freed"), equalTo(3));
+        }
+    }
+
+    private Response performRequestWithRemoteMetricUser(final Request request) throws IOException {
+        request.setOptions(
+            RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(REMOTE_METRIC_USER, PASS))
+        );
+        return client().performRequest(request);
+    }
+}