Pārlūkot izejas kodu

Hot-reloadable remote cluster credentials (with blocking call fix) (#103215)

Brings back https://github.com/elastic/elasticsearch/pull/102798, with a
tweak to avoid tripping on a blocking operation.

Only change compared to the original PR is
https://github.com/elastic/elasticsearch/pull/103215/commits/4072fac9549ce8874a54b60bfe72d63f6ea17c03
Nikolaj Volgushev 1 gadu atpakaļ
vecāks
revīzija
d920f4f291
25 mainītis faili ar 1069 papildinājumiem un 260 dzēšanām
  1. 5 0
      docs/changelog/102798.yaml
  2. 1 1
      server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java
  3. 19 6
      server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
  4. 52 0
      server/src/main/java/org/elasticsearch/transport/RemoteClusterCredentialsManager.java
  5. 12 15
      server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
  6. 43 18
      server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java
  7. 5 1
      server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java
  8. 45 9
      server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java
  9. 62 7
      server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java
  10. 43 0
      server/src/test/java/org/elasticsearch/transport/RemoteClusterCredentialsManagerTests.java
  11. 30 2
      server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java
  12. 15 3
      server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java
  13. 55 11
      server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java
  14. 23 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ActionTypes.java
  15. 3 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java
  16. 1 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java
  17. 314 0
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/ReloadRemoteClusterCredentialsIT.java
  18. 83 25
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
  19. 77 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/settings/TransportReloadRemoteClusterCredentialsAction.java
  20. 0 51
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolver.java
  21. 26 19
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java
  22. 13 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalStateSecurity.java
  23. 117 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java
  24. 0 38
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolverTests.java
  25. 25 52
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java

+ 5 - 0
docs/changelog/102798.yaml

@@ -0,0 +1,5 @@
+pr: 102798
+summary: Hot-reloadable remote cluster credentials
+area: Security
+type: enhancement
+issues: []

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

@@ -179,7 +179,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
                 RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
                     newConnection,
                     clusterAlias,
-                    actualProfile.getTransportProfile()
+                    connectionManager.getCredentialsManager()
                 ),
                 actualProfile.getHandshakeTimeout(),
                 cn -> true,

+ 19 - 6
server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

@@ -57,15 +57,28 @@ final class RemoteClusterConnection implements Closeable {
      * @param settings the nodes settings object
      * @param clusterAlias the configured alias of the cluster to connect to
      * @param transportService the local nodes transport service
-     * @param credentialsProtected Whether the remote cluster is protected by a credentials, i.e. it has a credentials configured
-     *                             via secure setting. This means the remote cluster uses the new configurable access RCS model
-     *                             (as opposed to the basic model).
+     * @param credentialsManager object to lookup remote cluster credentials by cluster alias. If a cluster is protected by a credential,
+     *                           i.e. it has a credential configured via secure setting.
+     *                           This means the remote cluster uses the advances RCS model (as opposed to the basic model).
      */
-    RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, boolean credentialsProtected) {
+    RemoteClusterConnection(
+        Settings settings,
+        String clusterAlias,
+        TransportService transportService,
+        RemoteClusterCredentialsManager credentialsManager
+    ) {
         this.transportService = transportService;
         this.clusterAlias = clusterAlias;
-        ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings, credentialsProtected);
-        this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService));
+        ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(
+            clusterAlias,
+            settings,
+            credentialsManager.hasCredentials(clusterAlias)
+        );
+        this.remoteConnectionManager = new RemoteConnectionManager(
+            clusterAlias,
+            credentialsManager,
+            createConnectionManager(profile, transportService)
+        );
         this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
         // we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
         this.remoteConnectionManager.addListener(transportService);

+ 52 - 0
server/src/main/java/org/elasticsearch/transport/RemoteClusterCredentialsManager.java

@@ -0,0 +1,52 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.transport;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.SecureString;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Nullable;
+
+import java.util.Map;
+
+import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS;
+
+public class RemoteClusterCredentialsManager {
+
+    private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsManager.class);
+
+    private volatile Map<String, SecureString> clusterCredentials;
+
+    public RemoteClusterCredentialsManager(Settings settings) {
+        updateClusterCredentials(settings);
+    }
+
+    public void updateClusterCredentials(Settings settings) {
+        clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings);
+        logger.debug(
+            () -> Strings.format(
+                "Updated remote cluster credentials for clusters: [%s]",
+                Strings.collectionToCommaDelimitedString(clusterCredentials.keySet())
+            )
+        );
+    }
+
+    @Nullable
+    public SecureString resolveCredentials(String clusterAlias) {
+        return clusterCredentials.get(clusterAlias);
+    }
+
+    public boolean hasCredentials(String clusterAlias) {
+        return clusterCredentials.containsKey(clusterAlias);
+    }
+
+    public static final RemoteClusterCredentialsManager EMPTY = new RemoteClusterCredentialsManager(Settings.EMPTY);
+}

+ 12 - 15
server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

@@ -147,15 +147,14 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
 
     private final TransportService transportService;
     private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
-    private final Set<String> credentialsProtectedRemoteClusters;
+    private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;
 
     RemoteClusterService(Settings settings, TransportService transportService) {
         super(settings);
         this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
         this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
         this.transportService = transportService;
-        this.credentialsProtectedRemoteClusters = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings).keySet();
-
+        this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings);
         if (remoteClusterServerEnabled) {
             registerRemoteClusterHandshakeRequestHandler(transportService);
         }
@@ -305,6 +304,14 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
         }
     }
 
+    public void updateRemoteClusterCredentials(Settings settings) {
+        remoteClusterCredentialsManager.updateClusterCredentials(settings);
+    }
+
+    public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {
+        return remoteClusterCredentialsManager;
+    }
+
     @Override
     protected void updateRemoteCluster(String clusterAlias, Settings settings) {
         CountDownLatch latch = new CountDownLatch(1);
@@ -363,12 +370,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
         if (remote == null) {
             // this is a new cluster we have to add a new representation
             Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
-            remote = new RemoteClusterConnection(
-                finalSettings,
-                clusterAlias,
-                transportService,
-                credentialsProtectedRemoteClusters.contains(clusterAlias)
-            );
+            remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
             remoteClusters.put(clusterAlias, remote);
             remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
         } else if (remote.shouldRebuildConnection(newSettings)) {
@@ -380,12 +382,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
             }
             remoteClusters.remove(clusterAlias);
             Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
-            remote = new RemoteClusterConnection(
-                finalSettings,
-                clusterAlias,
-                transportService,
-                credentialsProtectedRemoteClusters.contains(clusterAlias)
-            );
+            remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
             remoteClusters.put(clusterAlias, remote);
             remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
         } else {

+ 43 - 18
server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java

@@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
@@ -25,18 +26,19 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
 import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
 
 public class RemoteConnectionManager implements ConnectionManager {
 
     private final String clusterAlias;
+    private final RemoteClusterCredentialsManager credentialsManager;
     private final ConnectionManager delegate;
     private final AtomicLong counter = new AtomicLong();
     private volatile List<DiscoveryNode> connectedNodes = Collections.emptyList();
 
-    RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) {
+    RemoteConnectionManager(String clusterAlias, RemoteClusterCredentialsManager credentialsManager, ConnectionManager delegate) {
         this.clusterAlias = clusterAlias;
+        this.credentialsManager = credentialsManager;
         this.delegate = delegate;
         this.delegate.addListener(new TransportConnectionListener() {
             @Override
@@ -51,6 +53,10 @@ public class RemoteConnectionManager implements ConnectionManager {
         });
     }
 
+    public RemoteClusterCredentialsManager getCredentialsManager() {
+        return credentialsManager;
+    }
+
     /**
      * Remote cluster connections have a different lifecycle from intra-cluster connections. Use {@link #connectToRemoteClusterNode}
      * instead of this method.
@@ -95,13 +101,7 @@ public class RemoteConnectionManager implements ConnectionManager {
             node,
             profile,
             listener.delegateFailureAndWrap(
-                (l, connection) -> l.onResponse(
-                    new InternalRemoteConnection(
-                        connection,
-                        clusterAlias,
-                        profile != null ? profile.getTransportProfile() : getConnectionProfile().getTransportProfile()
-                    )
-                )
+                (l, connection) -> l.onResponse(wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager))
             )
         );
     }
@@ -182,16 +182,35 @@ public class RemoteConnectionManager implements ConnectionManager {
      * @return a cluster alias if the connection target a node in the remote cluster, otherwise an empty result
      */
     public static Optional<String> resolveRemoteClusterAlias(Transport.Connection connection) {
+        return resolveRemoteClusterAliasWithCredentials(connection).map(RemoteClusterAliasWithCredentials::clusterAlias);
+    }
+
+    public record RemoteClusterAliasWithCredentials(String clusterAlias, @Nullable SecureString credentials) {
+        @Override
+        public String toString() {
+            return "RemoteClusterAliasWithCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}";
+        }
+    }
+
+    /**
+     * This method returns information (alias and credentials) for remote cluster for the given transport connection.
+     * Either or both of alias and credentials can be null depending on the connection.
+     *
+     * @param connection the transport connection for which to resolve a remote cluster alias
+     */
+    public static Optional<RemoteClusterAliasWithCredentials> resolveRemoteClusterAliasWithCredentials(Transport.Connection connection) {
         Transport.Connection unwrapped = TransportService.unwrapConnection(connection);
         if (unwrapped instanceof InternalRemoteConnection remoteConnection) {
-            return Optional.of(remoteConnection.getClusterAlias());
+            return Optional.of(
+                new RemoteClusterAliasWithCredentials(remoteConnection.getClusterAlias(), remoteConnection.getClusterCredentials())
+            );
         }
         return Optional.empty();
     }
 
     private Transport.Connection getConnectionInternal(DiscoveryNode node) throws NodeNotConnectedException {
         Transport.Connection connection = delegate.getConnection(node);
-        return new InternalRemoteConnection(connection, clusterAlias, getConnectionProfile().getTransportProfile());
+        return wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager);
     }
 
     private synchronized void addConnectedNode(DiscoveryNode addedNode) {
@@ -297,21 +316,27 @@ public class RemoteConnectionManager implements ConnectionManager {
         private static final Logger logger = LogManager.getLogger(InternalRemoteConnection.class);
         private final Transport.Connection connection;
         private final String clusterAlias;
-        private final boolean isRemoteClusterProfile;
+        @Nullable
+        private final SecureString clusterCredentials;
 
-        InternalRemoteConnection(Transport.Connection connection, String clusterAlias, String transportProfile) {
+        private InternalRemoteConnection(Transport.Connection connection, String clusterAlias, @Nullable SecureString clusterCredentials) {
             assert false == connection instanceof InternalRemoteConnection : "should not double wrap";
             assert false == connection instanceof ProxyConnection
                 : "proxy connection should wrap internal remote connection, not the other way around";
-            this.clusterAlias = Objects.requireNonNull(clusterAlias);
             this.connection = Objects.requireNonNull(connection);
-            this.isRemoteClusterProfile = REMOTE_CLUSTER_PROFILE.equals(Objects.requireNonNull(transportProfile));
+            this.clusterAlias = Objects.requireNonNull(clusterAlias);
+            this.clusterCredentials = clusterCredentials;
         }
 
         public String getClusterAlias() {
             return clusterAlias;
         }
 
+        @Nullable
+        public SecureString getClusterCredentials() {
+            return clusterCredentials;
+        }
+
         @Override
         public DiscoveryNode getNode() {
             return connection.getNode();
@@ -321,7 +346,7 @@ public class RemoteConnectionManager implements ConnectionManager {
         public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
             throws IOException, TransportException {
             final String effectiveAction;
-            if (isRemoteClusterProfile && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
+            if (clusterCredentials != null && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
                 logger.trace("sending remote cluster specific handshake to node [{}] of remote cluster [{}]", getNode(), clusterAlias);
                 effectiveAction = REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
             } else {
@@ -389,8 +414,8 @@ public class RemoteConnectionManager implements ConnectionManager {
     static InternalRemoteConnection wrapConnectionWithRemoteClusterInfo(
         Transport.Connection connection,
         String clusterAlias,
-        String transportProfile
+        RemoteClusterCredentialsManager credentialsManager
     ) {
-        return new InternalRemoteConnection(connection, clusterAlias, transportProfile);
+        return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias));
     }
 }

+ 5 - 1
server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

@@ -357,7 +357,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
                 : "transport profile must be consistent between the connection manager and the actual profile";
             transportService.connectionValidator(node)
                 .validate(
-                    RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, profile.getTransportProfile()),
+                    RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
+                        connection,
+                        clusterAlias,
+                        connectionManager.getCredentialsManager()
+                    ),
                     profile,
                     listener
                 );

+ 45 - 9
server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java

@@ -130,7 +130,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -188,7 +192,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 AtomicBoolean useAddress1 = new AtomicBoolean(true);
 
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -263,7 +271,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -328,7 +340,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -388,7 +404,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 AtomicBoolean useAddress1 = new AtomicBoolean(true);
 
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -459,7 +479,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -511,7 +535,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 });
 
                 try (
-                    var remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    var remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     var strategy = new ProxyConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -554,7 +582,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -672,7 +704,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(
                         clusterAlias,
                         localService,

+ 62 - 7
server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

@@ -62,6 +62,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -252,7 +253,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 AtomicReference<Exception> exceptionReference = new AtomicReference<>();
                 String clusterAlias = "test-cluster";
                 Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
-                try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, randomBoolean())) {
+                try (
+                    RemoteClusterConnection connection = new RemoteClusterConnection(
+                        settings,
+                        clusterAlias,
+                        service,
+                        randomFrom(RemoteClusterCredentialsManager.EMPTY, buildCredentialsManager(clusterAlias))
+                    )
+                ) {
                     ActionListener<Void> listener = ActionListener.wrap(x -> {
                         listenerCalled.countDown();
                         fail("expected exception");
@@ -322,7 +330,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.acceptIncomingRequests();
                 String clusterAlias = "test-cluster";
                 Settings settings = buildRandomSettings(clusterAlias, seedNodes);
-                try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, false)) {
+                try (
+                    RemoteClusterConnection connection = new RemoteClusterConnection(
+                        settings,
+                        clusterAlias,
+                        service,
+                        RemoteClusterCredentialsManager.EMPTY
+                    )
+                ) {
                     int numThreads = randomIntBetween(4, 10);
                     Thread[] threads = new Thread[numThreads];
                     CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
@@ -470,7 +485,12 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                     settings = Settings.builder().put(settings).setSecureSettings(secureSettings).build();
                 }
                 try (
-                    RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, hasClusterCredentials)
+                    RemoteClusterConnection connection = new RemoteClusterConnection(
+                        settings,
+                        clusterAlias,
+                        service,
+                        hasClusterCredentials ? buildCredentialsManager(clusterAlias) : RemoteClusterCredentialsManager.EMPTY
+                    )
                 ) {
                     // test no nodes connected
                     RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo());
@@ -662,7 +682,12 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 }
 
                 try (
-                    RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, hasClusterCredentials)
+                    RemoteClusterConnection connection = new RemoteClusterConnection(
+                        settings,
+                        clusterAlias,
+                        service,
+                        hasClusterCredentials ? buildCredentialsManager(clusterAlias) : RemoteClusterCredentialsManager.EMPTY
+                    )
                 ) {
                     CountDownLatch responseLatch = new CountDownLatch(1);
                     AtomicReference<Function<String, DiscoveryNode>> reference = new AtomicReference<>();
@@ -713,7 +738,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 String clusterAlias = "test-cluster";
                 Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
 
-                try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, false)) {
+                try (
+                    RemoteClusterConnection connection = new RemoteClusterConnection(
+                        settings,
+                        clusterAlias,
+                        service,
+                        RemoteClusterCredentialsManager.EMPTY
+                    )
+                ) {
                     PlainActionFuture<Void> plainActionFuture = new PlainActionFuture<>();
                     connection.ensureConnected(plainActionFuture);
                     plainActionFuture.get(10, TimeUnit.SECONDS);
@@ -779,7 +811,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
 
                 String clusterAlias = "test-cluster";
                 Settings settings = buildRandomSettings(clusterAlias, seedNodes);
-                try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, randomBoolean())) {
+                try (
+                    RemoteClusterConnection connection = new RemoteClusterConnection(
+                        settings,
+                        clusterAlias,
+                        service,
+                        randomFrom(RemoteClusterCredentialsManager.EMPTY, buildCredentialsManager(clusterAlias))
+                    )
+                ) {
                     final int numGetThreads = randomIntBetween(4, 10);
                     final Thread[] getThreads = new Thread[numGetThreads];
                     final int numModifyingThreads = randomIntBetween(4, 10);
@@ -873,7 +912,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
                 service.acceptIncomingRequests();
                 String clusterAlias = "test-cluster";
                 Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
-                try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, false)) {
+                try (
+                    RemoteClusterConnection connection = new RemoteClusterConnection(
+                        settings,
+                        clusterAlias,
+                        service,
+                        RemoteClusterCredentialsManager.EMPTY
+                    )
+                ) {
                     PlainActionFuture.get(fut -> connection.ensureConnected(fut.map(x -> null)));
                     for (int i = 0; i < 10; i++) {
                         // always a direct connection as the remote node is already connected
@@ -921,4 +967,13 @@ public class RemoteClusterConnectionTests extends ESTestCase {
         );
         return builder.build();
     }
+
+    private static RemoteClusterCredentialsManager buildCredentialsManager(String clusterAlias) {
+        Objects.requireNonNull(clusterAlias);
+        final Settings.Builder builder = Settings.builder();
+        final MockSecureSettings secureSettings = new MockSecureSettings();
+        secureSettings.setString("cluster.remote." + clusterAlias + ".credentials", randomAlphaOfLength(20));
+        builder.setSecureSettings(secureSettings);
+        return new RemoteClusterCredentialsManager(builder.build());
+    }
 }

+ 43 - 0
server/src/test/java/org/elasticsearch/transport/RemoteClusterCredentialsManagerTests.java

@@ -0,0 +1,43 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.transport;
+
+import org.elasticsearch.common.settings.MockSecureSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class RemoteClusterCredentialsManagerTests extends ESTestCase {
+    public void testResolveRemoteClusterCredentials() {
+        final String clusterAlias = randomAlphaOfLength(9);
+        final String otherClusterAlias = randomAlphaOfLength(10);
+
+        final String secret = randomAlphaOfLength(20);
+        final Settings settings = buildSettingsWithCredentials(clusterAlias, secret);
+        RemoteClusterCredentialsManager credentialsManager = new RemoteClusterCredentialsManager(settings);
+        assertThat(credentialsManager.resolveCredentials(clusterAlias).toString(), equalTo(secret));
+        assertThat(credentialsManager.hasCredentials(otherClusterAlias), is(false));
+
+        final String updatedSecret = randomAlphaOfLength(21);
+        credentialsManager.updateClusterCredentials(buildSettingsWithCredentials(clusterAlias, updatedSecret));
+        assertThat(credentialsManager.resolveCredentials(clusterAlias).toString(), equalTo(updatedSecret));
+
+        credentialsManager.updateClusterCredentials(Settings.EMPTY);
+        assertThat(credentialsManager.hasCredentials(clusterAlias), is(false));
+    }
+
+    private Settings buildSettingsWithCredentials(String clusterAlias, String secret) {
+        final Settings.Builder builder = Settings.builder();
+        final MockSecureSettings secureSettings = new MockSecureSettings();
+        secureSettings.setString("cluster.remote." + clusterAlias + ".credentials", secret);
+        return builder.setSecureSettings(secureSettings).build();
+    }
+}

+ 30 - 2
server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -23,17 +24,20 @@ import org.mockito.Mockito;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class RemoteConnectionManagerTests extends ESTestCase {
 
@@ -49,6 +53,7 @@ public class RemoteConnectionManagerTests extends ESTestCase {
         transport = mock(Transport.class);
         remoteConnectionManager = new RemoteConnectionManager(
             "remote-cluster",
+            RemoteClusterCredentialsManager.EMPTY,
             new ClusterConnectionManager(Settings.EMPTY, transport, new ThreadContext(Settings.EMPTY))
         );
 
@@ -120,10 +125,13 @@ public class RemoteConnectionManagerTests extends ESTestCase {
 
     public void testRewriteHandshakeAction() throws IOException {
         final Transport.Connection connection = mock(Transport.Connection.class);
+        final String clusterAlias = randomAlphaOfLengthBetween(3, 8);
+        final RemoteClusterCredentialsManager credentialsResolver = mock(RemoteClusterCredentialsManager.class);
+        when(credentialsResolver.resolveCredentials(clusterAlias)).thenReturn(new SecureString(randomAlphaOfLength(42)));
         final Transport.Connection wrappedConnection = RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
             connection,
-            randomAlphaOfLengthBetween(3, 8),
-            RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE
+            clusterAlias,
+            credentialsResolver
         );
         final long requestId = randomLong();
         final TransportRequest request = mock(TransportRequest.class);
@@ -142,6 +150,26 @@ public class RemoteConnectionManagerTests extends ESTestCase {
         verify(connection).sendRequest(requestId, anotherAction, request, options);
     }
 
+    public void testWrapAndResolveConnectionRoundTrip() {
+        final Transport.Connection connection = mock(Transport.Connection.class);
+        final String clusterAlias = randomAlphaOfLengthBetween(3, 8);
+        final RemoteClusterCredentialsManager credentialsResolver = mock(RemoteClusterCredentialsManager.class);
+        final SecureString credentials = new SecureString(randomAlphaOfLength(42));
+        // second credential will never be resolved
+        when(credentialsResolver.resolveCredentials(clusterAlias)).thenReturn(credentials, (SecureString) null);
+        final Transport.Connection wrappedConnection = RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
+            connection,
+            clusterAlias,
+            credentialsResolver
+        );
+
+        final Optional<RemoteConnectionManager.RemoteClusterAliasWithCredentials> actual = RemoteConnectionManager
+            .resolveRemoteClusterAliasWithCredentials(wrappedConnection);
+
+        assertThat(actual.isPresent(), is(true));
+        assertThat(actual.get(), equalTo(new RemoteConnectionManager.RemoteClusterAliasWithCredentials(clusterAlias, credentials)));
+    }
+
     private static class TestRemoteConnection extends CloseableConnection {
 
         private final DiscoveryNode node;

+ 15 - 3
server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java

@@ -26,7 +26,11 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
             mock(Transport.class),
             threadContext
         );
-        RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
+        RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+            "cluster-alias",
+            RemoteClusterCredentialsManager.EMPTY,
+            connectionManager
+        );
         FakeConnectionStrategy first = new FakeConnectionStrategy(
             "cluster-alias",
             mock(TransportService.class),
@@ -46,7 +50,11 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
             mock(Transport.class),
             threadContext
         );
-        RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
+        RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+            "cluster-alias",
+            RemoteClusterCredentialsManager.EMPTY,
+            connectionManager
+        );
         FakeConnectionStrategy first = new FakeConnectionStrategy(
             "cluster-alias",
             mock(TransportService.class),
@@ -69,7 +77,11 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
         assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval());
         assertEquals(Compression.Enabled.INDEXING_DATA, connectionManager.getConnectionProfile().getCompressionEnabled());
         assertEquals(Compression.Scheme.LZ4, connectionManager.getConnectionProfile().getCompressionScheme());
-        RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
+        RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+            "cluster-alias",
+            RemoteClusterCredentialsManager.EMPTY,
+            connectionManager
+        );
         FakeConnectionStrategy first = new FakeConnectionStrategy(
             "cluster-alias",
             mock(TransportService.class),

+ 55 - 11
server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java

@@ -192,7 +192,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        hasClusterCredentials ? new RemoteClusterCredentialsManager(clientSettings) : RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -262,7 +266,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -336,7 +344,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -424,7 +436,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -486,7 +502,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -549,7 +569,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -617,7 +641,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -694,7 +722,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -783,7 +815,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -895,7 +931,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,
@@ -964,7 +1004,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                     threadPool.getThreadContext()
                 );
                 try (
-                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
+                    RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(
+                        clusterAlias,
+                        RemoteClusterCredentialsManager.EMPTY,
+                        connectionManager
+                    );
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
                         clusterAlias,
                         localService,

+ 23 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ActionTypes.java

@@ -0,0 +1,23 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.security.action;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+
+/**
+ * A collection of actions types for the Security plugin that need to be available in xpack.core.security and thus cannot be stored
+ * directly with their transport action implementation.
+ */
+public final class ActionTypes {
+    private ActionTypes() {};
+
+    public static final ActionType<ActionResponse.Empty> RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION = ActionType.localOnly(
+        "cluster:admin/xpack/security/remote_cluster_credentials/reload"
+    );
+}

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java

@@ -12,6 +12,7 @@ import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
 import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
 import org.elasticsearch.persistent.CompletionPersistentTaskAction;
 import org.elasticsearch.transport.TransportActionProxy;
+import org.elasticsearch.xpack.core.security.action.ActionTypes;
 import org.elasticsearch.xpack.core.security.support.StringMatcher;
 
 import java.util.Collections;
@@ -43,7 +44,8 @@ public final class SystemPrivilege extends Privilege {
         "indices:data/read/*", // needed for SystemIndexMigrator
         "indices:admin/refresh", // needed for SystemIndexMigrator
         "indices:admin/aliases", // needed for SystemIndexMigrator
-        TransportSearchShardsAction.TYPE.name() // added so this API can be called with the system user by other APIs
+        TransportSearchShardsAction.TYPE.name(), // added so this API can be called with the system user by other APIs
+        ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION.name() // needed for Security plugin reload of remote cluster credentials
     );
 
     private static final Predicate<String> PREDICATE = (action) -> {

+ 1 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -255,6 +255,7 @@ public class Constants {
         "cluster:admin/xpack/security/profile/suggest",
         "cluster:admin/xpack/security/profile/set_enabled",
         "cluster:admin/xpack/security/realm/cache/clear",
+        "cluster:admin/xpack/security/remote_cluster_credentials/reload",
         "cluster:admin/xpack/security/role/delete",
         "cluster:admin/xpack/security/role/get",
         "cluster:admin/xpack/security/role/put",

+ 314 - 0
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/ReloadRemoteClusterCredentialsIT.java

@@ -0,0 +1,314 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security;
+
+import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.reload.NodesReloadSecureSettingsResponse;
+import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchShardsRequest;
+import org.elasticsearch.action.search.SearchShardsResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.action.search.TransportSearchAction;
+import org.elasticsearch.action.search.TransportSearchShardsAction;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.VersionInformation;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.KeyStoreWrapper;
+import org.elasticsearch.common.settings.SecureString;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.internal.InternalSearchResponse;
+import org.elasticsearch.test.SecuritySingleNodeTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteClusterCredentialsManager;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.security.authc.ApiKeyService;
+import org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders;
+import org.junit.BeforeClass;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class ReloadRemoteClusterCredentialsIT extends SecuritySingleNodeTestCase {
+    private static final String CLUSTER_ALIAS = "my_remote_cluster";
+
+    @BeforeClass
+    public static void disableInFips() {
+        assumeFalse(
+            "Cannot run in FIPS mode since the keystore will be password protected and sending a password in the reload"
+                + "settings api call, require TLS to be configured for the transport layer",
+            inFipsJvm()
+        );
+    }
+
+    @Override
+    public String configRoles() {
+        return org.elasticsearch.core.Strings.format("""
+            user:
+              cluster: [ "ALL" ]
+              indices:
+                - names: '*'
+                  privileges: [ "ALL" ]
+              remote_indices:
+                - names: '*'
+                  privileges: [ "ALL" ]
+                  clusters: ["*"]
+            """);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        try {
+            clearRemoteCluster();
+            super.tearDown();
+        } finally {
+            ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
+        }
+    }
+
+    private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
+
+    public void testReloadRemoteClusterCredentials() throws Exception {
+        final String credentials = randomAlphaOfLength(42);
+        writeCredentialsToKeyStore(credentials);
+        final RemoteClusterCredentialsManager clusterCredentialsManager = getInstanceFromNode(TransportService.class)
+            .getRemoteClusterService()
+            .getRemoteClusterCredentialsManager();
+        // Until we reload, credentials written to keystore are not loaded into the credentials manager
+        assertThat(clusterCredentialsManager.hasCredentials(CLUSTER_ALIAS), is(false));
+        reloadSecureSettings();
+        assertThat(clusterCredentialsManager.resolveCredentials(CLUSTER_ALIAS), equalTo(credentials));
+
+        // Check that credentials get used for a remote connection, once we configure it
+        final BlockingQueue<Map<String, String>> capturedHeaders = ConcurrentCollections.newBlockingQueue();
+        try (MockTransportService remoteTransport = startTransport("remoteNodeA", threadPool, capturedHeaders)) {
+            final TransportAddress remoteAddress = remoteTransport.getOriginalTransport()
+                .profileBoundAddresses()
+                .get("_remote_cluster")
+                .publishAddress();
+
+            configureRemoteCluster(remoteAddress);
+
+            // Run search to trigger header capturing on the receiving side
+            client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get();
+
+            assertHeadersContainCredentialsThenClear(credentials, capturedHeaders);
+
+            // Update credentials and ensure they are used
+            final String updatedCredentials = randomAlphaOfLength(41);
+            writeCredentialsToKeyStore(updatedCredentials);
+            reloadSecureSettings();
+
+            client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get();
+
+            assertHeadersContainCredentialsThenClear(updatedCredentials, capturedHeaders);
+        }
+    }
+
+    private void assertHeadersContainCredentialsThenClear(String credentials, BlockingQueue<Map<String, String>> capturedHeaders) {
+        assertThat(capturedHeaders, is(not(empty())));
+        for (Map<String, String> actualHeaders : capturedHeaders) {
+            assertThat(actualHeaders, hasKey(CrossClusterAccessHeaders.CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY));
+            assertThat(
+                actualHeaders.get(CrossClusterAccessHeaders.CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY),
+                equalTo(ApiKeyService.withApiKeyPrefix(credentials))
+            );
+        }
+        capturedHeaders.clear();
+        assertThat(capturedHeaders, is(empty()));
+    }
+
+    private void clearRemoteCluster() throws InterruptedException, ExecutionException {
+        final var builder = Settings.builder()
+            .putNull("cluster.remote." + CLUSTER_ALIAS + ".mode")
+            .putNull("cluster.remote." + CLUSTER_ALIAS + ".seeds")
+            .putNull("cluster.remote." + CLUSTER_ALIAS + ".proxy_address");
+        clusterAdmin().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(builder)).get();
+    }
+
+    @Override
+    protected Settings nodeSettings() {
+        return Settings.builder().put(super.nodeSettings()).put("xpack.security.remote_cluster_client.ssl.enabled", false).build();
+    }
+
+    private void configureRemoteCluster(TransportAddress remoteAddress) throws InterruptedException, ExecutionException {
+        final Settings.Builder builder = Settings.builder();
+        if (randomBoolean()) {
+            builder.put("cluster.remote." + CLUSTER_ALIAS + ".mode", "sniff")
+                .put("cluster.remote." + CLUSTER_ALIAS + ".seeds", remoteAddress.toString())
+                .putNull("cluster.remote." + CLUSTER_ALIAS + ".proxy_address");
+        } else {
+            builder.put("cluster.remote." + CLUSTER_ALIAS + ".mode", "proxy")
+                .put("cluster.remote." + CLUSTER_ALIAS + ".proxy_address", remoteAddress.toString())
+                .putNull("cluster.remote." + CLUSTER_ALIAS + ".seeds");
+        }
+        clusterAdmin().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(builder)).get();
+    }
+
+    private void writeCredentialsToKeyStore(String credentials) throws Exception {
+        final Environment environment = getInstanceFromNode(Environment.class);
+        final KeyStoreWrapper keyStoreWrapper = KeyStoreWrapper.create();
+        keyStoreWrapper.setString("cluster.remote." + CLUSTER_ALIAS + ".credentials", credentials.toCharArray());
+        keyStoreWrapper.save(environment.configFile(), new char[0], false);
+    }
+
+    public static MockTransportService startTransport(
+        final String nodeName,
+        final ThreadPool threadPool,
+        final BlockingQueue<Map<String, String>> capturedHeaders
+    ) {
+        boolean success = false;
+        final Settings settings = Settings.builder()
+            .put("node.name", nodeName)
+            .put("remote_cluster_server.enabled", "true")
+            .put("remote_cluster.port", "0")
+            .put("xpack.security.remote_cluster_server.ssl.enabled", "false")
+            .build();
+        final MockTransportService service = MockTransportService.createNewService(
+            settings,
+            VersionInformation.CURRENT,
+            TransportVersion.current(),
+            threadPool,
+            null
+        );
+        try {
+            service.registerRequestHandler(
+                ClusterStateAction.NAME,
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                ClusterStateRequest::new,
+                (request, channel, task) -> {
+                    capturedHeaders.add(Map.copyOf(threadPool.getThreadContext().getHeaders()));
+                    channel.sendResponse(
+                        new ClusterStateResponse(ClusterName.DEFAULT, ClusterState.builder(ClusterName.DEFAULT).build(), false)
+                    );
+                }
+            );
+            service.registerRequestHandler(
+                RemoteClusterNodesAction.TYPE.name(),
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                RemoteClusterNodesAction.Request::new,
+                (request, channel, task) -> {
+                    capturedHeaders.add(Map.copyOf(threadPool.getThreadContext().getHeaders()));
+                    channel.sendResponse(new RemoteClusterNodesAction.Response(List.of()));
+                }
+            );
+            service.registerRequestHandler(
+                TransportSearchShardsAction.TYPE.name(),
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                SearchShardsRequest::new,
+                (request, channel, task) -> {
+                    capturedHeaders.add(Map.copyOf(threadPool.getThreadContext().getHeaders()));
+                    channel.sendResponse(new SearchShardsResponse(List.of(), List.of(), Collections.emptyMap()));
+                }
+            );
+            service.registerRequestHandler(
+                TransportSearchAction.TYPE.name(),
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                SearchRequest::new,
+                (request, channel, task) -> {
+                    capturedHeaders.add(Map.copyOf(threadPool.getThreadContext().getHeaders()));
+                    channel.sendResponse(
+                        new SearchResponse(
+                            new InternalSearchResponse(
+                                new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN),
+                                InternalAggregations.EMPTY,
+                                null,
+                                null,
+                                false,
+                                null,
+                                1
+                            ),
+                            null,
+                            1,
+                            1,
+                            0,
+                            100,
+                            ShardSearchFailure.EMPTY_ARRAY,
+                            SearchResponse.Clusters.EMPTY
+                        )
+                    );
+                }
+            );
+            service.start();
+            service.acceptIncomingRequests();
+            success = true;
+            return service;
+        } finally {
+            if (success == false) {
+                service.close();
+            }
+        }
+    }
+
+    private void reloadSecureSettings() throws InterruptedException {
+        final AtomicReference<AssertionError> reloadSettingsError = new AtomicReference<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        final SecureString emptyPassword = randomBoolean() ? new SecureString(new char[0]) : null;
+        clusterAdmin().prepareReloadSecureSettings()
+            .setSecureStorePassword(emptyPassword)
+            .setNodesIds(Strings.EMPTY_ARRAY)
+            .execute(new ActionListener<>() {
+                @Override
+                public void onResponse(NodesReloadSecureSettingsResponse nodesReloadResponse) {
+                    try {
+                        assertThat(nodesReloadResponse, notNullValue());
+                        final Map<String, NodesReloadSecureSettingsResponse.NodeResponse> nodesMap = nodesReloadResponse.getNodesMap();
+                        assertThat(nodesMap.size(), equalTo(1));
+                        for (final NodesReloadSecureSettingsResponse.NodeResponse nodeResponse : nodesReloadResponse.getNodes()) {
+                            assertThat(nodeResponse.reloadException(), nullValue());
+                        }
+                    } catch (final AssertionError e) {
+                        reloadSettingsError.set(e);
+                    } finally {
+                        latch.countDown();
+                    }
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    reloadSettingsError.set(new AssertionError("Nodes request failed", e));
+                    latch.countDown();
+                }
+            });
+        latch.await();
+        if (reloadSettingsError.get() != null) {
+            throw reloadSettingsError.get();
+        }
+    }
+}

+ 83 - 25
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

@@ -13,6 +13,7 @@ import io.netty.handler.codec.http.HttpUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.TransportVersion;
@@ -21,6 +22,7 @@ import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.support.ActionFilter;
 import org.elasticsearch.action.support.DestructiveOperations;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.bootstrap.BootstrapCheck;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterState;
@@ -110,6 +112,7 @@ import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.core.security.SecurityExtension;
 import org.elasticsearch.xpack.core.security.SecurityField;
 import org.elasticsearch.xpack.core.security.SecuritySettings;
+import org.elasticsearch.xpack.core.security.action.ActionTypes;
 import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheAction;
 import org.elasticsearch.xpack.core.security.action.DelegatePkiAuthenticationAction;
 import org.elasticsearch.xpack.core.security.action.apikey.BulkUpdateApiKeyAction;
@@ -244,6 +247,7 @@ import org.elasticsearch.xpack.security.action.service.TransportGetServiceAccoun
 import org.elasticsearch.xpack.security.action.service.TransportGetServiceAccountCredentialsAction;
 import org.elasticsearch.xpack.security.action.service.TransportGetServiceAccountNodesCredentialsAction;
 import org.elasticsearch.xpack.security.action.settings.TransportGetSecuritySettingsAction;
+import org.elasticsearch.xpack.security.action.settings.TransportReloadRemoteClusterCredentialsAction;
 import org.elasticsearch.xpack.security.action.settings.TransportUpdateSecuritySettingsAction;
 import org.elasticsearch.xpack.security.action.token.TransportCreateTokenAction;
 import org.elasticsearch.xpack.security.action.token.TransportInvalidateTokenAction;
@@ -364,7 +368,6 @@ import org.elasticsearch.xpack.security.rest.action.user.RestSetEnabledAction;
 import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
 import org.elasticsearch.xpack.security.support.ExtensionComponents;
 import org.elasticsearch.xpack.security.support.SecuritySystemIndices;
-import org.elasticsearch.xpack.security.transport.RemoteClusterCredentialsResolver;
 import org.elasticsearch.xpack.security.transport.SecurityHttpSettings;
 import org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor;
 import org.elasticsearch.xpack.security.transport.filter.IPFilter;
@@ -386,6 +389,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -554,6 +558,7 @@ public class Security extends Plugin
     private final SetOnce<ReservedRoleMappingAction> reservedRoleMappingAction = new SetOnce<>();
     private final SetOnce<WorkflowService> workflowService = new SetOnce<>();
     private final SetOnce<Realms> realms = new SetOnce<>();
+    private final SetOnce<Client> client = new SetOnce<>();
 
     public Security(Settings settings) {
         this(settings, Collections.emptyList());
@@ -573,25 +578,30 @@ public class Security extends Plugin
             runStartupChecks(settings);
             Automatons.updateConfiguration(settings);
         } else {
-            final List<String> remoteClusterCredentialsSettingKeys = RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS.getAllConcreteSettings(
-                settings
-            ).map(Setting::getKey).sorted().toList();
-            if (false == remoteClusterCredentialsSettingKeys.isEmpty()) {
-                throw new IllegalArgumentException(
-                    format(
-                        "Found [%s] remote clusters with credentials [%s]. Security [%s] must be enabled to connect to them. "
-                            + "Please either enable security or remove these settings from the keystore.",
-                        remoteClusterCredentialsSettingKeys.size(),
-                        Strings.collectionToCommaDelimitedString(remoteClusterCredentialsSettingKeys),
-                        XPackSettings.SECURITY_ENABLED.getKey()
-                    )
-                );
-            }
+            ensureNoRemoteClusterCredentialsOnDisabledSecurity(settings);
             this.bootstrapChecks.set(Collections.emptyList());
         }
         this.securityExtensions.addAll(extensions);
     }
 
+    private void ensureNoRemoteClusterCredentialsOnDisabledSecurity(Settings settings) {
+        assert false == enabled;
+        final List<String> remoteClusterCredentialsSettingKeys = RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS.getAllConcreteSettings(
+            settings
+        ).map(Setting::getKey).sorted().toList();
+        if (false == remoteClusterCredentialsSettingKeys.isEmpty()) {
+            throw new IllegalArgumentException(
+                format(
+                    "Found [%s] remote clusters with credentials [%s]. Security [%s] must be enabled to connect to them. "
+                        + "Please either enable security or remove these settings from the keystore.",
+                    remoteClusterCredentialsSettingKeys.size(),
+                    Strings.collectionToCommaDelimitedString(remoteClusterCredentialsSettingKeys),
+                    XPackSettings.SECURITY_ENABLED.getKey()
+                )
+            );
+        }
+    }
+
     private static void runStartupChecks(Settings settings) {
         validateRealmSettings(settings);
         if (XPackSettings.FIPS_MODE_ENABLED.get(settings)) {
@@ -616,6 +626,14 @@ public class Security extends Plugin
         return XPackPlugin.getSharedLicenseState();
     }
 
+    protected Client getClient() {
+        return client.get();
+    }
+
+    protected Realms getRealms() {
+        return realms.get();
+    }
+
     @Override
     public Collection<?> createComponents(PluginServices services) {
         try {
@@ -654,6 +672,8 @@ public class Security extends Plugin
             return Collections.singletonList(new SecurityUsageServices(null, null, null, null, null, null));
         }
 
+        this.client.set(client);
+
         // The settings in `environment` may have additional values over what was provided during construction
         // See Plugin#additionalSettings()
         this.settings = environment.settings();
@@ -980,8 +1000,6 @@ public class Security extends Plugin
         ipFilter.set(new IPFilter(settings, auditTrailService, clusterService.getClusterSettings(), getLicenseState()));
         components.add(ipFilter.get());
 
-        final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = new RemoteClusterCredentialsResolver(settings);
-
         DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterService.getClusterSettings());
         crossClusterAccessAuthcService.set(new CrossClusterAccessAuthenticationService(clusterService, apiKeyService, authcService.get()));
         components.add(crossClusterAccessAuthcService.get());
@@ -995,7 +1013,6 @@ public class Security extends Plugin
                 securityContext.get(),
                 destructiveOperations,
                 crossClusterAccessAuthcService.get(),
-                remoteClusterCredentialsResolver,
                 getLicenseState()
             )
         );
@@ -1348,6 +1365,7 @@ public class Security extends Plugin
             new ActionHandler<>(SetProfileEnabledAction.INSTANCE, TransportSetProfileEnabledAction.class),
             new ActionHandler<>(GetSecuritySettingsAction.INSTANCE, TransportGetSecuritySettingsAction.class),
             new ActionHandler<>(UpdateSecuritySettingsAction.INSTANCE, TransportUpdateSecuritySettingsAction.class),
+            new ActionHandler<>(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION, TransportReloadRemoteClusterCredentialsAction.class),
             usageAction,
             infoAction
         ).filter(Objects::nonNull).toList();
@@ -1887,16 +1905,56 @@ public class Security extends Plugin
     @Override
     public void reload(Settings settings) throws Exception {
         if (enabled) {
-            realms.get().stream().filter(r -> JwtRealmSettings.TYPE.equals(r.realmRef().getType())).forEach(realm -> {
-                if (realm instanceof JwtRealm jwtRealm) {
-                    jwtRealm.rotateClientSecret(
-                        CLIENT_AUTHENTICATION_SHARED_SECRET.getConcreteSettingForNamespace(realm.realmRef().getName()).get(settings)
-                    );
-                }
-            });
+            final List<Exception> reloadExceptions = new ArrayList<>();
+            try {
+                reloadRemoteClusterCredentials(settings);
+            } catch (Exception ex) {
+                reloadExceptions.add(ex);
+            }
+
+            try {
+                reloadSharedSecretsForJwtRealms(settings);
+            } catch (Exception ex) {
+                reloadExceptions.add(ex);
+            }
+
+            if (false == reloadExceptions.isEmpty()) {
+                final var combinedException = new ElasticsearchException(
+                    "secure settings reload failed for one or more security components"
+                );
+                reloadExceptions.forEach(combinedException::addSuppressed);
+                throw combinedException;
+            }
+        } else {
+            ensureNoRemoteClusterCredentialsOnDisabledSecurity(settings);
         }
     }
 
+    private void reloadSharedSecretsForJwtRealms(Settings settingsWithKeystore) {
+        getRealms().stream().filter(r -> JwtRealmSettings.TYPE.equals(r.realmRef().getType())).forEach(realm -> {
+            if (realm instanceof JwtRealm jwtRealm) {
+                jwtRealm.rotateClientSecret(
+                    CLIENT_AUTHENTICATION_SHARED_SECRET.getConcreteSettingForNamespace(realm.realmRef().getName()).get(settingsWithKeystore)
+                );
+            }
+        });
+    }
+
+    /**
+     * This method uses a transport action internally to access classes that are injectable but not part of the plugin contract.
+     * See {@link TransportReloadRemoteClusterCredentialsAction} for more context.
+     */
+    private void reloadRemoteClusterCredentials(Settings settingsWithKeystore) {
+        final PlainActionFuture<ActionResponse.Empty> future = new PlainActionFuture<>();
+        getClient().execute(
+            ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION,
+            new TransportReloadRemoteClusterCredentialsAction.Request(settingsWithKeystore),
+            future
+        );
+        assert future.isDone() : "expecting local-only action call to return immediately on invocation";
+        future.actionGet(0, TimeUnit.NANOSECONDS);
+    }
+
     static final class ValidateLicenseForFIPS implements BiConsumer<DiscoveryNode, ClusterState> {
         private final boolean inFipsMode;
         private final LicenseService licenseService;

+ 77 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/settings/TransportReloadRemoteClusterCredentialsAction.java

@@ -0,0 +1,77 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.action.settings;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.TransportAction;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.transport.RemoteClusterService;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.security.action.ActionTypes;
+import org.elasticsearch.xpack.security.Security;
+
+import java.io.IOException;
+
+/**
+ * This is a local-only action which updates remote cluster credentials for remote cluster connections, from keystore settings reloaded via
+ * a call to {@link org.elasticsearch.rest.action.admin.cluster.RestReloadSecureSettingsAction}.
+ *
+ * It's invoked as part of the {@link Security#reload(Settings)} call.
+ *
+ * This action is largely an implementation detail to work around the fact that Security is a plugin without direct access to many core
+ * classes, including the {@link RemoteClusterService} which is required for a credentials reload. A transport action gives us access to
+ * the {@link RemoteClusterService} which is injectable but not part of the plugin contract.
+ */
+public class TransportReloadRemoteClusterCredentialsAction extends TransportAction<
+    TransportReloadRemoteClusterCredentialsAction.Request,
+    ActionResponse.Empty> {
+
+    private final RemoteClusterService remoteClusterService;
+
+    @Inject
+    public TransportReloadRemoteClusterCredentialsAction(TransportService transportService, ActionFilters actionFilters) {
+        super(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION.name(), actionFilters, transportService.getTaskManager());
+        this.remoteClusterService = transportService.getRemoteClusterService();
+    }
+
+    @Override
+    protected void doExecute(Task task, Request request, ActionListener<ActionResponse.Empty> listener) {
+        // We avoid stashing and marking context as system to keep the action as minimal as possible (i.e., avoid copying context)
+        remoteClusterService.updateRemoteClusterCredentials(request.getSettings());
+        listener.onResponse(ActionResponse.Empty.INSTANCE);
+    }
+
+    public static class Request extends ActionRequest {
+        private final Settings settings;
+
+        public Request(Settings settings) {
+            this.settings = settings;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        public Settings getSettings() {
+            return settings;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            localOnly();
+        }
+    }
+}

+ 0 - 51
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolver.java

@@ -1,51 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.security.transport;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.settings.SecureString;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.xpack.security.authc.ApiKeyService;
-
-import java.util.Map;
-import java.util.Optional;
-
-import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS;
-
-public class RemoteClusterCredentialsResolver {
-
-    private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsResolver.class);
-
-    private final Map<String, SecureString> clusterCredentials;
-
-    public RemoteClusterCredentialsResolver(final Settings settings) {
-        this.clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings);
-        logger.debug(
-            "Read cluster credentials for remote clusters [{}]",
-            Strings.collectionToCommaDelimitedString(clusterCredentials.keySet())
-        );
-    }
-
-    public Optional<RemoteClusterCredentials> resolve(final String clusterAlias) {
-        final SecureString apiKey = clusterCredentials.get(clusterAlias);
-        if (apiKey == null) {
-            return Optional.empty();
-        } else {
-            return Optional.of(new RemoteClusterCredentials(clusterAlias, ApiKeyService.withApiKeyPrefix(apiKey.toString())));
-        }
-    }
-
-    record RemoteClusterCredentials(String clusterAlias, String credentials) {
-        @Override
-        public String toString() {
-            return "RemoteClusterCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}";
-        }
-    }
-}

+ 26 - 19
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java

@@ -12,6 +12,7 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
 import org.elasticsearch.action.support.DestructiveOperations;
+import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.ssl.SslConfiguration;
 import org.elasticsearch.common.util.Maps;
@@ -24,6 +25,7 @@ import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.RemoteConnectionManager;
+import org.elasticsearch.transport.RemoteConnectionManager.RemoteClusterAliasWithCredentials;
 import org.elasticsearch.transport.SendRequestTransportException;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportChannel;
@@ -46,6 +48,7 @@ import org.elasticsearch.xpack.core.security.user.User;
 import org.elasticsearch.xpack.core.ssl.SSLService;
 import org.elasticsearch.xpack.security.Security;
 import org.elasticsearch.xpack.security.audit.AuditUtil;
+import org.elasticsearch.xpack.security.authc.ApiKeyService;
 import org.elasticsearch.xpack.security.authc.AuthenticationService;
 import org.elasticsearch.xpack.security.authc.CrossClusterAccessAuthenticationService;
 import org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders;
@@ -63,7 +66,6 @@ import static org.elasticsearch.core.Strings.format;
 import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
 import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED;
 import static org.elasticsearch.transport.RemoteClusterPortSettings.TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY;
-import static org.elasticsearch.xpack.security.transport.RemoteClusterCredentialsResolver.RemoteClusterCredentials;
 
 public class SecurityServerTransportInterceptor implements TransportInterceptor {
 
@@ -85,8 +87,7 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
     private final Settings settings;
     private final SecurityContext securityContext;
     private final CrossClusterAccessAuthenticationService crossClusterAccessAuthcService;
-    private final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver;
-    private final Function<Transport.Connection, Optional<String>> remoteClusterAliasResolver;
+    private final Function<Transport.Connection, Optional<RemoteClusterAliasWithCredentials>> remoteClusterCredentialsResolver;
     private final XPackLicenseState licenseState;
 
     public SecurityServerTransportInterceptor(
@@ -98,7 +99,6 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
         SecurityContext securityContext,
         DestructiveOperations destructiveOperations,
         CrossClusterAccessAuthenticationService crossClusterAccessAuthcService,
-        RemoteClusterCredentialsResolver remoteClusterCredentialsResolver,
         XPackLicenseState licenseState
     ) {
         this(
@@ -110,9 +110,8 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
             securityContext,
             destructiveOperations,
             crossClusterAccessAuthcService,
-            remoteClusterCredentialsResolver,
             licenseState,
-            RemoteConnectionManager::resolveRemoteClusterAlias
+            RemoteConnectionManager::resolveRemoteClusterAliasWithCredentials
         );
     }
 
@@ -125,10 +124,9 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
         SecurityContext securityContext,
         DestructiveOperations destructiveOperations,
         CrossClusterAccessAuthenticationService crossClusterAccessAuthcService,
-        RemoteClusterCredentialsResolver remoteClusterCredentialsResolver,
         XPackLicenseState licenseState,
         // Inject for simplified testing
-        Function<Transport.Connection, Optional<String>> remoteClusterAliasResolver
+        Function<Transport.Connection, Optional<RemoteClusterAliasWithCredentials>> remoteClusterCredentialsResolver
     ) {
         this.settings = settings;
         this.threadPool = threadPool;
@@ -139,7 +137,6 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
         this.crossClusterAccessAuthcService = crossClusterAccessAuthcService;
         this.licenseState = licenseState;
         this.remoteClusterCredentialsResolver = remoteClusterCredentialsResolver;
-        this.remoteClusterAliasResolver = remoteClusterAliasResolver;
         this.profileFilters = initializeProfileFilters(destructiveOperations);
     }
 
@@ -159,7 +156,8 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
                 TransportResponseHandler<T> handler
             ) {
                 assertNoCrossClusterAccessHeadersInContext();
-                final Optional<String> remoteClusterAlias = remoteClusterAliasResolver.apply(connection);
+                final Optional<String> remoteClusterAlias = remoteClusterCredentialsResolver.apply(connection)
+                    .map(RemoteClusterAliasWithCredentials::clusterAlias);
                 if (PreAuthorizationUtils.shouldRemoveParentAuthorizationFromThreadContext(remoteClusterAlias, action, securityContext)) {
                     securityContext.executeAfterRemovingParentAuthorization(original -> {
                         sendRequestInner(
@@ -278,22 +276,23 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
              * Returns cluster credentials if the connection is remote, and cluster credentials are set up for the target cluster.
              */
             private Optional<RemoteClusterCredentials> getRemoteClusterCredentials(Transport.Connection connection) {
-                final Optional<String> optionalRemoteClusterAlias = remoteClusterAliasResolver.apply(connection);
-                if (optionalRemoteClusterAlias.isEmpty()) {
+                final Optional<RemoteClusterAliasWithCredentials> remoteClusterAliasWithCredentials = remoteClusterCredentialsResolver
+                    .apply(connection);
+                if (remoteClusterAliasWithCredentials.isEmpty()) {
                     logger.trace("Connection is not remote");
                     return Optional.empty();
                 }
 
-                final String remoteClusterAlias = optionalRemoteClusterAlias.get();
-                final Optional<RemoteClusterCredentials> remoteClusterCredentials = remoteClusterCredentialsResolver.resolve(
-                    remoteClusterAlias
-                );
-                if (remoteClusterCredentials.isEmpty()) {
+                final String remoteClusterAlias = remoteClusterAliasWithCredentials.get().clusterAlias();
+                final SecureString remoteClusterCredentials = remoteClusterAliasWithCredentials.get().credentials();
+                if (remoteClusterCredentials == null) {
                     logger.trace("No cluster credentials are configured for remote cluster [{}]", remoteClusterAlias);
                     return Optional.empty();
                 }
 
-                return remoteClusterCredentials;
+                return Optional.of(
+                    new RemoteClusterCredentials(remoteClusterAlias, ApiKeyService.withApiKeyPrefix(remoteClusterCredentials.toString()))
+                );
             }
 
             private <T extends TransportResponse> void sendWithCrossClusterAccessHeaders(
@@ -442,7 +441,7 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
             throw new IllegalStateException("there should always be a user when sending a message for action [" + action + "]");
         }
 
-        assert securityContext.getParentAuthorization() == null || remoteClusterAliasResolver.apply(connection).isPresent() == false
+        assert securityContext.getParentAuthorization() == null || remoteClusterCredentialsResolver.apply(connection).isEmpty()
             : "parent authorization header should not be set for remote cluster requests";
 
         try {
@@ -663,4 +662,12 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor
             }
         }
     }
+
+    record RemoteClusterCredentials(String clusterAlias, String credentials) {
+
+        @Override
+        public String toString() {
+            return "RemoteClusterCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}";
+        }
+    }
 }

+ 13 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalStateSecurity.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.license.LicenseService;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.ReloadablePlugin;
 import org.elasticsearch.protocol.xpack.XPackInfoRequest;
 import org.elasticsearch.protocol.xpack.XPackInfoResponse;
 import org.elasticsearch.protocol.xpack.XPackUsageRequest;
@@ -36,7 +37,7 @@ import java.nio.file.Path;
 import java.util.Collections;
 import java.util.List;
 
-public class LocalStateSecurity extends LocalStateCompositeXPackPlugin {
+public class LocalStateSecurity extends LocalStateCompositeXPackPlugin implements ReloadablePlugin {
 
     public static class SecurityTransportXPackUsageAction extends TransportXPackUsageAction {
         @Inject
@@ -130,4 +131,15 @@ public class LocalStateSecurity extends LocalStateCompositeXPackPlugin {
     public List<Plugin> plugins() {
         return plugins;
     }
+
+    @Override
+    public void reload(Settings settings) throws Exception {
+        plugins.stream().filter(p -> p instanceof ReloadablePlugin).forEach(p -> {
+            try {
+                ((ReloadablePlugin) p).reload(settings);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
 }

+ 117 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java

@@ -9,10 +9,13 @@ package org.elasticsearch.xpack.security;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionModule;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
@@ -72,6 +75,7 @@ import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.core.security.SecurityExtension;
 import org.elasticsearch.xpack.core.security.SecurityField;
+import org.elasticsearch.xpack.core.security.action.ActionTypes;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper;
 import org.elasticsearch.xpack.core.security.authc.Realm;
@@ -116,6 +120,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
 import static org.elasticsearch.xpack.core.security.authc.RealmSettings.getFullSettingKey;
@@ -133,7 +138,10 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class SecurityTests extends ESTestCase {
@@ -877,6 +885,23 @@ public class SecurityTests extends ESTestCase {
                     + "Please either enable security or remove these settings from the keystore."
             )
         );
+
+        // Security off, remote cluster with credentials on reload call
+        final MockSecureSettings secureSettings5 = new MockSecureSettings();
+        secureSettings5.setString("cluster.remote.my1.credentials", randomAlphaOfLength(20));
+        secureSettings5.setString("cluster.remote.my2.credentials", randomAlphaOfLength(20));
+        final Settings.Builder builder5 = Settings.builder().setSecureSettings(secureSettings5);
+        // Use builder with security disabled to construct valid Security instance
+        final var security = new Security(builder2.build());
+        final IllegalArgumentException e5 = expectThrows(IllegalArgumentException.class, () -> security.reload(builder5.build()));
+        assertThat(
+            e5.getMessage(),
+            containsString(
+                "Found [2] remote clusters with credentials [cluster.remote.my1.credentials,cluster.remote.my2.credentials]. "
+                    + "Security [xpack.security.enabled] must be enabled to connect to them. "
+                    + "Please either enable security or remove these settings from the keystore."
+            )
+        );
     }
 
     public void testLoadExtensions() throws Exception {
@@ -905,6 +930,98 @@ public class SecurityTests extends ESTestCase {
         assertThat(registry, instanceOf(DummyOperatorOnlyRegistry.class));
     }
 
+    public void testReload() throws Exception {
+        final Settings settings = Settings.builder().put("xpack.security.enabled", true).put("path.home", createTempDir()).build();
+
+        final PlainActionFuture<ActionResponse.Empty> value = new PlainActionFuture<>();
+        final Client mockedClient = mock(Client.class);
+
+        final Realms mockedRealms = mock(Realms.class);
+        when(mockedRealms.stream()).thenReturn(Stream.of());
+
+        doAnswer((inv) -> {
+            @SuppressWarnings("unchecked")
+            ActionListener<ActionResponse.Empty> listener = (ActionListener<ActionResponse.Empty>) inv.getArguments()[2];
+            listener.onResponse(ActionResponse.Empty.INSTANCE);
+            return null;
+        }).when(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any());
+
+        security = new Security(settings, Collections.emptyList()) {
+            @Override
+            protected Client getClient() {
+                return mockedClient;
+            }
+
+            @Override
+            protected Realms getRealms() {
+                return mockedRealms;
+            }
+        };
+
+        final Settings inputSettings = Settings.EMPTY;
+        security.reload(inputSettings);
+
+        verify(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any());
+        verify(mockedRealms).stream();
+    }
+
+    public void testReloadWithFailures() {
+        final Settings settings = Settings.builder().put("xpack.security.enabled", true).put("path.home", createTempDir()).build();
+
+        final boolean failRemoteClusterCredentialsReload = randomBoolean();
+        final Client mockedClient = mock(Client.class);
+        if (failRemoteClusterCredentialsReload) {
+            doAnswer((inv) -> {
+                @SuppressWarnings("unchecked")
+                ActionListener<ActionResponse.Empty> listener = (ActionListener<ActionResponse.Empty>) inv.getArguments()[2];
+                listener.onFailure(new RuntimeException("failed remote cluster credentials reload"));
+                return null;
+            }).when(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any());
+        } else {
+            doAnswer((inv) -> {
+                @SuppressWarnings("unchecked")
+                ActionListener<ActionResponse.Empty> listener = (ActionListener<ActionResponse.Empty>) inv.getArguments()[2];
+                listener.onResponse(ActionResponse.Empty.INSTANCE);
+                return null;
+            }).when(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any());
+        }
+
+        final Realms mockedRealms = mock(Realms.class);
+        final boolean failRealmsReload = (false == failRemoteClusterCredentialsReload) || randomBoolean();
+        if (failRealmsReload) {
+            when(mockedRealms.stream()).thenThrow(new RuntimeException("failed jwt realms reload"));
+        } else {
+            when(mockedRealms.stream()).thenReturn(Stream.of());
+        }
+        security = new Security(settings, Collections.emptyList()) {
+            @Override
+            protected Client getClient() {
+                return mockedClient;
+            }
+
+            @Override
+            protected Realms getRealms() {
+                return mockedRealms;
+            }
+        };
+
+        final Settings inputSettings = Settings.EMPTY;
+        final var exception = expectThrows(ElasticsearchException.class, () -> security.reload(inputSettings));
+
+        assertThat(exception.getMessage(), containsString("secure settings reload failed for one or more security component"));
+        if (failRemoteClusterCredentialsReload) {
+            assertThat(exception.getSuppressed()[0].getMessage(), containsString("failed remote cluster credentials reload"));
+            if (failRealmsReload) {
+                assertThat(exception.getSuppressed()[1].getMessage(), containsString("failed jwt realms reload"));
+            }
+        } else {
+            assertThat(exception.getSuppressed()[0].getMessage(), containsString("failed jwt realms reload"));
+        }
+        // Verify both called despite failure
+        verify(mockedClient).execute(eq(ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION), any(), any());
+        verify(mockedRealms).stream();
+    }
+
     public void testLoadNoExtensions() throws Exception {
         Settings settings = Settings.builder()
             .put("xpack.security.enabled", true)

+ 0 - 38
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/RemoteClusterCredentialsResolverTests.java

@@ -1,38 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-package org.elasticsearch.xpack.security.transport;
-
-import org.elasticsearch.common.settings.MockSecureSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.xpack.security.authc.ApiKeyService;
-
-import java.util.Optional;
-
-import static org.elasticsearch.xpack.security.transport.RemoteClusterCredentialsResolver.RemoteClusterCredentials;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-
-public class RemoteClusterCredentialsResolverTests extends ESTestCase {
-
-    public void testResolveRemoteClusterCredentials() {
-        final String clusterNameA = "clusterA";
-        final String clusterDoesNotExist = randomAlphaOfLength(10);
-        final Settings.Builder builder = Settings.builder();
-
-        final String secret = randomAlphaOfLength(20);
-        final MockSecureSettings secureSettings = new MockSecureSettings();
-        secureSettings.setString("cluster.remote." + clusterNameA + ".credentials", secret);
-        final Settings settings = builder.setSecureSettings(secureSettings).build();
-        RemoteClusterCredentialsResolver remoteClusterAuthorizationResolver = new RemoteClusterCredentialsResolver(settings);
-        final Optional<RemoteClusterCredentials> remoteClusterCredentials = remoteClusterAuthorizationResolver.resolve(clusterNameA);
-        assertThat(remoteClusterCredentials.isPresent(), is(true));
-        assertThat(remoteClusterCredentials.get().clusterAlias(), equalTo(clusterNameA));
-        assertThat(remoteClusterCredentials.get().credentials(), equalTo(ApiKeyService.withApiKeyPrefix(secret)));
-        assertThat(remoteClusterAuthorizationResolver.resolve(clusterDoesNotExist), is(Optional.empty()));
-    }
-}

+ 25 - 52
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.action.support.DestructiveOperations;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.ssl.SslClientAuthenticationMode;
 import org.elasticsearch.common.ssl.SslConfiguration;
@@ -33,6 +34,7 @@ import org.elasticsearch.test.TransportVersionUtils;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.RemoteClusterPortSettings;
+import org.elasticsearch.transport.RemoteConnectionManager.RemoteClusterAliasWithCredentials;
 import org.elasticsearch.transport.SendRequestTransportException;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.Transport.Connection;
@@ -77,6 +79,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN;
 import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
@@ -87,7 +90,6 @@ import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
 import static org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo.CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY;
 import static org.elasticsearch.xpack.core.security.authz.RoleDescriptorTests.randomUniquelyNamedRoleDescriptors;
 import static org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders.CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY;
-import static org.elasticsearch.xpack.security.transport.RemoteClusterCredentialsResolver.RemoteClusterCredentials;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -153,7 +155,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            new RemoteClusterCredentialsResolver(settings),
             mockLicenseState
         );
         ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
@@ -205,7 +206,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            new RemoteClusterCredentialsResolver(settings),
             mockLicenseState
         );
         ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
@@ -250,7 +250,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            new RemoteClusterCredentialsResolver(settings),
             mockLicenseState
         ) {
             @Override
@@ -313,7 +312,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            new RemoteClusterCredentialsResolver(settings),
             mockLicenseState
         );
         ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
@@ -382,7 +380,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            new RemoteClusterCredentialsResolver(settings),
             mockLicenseState
         );
         ClusterServiceUtils.setState(clusterService, clusterService.state()); // force state update to trigger listener
@@ -449,7 +446,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            new RemoteClusterCredentialsResolver(settings),
             mockLicenseState
         );
 
@@ -604,7 +600,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
 
         AuthenticationTestHelper.builder().build().writeToContext(threadContext);
         final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10);
-        final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mockRemoteClusterCredentialsResolver(remoteClusterAlias);
 
         final SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(
             settings,
@@ -618,9 +613,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            remoteClusterCredentialsResolver,
             unsupportedLicenseState,
-            ignored -> Optional.of(remoteClusterAlias)
+            mockRemoteClusterCredentialsResolver(remoteClusterAlias)
         );
 
         final AsyncSender sender = interceptor.interceptSender(mock(AsyncSender.class, ignored -> {
@@ -661,18 +655,16 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
             actualException.get().getCause().getMessage(),
             equalTo("current license is non-compliant for [" + Security.ADVANCED_REMOTE_CLUSTER_SECURITY_FEATURE.getName() + "]")
         );
-        verify(remoteClusterCredentialsResolver, times(1)).resolve(eq(remoteClusterAlias));
         assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY), nullValue());
         assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY), nullValue());
     }
 
-    private RemoteClusterCredentialsResolver mockRemoteClusterCredentialsResolver(String remoteClusterAlias) {
-        final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class);
-        final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42));
-        when(remoteClusterCredentialsResolver.resolve(any())).thenReturn(
-            Optional.of(new RemoteClusterCredentials(remoteClusterAlias, remoteClusterCredential))
+    private Function<Connection, Optional<RemoteClusterAliasWithCredentials>> mockRemoteClusterCredentialsResolver(
+        String remoteClusterAlias
+    ) {
+        return connection -> Optional.of(
+            new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(randomAlphaOfLengthBetween(10, 42).toCharArray()))
         );
-        return remoteClusterCredentialsResolver;
     }
 
     public void testSendWithCrossClusterAccessHeadersForSystemUserRegularAction() throws Exception {
@@ -736,12 +728,9 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
     ) throws IOException {
         authentication.writeToContext(threadContext);
         final String expectedRequestId = AuditUtil.getOrGenerateRequestId(threadContext);
-        final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class);
         final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10);
-        final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42));
-        when(remoteClusterCredentialsResolver.resolve(any())).thenReturn(
-            Optional.of(new RemoteClusterCredentials(remoteClusterAlias, remoteClusterCredential))
-        );
+        final String encodedApiKey = randomAlphaOfLengthBetween(10, 42);
+        final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(encodedApiKey);
         final AuthorizationService authzService = mock(AuthorizationService.class);
         // We capture the listener so that we can complete the full flow, by calling onResponse further down
         @SuppressWarnings("unchecked")
@@ -760,9 +749,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            remoteClusterCredentialsResolver,
             mockLicenseState,
-            ignored -> Optional.of(remoteClusterAlias)
+            ignored -> Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(encodedApiKey.toCharArray())))
         );
 
         final AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
@@ -861,7 +849,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
         }
         assertThat(sentCredential.get(), equalTo(remoteClusterCredential));
         verify(securityContext, never()).executeAsInternalUser(any(), any(), anyConsumer());
-        verify(remoteClusterCredentialsResolver, times(1)).resolve(eq(remoteClusterAlias));
         assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY), nullValue());
         assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY), nullValue());
         assertThat(AuditUtil.extractRequestId(securityContext.getThreadContext()), equalTo(expectedRequestId));
@@ -874,15 +861,9 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
         if (false == (notRemoteConnection || noCredential)) {
             noCredential = true;
         }
+        final boolean finalNoCredential = noCredential;
         final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10);
-        final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class);
-        when(remoteClusterCredentialsResolver.resolve(any())).thenReturn(
-            noCredential
-                ? Optional.empty()
-                : Optional.of(
-                    new RemoteClusterCredentials(remoteClusterAlias, ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42)))
-                )
-        );
+        final String encodedApiKey = randomAlphaOfLengthBetween(10, 42);
         final AuthenticationTestHelper.AuthenticationTestBuilder builder = AuthenticationTestHelper.builder();
         final Authentication authentication = randomFrom(
             builder.apiKey().build(),
@@ -904,9 +885,12 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            remoteClusterCredentialsResolver,
             mockLicenseState,
-            ignored -> notRemoteConnection ? Optional.empty() : Optional.of(remoteClusterAlias)
+            ignored -> notRemoteConnection
+                ? Optional.empty()
+                : (finalNoCredential
+                    ? Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, null))
+                    : Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(encodedApiKey.toCharArray()))))
         );
 
         final AtomicBoolean calledWrappedSender = new AtomicBoolean(false);
@@ -944,12 +928,9 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
             .realm()
             .build();
         authentication.writeToContext(threadContext);
-        final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class);
         final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10);
-        final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42));
-        when(remoteClusterCredentialsResolver.resolve(any())).thenReturn(
-            Optional.of(new RemoteClusterCredentials(remoteClusterAlias, remoteClusterCredential))
-        );
+        final String encodedApiKey = randomAlphaOfLengthBetween(10, 42);
+        final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(encodedApiKey);
 
         final SecurityServerTransportInterceptor interceptor = new SecurityServerTransportInterceptor(
             settings,
@@ -963,9 +944,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            remoteClusterCredentialsResolver,
             mockLicenseState,
-            ignored -> Optional.of(remoteClusterAlias)
+            ignored -> Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(encodedApiKey.toCharArray())))
         );
 
         final AsyncSender sender = interceptor.interceptSender(new AsyncSender() {
@@ -1029,7 +1009,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                     + "] does not support receiving them"
             )
         );
-        verify(remoteClusterCredentialsResolver, times(1)).resolve(eq(remoteClusterAlias));
         assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY), nullValue());
         assertThat(securityContext.getThreadContext().getHeader(CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY), nullValue());
     }
@@ -1040,12 +1019,9 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
             .realm()
             .build();
         authentication.writeToContext(threadContext);
-        final RemoteClusterCredentialsResolver remoteClusterCredentialsResolver = mock(RemoteClusterCredentialsResolver.class);
         final String remoteClusterAlias = randomAlphaOfLengthBetween(5, 10);
-        final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(randomAlphaOfLengthBetween(10, 42));
-        when(remoteClusterCredentialsResolver.resolve(any())).thenReturn(
-            Optional.of(new RemoteClusterCredentials(remoteClusterAlias, remoteClusterCredential))
-        );
+        final String encodedApiKey = randomAlphaOfLengthBetween(10, 42);
+        final String remoteClusterCredential = ApiKeyService.withApiKeyPrefix(encodedApiKey);
         final AuthorizationService authzService = mock(AuthorizationService.class);
 
         doAnswer(invocation -> {
@@ -1067,9 +1043,8 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            remoteClusterCredentialsResolver,
             mockLicenseState,
-            ignored -> Optional.of(remoteClusterAlias)
+            ignored -> Optional.of(new RemoteClusterAliasWithCredentials(remoteClusterAlias, new SecureString(encodedApiKey.toCharArray())))
         );
 
         final AsyncSender sender = interceptor.interceptSender(new AsyncSender() {
@@ -1171,7 +1146,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            new RemoteClusterCredentialsResolver(settings),
             mockLicenseState
         );
 
@@ -1225,7 +1199,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
                 new ClusterSettings(Settings.EMPTY, Collections.singleton(DestructiveOperations.REQUIRES_NAME_SETTING))
             ),
             mock(CrossClusterAccessAuthenticationService.class),
-            new RemoteClusterCredentialsResolver(settings),
             mockLicenseState
         );