Browse Source

[8.x] Add a monitor_stats privilege and allow that privilege for remote cluster privileges (#114964) (#116517)

* Add a monitor_stats privilege and allow that privilege for remote cluster privileges (#114964)

This commit does the following:
   * Add a new monitor_stats privilege
   * Ensure that monitor_stats can be set in the remote_cluster privileges
   * Give's Kibana the ability to remotely call monitor_stats via RCS 2.0

Since this is the first case where there is more than 1 remote_cluster privilege,
the following framework concern has been added:
    * Ensure that when sending to elder RCS 2.0 clusters that we don't send the new privilege
        previous only supported all or nothing remote_cluster blocks
    * Ensure that we when sending API key role descriptors that contains remote_cluster,
       we don't send the new privileges for RCS 1.0/2.0 if it not new enough
    * Fix and extend the BWC tests for RCS 1.0 and RCS 2.0

(cherry picked from commit af99654dac95e55bc44b9bfe71393f5d8b48b0ef)

* adjust bwc for 8.x branch
Jake Landis 11 months ago
parent
commit
8adb2c4043
34 changed files with 1156 additions and 204 deletions
  1. 6 0
      docs/changelog/114964.yaml
  2. 1 1
      docs/reference/rest-api/security/bulk-create-roles.asciidoc
  3. 3 1
      docs/reference/rest-api/security/get-builtin-privileges.asciidoc
  4. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  5. 1 0
      x-pack/plugin/build.gradle
  6. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/user/GetUserPrivilegesResponse.java
  7. 102 9
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/Authentication.java
  8. 22 12
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptor.java
  9. 34 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/RemoteClusterPermissionGroup.java
  10. 111 21
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/RemoteClusterPermissions.java
  11. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/Role.java
  12. 2 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/SimpleRole.java
  13. 10 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java
  14. 12 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java
  15. 42 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/CrossClusterApiKeyRoleDescriptorBuilderTests.java
  16. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/role/PutRoleRequestTests.java
  17. 84 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authc/AuthenticationTests.java
  18. 28 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorTests.java
  19. 13 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/permission/RemoteClusterPermissionGroupTests.java
  20. 118 23
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/permission/RemoteClusterPermissionsTests.java
  21. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java
  22. 5 3
      x-pack/plugin/security/qa/multi-cluster/build.gradle
  23. 89 79
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityBWCRestIT.java
  24. 69 0
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityBWCToRCS1ClusterRestIT.java
  25. 90 0
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityBWCToRCS2ClusterRestIT.java
  26. 266 0
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestStatsIT.java
  27. 2 2
      x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/apikey/ApiKeyRestIT.java
  28. 1 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java
  29. 1 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/RoleDescriptorStore.java
  30. 29 21
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java
  31. 5 5
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java
  32. 2 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/FileRolesStoreTests.java
  33. 2 2
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/user/RestGetUserPrivilegesActionTests.java
  34. 1 1
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml

+ 6 - 0
docs/changelog/114964.yaml

@@ -0,0 +1,6 @@
+pr: 114964
+summary: Add a `monitor_stats` privilege and allow that privilege for remote cluster
+  privileges
+area: Authorization
+type: enhancement
+issues: []

+ 1 - 1
docs/reference/rest-api/security/bulk-create-roles.asciidoc

@@ -327,7 +327,7 @@ The result would then have the `errors` field set to `true` and hold the error f
         "details": {
             "my_admin_role": { <4>
                 "type": "action_request_validation_exception",
-                "reason": "Validation Failed: 1: unknown cluster privilege [bad_cluster_privilege]. a privilege must be either one of the predefined cluster privilege names [manage_own_api_key,manage_data_stream_global_retention,monitor_data_stream_global_retention,none,cancel_task,cross_cluster_replication,cross_cluster_search,delegate_pki,grant_api_key,manage_autoscaling,manage_index_templates,manage_logstash_pipelines,manage_oidc,manage_saml,manage_search_application,manage_search_query_rules,manage_search_synonyms,manage_service_account,manage_token,manage_user_profile,monitor_connector,monitor_enrich,monitor_inference,monitor_ml,monitor_rollup,monitor_snapshot,monitor_text_structure,monitor_watcher,post_behavioral_analytics_event,read_ccr,read_connector_secrets,read_fleet_secrets,read_ilm,read_pipeline,read_security,read_slm,transport_client,write_connector_secrets,write_fleet_secrets,create_snapshot,manage_behavioral_analytics,manage_ccr,manage_connector,manage_enrich,manage_ilm,manage_inference,manage_ml,manage_rollup,manage_slm,manage_watcher,monitor_data_frame_transforms,monitor_transform,manage_api_key,manage_ingest_pipelines,manage_pipeline,manage_data_frame_transforms,manage_transform,manage_security,monitor,manage,all] or a pattern over one of the available cluster actions;"
+                "reason": "Validation Failed: 1: unknown cluster privilege [bad_cluster_privilege]. a privilege must be either one of the predefined cluster privilege names [manage_own_api_key,manage_data_stream_global_retention,monitor_data_stream_global_retention,none,cancel_task,cross_cluster_replication,cross_cluster_search,delegate_pki,grant_api_key,manage_autoscaling,manage_index_templates,manage_logstash_pipelines,manage_oidc,manage_saml,manage_search_application,manage_search_query_rules,manage_search_synonyms,manage_service_account,manage_token,manage_user_profile,monitor_connector,monitor_enrich,monitor_inference,monitor_ml,monitor_rollup,monitor_snapshot,monitor_stats,monitor_text_structure,monitor_watcher,post_behavioral_analytics_event,read_ccr,read_connector_secrets,read_fleet_secrets,read_ilm,read_pipeline,read_security,read_slm,transport_client,write_connector_secrets,write_fleet_secrets,create_snapshot,manage_behavioral_analytics,manage_ccr,manage_connector,manage_enrich,manage_ilm,manage_inference,manage_ml,manage_rollup,manage_slm,manage_watcher,monitor_data_frame_transforms,monitor_transform,manage_api_key,manage_ingest_pipelines,manage_pipeline,manage_data_frame_transforms,manage_transform,manage_security,monitor,manage,all] or a pattern over one of the available cluster actions;"
             }
         }
     }

+ 3 - 1
docs/reference/rest-api/security/get-builtin-privileges.asciidoc

@@ -111,6 +111,7 @@ A successful call returns an object with "cluster", "index", and "remote_cluster
     "monitor_ml",
     "monitor_rollup",
     "monitor_snapshot",
+    "monitor_stats",
     "monitor_text_structure",
     "monitor_transform",
     "monitor_watcher",
@@ -152,7 +153,8 @@ A successful call returns an object with "cluster", "index", and "remote_cluster
     "write"
   ],
   "remote_cluster" : [
-    "monitor_enrich"
+    "monitor_enrich",
+    "monitor_stats"
   ]
 }
 --------------------------------------------------

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -189,6 +189,7 @@ public class TransportVersions {
     public static final TransportVersion LOGSDB_TELEMETRY = def(8_784_00_0);
     public static final TransportVersion LOGSDB_TELEMETRY_STATS = def(8_785_00_0);
     public static final TransportVersion KQL_QUERY_ADDED = def(8_786_00_0);
+    public static final TransportVersion ROLE_MONITOR_STATS = def(8_787_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 1 - 0
x-pack/plugin/build.gradle

@@ -207,5 +207,6 @@ tasks.named("yamlRestTestV7CompatTransform").configure({ task ->
   task.skipTest("esql/60_usage/Basic ESQL usage output (telemetry) non-snapshot version", "The number of functions is constantly increasing")
   task.skipTest("esql/80_text/reverse text", "The output type changed from TEXT to KEYWORD.")
   task.skipTest("esql/80_text/values function", "The output type changed from TEXT to KEYWORD.")
+  task.skipTest("privileges/11_builtin/Test get builtin privileges" ,"unnecessary to test compatibility")
 })
 

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/user/GetUserPrivilegesResponse.java

@@ -115,7 +115,7 @@ public final class GetUserPrivilegesResponse extends ActionResponse {
     }
 
     public boolean hasRemoteClusterPrivileges() {
-        return remoteClusterPermissions.hasPrivileges();
+        return remoteClusterPermissions.hasAnyPrivileges();
     }
 
     @Override

+ 102 - 9
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/Authentication.java

@@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.security.authc.file.FileRealmSettings;
 import org.elasticsearch.xpack.core.security.authc.service.ServiceAccountSettings;
 import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+import org.elasticsearch.xpack.core.security.authz.permission.RemoteClusterPermissions;
 import org.elasticsearch.xpack.core.security.user.AnonymousUser;
 import org.elasticsearch.xpack.core.security.user.InternalUser;
 import org.elasticsearch.xpack.core.security.user.InternalUsers;
@@ -76,6 +77,7 @@ import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.CR
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.FALLBACK_REALM_NAME;
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.FALLBACK_REALM_TYPE;
 import static org.elasticsearch.xpack.core.security.authc.RealmDomain.REALM_DOMAIN_PARSER;
+import static org.elasticsearch.xpack.core.security.authz.RoleDescriptor.Fields.REMOTE_CLUSTER;
 import static org.elasticsearch.xpack.core.security.authz.permission.RemoteClusterPermissions.ROLE_REMOTE_CLUSTER_PRIVS;
 
 /**
@@ -233,8 +235,8 @@ public final class Authentication implements ToXContentObject {
                     + "]"
             );
         }
-
         final Map<String, Object> newMetadata = maybeRewriteMetadata(olderVersion, this);
+
         final Authentication newAuthentication;
         if (isRunAs()) {
             // The lookup user for run-as currently doesn't have authentication metadata associated with them because
@@ -272,12 +274,23 @@ public final class Authentication implements ToXContentObject {
     }
 
     private static Map<String, Object> maybeRewriteMetadata(TransportVersion olderVersion, Authentication authentication) {
-        if (authentication.isAuthenticatedAsApiKey()) {
-            return maybeRewriteMetadataForApiKeyRoleDescriptors(olderVersion, authentication);
-        } else if (authentication.isCrossClusterAccess()) {
-            return maybeRewriteMetadataForCrossClusterAccessAuthentication(olderVersion, authentication);
-        } else {
-            return authentication.getAuthenticatingSubject().getMetadata();
+        try {
+            if (authentication.isAuthenticatedAsApiKey()) {
+                return maybeRewriteMetadataForApiKeyRoleDescriptors(olderVersion, authentication);
+            } else if (authentication.isCrossClusterAccess()) {
+                return maybeRewriteMetadataForCrossClusterAccessAuthentication(olderVersion, authentication);
+            } else {
+                return authentication.getAuthenticatingSubject().getMetadata();
+            }
+        } catch (Exception e) {
+            // CCS workflows may swallow the exception message making this difficult to troubleshoot, so we explicitly log and re-throw
+            // here. It may result in duplicate logs, so we only log the message at warn level.
+            if (logger.isDebugEnabled()) {
+                logger.debug("Un-expected exception thrown while rewriting metadata. This is likely a bug.", e);
+            } else {
+                logger.warn("Un-expected exception thrown while rewriting metadata. This is likely a bug [" + e.getMessage() + "]");
+            }
+            throw e;
         }
     }
 
@@ -1323,6 +1336,7 @@ public final class Authentication implements ToXContentObject {
 
             if (authentication.getEffectiveSubject().getTransportVersion().onOrAfter(ROLE_REMOTE_CLUSTER_PRIVS)
                 && streamVersion.before(ROLE_REMOTE_CLUSTER_PRIVS)) {
+                // the authentication understands the remote_cluster field but the stream does not
                 metadata = new HashMap<>(metadata);
                 metadata.put(
                     AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY,
@@ -1336,7 +1350,26 @@ public final class Authentication implements ToXContentObject {
                         (BytesReference) metadata.get(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY)
                     )
                 );
-            }
+            } else if (authentication.getEffectiveSubject().getTransportVersion().onOrAfter(ROLE_REMOTE_CLUSTER_PRIVS)
+                && streamVersion.onOrAfter(ROLE_REMOTE_CLUSTER_PRIVS)) {
+                    // both the authentication object and the stream understand the remote_cluster field
+                    // check each individual permission and remove as needed
+                    metadata = new HashMap<>(metadata);
+                    metadata.put(
+                        AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY,
+                        maybeRemoveRemoteClusterPrivilegesFromRoleDescriptors(
+                            (BytesReference) metadata.get(AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY),
+                            streamVersion
+                        )
+                    );
+                    metadata.put(
+                        AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY,
+                        maybeRemoveRemoteClusterPrivilegesFromRoleDescriptors(
+                            (BytesReference) metadata.get(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY),
+                            streamVersion
+                        )
+                    );
+                }
 
             if (authentication.getEffectiveSubject().getTransportVersion().onOrAfter(VERSION_API_KEY_ROLES_AS_BYTES)
                 && streamVersion.before(VERSION_API_KEY_ROLES_AS_BYTES)) {
@@ -1417,7 +1450,7 @@ public final class Authentication implements ToXContentObject {
     }
 
     static BytesReference maybeRemoveRemoteClusterFromRoleDescriptors(BytesReference roleDescriptorsBytes) {
-        return maybeRemoveTopLevelFromRoleDescriptors(roleDescriptorsBytes, RoleDescriptor.Fields.REMOTE_CLUSTER.getPreferredName());
+        return maybeRemoveTopLevelFromRoleDescriptors(roleDescriptorsBytes, REMOTE_CLUSTER.getPreferredName());
     }
 
     static BytesReference maybeRemoveRemoteIndicesFromRoleDescriptors(BytesReference roleDescriptorsBytes) {
@@ -1450,6 +1483,66 @@ public final class Authentication implements ToXContentObject {
         }
     }
 
+    /**
+     * Before we send the role descriptors to the remote cluster, we need to remove the remote cluster privileges that the other cluster
+     * will not understand. If all privileges are removed, then the entire "remote_cluster" is removed to avoid sending empty privileges.
+     * @param roleDescriptorsBytes The role descriptors to be sent to the remote cluster, represented as bytes.
+     * @return The role descriptors with the privileges that unsupported by version removed, represented as bytes.
+     */
+    @SuppressWarnings("unchecked")
+    static BytesReference maybeRemoveRemoteClusterPrivilegesFromRoleDescriptors(
+        BytesReference roleDescriptorsBytes,
+        TransportVersion outboundVersion
+    ) {
+        if (roleDescriptorsBytes == null || roleDescriptorsBytes.length() == 0) {
+            return roleDescriptorsBytes;
+        }
+        final Map<String, Object> roleDescriptorsMap = convertRoleDescriptorsBytesToMap(roleDescriptorsBytes);
+        final Map<String, Object> roleDescriptorsMapMutated = new HashMap<>(roleDescriptorsMap);
+        final AtomicBoolean modified = new AtomicBoolean(false);
+        roleDescriptorsMap.forEach((key, value) -> {
+            if (value instanceof Map) {
+                Map<String, Object> roleDescriptor = (Map<String, Object>) value;
+                roleDescriptor.forEach((innerKey, innerValue) -> {
+                    // example: remote_cluster=[{privileges=[monitor_enrich, monitor_stats]
+                    if (REMOTE_CLUSTER.getPreferredName().equals(innerKey)) {
+                        assert innerValue instanceof List;
+                        RemoteClusterPermissions discoveredRemoteClusterPermission = new RemoteClusterPermissions(
+                            (List<Map<String, List<String>>>) innerValue
+                        );
+                        RemoteClusterPermissions mutated = discoveredRemoteClusterPermission.removeUnsupportedPrivileges(outboundVersion);
+                        if (mutated.equals(discoveredRemoteClusterPermission) == false) {
+                            // swap out the old value with the new value
+                            modified.set(true);
+                            Map<String, Object> remoteClusterMap = new HashMap<>((Map<String, Object>) roleDescriptorsMapMutated.get(key));
+                            if (mutated.hasAnyPrivileges()) {
+                                // has at least one group with privileges
+                                remoteClusterMap.put(innerKey, mutated.toMap());
+                            } else {
+                                // has no groups with privileges
+                                remoteClusterMap.remove(innerKey);
+                            }
+                            roleDescriptorsMapMutated.put(key, remoteClusterMap);
+                        }
+                    }
+                });
+            }
+        });
+        if (modified.get()) {
+            logger.debug(
+                "mutated role descriptors. Changed from {} to {} for outbound version {}",
+                roleDescriptorsMap,
+                roleDescriptorsMapMutated,
+                outboundVersion
+            );
+            return convertRoleDescriptorsMapToBytes(roleDescriptorsMapMutated);
+        } else {
+            // No need to serialize if we did not change anything.
+            logger.trace("no change to role descriptors {} for outbound version {}", roleDescriptorsMap, outboundVersion);
+            return roleDescriptorsBytes;
+        }
+    }
+
     static boolean equivalentRealms(String name1, String type1, String name2, String type2) {
         if (false == type1.equals(type2)) {
             return false;

+ 22 - 12
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptor.java

@@ -6,6 +6,8 @@
  */
 package org.elasticsearch.xpack.core.security.authz;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.TransportVersion;
@@ -62,6 +64,7 @@ public class RoleDescriptor implements ToXContentObject, Writeable {
     public static final TransportVersion SECURITY_ROLE_DESCRIPTION = TransportVersions.V_8_15_0;
 
     public static final String ROLE_TYPE = "role";
+    private static final Logger logger = LogManager.getLogger(RoleDescriptor.class);
 
     private final String name;
     private final String[] clusterPrivileges;
@@ -191,7 +194,7 @@ public class RoleDescriptor implements ToXContentObject, Writeable {
             ? Collections.unmodifiableMap(transientMetadata)
             : Collections.singletonMap("enabled", true);
         this.remoteIndicesPrivileges = remoteIndicesPrivileges != null ? remoteIndicesPrivileges : RemoteIndicesPrivileges.NONE;
-        this.remoteClusterPermissions = remoteClusterPermissions != null && remoteClusterPermissions.hasPrivileges()
+        this.remoteClusterPermissions = remoteClusterPermissions != null && remoteClusterPermissions.hasAnyPrivileges()
             ? remoteClusterPermissions
             : RemoteClusterPermissions.NONE;
         this.restriction = restriction != null ? restriction : Restriction.NONE;
@@ -263,7 +266,7 @@ public class RoleDescriptor implements ToXContentObject, Writeable {
     }
 
     public boolean hasRemoteClusterPermissions() {
-        return remoteClusterPermissions.hasPrivileges();
+        return remoteClusterPermissions.hasAnyPrivileges();
     }
 
     public RemoteClusterPermissions getRemoteClusterPermissions() {
@@ -830,25 +833,32 @@ public class RoleDescriptor implements ToXContentObject, Writeable {
                     currentFieldName = parser.currentName();
                 } else if (Fields.PRIVILEGES.match(currentFieldName, parser.getDeprecationHandler())) {
                     privileges = readStringArray(roleName, parser, false);
-                    if (privileges.length != 1
-                        || RemoteClusterPermissions.getSupportedRemoteClusterPermissions()
-                            .contains(privileges[0].trim().toLowerCase(Locale.ROOT)) == false) {
-                        throw new ElasticsearchParseException(
-                            "failed to parse remote_cluster for role [{}]. "
-                                + RemoteClusterPermissions.getSupportedRemoteClusterPermissions()
-                                + " is the only value allowed for [{}] within [remote_cluster]",
+                    if (Arrays.stream(privileges)
+                        .map(s -> s.toLowerCase(Locale.ROOT).trim())
+                        .allMatch(RemoteClusterPermissions.getSupportedRemoteClusterPermissions()::contains) == false) {
+                        final String message = String.format(
+                            Locale.ROOT,
+                            "failed to parse remote_cluster for role [%s]. "
+                                + "%s are the only values allowed for [%s] within [remote_cluster]. Found %s",
                             roleName,
-                            currentFieldName
+                            RemoteClusterPermissions.getSupportedRemoteClusterPermissions(),
+                            currentFieldName,
+                            Arrays.toString(privileges)
                         );
+                        logger.info(message);
+                        throw new ElasticsearchParseException(message);
                     }
                 } else if (Fields.CLUSTERS.match(currentFieldName, parser.getDeprecationHandler())) {
                     clusters = readStringArray(roleName, parser, false);
                 } else {
-                    throw new ElasticsearchParseException(
-                        "failed to parse remote_cluster for role [{}]. unexpected field [{}]",
+                    final String message = String.format(
+                        Locale.ROOT,
+                        "failed to parse remote_cluster for role [%s]. unexpected field [%s]",
                         roleName,
                         currentFieldName
                     );
+                    logger.info(message);
+                    throw new ElasticsearchParseException(message);
                 }
             }
             if (privileges != null && clusters == null) {

+ 34 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/RemoteClusterPermissionGroup.java

@@ -13,11 +13,15 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 import org.elasticsearch.xpack.core.security.support.StringMatcher;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.core.security.authz.RoleDescriptor.Fields.CLUSTERS;
+import static org.elasticsearch.xpack.core.security.authz.RoleDescriptor.Fields.PRIVILEGES;
 
 /**
  * Represents a group of permissions for a remote cluster. For example:
@@ -41,6 +45,14 @@ public class RemoteClusterPermissionGroup implements NamedWriteable, ToXContentO
         remoteClusterAliasMatcher = StringMatcher.of(remoteClusterAliases);
     }
 
+    public RemoteClusterPermissionGroup(Map<String, List<String>> remoteClusterGroup) {
+        assert remoteClusterGroup.get(PRIVILEGES.getPreferredName()) != null : "privileges must be non-null";
+        assert remoteClusterGroup.get(CLUSTERS.getPreferredName()) != null : "clusters must be non-null";
+        clusterPrivileges = remoteClusterGroup.get(PRIVILEGES.getPreferredName()).toArray(new String[0]);
+        remoteClusterAliases = remoteClusterGroup.get(CLUSTERS.getPreferredName()).toArray(new String[0]);
+        remoteClusterAliasMatcher = StringMatcher.of(remoteClusterAliases);
+    }
+
     /**
      * @param clusterPrivileges The list of cluster privileges that are allowed for the remote cluster. must not be null or empty.
      * @param remoteClusterAliases The list of remote clusters that the privileges apply to. must not be null or empty.
@@ -53,10 +65,14 @@ public class RemoteClusterPermissionGroup implements NamedWriteable, ToXContentO
             throw new IllegalArgumentException("remote cluster groups must not be null or empty");
         }
         if (Arrays.stream(clusterPrivileges).anyMatch(s -> Strings.hasText(s) == false)) {
-            throw new IllegalArgumentException("remote_cluster privileges must contain valid non-empty, non-null values");
+            throw new IllegalArgumentException(
+                "remote_cluster privileges must contain valid non-empty, non-null values " + Arrays.toString(clusterPrivileges)
+            );
         }
         if (Arrays.stream(remoteClusterAliases).anyMatch(s -> Strings.hasText(s) == false)) {
-            throw new IllegalArgumentException("remote_cluster clusters aliases must contain valid non-empty, non-null values");
+            throw new IllegalArgumentException(
+                "remote_cluster clusters aliases must contain valid non-empty, non-null values " + Arrays.toString(remoteClusterAliases)
+            );
         }
 
         this.clusterPrivileges = clusterPrivileges;
@@ -86,11 +102,24 @@ public class RemoteClusterPermissionGroup implements NamedWriteable, ToXContentO
         return Arrays.copyOf(remoteClusterAliases, remoteClusterAliases.length);
     }
 
+    /**
+     * Converts the group to a map representation.
+     * @return A map representation of the group.
+     */
+    public Map<String, List<String>> toMap() {
+        return Map.of(
+            PRIVILEGES.getPreferredName(),
+            Arrays.asList(clusterPrivileges),
+            CLUSTERS.getPreferredName(),
+            Arrays.asList(remoteClusterAliases)
+        );
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
-        builder.array(RoleDescriptor.Fields.PRIVILEGES.getPreferredName(), clusterPrivileges);
-        builder.array(RoleDescriptor.Fields.CLUSTERS.getPreferredName(), remoteClusterAliases);
+        builder.array(PRIVILEGES.getPreferredName(), clusterPrivileges);
+        builder.array(CLUSTERS.getPreferredName(), remoteClusterAliases);
         builder.endObject();
         return builder;
     }

+ 111 - 21
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/RemoteClusterPermissions.java

@@ -29,13 +29,19 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.TransportVersions.ROLE_MONITOR_STATS;
+
 /**
  * Represents the set of permissions for remote clusters. This is intended to be the model for both the {@link RoleDescriptor}
- * and {@link Role}. This model is not intended to be sent to a remote cluster, but can be (wire) serialized within a single cluster
- * as well as the Xcontent serialization for the REST API and persistence of the role in the security index. The privileges modeled here
- * will be converted to the appropriate cluster privileges when sent to a remote cluster.
+ * and {@link Role}. This model is intended to be converted to local cluster permissions
+ * {@link #collapseAndRemoveUnsupportedPrivileges(String, TransportVersion)} before sent to the remote cluster. This model also be included
+ * in the role descriptors for (normal) API keys sent between nodes/clusters. In both cases the outbound transport version can be used to
+ * remove permissions that are not available to older nodes or clusters. The methods {@link #removeUnsupportedPrivileges(TransportVersion)}
+ * and {@link #collapseAndRemoveUnsupportedPrivileges(String, TransportVersion)} are used to aid in ensuring correct privileges per
+ * transport version.
  * For example, on the local/querying cluster this model represents the following:
  * <code>
  * "remote_cluster" : [
@@ -49,15 +55,18 @@ import java.util.stream.Collectors;
  *         }
  *     ]
  * </code>
- * when sent to the remote cluster "clusterA", the privileges will be converted to the appropriate cluster privileges. For example:
+ * (RCS 2.0) when sent to the remote cluster "clusterA", the privileges will be converted to the appropriate cluster privileges.
+ * For example:
  * <code>
  *   "cluster": ["foo"]
  * </code>
- * and when sent to the remote cluster "clusterB", the privileges will be converted to the appropriate cluster privileges. For example:
+ * and (RCS 2.0) when sent to the remote cluster "clusterB", the privileges will be converted to the appropriate cluster privileges.
+ * For example:
  * <code>
  *   "cluster": ["bar"]
  * </code>
- * If the remote cluster does not support the privilege, as determined by the remote cluster version, the privilege will be not be sent.
+ * For normal API keys and their role descriptors :If the remote cluster does not support the privilege, the privilege will be not be sent.
+ * Upstream code performs the removal, but this class owns the business logic for how to remove per outbound version.
  */
 public class RemoteClusterPermissions implements NamedWriteable, ToXContentObject {
 
@@ -70,19 +79,33 @@ public class RemoteClusterPermissions implements NamedWriteable, ToXContentObjec
     // package private non-final for testing
     static Map<TransportVersion, Set<String>> allowedRemoteClusterPermissions = Map.of(
         ROLE_REMOTE_CLUSTER_PRIVS,
-        Set.of(ClusterPrivilegeResolver.MONITOR_ENRICH.name())
+        Set.of(ClusterPrivilegeResolver.MONITOR_ENRICH.name()),
+        ROLE_MONITOR_STATS,
+        Set.of(ClusterPrivilegeResolver.MONITOR_STATS.name())
     );
+    static final TransportVersion lastTransportVersionPermission = allowedRemoteClusterPermissions.keySet()
+        .stream()
+        .max(TransportVersion::compareTo)
+        .orElseThrow();
 
     public static final RemoteClusterPermissions NONE = new RemoteClusterPermissions();
 
     public static Set<String> getSupportedRemoteClusterPermissions() {
-        return allowedRemoteClusterPermissions.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+        return allowedRemoteClusterPermissions.values().stream().flatMap(Set::stream).collect(Collectors.toCollection(TreeSet::new));
     }
 
     public RemoteClusterPermissions(StreamInput in) throws IOException {
         remoteClusterPermissionGroups = in.readNamedWriteableCollectionAsList(RemoteClusterPermissionGroup.class);
     }
 
+    public RemoteClusterPermissions(List<Map<String, List<String>>> remoteClusters) {
+        remoteClusterPermissionGroups = new ArrayList<>();
+        for (Map<String, List<String>> remoteCluster : remoteClusters) {
+            RemoteClusterPermissionGroup remoteClusterPermissionGroup = new RemoteClusterPermissionGroup(remoteCluster);
+            remoteClusterPermissionGroups.add(remoteClusterPermissionGroup);
+        }
+    }
+
     public RemoteClusterPermissions() {
         remoteClusterPermissionGroups = new ArrayList<>();
     }
@@ -97,10 +120,64 @@ public class RemoteClusterPermissions implements NamedWriteable, ToXContentObjec
     }
 
     /**
-     * Gets the privilege names for the remote cluster. This method will collapse all groups to single String[] all lowercase
-     * and will only return the appropriate privileges for the provided remote cluster version.
+     * Will remove any unsupported privileges for the provided outbound version. This method will not modify the current instance.
+     * This is useful for (normal) API keys role descriptors to help ensure that we don't send unsupported privileges. The result of
+     * this method may result in no groups if all privileges are removed. {@link #hasAnyPrivileges()} can be used to check if there are
+     * any privileges left.
+     * @param outboundVersion The version by which to remove unsupported privileges, this is typically the version of the remote cluster
+     * @return a new instance of RemoteClusterPermissions with the unsupported privileges removed
      */
-    public String[] privilegeNames(final String remoteClusterAlias, TransportVersion remoteClusterVersion) {
+    public RemoteClusterPermissions removeUnsupportedPrivileges(TransportVersion outboundVersion) {
+        Objects.requireNonNull(outboundVersion, "outboundVersion must not be null");
+        if (outboundVersion.onOrAfter(lastTransportVersionPermission)) {
+            return this;
+        }
+        RemoteClusterPermissions copyForOutboundVersion = new RemoteClusterPermissions();
+        Set<String> allowedPermissionsPerVersion = getAllowedPermissionsPerVersion(outboundVersion);
+        for (RemoteClusterPermissionGroup group : remoteClusterPermissionGroups) {
+            String[] privileges = group.clusterPrivileges();
+            List<String> outboundPrivileges = new ArrayList<>(privileges.length);
+            for (String privilege : privileges) {
+                if (allowedPermissionsPerVersion.contains(privilege.toLowerCase(Locale.ROOT))) {
+                    outboundPrivileges.add(privilege);
+                }
+            }
+            if (outboundPrivileges.isEmpty() == false) {
+                RemoteClusterPermissionGroup outboundGroup = new RemoteClusterPermissionGroup(
+                    outboundPrivileges.toArray(new String[0]),
+                    group.remoteClusterAliases()
+                );
+                copyForOutboundVersion.addGroup(outboundGroup);
+                if (logger.isDebugEnabled()) {
+                    if (group.equals(outboundGroup) == false) {
+                        logger.debug(
+                            "Removed unsupported remote cluster permissions. Remaining {} for remote cluster [{}] for version [{}]."
+                                + "Due to the remote cluster version, only the following permissions are allowed: {}",
+                            outboundPrivileges,
+                            group.remoteClusterAliases(),
+                            outboundVersion,
+                            allowedPermissionsPerVersion
+                        );
+                    }
+                }
+            } else {
+                logger.debug(
+                    "Removed all remote cluster permissions for remote cluster [{}]. "
+                        + "Due to the remote cluster version, only the following permissions are allowed: {}",
+                    group.remoteClusterAliases(),
+                    allowedPermissionsPerVersion
+                );
+            }
+        }
+        return copyForOutboundVersion;
+    }
+
+    /**
+     * Gets all the privilege names for the remote cluster. This method will collapse all groups to single String[] all lowercase
+     * and will only return the appropriate privileges for the provided remote cluster version. This is useful for RCS 2.0 to ensure
+     * that we properly convert all the remote_cluster -> cluster privileges per remote cluster.
+     */
+    public String[] collapseAndRemoveUnsupportedPrivileges(final String remoteClusterAlias, TransportVersion outboundVersion) {
 
         // get all privileges for the remote cluster
         Set<String> groupPrivileges = remoteClusterPermissionGroups.stream()
@@ -111,13 +188,7 @@ public class RemoteClusterPermissions implements NamedWriteable, ToXContentObjec
             .collect(Collectors.toSet());
 
         // find all the privileges that are allowed for the remote cluster version
-        Set<String> allowedPermissionsPerVersion = allowedRemoteClusterPermissions.entrySet()
-            .stream()
-            .filter((entry) -> entry.getKey().onOrBefore(remoteClusterVersion))
-            .map(Map.Entry::getValue)
-            .flatMap(Set::stream)
-            .map(s -> s.toLowerCase(Locale.ROOT))
-            .collect(Collectors.toSet());
+        Set<String> allowedPermissionsPerVersion = getAllowedPermissionsPerVersion(outboundVersion);
 
         // intersect the two sets to get the allowed privileges for the remote cluster version
         Set<String> allowedPrivileges = new HashSet<>(groupPrivileges);
@@ -137,13 +208,21 @@ public class RemoteClusterPermissions implements NamedWriteable, ToXContentObjec
         return allowedPrivileges.stream().sorted().toArray(String[]::new);
     }
 
+    /**
+     * Converts this object to it's {@link Map} representation.
+     * @return a list of maps representing the remote cluster permissions
+     */
+    public List<Map<String, List<String>>> toMap() {
+        return remoteClusterPermissionGroups.stream().map(RemoteClusterPermissionGroup::toMap).toList();
+    }
+
     /**
      * Validates the remote cluster permissions (regardless of remote cluster version).
      * This method will throw an {@link IllegalArgumentException} if the permissions are invalid.
      * Generally, this method is just a safety check and validity should be checked before adding the permissions to this class.
      */
     public void validate() {
-        assert hasPrivileges();
+        assert hasAnyPrivileges();
         Set<String> invalid = getUnsupportedPrivileges();
         if (invalid.isEmpty() == false) {
             throw new IllegalArgumentException(
@@ -173,11 +252,11 @@ public class RemoteClusterPermissions implements NamedWriteable, ToXContentObjec
         return invalid;
     }
 
-    public boolean hasPrivileges(final String remoteClusterAlias) {
+    public boolean hasAnyPrivileges(final String remoteClusterAlias) {
         return remoteClusterPermissionGroups.stream().anyMatch(remoteIndicesGroup -> remoteIndicesGroup.hasPrivileges(remoteClusterAlias));
     }
 
-    public boolean hasPrivileges() {
+    public boolean hasAnyPrivileges() {
         return remoteClusterPermissionGroups.isEmpty() == false;
     }
 
@@ -185,6 +264,16 @@ public class RemoteClusterPermissions implements NamedWriteable, ToXContentObjec
         return Collections.unmodifiableList(remoteClusterPermissionGroups);
     }
 
+    private Set<String> getAllowedPermissionsPerVersion(TransportVersion outboundVersion) {
+        return allowedRemoteClusterPermissions.entrySet()
+            .stream()
+            .filter((entry) -> entry.getKey().onOrBefore(outboundVersion))
+            .map(Map.Entry::getValue)
+            .flatMap(Set::stream)
+            .map(s -> s.toLowerCase(Locale.ROOT))
+            .collect(Collectors.toSet());
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         for (RemoteClusterPermissionGroup remoteClusterPermissionGroup : remoteClusterPermissionGroups) {
@@ -220,4 +309,5 @@ public class RemoteClusterPermissions implements NamedWriteable, ToXContentObjec
     public String getWriteableName() {
         return NAME;
     }
+
 }

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/Role.java

@@ -283,7 +283,7 @@ public interface Role {
         public Builder addRemoteClusterPermissions(RemoteClusterPermissions remoteClusterPermissions) {
             Objects.requireNonNull(remoteClusterPermissions, "remoteClusterPermissions must not be null");
             assert this.remoteClusterPermissions == null : "addRemoteClusterPermissions should only be called once";
-            if (remoteClusterPermissions.hasPrivileges()) {
+            if (remoteClusterPermissions.hasAnyPrivileges()) {
                 remoteClusterPermissions.validate();
             }
             this.remoteClusterPermissions = remoteClusterPermissions;

+ 2 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/SimpleRole.java

@@ -210,7 +210,7 @@ public class SimpleRole implements Role {
         final RemoteIndicesPermission remoteIndicesPermission = this.remoteIndicesPermission.forCluster(remoteClusterAlias);
 
         if (remoteIndicesPermission.remoteIndicesGroups().isEmpty()
-            && remoteClusterPermissions.hasPrivileges(remoteClusterAlias) == false) {
+            && remoteClusterPermissions.hasAnyPrivileges(remoteClusterAlias) == false) {
             return RoleDescriptorsIntersection.EMPTY;
         }
 
@@ -224,7 +224,7 @@ public class SimpleRole implements Role {
         return new RoleDescriptorsIntersection(
             new RoleDescriptor(
                 REMOTE_USER_ROLE_NAME,
-                remoteClusterPermissions.privilegeNames(remoteClusterAlias, remoteClusterVersion),
+                remoteClusterPermissions.collapseAndRemoveUnsupportedPrivileges(remoteClusterAlias, remoteClusterVersion),
                 // The role descriptors constructed here may be cached in raw byte form, using a hash of their content as a
                 // cache key; we therefore need deterministic order when constructing them here, to ensure cache hits for
                 // equivalent role descriptors

+ 10 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java

@@ -110,6 +110,8 @@ public class ClusterPrivilegeResolver {
     private static final Set<String> MONITOR_WATCHER_PATTERN = Set.of("cluster:monitor/xpack/watcher/*");
     private static final Set<String> MONITOR_ROLLUP_PATTERN = Set.of("cluster:monitor/xpack/rollup/*");
     private static final Set<String> MONITOR_ENRICH_PATTERN = Set.of("cluster:monitor/xpack/enrich/*", "cluster:admin/xpack/enrich/get");
+    // intentionally cluster:monitor/stats* to match cluster:monitor/stats, cluster:monitor/stats[n] and cluster:monitor/stats/remote
+    private static final Set<String> MONITOR_STATS_PATTERN = Set.of("cluster:monitor/stats*");
 
     private static final Set<String> ALL_CLUSTER_PATTERN = Set.of(
         "cluster:*",
@@ -208,7 +210,11 @@ public class ClusterPrivilegeResolver {
         // esql enrich
         "cluster:monitor/xpack/enrich/esql/resolve_policy",
         "cluster:internal:data/read/esql/open_exchange",
-        "cluster:internal:data/read/esql/exchange"
+        "cluster:internal:data/read/esql/exchange",
+        // cluster stats for remote clusters
+        "cluster:monitor/stats/remote",
+        "cluster:monitor/stats",
+        "cluster:monitor/stats[n]"
     );
     private static final Set<String> CROSS_CLUSTER_REPLICATION_PATTERN = Set.of(
         RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME,
@@ -243,6 +249,7 @@ public class ClusterPrivilegeResolver {
     public static final NamedClusterPrivilege MONITOR_WATCHER = new ActionClusterPrivilege("monitor_watcher", MONITOR_WATCHER_PATTERN);
     public static final NamedClusterPrivilege MONITOR_ROLLUP = new ActionClusterPrivilege("monitor_rollup", MONITOR_ROLLUP_PATTERN);
     public static final NamedClusterPrivilege MONITOR_ENRICH = new ActionClusterPrivilege("monitor_enrich", MONITOR_ENRICH_PATTERN);
+    public static final NamedClusterPrivilege MONITOR_STATS = new ActionClusterPrivilege("monitor_stats", MONITOR_STATS_PATTERN);
     public static final NamedClusterPrivilege MANAGE = new ActionClusterPrivilege("manage", ALL_CLUSTER_PATTERN, ALL_SECURITY_PATTERN);
     public static final NamedClusterPrivilege MANAGE_INFERENCE = new ActionClusterPrivilege("manage_inference", MANAGE_INFERENCE_PATTERN);
     public static final NamedClusterPrivilege MANAGE_ML = new ActionClusterPrivilege("manage_ml", MANAGE_ML_PATTERN);
@@ -424,6 +431,7 @@ public class ClusterPrivilegeResolver {
             MONITOR_WATCHER,
             MONITOR_ROLLUP,
             MONITOR_ENRICH,
+            MONITOR_STATS,
             MANAGE,
             MANAGE_CONNECTOR,
             MANAGE_INFERENCE,
@@ -499,7 +507,7 @@ public class ClusterPrivilegeResolver {
             + Strings.collectionToCommaDelimitedString(VALUES.keySet())
             + "] or a pattern over one of the available "
             + "cluster actions";
-        logger.debug(errorMessage);
+        logger.warn(errorMessage);
         throw new IllegalArgumentException(errorMessage);
 
     }

+ 12 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java

@@ -20,6 +20,9 @@ import org.elasticsearch.xpack.core.security.action.profile.GetProfilesAction;
 import org.elasticsearch.xpack.core.security.action.profile.SuggestProfilesAction;
 import org.elasticsearch.xpack.core.security.action.user.ProfileHasPrivilegesAction;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+import org.elasticsearch.xpack.core.security.authz.permission.RemoteClusterPermissionGroup;
+import org.elasticsearch.xpack.core.security.authz.permission.RemoteClusterPermissions;
+import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
 import org.elasticsearch.xpack.core.security.authz.privilege.ConfigurableClusterPrivilege;
 import org.elasticsearch.xpack.core.security.authz.privilege.ConfigurableClusterPrivileges;
 import org.elasticsearch.xpack.core.security.support.MetadataUtils;
@@ -492,7 +495,15 @@ class KibanaOwnedReservedRoleDescriptors {
                 getRemoteIndicesReadPrivileges("metrics-apm.*"),
                 getRemoteIndicesReadPrivileges("traces-apm.*"),
                 getRemoteIndicesReadPrivileges("traces-apm-*") },
-            null,
+            new RemoteClusterPermissions().addGroup(
+                new RemoteClusterPermissionGroup(
+                    RemoteClusterPermissions.getSupportedRemoteClusterPermissions()
+                        .stream()
+                        .filter(s -> s.equals(ClusterPrivilegeResolver.MONITOR_STATS.name()))
+                        .toArray(String[]::new),
+                    new String[] { "*" }
+                )
+            ),
             null,
             "Grants access necessary for the Kibana system user to read from and write to the Kibana indices, "
                 + "manage index templates and tokens, and check the availability of the Elasticsearch cluster. "

+ 42 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/CrossClusterApiKeyRoleDescriptorBuilderTests.java

@@ -10,11 +10,16 @@ package org.elasticsearch.xpack.core.security.action.apikey;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.xcontent.XContentParseException;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+import org.elasticsearch.xpack.core.security.authz.permission.ClusterPermission;
 import org.elasticsearch.xpack.core.security.authz.permission.RemoteClusterPermissions;
+import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilege;
+import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
 
 import java.io.IOException;
 import java.util.List;
@@ -27,6 +32,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
 
 public class CrossClusterApiKeyRoleDescriptorBuilderTests extends ESTestCase {
 
@@ -356,9 +362,42 @@ public class CrossClusterApiKeyRoleDescriptorBuilderTests extends ESTestCase {
     }
 
     public void testAPIKeyAllowsAllRemoteClusterPrivilegesForCCS() {
-        // if users can add remote cluster permissions to a role, then the APIKey should also allow that for that permission
-        // the inverse however, is not guaranteed. cross_cluster_search exists largely for internal use and is not exposed to the users role
-        assertTrue(Set.of(CCS_CLUSTER_PRIVILEGE_NAMES).containsAll(RemoteClusterPermissions.getSupportedRemoteClusterPermissions()));
+        // test to help ensure that at least 1 action that is allowed by the remote cluster permissions are supported by CCS
+        List<String> actionsToTest = List.of("cluster:monitor/xpack/enrich/esql/resolve_policy", "cluster:monitor/stats/remote");
+        // if you add new remote cluster permissions, please define an action we can test to help ensure it is supported by RCS 2.0
+        assertThat(actionsToTest.size(), equalTo(RemoteClusterPermissions.getSupportedRemoteClusterPermissions().size()));
+
+        for (String privilege : RemoteClusterPermissions.getSupportedRemoteClusterPermissions()) {
+            boolean actionPassesRemoteClusterPermissionCheck = false;
+            ClusterPrivilege clusterPrivilege = ClusterPrivilegeResolver.resolve(privilege);
+            // each remote cluster privilege has an action to test
+            for (String action : actionsToTest) {
+                if (clusterPrivilege.buildPermission(ClusterPermission.builder())
+                    .build()
+                    .check(action, mock(TransportRequest.class), AuthenticationTestHelper.builder().build())) {
+                    actionPassesRemoteClusterPermissionCheck = true;
+                    break;
+                }
+            }
+            assertTrue(
+                "privilege [" + privilege + "] does not cover any actions among [" + actionsToTest + "]",
+                actionPassesRemoteClusterPermissionCheck
+            );
+        }
+        // test that the actions pass the privilege check for CCS
+        for (String privilege : Set.of(CCS_CLUSTER_PRIVILEGE_NAMES)) {
+            boolean actionPassesRemoteCCSCheck = false;
+            ClusterPrivilege clusterPrivilege = ClusterPrivilegeResolver.resolve(privilege);
+            for (String action : actionsToTest) {
+                if (clusterPrivilege.buildPermission(ClusterPermission.builder())
+                    .build()
+                    .check(action, mock(TransportRequest.class), AuthenticationTestHelper.builder().build())) {
+                    actionPassesRemoteCCSCheck = true;
+                    break;
+                }
+            }
+            assertTrue(actionPassesRemoteCCSCheck);
+        }
     }
 
     private static void assertRoleDescriptor(

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/role/PutRoleRequestTests.java

@@ -104,7 +104,7 @@ public class PutRoleRequestTests extends ESTestCase {
         }
         request.putRemoteCluster(remoteClusterPermissions);
         assertValidationError("Invalid remote_cluster permissions found. Please remove the following: [", request);
-        assertValidationError("Only [monitor_enrich] are allowed", request);
+        assertValidationError("Only [monitor_enrich, monitor_stats] are allowed", request);
     }
 
     public void testValidationErrorWithEmptyClustersInRemoteIndices() {

+ 84 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authc/AuthenticationTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.TransportVersionUtils;
 import org.elasticsearch.transport.RemoteClusterPortSettings;
+import org.elasticsearch.xcontent.ObjectPath;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentType;
@@ -32,6 +33,7 @@ import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContext
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptorsIntersection;
 import org.elasticsearch.xpack.core.security.user.AnonymousUser;
 import org.elasticsearch.xpack.core.security.user.User;
+import org.hamcrest.Matchers;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -42,6 +44,8 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static java.util.Map.entry;
+import static org.elasticsearch.TransportVersions.ROLE_MONITOR_STATS;
+import static org.elasticsearch.xpack.core.security.authc.Authentication.VERSION_API_KEY_ROLES_AS_BYTES;
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper.randomCrossClusterAccessSubjectInfo;
 import static org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfoTests.randomRoleDescriptorsIntersection;
 import static org.elasticsearch.xpack.core.security.authz.permission.RemoteClusterPermissions.ROLE_REMOTE_CLUSTER_PRIVS;
@@ -1070,7 +1074,7 @@ public class AuthenticationTests extends ESTestCase {
         // pick a version before that of the authentication instance to force a rewrite
         final TransportVersion olderVersion = TransportVersionUtils.randomVersionBetween(
             random(),
-            Authentication.VERSION_API_KEY_ROLES_AS_BYTES,
+            VERSION_API_KEY_ROLES_AS_BYTES,
             TransportVersionUtils.getPreviousVersion(original.getEffectiveSubject().getTransportVersion())
         );
 
@@ -1115,7 +1119,7 @@ public class AuthenticationTests extends ESTestCase {
         // pick a version before that of the authentication instance to force a rewrite
         final TransportVersion olderVersion = TransportVersionUtils.randomVersionBetween(
             random(),
-            Authentication.VERSION_API_KEY_ROLES_AS_BYTES,
+            VERSION_API_KEY_ROLES_AS_BYTES,
             TransportVersionUtils.getPreviousVersion(original.getEffectiveSubject().getTransportVersion())
         );
 
@@ -1135,6 +1139,84 @@ public class AuthenticationTests extends ESTestCase {
         );
     }
 
+    public void testMaybeRewriteMetadataForApiKeyRoleDescriptorsWithRemoteClusterRemovePrivs() throws IOException {
+        final String apiKeyId = randomAlphaOfLengthBetween(1, 10);
+        final String apiKeyName = randomAlphaOfLengthBetween(1, 10);
+        Map<String, Object> metadata = Map.ofEntries(
+            entry(AuthenticationField.API_KEY_ID_KEY, apiKeyId),
+            entry(AuthenticationField.API_KEY_NAME_KEY, apiKeyName),
+            entry(AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY, new BytesArray("""
+                {"base_role":{"cluster":["all"],
+                 "remote_cluster":[{"privileges":["monitor_enrich", "monitor_stats"],"clusters":["*"]}]
+                }}""")),
+            entry(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY, new BytesArray("""
+                {"limited_by_role":{"cluster":["*"],
+                 "remote_cluster":[{"privileges":["monitor_enrich", "monitor_stats"],"clusters":["*"]}]
+                }}"""))
+        );
+
+        final Authentication with2privs = AuthenticationTestHelper.builder()
+            .apiKey()
+            .metadata(metadata)
+            .transportVersion(TransportVersion.current())
+            .build();
+
+        // pick a version that will only remove one of the two privileges
+        final TransportVersion olderVersion = TransportVersionUtils.randomVersionBetween(
+            random(),
+            ROLE_REMOTE_CLUSTER_PRIVS,
+            TransportVersionUtils.getPreviousVersion(ROLE_MONITOR_STATS)
+        );
+
+        Map<String, Object> rewrittenMetadata = with2privs.maybeRewriteForOlderVersion(olderVersion).getEffectiveSubject().getMetadata();
+        assertThat(rewrittenMetadata.keySet(), equalTo(with2privs.getAuthenticatingSubject().getMetadata().keySet()));
+
+        // only one of the two privileges are left after the rewrite
+        BytesReference baseRoleBytes = (BytesReference) rewrittenMetadata.get(AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY);
+        Map<String, Object> baseRoleAsMap = XContentHelper.convertToMap(baseRoleBytes, false, XContentType.JSON).v2();
+        assertThat(ObjectPath.eval("base_role.remote_cluster.0.privileges", baseRoleAsMap), Matchers.contains("monitor_enrich"));
+        assertThat(ObjectPath.eval("base_role.remote_cluster.0.clusters", baseRoleAsMap), notNullValue());
+        BytesReference limitedByRoleBytes = (BytesReference) rewrittenMetadata.get(
+            AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY
+        );
+        Map<String, Object> limitedByRoleAsMap = XContentHelper.convertToMap(limitedByRoleBytes, false, XContentType.JSON).v2();
+        assertThat(ObjectPath.eval("limited_by_role.remote_cluster.0.privileges", limitedByRoleAsMap), Matchers.contains("monitor_enrich"));
+        assertThat(ObjectPath.eval("limited_by_role.remote_cluster.0.clusters", limitedByRoleAsMap), notNullValue());
+
+        // same version, but it removes the only defined privilege
+        metadata = Map.ofEntries(
+            entry(AuthenticationField.API_KEY_ID_KEY, apiKeyId),
+            entry(AuthenticationField.API_KEY_NAME_KEY, apiKeyName),
+            entry(AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY, new BytesArray("""
+                {"base_role":{"cluster":["all"],
+                 "remote_cluster":[{"privileges":["monitor_stats"],"clusters":["*"]}]
+                }}""")),
+            entry(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY, new BytesArray("""
+                {"limited_by_role":{"cluster":["*"],
+                 "remote_cluster":[{"privileges":["monitor_stats"],"clusters":["*"]}]
+                }}"""))
+        );
+
+        final Authentication with1priv = AuthenticationTestHelper.builder()
+            .apiKey()
+            .metadata(metadata)
+            .transportVersion(TransportVersion.current())
+            .build();
+
+        rewrittenMetadata = with1priv.maybeRewriteForOlderVersion(olderVersion).getEffectiveSubject().getMetadata();
+        assertThat(rewrittenMetadata.keySet(), equalTo(with1priv.getAuthenticatingSubject().getMetadata().keySet()));
+
+        // the one privileges is removed after the rewrite, which removes the full "remote_cluster" object
+        baseRoleBytes = (BytesReference) rewrittenMetadata.get(AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY);
+        baseRoleAsMap = XContentHelper.convertToMap(baseRoleBytes, false, XContentType.JSON).v2();
+        assertThat(ObjectPath.eval("base_role.remote_cluster", baseRoleAsMap), nullValue());
+        assertThat(ObjectPath.eval("base_role.cluster", baseRoleAsMap), notNullValue());
+        limitedByRoleBytes = (BytesReference) rewrittenMetadata.get(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY);
+        limitedByRoleAsMap = XContentHelper.convertToMap(limitedByRoleBytes, false, XContentType.JSON).v2();
+        assertThat(ObjectPath.eval("limited_by_role.remote_cluster", limitedByRoleAsMap), nullValue());
+        assertThat(ObjectPath.eval("limited_by_role.cluster", limitedByRoleAsMap), notNullValue());
+    }
+
     public void testMaybeRemoveRemoteIndicesFromRoleDescriptors() {
         final boolean includeClusterPrivileges = randomBoolean();
         final BytesReference roleWithoutRemoteIndices = new BytesArray(Strings.format("""

+ 28 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorTests.java

@@ -542,6 +542,34 @@ public class RoleDescriptorTests extends ESTestCase {
             () -> RoleDescriptor.parserBuilder().build().parse("test", new BytesArray(q4), XContentType.JSON)
         );
         assertThat(illegalArgumentException.getMessage(), containsString("remote cluster groups must not be null or empty"));
+
+        // one invalid privilege
+        String q5 = """
+            {
+              "remote_cluster": [
+                {
+                  "privileges": [
+                      "monitor_stats", "read_pipeline"
+                  ],
+                  "clusters": [
+                      "*"
+                  ]
+                }
+              ]
+            }""";
+
+        ElasticsearchParseException parseException = expectThrows(
+            ElasticsearchParseException.class,
+            () -> RoleDescriptor.parserBuilder().build().parse("test", new BytesArray(q5), XContentType.JSON)
+        );
+        assertThat(
+            parseException.getMessage(),
+            containsString(
+                "failed to parse remote_cluster for role [test]. "
+                    + "[monitor_enrich, monitor_stats] are the only values allowed for [privileges] within [remote_cluster]. "
+                    + "Found [monitor_stats, read_pipeline]"
+            )
+        );
     }
 
     public void testParsingFieldPermissionsUsesCache() throws IOException {

+ 13 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/permission/RemoteClusterPermissionGroupTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.xcontent.XContentParser;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Locale;
+import java.util.Map;
 
 import static org.hamcrest.Matchers.containsString;
 
@@ -90,7 +91,7 @@ public class RemoteClusterPermissionGroupTests extends AbstractXContentSerializi
         );
 
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class, invalidClusterAlias);
-        assertEquals("remote_cluster clusters aliases must contain valid non-empty, non-null values", e.getMessage());
+        assertThat(e.getMessage(), containsString("remote_cluster clusters aliases must contain valid non-empty, non-null values"));
 
         final ThrowingRunnable invalidPermission = randomFrom(
             () -> new RemoteClusterPermissionGroup(new String[] { null }, new String[] { "bar" }),
@@ -100,7 +101,17 @@ public class RemoteClusterPermissionGroupTests extends AbstractXContentSerializi
         );
 
         IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, invalidPermission);
-        assertEquals("remote_cluster privileges must contain valid non-empty, non-null values", e2.getMessage());
+        assertThat(e2.getMessage(), containsString("remote_cluster privileges must contain valid non-empty, non-null values"));
+    }
+
+    public void testToMap() {
+        String[] privileges = generateRandomStringArray(5, 5, false, false);
+        String[] clusters = generateRandomStringArray(5, 5, false, false);
+        RemoteClusterPermissionGroup remoteClusterPermissionGroup = new RemoteClusterPermissionGroup(privileges, clusters);
+        assertEquals(
+            Map.of("privileges", Arrays.asList(privileges), "clusters", Arrays.asList(clusters)),
+            remoteClusterPermissionGroup.toMap()
+        );
     }
 
     @Override

+ 118 - 23
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/permission/RemoteClusterPermissionsTests.java

@@ -15,6 +15,8 @@ import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.test.AbstractXContentSerializingTestCase;
 import org.elasticsearch.test.TransportVersionUtils;
 import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+import org.elasticsearch.xpack.core.security.xcontent.XContentUtils;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -27,8 +29,11 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.elasticsearch.TransportVersions.ROLE_MONITOR_STATS;
 import static org.elasticsearch.xpack.core.security.authz.permission.RemoteClusterPermissions.ROLE_REMOTE_CLUSTER_PRIVS;
+import static org.elasticsearch.xpack.core.security.authz.permission.RemoteClusterPermissions.lastTransportVersionPermission;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -85,13 +90,13 @@ public class RemoteClusterPermissionsTests extends AbstractXContentSerializingTe
         for (int i = 0; i < generateRandomGroups(true).size(); i++) {
             String[] clusters = groupClusters.get(i);
             for (String cluster : clusters) {
-                assertTrue(remoteClusterPermission.hasPrivileges(cluster));
-                assertFalse(remoteClusterPermission.hasPrivileges(randomAlphaOfLength(20)));
+                assertTrue(remoteClusterPermission.hasAnyPrivileges(cluster));
+                assertFalse(remoteClusterPermission.hasAnyPrivileges(randomAlphaOfLength(20)));
             }
         }
     }
 
-    public void testPrivilegeNames() {
+    public void testCollapseAndRemoveUnsupportedPrivileges() {
         Map<TransportVersion, Set<String>> original = RemoteClusterPermissions.allowedRemoteClusterPermissions;
         try {
             // create random groups with random privileges for random clusters
@@ -108,7 +113,7 @@ public class RemoteClusterPermissionsTests extends AbstractXContentSerializingTe
                 String[] privileges = groupPrivileges.get(i);
                 String[] clusters = groupClusters.get(i);
                 for (String cluster : clusters) {
-                    String[] found = remoteClusterPermission.privilegeNames(cluster, TransportVersion.current());
+                    String[] found = remoteClusterPermission.collapseAndRemoveUnsupportedPrivileges(cluster, TransportVersion.current());
                     Arrays.sort(found);
                     // ensure all lowercase since the privilege names are case insensitive and the method will result in lowercase
                     for (int j = 0; j < privileges.length; j++) {
@@ -126,13 +131,14 @@ public class RemoteClusterPermissionsTests extends AbstractXContentSerializingTe
         // create random groups with random privileges for random clusters
         List<RemoteClusterPermissionGroup> randomGroups = generateRandomGroups(true);
         // replace a random value with one that is allowed
-        groupPrivileges.get(0)[0] = "monitor_enrich";
+        String singleValidPrivilege = randomFrom(RemoteClusterPermissions.allowedRemoteClusterPermissions.get(TransportVersion.current()));
+        groupPrivileges.get(0)[0] = singleValidPrivilege;
 
         for (int i = 0; i < randomGroups.size(); i++) {
             String[] privileges = groupPrivileges.get(i);
             String[] clusters = groupClusters.get(i);
             for (String cluster : clusters) {
-                String[] found = remoteClusterPermission.privilegeNames(cluster, TransportVersion.current());
+                String[] found = remoteClusterPermission.collapseAndRemoveUnsupportedPrivileges(cluster, TransportVersion.current());
                 Arrays.sort(found);
                 // ensure all lowercase since the privilege names are case insensitive and the method will result in lowercase
                 for (int j = 0; j < privileges.length; j++) {
@@ -149,7 +155,7 @@ public class RemoteClusterPermissionsTests extends AbstractXContentSerializingTe
                     assertFalse(Arrays.equals(privileges, found));
                     if (i == 0) {
                         // ensure that for the current version we only find the valid "monitor_enrich"
-                        assertThat(Set.of(found), equalTo(Set.of("monitor_enrich")));
+                        assertThat(Set.of(found), equalTo(Set.of(singleValidPrivilege)));
                     } else {
                         // all other groups should be found to not have any privileges
                         assertTrue(found.length == 0);
@@ -159,21 +165,26 @@ public class RemoteClusterPermissionsTests extends AbstractXContentSerializingTe
         }
     }
 
-    public void testMonitorEnrichPerVersion() {
-        // test monitor_enrich before, after and on monitor enrich version
-        String[] privileges = randomBoolean() ? new String[] { "monitor_enrich" } : new String[] { "monitor_enrich", "foo", "bar" };
+    public void testPermissionsPerVersion() {
+        testPermissionPerVersion("monitor_enrich", ROLE_REMOTE_CLUSTER_PRIVS);
+        testPermissionPerVersion("monitor_stats", ROLE_MONITOR_STATS);
+    }
+
+    private void testPermissionPerVersion(String permission, TransportVersion version) {
+        // test permission before, after and on the version
+        String[] privileges = randomBoolean() ? new String[] { permission } : new String[] { permission, "foo", "bar" };
         String[] before = new RemoteClusterPermissions().addGroup(new RemoteClusterPermissionGroup(privileges, new String[] { "*" }))
-            .privilegeNames("*", TransportVersionUtils.getPreviousVersion(ROLE_REMOTE_CLUSTER_PRIVS));
-        // empty set since monitor_enrich is not allowed in the before version
+            .collapseAndRemoveUnsupportedPrivileges("*", TransportVersionUtils.getPreviousVersion(version));
+        // empty set since permissions is not allowed in the before version
         assertThat(Set.of(before), equalTo(Collections.emptySet()));
         String[] on = new RemoteClusterPermissions().addGroup(new RemoteClusterPermissionGroup(privileges, new String[] { "*" }))
-            .privilegeNames("*", ROLE_REMOTE_CLUSTER_PRIVS);
-        // only monitor_enrich since the other values are not allowed
-        assertThat(Set.of(on), equalTo(Set.of("monitor_enrich")));
+            .collapseAndRemoveUnsupportedPrivileges("*", version);
+        // the permission is found on that provided version
+        assertThat(Set.of(on), equalTo(Set.of(permission)));
         String[] after = new RemoteClusterPermissions().addGroup(new RemoteClusterPermissionGroup(privileges, new String[] { "*" }))
-            .privilegeNames("*", TransportVersion.current());
-        // only monitor_enrich since the other values are not allowed
-        assertThat(Set.of(after), equalTo(Set.of("monitor_enrich")));
+            .collapseAndRemoveUnsupportedPrivileges("*", TransportVersion.current());
+        // current version (after the version) has the permission
+        assertThat(Set.of(after), equalTo(Set.of(permission)));
     }
 
     public void testValidate() {
@@ -181,12 +192,70 @@ public class RemoteClusterPermissionsTests extends AbstractXContentSerializingTe
         // random values not allowed
         IllegalArgumentException error = expectThrows(IllegalArgumentException.class, () -> remoteClusterPermission.validate());
         assertTrue(error.getMessage().contains("Invalid remote_cluster permissions found. Please remove the following:"));
-        assertTrue(error.getMessage().contains("Only [monitor_enrich] are allowed"));
+        assertTrue(error.getMessage().contains("Only [monitor_enrich, monitor_stats] are allowed"));
 
         new RemoteClusterPermissions().addGroup(new RemoteClusterPermissionGroup(new String[] { "monitor_enrich" }, new String[] { "*" }))
             .validate(); // no error
     }
 
+    public void testToMap() {
+        RemoteClusterPermissions remoteClusterPermissions = new RemoteClusterPermissions();
+        List<RemoteClusterPermissionGroup> groups = generateRandomGroups(randomBoolean());
+        for (int i = 0; i < groups.size(); i++) {
+            remoteClusterPermissions.addGroup(groups.get(i));
+        }
+        List<Map<String, List<String>>> asAsMap = remoteClusterPermissions.toMap();
+        RemoteClusterPermissions remoteClusterPermissionsAsMap = new RemoteClusterPermissions(asAsMap);
+        assertEquals(remoteClusterPermissions, remoteClusterPermissionsAsMap);
+    }
+
+    public void testRemoveUnsupportedPrivileges() {
+        RemoteClusterPermissions remoteClusterPermissions = new RemoteClusterPermissions();
+        RemoteClusterPermissionGroup group = new RemoteClusterPermissionGroup(new String[] { "monitor_enrich" }, new String[] { "*" });
+        remoteClusterPermissions.addGroup(group);
+        // this privilege is allowed by versions, so nothing should be removed
+        assertEquals(remoteClusterPermissions, remoteClusterPermissions.removeUnsupportedPrivileges(ROLE_REMOTE_CLUSTER_PRIVS));
+        assertEquals(remoteClusterPermissions, remoteClusterPermissions.removeUnsupportedPrivileges(ROLE_MONITOR_STATS));
+
+        remoteClusterPermissions = new RemoteClusterPermissions();
+        if (randomBoolean()) {
+            group = new RemoteClusterPermissionGroup(new String[] { "monitor_stats" }, new String[] { "*" });
+        } else {
+            // if somehow duplicates end up here, they should not influence removal
+            group = new RemoteClusterPermissionGroup(new String[] { "monitor_stats", "monitor_stats" }, new String[] { "*" });
+        }
+        remoteClusterPermissions.addGroup(group);
+        // this single newer privilege is not allowed in the older version, so it should result in an object with no groups
+        assertNotEquals(remoteClusterPermissions, remoteClusterPermissions.removeUnsupportedPrivileges(ROLE_REMOTE_CLUSTER_PRIVS));
+        assertFalse(remoteClusterPermissions.removeUnsupportedPrivileges(ROLE_REMOTE_CLUSTER_PRIVS).hasAnyPrivileges());
+        assertEquals(remoteClusterPermissions, remoteClusterPermissions.removeUnsupportedPrivileges(ROLE_MONITOR_STATS));
+
+        int groupCount = randomIntBetween(1, 5);
+        remoteClusterPermissions = new RemoteClusterPermissions();
+        group = new RemoteClusterPermissionGroup(new String[] { "monitor_enrich", "monitor_stats" }, new String[] { "*" });
+        for (int i = 0; i < groupCount; i++) {
+            remoteClusterPermissions.addGroup(group);
+        }
+        // one of the newer privilege is not allowed in the older version, so it should result in a group with only the allowed privilege
+        RemoteClusterPermissions expected = new RemoteClusterPermissions();
+        for (int i = 0; i < groupCount; i++) {
+            expected.addGroup(new RemoteClusterPermissionGroup(new String[] { "monitor_enrich" }, new String[] { "*" }));
+        }
+        assertEquals(expected, remoteClusterPermissions.removeUnsupportedPrivileges(ROLE_REMOTE_CLUSTER_PRIVS));
+        // both privileges allowed in the newer version, so it should not change the permission
+        assertEquals(remoteClusterPermissions, remoteClusterPermissions.removeUnsupportedPrivileges(ROLE_MONITOR_STATS));
+    }
+
+    public void testShortCircuitRemoveUnsupportedPrivileges() {
+        RemoteClusterPermissions remoteClusterPermissions = new RemoteClusterPermissions();
+        assertSame(remoteClusterPermissions, remoteClusterPermissions.removeUnsupportedPrivileges(TransportVersion.current()));
+        assertSame(remoteClusterPermissions, remoteClusterPermissions.removeUnsupportedPrivileges(lastTransportVersionPermission));
+        assertNotSame(
+            remoteClusterPermissions,
+            remoteClusterPermissions.removeUnsupportedPrivileges(TransportVersionUtils.getPreviousVersion(lastTransportVersionPermission))
+        );
+    }
+
     private List<RemoteClusterPermissionGroup> generateRandomGroups(boolean fuzzyCluster) {
         clean();
         List<RemoteClusterPermissionGroup> groups = new ArrayList<>();
@@ -216,22 +285,48 @@ public class RemoteClusterPermissionsTests extends AbstractXContentSerializingTe
 
     @Override
     protected RemoteClusterPermissions createTestInstance() {
+        Set<String> all = RemoteClusterPermissions.allowedRemoteClusterPermissions.values()
+            .stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
+        List<String> randomPermission = randomList(1, all.size(), () -> randomFrom(all));
         return new RemoteClusterPermissions().addGroup(
-            new RemoteClusterPermissionGroup(new String[] { "monitor_enrich" }, new String[] { "*" })
+            new RemoteClusterPermissionGroup(randomPermission.toArray(new String[0]), new String[] { "*" })
         );
     }
 
     @Override
     protected RemoteClusterPermissions mutateInstance(RemoteClusterPermissions instance) throws IOException {
         return new RemoteClusterPermissions().addGroup(
-            new RemoteClusterPermissionGroup(new String[] { "monitor_enrich" }, new String[] { "*" })
+            new RemoteClusterPermissionGroup(new String[] { "monitor_enrich", "monitor_stats" }, new String[] { "*" })
         ).addGroup(new RemoteClusterPermissionGroup(new String[] { "foobar" }, new String[] { "*" }));
     }
 
     @Override
     protected RemoteClusterPermissions doParseInstance(XContentParser parser) throws IOException {
-        // fromXContent/parsing isn't supported since we still do old school manual parsing of the role descriptor
-        return createTestInstance();
+        // fromXContent/object parsing isn't supported since we still do old school manual parsing of the role descriptor
+        // so this test is silly because it only tests we know how to manually parse the test instance in this test
+        // this is needed since we want the other parts from the AbstractXContentSerializingTestCase suite
+        RemoteClusterPermissions remoteClusterPermissions = new RemoteClusterPermissions();
+        String[] privileges = null;
+        String[] clusters = null;
+        XContentParser.Token token;
+        String currentFieldName = null;
+        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+            if (token == XContentParser.Token.START_OBJECT) {
+                continue;
+            }
+            if (token == XContentParser.Token.FIELD_NAME) {
+                currentFieldName = parser.currentName();
+            } else if (RoleDescriptor.Fields.PRIVILEGES.match(currentFieldName, parser.getDeprecationHandler())) {
+                privileges = XContentUtils.readStringArray(parser, false);
+
+            } else if (RoleDescriptor.Fields.CLUSTERS.match(currentFieldName, parser.getDeprecationHandler())) {
+                clusters = XContentUtils.readStringArray(parser, false);
+            }
+        }
+        remoteClusterPermissions.addGroup(new RemoteClusterPermissionGroup(privileges, clusters));
+        return remoteClusterPermissions;
     }
 
     @Override

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java

@@ -2833,7 +2833,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
             is(false)
         );
         assertThat(
-            superuserRole.remoteCluster().privilegeNames("*", TransportVersion.current()),
+            superuserRole.remoteCluster().collapseAndRemoveUnsupportedPrivileges("*", TransportVersion.current()),
             equalTo(RemoteClusterPermissions.getSupportedRemoteClusterPermissions().toArray(new String[0]))
         );
     }

+ 5 - 3
x-pack/plugin/security/qa/multi-cluster/build.gradle

@@ -31,13 +31,15 @@ dependencies {
 tasks.named("javaRestTest") {
   enabled = true
   // This is tested explicitly in bwc test tasks.
-  exclude '**/RemoteClusterSecurityBwcRestIT.class'
+  exclude '**/RemoteClusterSecurityBWCToRCS1ClusterRestIT.class'
+  exclude '**/RemoteClusterSecurityBWCToRCS2ClusterRestIT.class'
 }
 
-BuildParams.bwcVersions.withWireCompatible(v -> v.before(BuildParams.isSnapshotBuild() ? '8.8.0' : '8.9.1')) { bwcVersion, baseName ->
+BuildParams.bwcVersions.withWireCompatible(v -> v.onOrAfter('8.16.0')) { bwcVersion, baseName ->
   tasks.register(bwcTaskName(bwcVersion), StandaloneRestIntegTestTask) {
     usesBwcDistribution(bwcVersion)
     systemProperty("tests.old_cluster_version", bwcVersion)
-    include '**/RemoteClusterSecurityBwcRestIT.class'
+    include '**/RemoteClusterSecurityBWCToRCS1ClusterRestIT.class'
+    include '**/RemoteClusterSecurityBWCToRCS2ClusterRestIT.class'
   }
 }

+ 89 - 79
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityBwcRestIT.java → x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityBWCRestIT.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.remotecluster;
 
+import org.apache.http.util.EntityUtils;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
@@ -15,14 +16,9 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchResponseUtils;
-import org.elasticsearch.test.cluster.ElasticsearchCluster;
-import org.elasticsearch.test.cluster.local.distribution.DistributionType;
-import org.elasticsearch.test.cluster.util.Version;
-import org.elasticsearch.test.cluster.util.resource.Resource;
 import org.elasticsearch.test.rest.ObjectPath;
-import org.junit.ClassRule;
-import org.junit.rules.RuleChain;
-import org.junit.rules.TestRule;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.json.JsonXContent;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -32,48 +28,21 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 /**
- * BWC test which ensures that users and API keys with defined {@code remote_indices} privileges can be used to query legacy remote clusters
+ * A set of BWC tests that can be executed with either RCS 1 or RCS 2 against an older fulfilling cluster.
  */
-public class RemoteClusterSecurityBwcRestIT extends AbstractRemoteClusterSecurityTestCase {
+public abstract class AbstractRemoteClusterSecurityBWCRestIT extends AbstractRemoteClusterSecurityTestCase {
 
-    private static final Version OLD_CLUSTER_VERSION = Version.fromString(System.getProperty("tests.old_cluster_version"));
+    protected abstract boolean isRCS2();
 
-    static {
-        fulfillingCluster = ElasticsearchCluster.local()
-            .version(OLD_CLUSTER_VERSION)
-            .distribution(DistributionType.DEFAULT)
-            .name("fulfilling-cluster")
-            .apply(commonClusterConfig)
-            .setting("xpack.ml.enabled", "false")
-            .build();
-
-        queryCluster = ElasticsearchCluster.local()
-            .version(Version.CURRENT)
-            .distribution(DistributionType.INTEG_TEST)
-            .name("query-cluster")
-            .apply(commonClusterConfig)
-            .setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
-            .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
-            .rolesFile(Resource.fromClasspath("roles.yml"))
-            .build();
-    }
-
-    @ClassRule
-    // Use a RuleChain to ensure that fulfilling cluster is started before query cluster
-    public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
-
-    public void testBwcWithLegacyCrossClusterSearch() throws Exception {
-        final boolean useProxyMode = randomBoolean();
-        // Update remote cluster settings on QC.
-        setupQueryClusterRemoteClusters(useProxyMode);
-        // Ensure remote cluster is connected
-        ensureRemoteFulfillingClusterIsConnected(useProxyMode);
+    public void testBwcCCSViaRCS1orRCS2() throws Exception {
 
         // Fulfilling cluster
         {
@@ -122,19 +91,22 @@ public class RemoteClusterSecurityBwcRestIT extends AbstractRemoteClusterSecurit
                   ]
                 }""");
             assertOK(adminClient().performRequest(putRoleRequest));
-            // We need to define the same role on QC and FC in order for CCS to work.
-            final var putRoleRequestFulfilling = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
-            putRoleRequestFulfilling.setJsonEntity("""
-                {
-                  "cluster": ["manage_own_api_key"],
-                  "indices": [
+            if (isRCS2() == false) {
+                // We need to define the same role on QC and FC in order for CCS to work.
+                final var putRoleRequestFulfilling = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
+                putRoleRequestFulfilling.setJsonEntity("""
                     {
-                      "names": ["remote_index1"],
-                      "privileges": ["read", "read_cross_cluster"]
-                    }
-                  ]
-                }""");
-            assertOK(performRequestAgainstFulfillingCluster(putRoleRequestFulfilling));
+                      "cluster": ["manage_own_api_key"],
+                      "indices": [
+                        {
+                          "names": ["remote_index1"],
+                          "privileges": ["read", "read_cross_cluster"]
+                        }
+                      ]
+                    }""");
+                assertOK(performRequestAgainstFulfillingCluster(putRoleRequestFulfilling));
+            }
+
             final var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
             putUserRequest.setJsonEntity("""
                 {
@@ -166,7 +138,7 @@ public class RemoteClusterSecurityBwcRestIT extends AbstractRemoteClusterSecurit
                       ],
                       "remote_cluster": [
                         {
-                          "privileges": ["monitor_enrich"],
+                          "privileges": ["monitor_enrich", "monitor_stats"],
                           "clusters": ["*"]
                         }
                       ]
@@ -187,38 +159,35 @@ public class RemoteClusterSecurityBwcRestIT extends AbstractRemoteClusterSecurit
 
             // Check that we can search the fulfilling cluster from the querying cluster
             final boolean alsoSearchLocally = randomBoolean();
+            final String remoteClusterName = randomFrom("my_remote_cluster", "*", "my_remote_*");
+            final String remoteIndexName = randomFrom("remote_index1", "*");
             final var searchRequest = new Request(
                 "GET",
                 String.format(
                     Locale.ROOT,
                     "/%s%s:%s/_search?ccs_minimize_roundtrips=%s",
                     alsoSearchLocally ? "local_index," : "",
-                    randomFrom("my_remote_cluster", "*", "my_remote_*"),
-                    randomFrom("remote_index1", "*"),
+                    remoteClusterName,
+                    remoteIndexName,
                     randomBoolean()
                 )
             );
-            final String sendRequestWith = randomFrom("user", "apikey");
-            final Response response = sendRequestWith.equals("user")
-                ? performRequestWithRemoteAccessUser(searchRequest)
-                : performRequestWithApiKey(searchRequest, apiKeyEncoded);
+            String esqlCommand = String.format(Locale.ROOT, "FROM %s,%s:%s | LIMIT 10", "local_index", remoteClusterName, remoteIndexName);
+            // send request with user
+            Response response = performRequestWithRemoteAccessUser(searchRequest);
             assertOK(response);
-            final SearchResponse searchResponse;
             try (var parser = responseAsParser(response)) {
-                searchResponse = SearchResponseUtils.parseSearchResponse(parser);
+                assertSearchResponse(SearchResponseUtils.parseSearchResponse(parser), alsoSearchLocally);
             }
-            try {
-                final List<String> actualIndices = Arrays.stream(searchResponse.getHits().getHits())
-                    .map(SearchHit::getIndex)
-                    .collect(Collectors.toList());
-                if (alsoSearchLocally) {
-                    assertThat(actualIndices, containsInAnyOrder("remote_index1", "local_index"));
-                } else {
-                    assertThat(actualIndices, containsInAnyOrder("remote_index1"));
-                }
-            } finally {
-                searchResponse.decRef();
+            assertEsqlResponse(performRequestWithRemoteAccessUser(esqlRequest(esqlCommand)));
+
+            // send request with apikey
+            response = performRequestWithApiKey(searchRequest, apiKeyEncoded);
+            assertOK(response);
+            try (var parser = responseAsParser(response)) {
+                assertSearchResponse(SearchResponseUtils.parseSearchResponse(parser), alsoSearchLocally);
             }
+            assertEsqlResponse(performRequestWithApiKey(esqlRequest(esqlCommand), apiKeyEncoded));
         }
     }
 
@@ -231,6 +200,14 @@ public class RemoteClusterSecurityBwcRestIT extends AbstractRemoteClusterSecurit
             final Map<String, Object> remoteInfoMap = responseAsMap(remoteInfoResponse);
             assertThat(remoteInfoMap, hasKey("my_remote_cluster"));
             assertThat(org.elasticsearch.xcontent.ObjectPath.eval("my_remote_cluster.connected", remoteInfoMap), is(true));
+            if (isRCS2()) {
+                assertThat(
+                    org.elasticsearch.xcontent.ObjectPath.eval("my_remote_cluster.cluster_credentials", remoteInfoMap),
+                    is("::es_redacted::") // RCS 2.0
+                );
+            } else {
+                assertThat(org.elasticsearch.xcontent.ObjectPath.eval("my_remote_cluster.cluster_credentials", remoteInfoMap), nullValue());
+            }
             if (false == useProxyMode) {
                 assertThat(
                     org.elasticsearch.xcontent.ObjectPath.eval("my_remote_cluster.num_nodes_connected", remoteInfoMap),
@@ -240,7 +217,17 @@ public class RemoteClusterSecurityBwcRestIT extends AbstractRemoteClusterSecurit
         });
     }
 
-    private void setupQueryClusterRemoteClusters(boolean useProxyMode) throws IOException {
+    private Response performRequestWithRemoteAccessUser(final Request request) throws IOException {
+        request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue(REMOTE_SEARCH_USER, PASS)));
+        return client().performRequest(request);
+    }
+
+    private Response performRequestWithApiKey(final Request request, final String encoded) throws IOException {
+        request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", "ApiKey " + encoded));
+        return client().performRequest(request);
+    }
+
+    private void setupQueryClusterRCS1(boolean useProxyMode) throws IOException {
         final Settings.Builder builder = Settings.builder();
         if (useProxyMode) {
             builder.put("cluster.remote.my_remote_cluster.mode", "proxy")
@@ -252,14 +239,37 @@ public class RemoteClusterSecurityBwcRestIT extends AbstractRemoteClusterSecurit
         updateClusterSettings(builder.build());
     }
 
-    private Response performRequestWithRemoteAccessUser(final Request request) throws IOException {
-        request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue(REMOTE_SEARCH_USER, PASS)));
-        return client().performRequest(request);
+    private Request esqlRequest(String command) throws IOException {
+        XContentBuilder body = JsonXContent.contentBuilder();
+        body.startObject();
+        body.field("query", command);
+        body.field("include_ccs_metadata", true);
+        body.endObject();
+        Request request = new Request("POST", "_query");
+        request.setJsonEntity(org.elasticsearch.common.Strings.toString(body));
+        return request;
     }
 
-    private Response performRequestWithApiKey(final Request request, final String encoded) throws IOException {
-        request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", "ApiKey " + encoded));
-        return client().performRequest(request);
+    private void assertSearchResponse(SearchResponse searchResponse, boolean alsoSearchLocally) {
+        try {
+            final List<String> actualIndices = Arrays.stream(searchResponse.getHits().getHits())
+                .map(SearchHit::getIndex)
+                .collect(Collectors.toList());
+            if (alsoSearchLocally) {
+                assertThat(actualIndices, containsInAnyOrder("remote_index1", "local_index"));
+            } else {
+                assertThat(actualIndices, containsInAnyOrder("remote_index1"));
+            }
+        } finally {
+            searchResponse.decRef();
+        }
     }
 
+    private void assertEsqlResponse(Response response) throws IOException {
+        assertOK(response);
+        String responseAsString = EntityUtils.toString(response.getEntity());
+        assertThat(responseAsString, containsString("\"my_remote_cluster\":{\"status\":\"successful\""));
+        assertThat(responseAsString, containsString("local_bar"));
+        assertThat(responseAsString, containsString("bar"));
+    }
 }

+ 69 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityBWCToRCS1ClusterRestIT.java

@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.remotecluster;
+
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.local.distribution.DistributionType;
+import org.elasticsearch.test.cluster.util.Version;
+import org.elasticsearch.test.cluster.util.resource.Resource;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+/**
+ * BWC test which ensures that users and API keys with defined {@code remote_indices}/{@code remote_cluster} privileges can be used
+ * to query legacy remote clusters when using RCS 1.0. We send the request the to an older fulfilling cluster using RCS 1.0 with a user/role
+ * and API key where the {@code remote_indices}/{@code remote_cluster} are defined in the newer query cluster.
+ * All RCS 2.0 config should be effectively ignored when using RCS 1 for CCS. We send to an elder fulfil cluster to help ensure that
+ * newly introduced RCS 2.0 artifacts are forward compatible from the perspective of the old cluster. For example, a new privilege
+ * sent to an old cluster should be ignored.
+ */
+public class RemoteClusterSecurityBWCToRCS1ClusterRestIT extends AbstractRemoteClusterSecurityBWCRestIT {
+
+    private static final Version OLD_CLUSTER_VERSION = Version.fromString(System.getProperty("tests.old_cluster_version"));
+
+    static {
+        fulfillingCluster = ElasticsearchCluster.local()
+            .version(OLD_CLUSTER_VERSION)
+            .distribution(DistributionType.DEFAULT)
+            .name("fulfilling-cluster")
+            .apply(commonClusterConfig)
+            .setting("xpack.ml.enabled", "false")
+            // .setting("logger.org.elasticsearch.xpack.core", "trace") //useful for human debugging
+            // .setting("logger.org.elasticsearch.xpack.security", "trace") //useful for human debugging
+            .build();
+
+        queryCluster = ElasticsearchCluster.local()
+            .version(Version.CURRENT)
+            .distribution(DistributionType.DEFAULT)
+            .setting("xpack.ml.enabled", "false")
+            .name("query-cluster")
+            .apply(commonClusterConfig)
+            .setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
+            .rolesFile(Resource.fromClasspath("roles.yml"))
+            .build();
+    }
+
+    @ClassRule
+    // Use a RuleChain to ensure that fulfilling cluster is started before query cluster
+    public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
+
+    @Override
+    protected boolean isRCS2() {
+        return false;
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        configureRemoteCluster(REMOTE_CLUSTER_ALIAS, fulfillingCluster, true, randomBoolean(), false);
+        super.setUp();
+    }
+}

+ 90 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityBWCToRCS2ClusterRestIT.java

@@ -0,0 +1,90 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.remotecluster;
+
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.local.distribution.DistributionType;
+import org.elasticsearch.test.cluster.util.Version;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * BWC test which ensures that users and API keys with defined {@code remote_indices}/{@code remote_cluster} privileges can be used
+ * to query older remote clusters when using RCS 2.0. We send the request the to an older fulfilling cluster using RCS 2.0 with a user/role
+ * and API key where the {@code remote_indices}/{@code remote_cluster} are defined in the newer query cluster.
+ * All new RCS 2.0 config should be effectively ignored when sending to older RCS 2.0. For example, a new privilege
+ * sent to an old cluster should be ignored.
+ */
+public class RemoteClusterSecurityBWCToRCS2ClusterRestIT extends AbstractRemoteClusterSecurityBWCRestIT {
+
+    private static final Version OLD_CLUSTER_VERSION = Version.fromString(System.getProperty("tests.old_cluster_version"));
+    private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
+
+    static {
+
+        fulfillingCluster = ElasticsearchCluster.local()
+            .name("fulfilling-cluster")
+            .version(OLD_CLUSTER_VERSION)
+            .distribution(DistributionType.DEFAULT)
+            .apply(commonClusterConfig)
+            .setting("xpack.ml.enabled", "false")
+            .setting("remote_cluster_server.enabled", "true")
+            .setting("remote_cluster.port", "0")
+            .setting("xpack.security.remote_cluster_server.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
+            .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
+            .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
+            // .setting("logger.org.elasticsearch.xpack.core", "trace") //useful for human debugging
+            // .setting("logger.org.elasticsearch.xpack.security", "trace") //useful for human debugging
+            .build();
+
+        queryCluster = ElasticsearchCluster.local()
+            .name("query-cluster")
+            .distribution(DistributionType.DEFAULT)
+            .setting("xpack.ml.enabled", "false")
+            .apply(commonClusterConfig)
+            .setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
+            .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
+                if (API_KEY_MAP_REF.get() == null) {
+                    final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
+                        {
+                            "search": [
+                              {
+                                "names": ["*"]
+                              }
+                            ]
+                        }""");
+                    API_KEY_MAP_REF.set(apiKeyMap);
+                }
+                return (String) API_KEY_MAP_REF.get().get("encoded");
+            })
+            .build();
+    }
+
+    @ClassRule
+    // Use a RuleChain to ensure that fulfilling cluster is started before query cluster
+    public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
+
+    @Override
+    protected boolean isRCS2() {
+        return true;
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        configureRemoteCluster(REMOTE_CLUSTER_ALIAS, fulfillingCluster, false, randomBoolean(), false);
+        super.setUp();
+    }
+}

+ 266 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestStatsIT.java

@@ -0,0 +1,266 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.remotecluster;
+
+import org.apache.http.util.EntityUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchResponseUtils;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.local.distribution.DistributionType;
+import org.elasticsearch.test.cluster.util.resource.Resource;
+import org.elasticsearch.test.junit.RunnableTestRuleAdapter;
+import org.elasticsearch.test.rest.ObjectPath;
+import org.elasticsearch.xcontent.XContentType;
+import org.junit.ClassRule;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+
+public class RemoteClusterSecurityRestStatsIT extends AbstractRemoteClusterSecurityTestCase {
+
+    private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
+    private static final AtomicReference<Map<String, Object>> REST_API_KEY_MAP_REF = new AtomicReference<>();
+    private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean();
+    private static final AtomicBoolean NODE1_RCS_SERVER_ENABLED = new AtomicBoolean();
+    private static final AtomicBoolean NODE2_RCS_SERVER_ENABLED = new AtomicBoolean();
+    private static final int FULFILL_NODE_COUNT = 3;
+    private static final Logger logger = LogManager.getLogger(RemoteClusterSecurityRestStatsIT.class);
+
+    static {
+        fulfillingCluster = ElasticsearchCluster.local()
+            .distribution(DistributionType.DEFAULT)
+            .name("fulfilling-cluster")
+            .nodes(FULFILL_NODE_COUNT)
+            .apply(commonClusterConfig)
+            .setting("remote_cluster.port", "0")
+            .setting("xpack.security.remote_cluster_server.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
+            .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
+            .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
+            .setting("xpack.security.authc.token.enabled", "true")
+            .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
+            .node(0, spec -> spec.setting("remote_cluster_server.enabled", "true"))
+            .node(1, spec -> spec.setting("remote_cluster_server.enabled", () -> String.valueOf(NODE1_RCS_SERVER_ENABLED.get())))
+            .node(2, spec -> spec.setting("remote_cluster_server.enabled", () -> String.valueOf(NODE2_RCS_SERVER_ENABLED.get())))
+            .build();
+
+        queryCluster = ElasticsearchCluster.local()
+            .distribution(DistributionType.DEFAULT)
+            .name("query-cluster")
+            .apply(commonClusterConfig)
+            .setting("xpack.security.remote_cluster_client.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
+            .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
+            .setting("xpack.security.authc.token.enabled", "true")
+            .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
+                if (API_KEY_MAP_REF.get() == null) {
+                    final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
+                        {
+                          "search": [
+                            {
+                                "names": ["*"]
+                            }
+                          ]
+                        }""");
+                    API_KEY_MAP_REF.set(apiKeyMap);
+                }
+                return (String) API_KEY_MAP_REF.get().get("encoded");
+            })
+            // Define a bogus API key for another remote cluster
+            .keystore("cluster.remote.invalid_remote.credentials", randomEncodedApiKey())
+            // Define remote with a REST API key to observe expected failure
+            .keystore("cluster.remote.wrong_api_key_type.credentials", () -> {
+                if (REST_API_KEY_MAP_REF.get() == null) {
+                    initFulfillingClusterClient();
+                    final var createApiKeyRequest = new Request("POST", "/_security/api_key");
+                    createApiKeyRequest.setJsonEntity("""
+                        {
+                          "name": "rest_api_key"
+                        }""");
+                    try {
+                        final Response createApiKeyResponse = performRequestWithAdminUser(fulfillingClusterClient, createApiKeyRequest);
+                        assertOK(createApiKeyResponse);
+                        REST_API_KEY_MAP_REF.set(responseAsMap(createApiKeyResponse));
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                    }
+                }
+                return (String) REST_API_KEY_MAP_REF.get().get("encoded");
+            })
+            .rolesFile(Resource.fromClasspath("roles.yml"))
+            .user(REMOTE_METRIC_USER, PASS.toString(), "read_remote_shared_metrics", false)
+            .build();
+    }
+
+    @ClassRule
+    // Use a RuleChain to ensure that fulfilling cluster is started before query cluster
+    // `SSL_ENABLED_REF` is used to control the SSL-enabled setting on the test clusters
+    // We set it here, since randomization methods are not available in the static initialize context above
+    public static TestRule clusterRule = RuleChain.outerRule(new RunnableTestRuleAdapter(() -> {
+        SSL_ENABLED_REF.set(usually());
+        NODE1_RCS_SERVER_ENABLED.set(randomBoolean());
+        NODE2_RCS_SERVER_ENABLED.set(randomBoolean());
+    })).around(fulfillingCluster).around(queryCluster);
+
+    public void testCrossClusterStats() throws Exception {
+        configureRemoteCluster();
+        setupRoleAndUserQueryCluster();
+        addDocToIndexFulfillingCluster("index1");
+
+        // search #1
+        searchFulfillingClusterFromQueryCluster("index1");
+        Map<String, Object> statsResponseAsMap = getFulfillingClusterStatsFromQueryCluster();
+        assertThat(ObjectPath.evaluate(statsResponseAsMap, "ccs.clusters.my_remote_cluster.nodes_count"), equalTo(FULFILL_NODE_COUNT));
+        assertThat(ObjectPath.evaluate(statsResponseAsMap, "ccs._search.clusters.my_remote_cluster.total"), equalTo(1));
+        int initialIndexCount = ObjectPath.evaluate(statsResponseAsMap, "ccs.clusters.my_remote_cluster.indices_count");
+
+        // search #2
+        searchFulfillingClusterFromQueryCluster("index1");
+        statsResponseAsMap = getFulfillingClusterStatsFromQueryCluster();
+        assertThat(ObjectPath.evaluate(statsResponseAsMap, "ccs._search.total"), equalTo(2));
+        assertThat(ObjectPath.evaluate(statsResponseAsMap, "ccs._search.clusters.my_remote_cluster.total"), equalTo(2));
+
+        // search #3
+        expectThrows(Exception.class, () -> searchFulfillingClusterFromQueryCluster("junk"));
+        statsResponseAsMap = getFulfillingClusterStatsFromQueryCluster();
+        assertThat(ObjectPath.evaluate(statsResponseAsMap, "ccs._search.total"), equalTo(3));
+        assertThat(ObjectPath.evaluate(statsResponseAsMap, "ccs._search.clusters.my_remote_cluster.total"), equalTo(2));
+
+        // search #4
+        addDocToIndexFulfillingCluster("index2");
+        searchFulfillingClusterFromQueryCluster("index2");
+        statsResponseAsMap = getFulfillingClusterStatsFromQueryCluster();
+        assertThat(ObjectPath.evaluate(statsResponseAsMap, "ccs._search.total"), equalTo(4));
+        assertThat(ObjectPath.evaluate(statsResponseAsMap, "ccs._search.clusters.my_remote_cluster.total"), equalTo(3));
+        int updatedIndexCount = ObjectPath.evaluate(statsResponseAsMap, "ccs.clusters.my_remote_cluster.indices_count");
+        assertThat(updatedIndexCount, equalTo(initialIndexCount + 1));
+    }
+
+    private Map<String, Object> getFulfillingClusterStatsFromQueryCluster() throws IOException {
+        return getFulfillingClusterStatsFromQueryCluster(false);
+    }
+
+    private Map<String, Object> getFulfillingClusterStatsFromQueryCluster(boolean humanDebug) throws IOException {
+        Request stats = new Request("GET", "_cluster/stats?include_remotes=true&filter_path=ccs");
+        Response statsResponse = performRequestWithRemoteSearchUser(stats);
+        if (humanDebug) {
+            debugResponse(statsResponse);
+        }
+        return entityAsMap(statsResponse.getEntity());
+    }
+
+    private void searchFulfillingClusterFromQueryCluster(String index, boolean humanDebug) throws IOException {
+        final var searchRequest = new Request(
+            "GET",
+            String.format(
+                Locale.ROOT,
+                "/%s:%s/_search?ccs_minimize_roundtrips=%s",
+                randomFrom("my_remote_cluster", "*", "my_remote_*"),
+                index,
+                randomBoolean()
+            )
+        );
+        Response response = performRequestWithRemoteSearchUser(searchRequest);
+        if (humanDebug) {
+            debugResponse(response);
+        }
+        assertOK(response);
+        final SearchResponse searchResponse = SearchResponseUtils.parseSearchResponse(responseAsParser(response));
+        try {
+            final List<String> actualIndices = Arrays.stream(searchResponse.getHits().getHits())
+                .map(SearchHit::getIndex)
+                .collect(Collectors.toList());
+            assertThat(actualIndices, containsInAnyOrder(index));
+
+        } finally {
+            searchResponse.decRef();
+        }
+    }
+
+    private void searchFulfillingClusterFromQueryCluster(String index) throws IOException {
+        searchFulfillingClusterFromQueryCluster(index, false);
+    }
+
+    private void addDocToIndexFulfillingCluster(String index) throws IOException {
+        // Index some documents, so we can attempt to search them from the querying cluster
+        final Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
+        bulkRequest.setJsonEntity(Strings.format("""
+            { "index": { "_index": "%s" } }
+            { "foo": "bar" }
+            """, index));
+        assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
+    }
+
+    private void setupRoleAndUserQueryCluster() throws IOException {
+        final var putRoleRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
+        putRoleRequest.setJsonEntity("""
+            {
+              "description": "Role with privileges for remote indices and stats.",
+              "cluster": ["monitor_stats"],
+              "remote_indices": [
+                {
+                  "names": ["*"],
+                  "privileges": ["read", "read_cross_cluster"],
+                  "clusters": ["*"]
+                }
+              ],
+              "remote_cluster": [
+               {
+                  "privileges": ["monitor_stats"],
+                   "clusters": ["*"]
+               }
+              ]
+            }""");
+        assertOK(adminClient().performRequest(putRoleRequest));
+        final var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
+        putUserRequest.setJsonEntity("""
+            {
+              "password": "x-pack-test-password",
+              "roles" : ["remote_search"]
+            }""");
+        assertOK(adminClient().performRequest(putUserRequest));
+    }
+
+    private Response performRequestWithRemoteSearchUser(final Request request) throws IOException {
+        request.setOptions(
+            RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(REMOTE_SEARCH_USER, PASS))
+        );
+        return client().performRequest(request);
+    }
+
+    // helper method for humans see the responses for debug purposes, when used will always fail the test
+    private void debugResponse(Response response) throws IOException {
+        String jsonString = XContentHelper.convertToJson(
+            new BytesArray(EntityUtils.toString(response.getEntity())),
+            true,
+            true,
+            XContentType.JSON
+        );
+        logger.error(jsonString);
+        assertFalse(true); // boom
+    }
+}

+ 2 - 2
x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/apikey/ApiKeyRestIT.java

@@ -828,7 +828,7 @@ public class ApiKeyRestIT extends SecurityOnTrialLicenseRestTestCase {
         assertOK(response);
         assertAPIKeyWithRemoteClusterPermissions(apiKeyId, includeRemoteCluster, false, null, new String[] { "foo", "bar" });
 
-        // create API key as the remote user which does remote_cluster limited_by permissions
+        // create API key as the remote user which has all remote_cluster permissions via limited_by
         response = sendRequestAsRemoteUser(createApiKeyRequest);
         apiKeyId = ObjectPath.createFromResponse(response).evaluate("id");
         assertThat(apiKeyId, notNullValue());
@@ -922,7 +922,7 @@ public class ApiKeyRestIT extends SecurityOnTrialLicenseRestTestCase {
             assertNotNull(limitedByRole);
 
             List<Map<String, List<String>>> remoteCluster = (List<Map<String, List<String>>>) limitedByRole.get("remote_cluster");
-            assertThat(remoteCluster.get(0).get("privileges"), containsInAnyOrder("monitor_enrich"));
+            assertThat(remoteCluster.get(0).get("privileges"), containsInAnyOrder("monitor_stats", "monitor_enrich"));
             assertThat(remoteCluster.get(0).get("clusters"), containsInAnyOrder("remote"));
         } else {
             // no limited by permissions

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java

@@ -512,7 +512,7 @@ public class CompositeRolesStore {
             );
         });
 
-        if (remoteClusterPermissions.hasPrivileges()) {
+        if (remoteClusterPermissions.hasAnyPrivileges()) {
             builder.addRemoteClusterPermissions(remoteClusterPermissions);
         } else {
             builder.addRemoteClusterPermissions(RemoteClusterPermissions.NONE);

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/RoleDescriptorStore.java

@@ -150,7 +150,7 @@ public class RoleDescriptorStore implements RoleReferenceResolver {
                     + "but other privileges found for subject ["
                     + crossClusterAccessRoleReference.getUserPrincipal()
                     + "]";
-                logger.debug("{}. Invalid role descriptor: [{}]", message, roleDescriptor);
+                logger.warn("{}. Invalid role descriptor: [{}]", message, roleDescriptor);
                 listener.onFailure(new IllegalArgumentException(message));
                 return;
             }

+ 29 - 21
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java

@@ -92,6 +92,7 @@ import org.elasticsearch.xpack.core.security.authz.permission.SimpleRole;
 import org.elasticsearch.xpack.core.security.authz.privilege.ApplicationPrivilege;
 import org.elasticsearch.xpack.core.security.authz.privilege.ApplicationPrivilegeDescriptor;
 import org.elasticsearch.xpack.core.security.authz.privilege.ApplicationPrivilegeTests;
+import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
 import org.elasticsearch.xpack.core.security.authz.privilege.ConfigurableClusterPrivileges.ManageApplicationPrivileges;
 import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege;
 import org.elasticsearch.xpack.core.security.authz.privilege.Privilege;
@@ -1312,10 +1313,7 @@ public class RBACEngineTests extends ESTestCase {
             )
             .addRemoteClusterPermissions(
                 new RemoteClusterPermissions().addGroup(
-                    new RemoteClusterPermissionGroup(
-                        RemoteClusterPermissions.getSupportedRemoteClusterPermissions().toArray(new String[0]),
-                        new String[] { "remote-1" }
-                    )
+                    new RemoteClusterPermissionGroup(new String[] { "monitor_enrich" }, new String[] { "remote-1" })
                 )
                     .addGroup(
                         new RemoteClusterPermissionGroup(
@@ -1383,26 +1381,33 @@ public class RBACEngineTests extends ESTestCase {
 
         RemoteClusterPermissions remoteClusterPermissions = response.getRemoteClusterPermissions();
         String[] allRemoteClusterPermissions = RemoteClusterPermissions.getSupportedRemoteClusterPermissions().toArray(new String[0]);
-        assert allRemoteClusterPermissions.length == 1
-            : "if more remote cluster permissions are added this test needs to be updated to ensure the correct remotes receive the "
-                + "correct permissions. ";
-        // 2 groups with 3 aliases
+
         assertThat(response.getRemoteClusterPermissions().groups(), iterableWithSize(2));
-        assertEquals(
-            3,
-            response.getRemoteClusterPermissions()
-                .groups()
-                .stream()
-                .map(RemoteClusterPermissionGroup::remoteClusterAliases)
-                .flatMap(Arrays::stream)
-                .distinct()
-                .count()
+        // remote-1 has monitor_enrich permission
+        // remote-2 and remote-3 have all permissions
+        assertThat(
+            response.getRemoteClusterPermissions().groups(),
+            containsInAnyOrder(
+                new RemoteClusterPermissionGroup(new String[] { "monitor_enrich" }, new String[] { "remote-1" }),
+                new RemoteClusterPermissionGroup(allRemoteClusterPermissions, new String[] { "remote-2", "remote-3" })
+            )
+        );
+
+        // ensure that all permissions are valid for the current transport version
+        assertThat(
+            Arrays.asList(remoteClusterPermissions.collapseAndRemoveUnsupportedPrivileges("remote-1", TransportVersion.current())),
+            hasItem("monitor_enrich")
         );
 
         for (String permission : RemoteClusterPermissions.getSupportedRemoteClusterPermissions()) {
-            assertThat(Arrays.asList(remoteClusterPermissions.privilegeNames("remote-1", TransportVersion.current())), hasItem(permission));
-            assertThat(Arrays.asList(remoteClusterPermissions.privilegeNames("remote-2", TransportVersion.current())), hasItem(permission));
-            assertThat(Arrays.asList(remoteClusterPermissions.privilegeNames("remote-3", TransportVersion.current())), hasItem(permission));
+            assertThat(
+                Arrays.asList(remoteClusterPermissions.collapseAndRemoveUnsupportedPrivileges("remote-2", TransportVersion.current())),
+                hasItem(permission)
+            );
+            assertThat(
+                Arrays.asList(remoteClusterPermissions.collapseAndRemoveUnsupportedPrivileges("remote-3", TransportVersion.current())),
+                hasItem(permission)
+            );
         }
     }
 
@@ -1782,7 +1787,10 @@ public class RBACEngineTests extends ESTestCase {
                     new RoleDescriptorsIntersection(
                         new RoleDescriptor(
                             Role.REMOTE_USER_ROLE_NAME,
-                            null,
+                            RemoteClusterPermissions.getSupportedRemoteClusterPermissions()
+                                .stream()
+                                .filter(s -> s.equals(ClusterPrivilegeResolver.MONITOR_STATS.name()))
+                                .toArray(String[]::new),
                             new IndicesPrivileges[] {
                                 IndicesPrivileges.builder().indices(".monitoring-*").privileges("read", "read_cross_cluster").build(),
                                 IndicesPrivileges.builder().indices("apm-*").privileges("read", "read_cross_cluster").build(),

+ 5 - 5
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java

@@ -1077,7 +1077,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         assertHasRemoteIndexGroupsForClusters(forRemote, Set.of("*"), indexGroup("remote-idx-2-*"));
         assertValidRemoteClusterPermissions(role.remoteCluster(), new String[] { "remote-*" });
         assertThat(
-            role.remoteCluster().privilegeNames("remote-foobar", TransportVersion.current()),
+            role.remoteCluster().collapseAndRemoveUnsupportedPrivileges("remote-foobar", TransportVersion.current()),
             equalTo(RemoteClusterPermissions.getSupportedRemoteClusterPermissions().toArray(new String[0]))
         );
     }
@@ -3196,12 +3196,12 @@ public class CompositeRolesStoreTests extends ESTestCase {
     }
 
     private void assertValidRemoteClusterPermissionsParent(RemoteClusterPermissions permissions, String[] aliases) {
-        assertTrue(permissions.hasPrivileges());
+        assertTrue(permissions.hasAnyPrivileges());
         for (String alias : aliases) {
-            assertTrue(permissions.hasPrivileges(alias));
-            assertFalse(permissions.hasPrivileges(randomValueOtherThan(alias, () -> randomAlphaOfLength(5))));
+            assertTrue(permissions.hasAnyPrivileges(alias));
+            assertFalse(permissions.hasAnyPrivileges(randomValueOtherThan(alias, () -> randomAlphaOfLength(5))));
             assertThat(
-                permissions.privilegeNames(alias, TransportVersion.current()),
+                permissions.collapseAndRemoveUnsupportedPrivileges(alias, TransportVersion.current()),
                 arrayContaining(RemoteClusterPermissions.getSupportedRemoteClusterPermissions().toArray(new String[0]))
             );
         }

+ 2 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/FileRolesStoreTests.java

@@ -388,7 +388,8 @@ public class FileRolesStoreTests extends ESTestCase {
             events.get(4),
             startsWith(
                 "failed to parse remote_cluster for role [invalid_role_bad_priv_remote_clusters]. "
-                    + "[monitor_enrich] is the only value allowed for [privileges] within [remote_cluster]. skipping role..."
+                    + "[monitor_enrich, monitor_stats] are the only values allowed for [privileges] within [remote_cluster]. "
+                    + "Found [junk]. skipping role..."
             )
         );
     }

+ 2 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/user/RestGetUserPrivilegesActionTests.java

@@ -213,7 +213,7 @@ public class RestGetUserPrivilegesActionTests extends ESTestCase {
             ,"remote_cluster":[
                     {
                        "privileges":[
-                          "monitor_enrich"
+                          "monitor_enrich", "monitor_stats"
                        ],
                        "clusters":[
                           "remote-1"
@@ -221,7 +221,7 @@ public class RestGetUserPrivilegesActionTests extends ESTestCase {
                     },
                     {
                        "privileges":[
-                          "monitor_enrich"
+                          "monitor_enrich", "monitor_stats"
                        ],
                        "clusters":[
                           "remote-2",

+ 1 - 1
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/privileges/11_builtin.yml

@@ -15,5 +15,5 @@ setup:
   # This is fragile - it needs to be updated every time we add a new cluster/index privilege
   # I would much prefer we could just check that specific entries are in the array, but we don't have
   # an assertion for that
-  - length: { "cluster" : 61 }
+  - length: { "cluster" : 62 }
   - length: { "index" : 22 }