Browse Source

Enforce API key type on authentication (#95908)

The PR adds enforcement for API key type at authentication time.
Concretely, new cross-cluster API keys (#95714) can only be used on the
dedicated remote cluster interface and the existing (rest) API key must
not be used for new remote cluster communication. To make cross-cluster
API keys actually usable after authentication, the PR also adds support
for resolving their roles.
Yang Wang 2 years ago
parent
commit
86f37160ae
36 changed files with 709 additions and 432 deletions
  1. 3 7
      qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/RcsCcsCommonYamlTestSuiteIT.java
  2. 19 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKey.java
  3. 14 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/CreateCrossClusterApiKeyRequest.java
  4. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/AuthenticationField.java
  5. 36 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/Subject.java
  6. 22 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKeyTests.java
  7. 21 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authc/AuthenticationTestHelper.java
  8. 72 91
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authc/SubjectTests.java
  9. 2 2
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityDlsAndFlsRestIT.java
  10. 14 10
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityTestCase.java
  11. 4 2
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityWithMultipleRemotesRestIT.java
  12. 2 6
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java
  13. 2 1
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityBwcRestIT.java
  14. 8 32
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityCcrIT.java
  15. 43 34
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityFcActionAuthorizationIT.java
  16. 2 6
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityLicensingAndFeatureUsageRestIT.java
  17. 2 6
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityMlIT.java
  18. 56 10
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java
  19. 3 6
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecuritySpecialUserIT.java
  20. 2 6
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityTransformIT.java
  21. 4 14
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithDlsAndFlsRestIT.java
  22. 4 14
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithDlsRestIT.java
  23. 4 14
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithFlsRestIT.java
  24. 2 6
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithMixedModelRemotesRestIT.java
  25. 4 12
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithSameModelRemotesRestIT.java
  26. 4 8
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithoutDlsAndFlsRestIT.java
  27. 33 0
      x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/apikey/ApiKeyRestIT.java
  28. 22 12
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/apikey/ApiKeySingleNodeTests.java
  29. 5 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyAuthenticator.java
  30. 52 11
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java
  31. 4 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/CrossClusterAccessAuthenticationService.java
  32. 2 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/CrossClusterAccessHeaders.java
  33. 193 84
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java
  34. 29 8
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticatorChainTests.java
  35. 13 20
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java
  36. 6 4
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/crossclusteraccess/CrossClusterAccessAuthenticationServiceIntegTests.java

+ 3 - 7
qa/ccs-common-rest/src/yamlRestTest/java/org/elasticsearch/test/rest/yaml/RcsCcsCommonYamlTestSuiteIT.java

@@ -119,21 +119,17 @@ public class RcsCcsCommonYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
 
     private static Map<String, Object> createCrossClusterAccessApiKey() throws IOException {
         assert fulfillingCluster != null;
-        final var createApiKeyRequest = new Request("POST", "/_security/api_key");
+        final var createApiKeyRequest = new Request("POST", "/_security/cross_cluster/api_key");
         createApiKeyRequest.setJsonEntity("""
             {
               "name": "cross_cluster_access_key",
-              "role_descriptors": {
-                "role": {
-                  "cluster": ["cross_cluster_search"],
-                  "index": [
+              "access": {
+                  "search": [
                     {
                       "names": ["*"],
-                      "privileges": ["read", "read_cross_cluster"],
                       "allow_restricted_indices": true
                     }
                   ]
-                }
               }
             }""");
         createApiKeyRequest.setOptions(

+ 19 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKey.java

@@ -11,6 +11,7 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.XContentParserUtils;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ObjectParser;
@@ -28,6 +29,8 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
 import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
@@ -51,10 +54,25 @@ public final class ApiKey implements ToXContentObject, Writeable {
             return switch (value.toLowerCase(Locale.ROOT)) {
                 case "rest" -> REST;
                 case "cross_cluster" -> CROSS_CLUSTER;
-                default -> throw new IllegalArgumentException("unknown API key type [" + value + "]");
+                default -> throw new IllegalArgumentException(
+                    "invalid API key type ["
+                        + value
+                        + "] expected one of ["
+                        + Stream.of(values()).map(Type::value).collect(Collectors.joining(","))
+                        + "]"
+                );
             };
         }
 
+        public static Type fromXContent(XContentParser parser) throws IOException {
+            XContentParser.Token token = parser.currentToken();
+            if (token == null) {
+                token = parser.nextToken();
+            }
+            XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, token, parser);
+            return parse(parser.text());
+        }
+
         public String value() {
             return name().toLowerCase(Locale.ROOT);
         }

+ 14 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/CreateCrossClusterApiKeyRequest.java

@@ -14,6 +14,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.core.security.action.role.RoleDescriptorRequestValidator;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 
@@ -95,4 +97,16 @@ public final class CreateCrossClusterApiKeyRequest extends AbstractCreateApiKeyR
     public int hashCode() {
         return Objects.hash(id, name, expiration, metadata, roleDescriptors, refreshPolicy);
     }
+
+    public static CreateCrossClusterApiKeyRequest withNameAndAccess(String name, String access) throws IOException {
+        return new CreateCrossClusterApiKeyRequest(
+            name,
+            CrossClusterApiKeyRoleDescriptorBuilder.PARSER.parse(
+                JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, access),
+                null
+            ),
+            null,
+            null
+        );
+    }
 }

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/AuthenticationField.java

@@ -20,6 +20,7 @@ public final class AuthenticationField {
     public static final String API_KEY_CREATOR_REALM_TYPE = "_security_api_key_creator_realm_type";
     public static final String API_KEY_ID_KEY = "_security_api_key_id";
     public static final String API_KEY_NAME_KEY = "_security_api_key_name";
+    public static final String API_KEY_TYPE_KEY = "_security_api_key_type";
     public static final String API_KEY_METADATA_KEY = "_security_api_key_metadata";
     public static final String API_KEY_ROLE_DESCRIPTORS_KEY = "_security_api_key_role_descriptors";
     public static final String API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY = "_security_api_key_limited_by_role_descriptors";

+ 36 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/Subject.java

@@ -13,6 +13,7 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.util.ArrayUtils;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo.RoleDescriptorsBytes;
 import org.elasticsearch.xpack.core.security.authc.service.ServiceAccountSettings;
 import org.elasticsearch.xpack.core.security.authz.store.RoleReference;
@@ -26,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.transport.RemoteClusterPortSettings.TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY_CCR;
 import static org.elasticsearch.xpack.core.security.authc.Authentication.VERSION_API_KEY_ROLES_AS_BYTES;
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY;
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY;
@@ -246,11 +248,14 @@ public class Subject {
             return buildRolesReferenceForApiKeyBwc();
         }
         final String apiKeyId = (String) metadata.get(AuthenticationField.API_KEY_ID_KEY);
+        assert ApiKey.Type.REST == getApiKeyType() : "only a REST API key should have its role built here";
+
         final BytesReference roleDescriptorsBytes = (BytesReference) metadata.get(API_KEY_ROLE_DESCRIPTORS_KEY);
         final BytesReference limitedByRoleDescriptorsBytes = getLimitedByRoleDescriptorsBytes();
         if (roleDescriptorsBytes == null && limitedByRoleDescriptorsBytes == null) {
             throw new ElasticsearchSecurityException("no role descriptors found for API key");
         }
+
         final RoleReference.ApiKeyRoleReference limitedByRoleReference = new RoleReference.ApiKeyRoleReference(
             apiKeyId,
             limitedByRoleDescriptorsBytes,
@@ -265,7 +270,23 @@ public class Subject {
         );
     }
 
+    // Package private for testing
+    RoleReference.ApiKeyRoleReference buildRoleReferenceForCrossClusterApiKey() {
+        assert version.onOrAfter(TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY_CCR);
+        final String apiKeyId = (String) metadata.get(AuthenticationField.API_KEY_ID_KEY);
+        assert ApiKey.Type.CROSS_CLUSTER == getApiKeyType() : "cross cluster access must use cross-cluster API keys";
+        final BytesReference roleDescriptorsBytes = (BytesReference) metadata.get(API_KEY_ROLE_DESCRIPTORS_KEY);
+        if (roleDescriptorsBytes == null) {
+            throw new ElasticsearchSecurityException("no role descriptors found for API key");
+        }
+        final BytesReference limitedByRoleDescriptorsBytes = (BytesReference) metadata.get(API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY);
+        assert isEmptyRoleDescriptorsBytes(limitedByRoleDescriptorsBytes)
+            : "cross cluster API keys must have empty limited-by role descriptors";
+        return new RoleReference.ApiKeyRoleReference(apiKeyId, roleDescriptorsBytes, RoleReference.ApiKeyRoleType.ASSIGNED);
+    }
+
     private RoleReferenceIntersection buildRoleReferencesForCrossClusterAccess() {
+        assert ApiKey.Type.CROSS_CLUSTER == getApiKeyType() : "cross cluster access must use cross-cluster API keys";
         final List<RoleReference> roleReferences = new ArrayList<>(4);
         @SuppressWarnings("unchecked")
         final var crossClusterAccessRoleDescriptorsBytes = (List<RoleDescriptorsBytes>) metadata.get(
@@ -292,7 +313,7 @@ public class Subject {
                 roleReferences.add(new RoleReference.CrossClusterAccessRoleReference(innerUser.principal(), roleDescriptorsBytes));
             }
         }
-        roleReferences.addAll(buildRoleReferencesForApiKey().getRoleReferences());
+        roleReferences.add(buildRoleReferenceForCrossClusterApiKey());
         return new RoleReferenceIntersection(List.copyOf(roleReferences));
     }
 
@@ -342,6 +363,8 @@ public class Subject {
     );
 
     private BytesReference getLimitedByRoleDescriptorsBytes() {
+        assert ApiKey.Type.REST == getApiKeyType()
+            : "bug fixing for fleet-server limited-by role descriptors applies only to REST API keys";
         final BytesReference bytesReference = (BytesReference) metadata.get(API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY);
         // Unfortunate BWC bug fix code
         if (bytesReference.length() == 2 && "{}".equals(bytesReference.utf8ToString())) {
@@ -352,4 +375,16 @@ public class Subject {
         }
         return bytesReference;
     }
+
+    private ApiKey.Type getApiKeyType() {
+        final String typeString = (String) metadata.get(AuthenticationField.API_KEY_TYPE_KEY);
+        assert (typeString != null) || version.before(TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY_CCR)
+            : "API key type must be non-null except for versions older than " + TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY_CCR;
+
+        // A null type string can only be for the REST type because it is not possible to
+        // create cross-cluster API keys for mixed cluster with old nodes.
+        // It is also not possible to send such an API key to an old node because it can only be
+        // used via the dedicated remote cluster port which means the node must be of a newer version.
+        return typeString == null ? ApiKey.Type.REST : ApiKey.Type.parse(typeString);
+    }
 }

+ 22 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKeyTests.java

@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import static org.elasticsearch.xpack.core.security.authz.RoleDescriptorTests.randomUniquelyNamedRoleDescriptors;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.is;
@@ -108,6 +109,27 @@ public class ApiKeyTests extends ESTestCase {
         }
     }
 
+    public void testParseApiKeyType() throws IOException {
+        assertThat(parseTypeString(randomFrom("rest", "REST", "Rest")), is(ApiKey.Type.REST));
+        assertThat(parseTypeString(randomFrom("cross_cluster", "CROSS_CLUSTER", "Cross_Cluster")), is(ApiKey.Type.CROSS_CLUSTER));
+
+        final IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> parseTypeString(randomAlphaOfLengthBetween(3, 20))
+        );
+        assertThat(e.getMessage(), containsString("invalid API key type"));
+    }
+
+    private ApiKey.Type parseTypeString(String typeString) throws IOException {
+        if (randomBoolean()) {
+            return ApiKey.Type.parse(typeString);
+        } else {
+            return ApiKey.Type.fromXContent(
+                JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, "\"" + typeString + "\"")
+            );
+        }
+    }
+
     @SuppressWarnings("unchecked")
     public static Map<String, Object> randomMetadata() {
         return randomFrom(

+ 21 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authc/AuthenticationTestHelper.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.action.service.TokenInfo;
 import org.elasticsearch.xpack.core.security.authc.Authentication.AuthenticationType;
 import org.elasticsearch.xpack.core.security.authc.esnative.NativeRealmSettings;
@@ -381,6 +382,25 @@ public class AuthenticationTestHelper {
             realmRef = null;
             candidateAuthenticationTypes = EnumSet.of(AuthenticationType.API_KEY);
             metadata.put(AuthenticationField.API_KEY_ID_KEY, Objects.requireNonNull(apiKeyId));
+            metadata.put(AuthenticationField.API_KEY_TYPE_KEY, ApiKey.Type.REST.value());
+            return this;
+        }
+
+        public AuthenticationTestBuilder crossClusterApiKey(String apiKeyId) {
+            apiKey(apiKeyId);
+            candidateAuthenticationTypes = EnumSet.of(AuthenticationType.API_KEY);
+            metadata.put(AuthenticationField.API_KEY_TYPE_KEY, ApiKey.Type.CROSS_CLUSTER.value());
+            metadata.put(AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY, new BytesArray("""
+                {
+                  "cross_cluster": {
+                    "cluster": ["cross_cluster_search", "cross_cluster_replication"],
+                    "indices": [
+                      { "names":["logs*"], "privileges":["read","read_cross_cluster","view_index_metadata"] },
+                      { "names":["archive*"],"privileges":["cross_cluster_replication","cross_cluster_replication_internal"] }
+                    ]
+                  }
+                }"""));
+            metadata.put(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY, new BytesArray("{}"));
             return this;
         }
 
@@ -435,7 +455,7 @@ public class AuthenticationTestHelper {
             if (authenticatingAuthentication != null) {
                 throw new IllegalArgumentException("cannot use cross cluster access authentication as run-as target");
             }
-            apiKey(crossClusterAccessApiKeyId);
+            crossClusterApiKey(crossClusterAccessApiKeyId);
             this.crossClusterAccessSubjectInfo = Objects.requireNonNull(crossClusterAccessSubjectInfo);
             return this;
         }

+ 72 - 91
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authc/SubjectTests.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.ArrayUtils;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.TransportVersionUtils;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.authc.service.ServiceAccountSettings;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptorsIntersection;
 import org.elasticsearch.xpack.core.security.authz.store.RoleKey;
@@ -112,11 +113,12 @@ public class SubjectTests extends ESTestCase {
         assertThat(serviceAccountRoleReference.getPrincipal(), equalTo(serviceUser.principal()));
     }
 
-    public void testGetRoleReferencesForApiKey() {
+    public void testGetRoleReferencesForRestApiKey() {
         Map<String, Object> authMetadata = new HashMap<>();
         final String apiKeyId = randomAlphaOfLength(12);
         authMetadata.put(AuthenticationField.API_KEY_ID_KEY, apiKeyId);
         authMetadata.put(AuthenticationField.API_KEY_NAME_KEY, randomBoolean() ? null : randomAlphaOfLength(12));
+        authMetadata.put(AuthenticationField.API_KEY_TYPE_KEY, ApiKey.Type.REST.value());
         final BytesReference roleBytes = new BytesArray("{\"a role\": {\"cluster\": [\"all\"]}}");
         final BytesReference limitedByRoleBytes = new BytesArray("{\"limitedBy role\": {\"cluster\": [\"all\"]}}");
 
@@ -154,23 +156,54 @@ public class SubjectTests extends ESTestCase {
         }
     }
 
-    public void testGetRoleReferencesForCrossClusterAccess() {
+    public void testBuildRoleReferenceForCrossClusterApiKey() {
         Map<String, Object> authMetadata = new HashMap<>();
         final String apiKeyId = randomAlphaOfLength(12);
         authMetadata.put(AuthenticationField.API_KEY_ID_KEY, apiKeyId);
         authMetadata.put(AuthenticationField.API_KEY_NAME_KEY, randomBoolean() ? null : randomAlphaOfLength(12));
+        authMetadata.put(AuthenticationField.API_KEY_TYPE_KEY, ApiKey.Type.CROSS_CLUSTER.value());
         final BytesReference roleBytes = new BytesArray("""
-            {"role":{"indices":[{"names":["index*"],"privileges":["read"]}]}}""");
-        final BytesReference limitedByRoleBytes = new BytesArray("""
-            {"limited-by-role":{"indices":[{"names":["*"],"privileges":["all"]}]}}""");
-
-        final boolean emptyRoleBytes = randomBoolean();
+            {
+              "cross_cluster": {
+                "cluster": ["cross_cluster_search"],
+                "indices": [
+                  { "names":["index"], "privileges":["read","read_cross_cluster","view_index_metadata"] }
+                ]
+              }
+            }""");
+        authMetadata.put(AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY, roleBytes);
+        authMetadata.put(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY, new BytesArray("{}"));
 
-        authMetadata.put(
-            AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY,
-            emptyRoleBytes ? randomFrom(Arrays.asList(null, new BytesArray("{}"))) : roleBytes
+        final Subject subject = new Subject(
+            new User("joe"),
+            new Authentication.RealmRef(API_KEY_REALM_NAME, API_KEY_REALM_TYPE, "node"),
+            TransportVersion.CURRENT,
+            authMetadata
         );
-        authMetadata.put(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY, limitedByRoleBytes);
+
+        final ApiKeyRoleReference roleReference = subject.buildRoleReferenceForCrossClusterApiKey();
+        assertThat(roleReference.getApiKeyId(), equalTo(apiKeyId));
+        assertThat(roleReference.getRoleDescriptorsBytes(), equalTo(authMetadata.get(API_KEY_ROLE_DESCRIPTORS_KEY)));
+    }
+
+    public void testGetRoleReferencesForCrossClusterAccess() {
+        Map<String, Object> authMetadata = new HashMap<>();
+        final String apiKeyId = randomAlphaOfLength(12);
+        authMetadata.put(AuthenticationField.API_KEY_ID_KEY, apiKeyId);
+        authMetadata.put(AuthenticationField.API_KEY_NAME_KEY, randomBoolean() ? null : randomAlphaOfLength(12));
+        authMetadata.put(AuthenticationField.API_KEY_TYPE_KEY, ApiKey.Type.CROSS_CLUSTER.value());
+        final BytesReference roleBytes = new BytesArray("""
+            {
+              "cross_cluster": {
+                "cluster": ["cross_cluster_replication"],
+                "indices": [
+                  { "names":["index*"],"privileges":["cross_cluster_replication","cross_cluster_replication_internal"] }
+                ]
+              }
+            }""");
+
+        authMetadata.put(AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY, roleBytes);
+        authMetadata.put(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY, new BytesArray("{}"));
 
         final CrossClusterAccessSubjectInfo crossClusterAccessSubjectInfo = randomBoolean()
             ? AuthenticationTestHelper.randomCrossClusterAccessSubjectInfo(RoleDescriptorsIntersection.EMPTY)
@@ -193,89 +226,35 @@ public class SubjectTests extends ESTestCase {
         final int numberOfRemoteRoleDescriptors = crossClusterAccessSubjectInfo.getRoleDescriptorsBytesList().size();
         assertThat(numberOfRemoteRoleDescriptors, anyOf(equalTo(0), equalTo(1), equalTo(2)));
         final List<RoleReference> roleReferences = roleReferenceIntersection.getRoleReferences();
-        if (emptyRoleBytes) {
-            if (numberOfRemoteRoleDescriptors == 2) {
-                // Two role references means we can't have a FixedRoleReference
-                assertThat(
-                    roleReferences,
-                    contains(
-                        isA(CrossClusterAccessRoleReference.class),
-                        isA(CrossClusterAccessRoleReference.class),
-                        isA(ApiKeyRoleReference.class)
-                    )
-                );
-
-                expectCrossClusterAccessReferenceAtIndex(0, roleReferences, crossClusterAccessSubjectInfo);
-                expectCrossClusterAccessReferenceAtIndex(1, roleReferences, crossClusterAccessSubjectInfo);
-
-                final ApiKeyRoleReference roleReference = (ApiKeyRoleReference) roleReferences.get(2);
-                assertThat(roleReference.getApiKeyId(), equalTo(apiKeyId));
-                assertThat(roleReference.getRoleDescriptorsBytes(), equalTo(authMetadata.get(API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY)));
-            } else {
-                if (isInternalUser) {
-                    assertThat(roleReferences, contains(isA(FixedRoleReference.class), isA(ApiKeyRoleReference.class)));
-                    expectFixedReferenceAtIndex(0, roleReferences);
-                } else {
-                    assertThat(roleReferences, contains(isA(CrossClusterAccessRoleReference.class), isA(ApiKeyRoleReference.class)));
-                    expectCrossClusterAccessReferenceAtIndex(0, roleReferences, crossClusterAccessSubjectInfo);
-                }
-
-                final ApiKeyRoleReference roleReference = (ApiKeyRoleReference) roleReferences.get(1);
-                assertThat(roleReference.getApiKeyId(), equalTo(apiKeyId));
-                assertThat(roleReference.getRoleDescriptorsBytes(), equalTo(authMetadata.get(API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY)));
-            }
+        if (numberOfRemoteRoleDescriptors == 2) {
+            // Two role references means we can't have a FixedRoleReference
+            assertThat(
+                roleReferences,
+                contains(
+                    isA(CrossClusterAccessRoleReference.class),
+                    isA(CrossClusterAccessRoleReference.class),
+                    isA(ApiKeyRoleReference.class)
+                )
+            );
+
+            expectCrossClusterAccessReferenceAtIndex(0, roleReferences, crossClusterAccessSubjectInfo);
+            expectCrossClusterAccessReferenceAtIndex(1, roleReferences, crossClusterAccessSubjectInfo);
+
+            final ApiKeyRoleReference roleReference = (ApiKeyRoleReference) roleReferences.get(2);
+            assertThat(roleReference.getApiKeyId(), equalTo(apiKeyId));
+            assertThat(roleReference.getRoleDescriptorsBytes(), equalTo(authMetadata.get(API_KEY_ROLE_DESCRIPTORS_KEY)));
         } else {
-            if (numberOfRemoteRoleDescriptors == 2) {
-                // Two role references means we can't have a FixedRoleReference
-                assertThat(
-                    roleReferences,
-                    contains(
-                        isA(CrossClusterAccessRoleReference.class),
-                        isA(CrossClusterAccessRoleReference.class),
-                        isA(ApiKeyRoleReference.class),
-                        isA(ApiKeyRoleReference.class)
-                    )
-                );
-
-                expectCrossClusterAccessReferenceAtIndex(0, roleReferences, crossClusterAccessSubjectInfo);
-                expectCrossClusterAccessReferenceAtIndex(1, roleReferences, crossClusterAccessSubjectInfo);
-
-                final ApiKeyRoleReference roleReference = (ApiKeyRoleReference) roleReferences.get(2);
-                assertThat(roleReference.getApiKeyId(), equalTo(apiKeyId));
-                assertThat(roleReference.getRoleDescriptorsBytes(), equalTo(authMetadata.get(API_KEY_ROLE_DESCRIPTORS_KEY)));
-
-                final ApiKeyRoleReference limitedByRoleReference = (ApiKeyRoleReference) roleReferences.get(3);
-                assertThat(limitedByRoleReference.getApiKeyId(), equalTo(apiKeyId));
-                assertThat(
-                    limitedByRoleReference.getRoleDescriptorsBytes(),
-                    equalTo(authMetadata.get(API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY))
-                );
+            if (isInternalUser) {
+                assertThat(roleReferences, contains(isA(FixedRoleReference.class), isA(ApiKeyRoleReference.class)));
+                expectFixedReferenceAtIndex(0, roleReferences);
             } else {
-                if (isInternalUser) {
-                    assertThat(
-                        roleReferences,
-                        contains(isA(FixedRoleReference.class), isA(ApiKeyRoleReference.class), isA(ApiKeyRoleReference.class))
-                    );
-                    expectFixedReferenceAtIndex(0, roleReferences);
-                } else {
-                    assertThat(
-                        roleReferences,
-                        contains(isA(CrossClusterAccessRoleReference.class), isA(ApiKeyRoleReference.class), isA(ApiKeyRoleReference.class))
-                    );
-                    expectCrossClusterAccessReferenceAtIndex(0, roleReferences, crossClusterAccessSubjectInfo);
-                }
-
-                final ApiKeyRoleReference roleReference = (ApiKeyRoleReference) roleReferences.get(1);
-                assertThat(roleReference.getApiKeyId(), equalTo(apiKeyId));
-                assertThat(roleReference.getRoleDescriptorsBytes(), equalTo(authMetadata.get(API_KEY_ROLE_DESCRIPTORS_KEY)));
-
-                final ApiKeyRoleReference limitedByRoleReference = (ApiKeyRoleReference) roleReferences.get(2);
-                assertThat(limitedByRoleReference.getApiKeyId(), equalTo(apiKeyId));
-                assertThat(
-                    limitedByRoleReference.getRoleDescriptorsBytes(),
-                    equalTo(authMetadata.get(API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY))
-                );
+                assertThat(roleReferences, contains(isA(CrossClusterAccessRoleReference.class), isA(ApiKeyRoleReference.class)));
+                expectCrossClusterAccessReferenceAtIndex(0, roleReferences, crossClusterAccessSubjectInfo);
             }
+
+            final ApiKeyRoleReference roleReference = (ApiKeyRoleReference) roleReferences.get(1);
+            assertThat(roleReference.getApiKeyId(), equalTo(apiKeyId));
+            assertThat(roleReference.getRoleDescriptorsBytes(), equalTo(authMetadata.get(API_KEY_ROLE_DESCRIPTORS_KEY)));
         }
     }
 
@@ -361,6 +340,8 @@ public class SubjectTests extends ESTestCase {
                 randomAlphaOfLength(20),
                 AuthenticationField.API_KEY_NAME_KEY,
                 randomAlphaOfLength(12),
+                AuthenticationField.API_KEY_TYPE_KEY,
+                ApiKey.Type.REST.value(),
                 AuthenticationField.API_KEY_ROLE_DESCRIPTORS_KEY,
                 roleBytes,
                 AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY,

+ 2 - 2
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityDlsAndFlsRestIT.java

@@ -242,9 +242,9 @@ public abstract class AbstractRemoteClusterSecurityDlsAndFlsRestIT extends Abstr
         return Tuple.tuple(apiKeyId, apiKeyEncoded);
     }
 
-    protected static String createCrossClusterAccessApiKey(String roleDescriptorsJson, AtomicReference<Map<String, Object>> apiKeyRef) {
+    protected static String createCrossClusterAccessApiKey(String accessJson, AtomicReference<Map<String, Object>> apiKeyRef) {
         if (apiKeyRef.get() == null) {
-            apiKeyRef.set(createCrossClusterAccessApiKey(roleDescriptorsJson));
+            apiKeyRef.set(createCrossClusterAccessApiKey(accessJson));
         }
         return (String) apiKeyRef.get().get("encoded");
     }

+ 14 - 10
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityTestCase.java

@@ -122,23 +122,27 @@ public abstract class AbstractRemoteClusterSecurityTestCase extends ESRestTestCa
         return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
     }
 
-    protected static Map<String, Object> createCrossClusterAccessApiKey(String roleDescriptorsJson) {
+    protected static Map<String, Object> createCrossClusterAccessApiKey(String accessJson) {
         initFulfillingClusterClient();
-        return createCrossClusterAccessApiKey(fulfillingClusterClient, roleDescriptorsJson);
+        return createCrossClusterAccessApiKey(fulfillingClusterClient, accessJson);
     }
 
-    static Map<String, Object> createCrossClusterAccessApiKey(RestClient targetClusterClient, String roleDescriptorsJson) {
+    protected static Map<String, Object> createCrossClusterAccessApiKey(RestClient targetClusterClient, String accessJson) {
+
         // Create API key on FC
-        final var createApiKeyRequest = new Request("POST", "/_security/api_key");
-        createApiKeyRequest.setJsonEntity(Strings.format("""
+        final var createCrossClusterApiKeyRequest = new Request("POST", "/_security/cross_cluster/api_key");
+        createCrossClusterApiKeyRequest.setJsonEntity(Strings.format("""
             {
               "name": "cross_cluster_access_key",
-              "role_descriptors": %s
-            }""", roleDescriptorsJson));
+              "access": %s
+            }""", accessJson));
         try {
-            final Response createApiKeyResponse = performRequestWithAdminUser(targetClusterClient, createApiKeyRequest);
-            assertOK(createApiKeyResponse);
-            return responseAsMap(createApiKeyResponse);
+            final Response createCrossClusterApiKeyResponse = performRequestWithAdminUser(
+                targetClusterClient,
+                createCrossClusterApiKeyRequest
+            );
+            assertOK(createCrossClusterApiKeyResponse);
+            return responseAsMap(createCrossClusterApiKeyResponse);
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }

+ 4 - 2
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityWithMultipleRemotesRestIT.java

@@ -67,7 +67,8 @@ public abstract class AbstractRemoteClusterSecurityWithMultipleRemotesRestIT ext
                 { "index": { "_index": "cluster1_index1" } }
                 { "name": "doc1" }
                 { "index": { "_index": "cluster1_index2" } }
-                { "name": "doc2" }\n"""));
+                { "name": "doc2" }
+                """));
             assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
         }
 
@@ -79,7 +80,8 @@ public abstract class AbstractRemoteClusterSecurityWithMultipleRemotesRestIT ext
                 { "index": { "_index": "cluster2_index1" } }
                 { "name": "doc1" }
                 { "index": { "_index": "cluster2_index2" } }
-                { "name": "doc2" }\n"""));
+                { "name": "doc2" }
+                """));
             assertOK(performRequestAgainstOtherFulfillingCluster(bulkRequest));
         }
 

+ 2 - 6
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityApiKeyRestIT.java

@@ -59,15 +59,11 @@ public class RemoteClusterSecurityApiKeyRestIT extends AbstractRemoteClusterSecu
                 if (API_KEY_MAP_REF.get() == null) {
                     final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
                         {
-                          "role": {
-                            "cluster": ["cross_cluster_search"],
-                            "index": [
+                            "search": [
                               {
-                                "names": ["index*", "not_found_index"],
-                                "privileges": ["read", "read_cross_cluster"]
+                                "names": ["index*", "not_found_index"]
                               }
                             ]
-                          }
                         }""");
                     API_KEY_MAP_REF.set(apiKeyMap);
                 }

+ 2 - 1
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityBwcRestIT.java

@@ -82,7 +82,8 @@ public class RemoteClusterSecurityBwcRestIT extends AbstractRemoteClusterSecurit
                 { "index": { "_index": "remote_index1" } }
                 { "foo": "bar" }
                 { "index": { "_index": "remote_index2" } }
-                { "bar": "foo" }\n"""));
+                { "bar": "foo" }
+                """));
             assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
         }
 

+ 8 - 32
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityCcrIT.java

@@ -22,7 +22,6 @@ import org.junit.rules.RuleChain;
 import org.junit.rules.TestRule;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -62,43 +61,20 @@ public class RemoteClusterSecurityCcrIT extends AbstractRemoteClusterSecurityTes
             .setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
             .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
             .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
-                API_KEY_MAP_REF.updateAndGet(v -> v != null ? v : createCrossClusterAccessCcrApiKey());
+                API_KEY_MAP_REF.updateAndGet(v -> v != null ? v : createCrossClusterAccessApiKey("""
+                    {
+                      "replication": [
+                        {
+                           "names": ["leader-index", "leader-alias", "metrics-*"]
+                        }
+                      ]
+                    }"""));
                 return (String) API_KEY_MAP_REF.get().get("encoded");
             })
             .user("ccr_user", PASS.toString(), "ccr_user_role")
             .build();
     }
 
-    // Create an API Key specifically for CCR access
-    private static Map<String, Object> createCrossClusterAccessCcrApiKey() {
-        initFulfillingClusterClient();
-        final var createApiKeyRequest = new Request("POST", "/_security/api_key");
-        createApiKeyRequest.setJsonEntity(Strings.format("""
-            {
-              "name": "cross_cluster_access_key",
-              "role_descriptors": {
-                "role": {
-                  "cluster": [
-                    "cross_cluster_replication"
-                  ],
-                  "index": [
-                    {
-                       "names": ["leader-index", "leader-alias", "metrics-*"],
-                       "privileges": ["cross_cluster_replication", "cross_cluster_replication_internal"]
-                    }
-                  ]
-                }
-              }
-            }"""));
-        try {
-            final Response createApiKeyResponse = performRequestWithAdminUser(fulfillingClusterClient, createApiKeyRequest);
-            assertOK(createApiKeyResponse);
-            return responseAsMap(createApiKeyResponse);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
     @ClassRule
     // Use a RuleChain to ensure that fulfilling cluster is started before query cluster
     public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);

+ 43 - 34
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityCcrFcActionAuthorizationIT.java → x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityFcActionAuthorizationIT.java

@@ -10,7 +10,9 @@ package org.elasticsearch.xpack.remotecluster;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.MockSecureSettings;
@@ -41,7 +43,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.elasticsearch.xpack.remotecluster.AbstractRemoteClusterSecurityTestCase.PASS;
 import static org.elasticsearch.xpack.remotecluster.AbstractRemoteClusterSecurityTestCase.USER;
@@ -52,9 +53,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 
-public class RemoteClusterSecurityCcrFcActionAuthorizationIT extends ESRestTestCase {
-
-    private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
+public class RemoteClusterSecurityFcActionAuthorizationIT extends ESRestTestCase {
 
     @ClassRule
     public static ElasticsearchCluster testCluster = ElasticsearchCluster.local()
@@ -72,33 +71,8 @@ public class RemoteClusterSecurityCcrFcActionAuthorizationIT extends ESRestTestC
         .user(USER, PASS.toString())
         .build();
 
-    // Create an API Key specifically for CCR access
-    private static Map<String, Object> createCrossClusterAccessCcrApiKey() {
-        return createCrossClusterAccessApiKey(adminClient(), """
-            {
-              "role": {
-                "cluster": [
-                  "cross_cluster_replication"
-                ],
-                "index": [
-                  {
-                     "names": ["leader-index*"],
-                     "privileges": ["cross_cluster_replication", "cross_cluster_replication_internal"]
-                  }
-                ]
-              }
-            }""");
-    }
-
     private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
 
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-        initClient();
-        API_KEY_MAP_REF.updateAndGet(v -> v != null ? v : createCrossClusterAccessCcrApiKey());
-    }
-
     @Override
     protected String getTestRestCluster() {
         return testCluster.getHttpAddresses();
@@ -117,6 +91,15 @@ public class RemoteClusterSecurityCcrFcActionAuthorizationIT extends ESRestTestC
     }
 
     public void testIndicesPrivilegesAreEnforcedForCcrRestoreSessionActions() throws IOException {
+        final Map<String, Object> crossClusterApiKeyMap = createCrossClusterAccessApiKey(adminClient(), """
+            {
+              "replication": [
+                {
+                   "names": ["leader-index*"]
+                }
+              ]
+            }""");
+
         final String leaderIndex1UUID;
         final String leaderIndex2UUID;
         final String privateIndexUUID;
@@ -143,7 +126,7 @@ public class RemoteClusterSecurityCcrFcActionAuthorizationIT extends ESRestTestC
         }
 
         // Simulate QC behaviours by directly connecting to the FC using a transport service
-        try (MockTransportService service = startTransport("node", threadPool)) {
+        try (MockTransportService service = startTransport("node", threadPool, (String) crossClusterApiKeyMap.get("encoded"))) {
             final RemoteClusterService remoteClusterService = service.getRemoteClusterService();
             final List<RemoteConnectionInfo> remoteConnectionInfos = remoteClusterService.getRemoteConnectionInfos().toList();
             assertThat(remoteConnectionInfos, hasSize(1));
@@ -163,7 +146,7 @@ public class RemoteClusterSecurityCcrFcActionAuthorizationIT extends ESRestTestC
                 containsString(
                     "action [indices:internal/admin/ccr/restore/session/put] towards remote cluster is unauthorized "
                         + "for user [_cross_cluster_access] with assigned roles [] authenticated by API key id ["
-                        + API_KEY_MAP_REF.get().get("id")
+                        + crossClusterApiKeyMap.get("id")
                         + "] of user [test_user] on indices [private-index], this action is granted by the index privileges "
                         + "[cross_cluster_replication_internal,all]"
                 )
@@ -271,7 +254,33 @@ public class RemoteClusterSecurityCcrFcActionAuthorizationIT extends ESRestTestC
         }
     }
 
-    private static MockTransportService startTransport(final String nodeName, final ThreadPool threadPool) {
+    public void testRestApiKeyIsNotAllowedOnRemoteClusterPort() throws IOException {
+        final var createApiKeyRequest = new Request("POST", "/_security/api_key");
+        createApiKeyRequest.setJsonEntity("""
+            {
+              "name": "rest_api_key"
+            }""");
+        final Response createApiKeyResponse = adminClient().performRequest(createApiKeyRequest);
+        assertOK(createApiKeyResponse);
+        final Map<String, Object> apiKeyMap = responseAsMap(createApiKeyResponse);
+        try (MockTransportService service = startTransport("node", threadPool, (String) apiKeyMap.get("encoded"))) {
+            final RemoteClusterService remoteClusterService = service.getRemoteClusterService();
+            final Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, "my_remote_cluster");
+
+            final ElasticsearchSecurityException e = expectThrows(
+                ElasticsearchSecurityException.class,
+                () -> remoteClusterClient.execute(RemoteClusterNodesAction.INSTANCE, RemoteClusterNodesAction.Request.INSTANCE).actionGet()
+            );
+            assertThat(
+                e.getMessage(),
+                containsString(
+                    "authentication expected API key type of [cross_cluster], but API key [" + apiKeyMap.get("id") + "] has type [rest]"
+                )
+            );
+        }
+    }
+
+    private static MockTransportService startTransport(final String nodeName, final ThreadPool threadPool, String encodedApiKey) {
         final String remoteClusterServerEndpoint = testCluster.getRemoteClusterServerEndpoint(0);
 
         final Settings.Builder builder = Settings.builder()
@@ -279,7 +288,7 @@ public class RemoteClusterSecurityCcrFcActionAuthorizationIT extends ESRestTestC
             .put("xpack.security.remote_cluster_client.ssl.enabled", "false");
 
         final MockSecureSettings secureSettings = new MockSecureSettings();
-        secureSettings.setString("cluster.remote.my_remote_cluster.credentials", (String) API_KEY_MAP_REF.get().get("encoded"));
+        secureSettings.setString("cluster.remote.my_remote_cluster.credentials", encodedApiKey);
         builder.setSecureSettings(secureSettings);
         if (randomBoolean()) {
             builder.put("cluster.remote.my_remote_cluster.mode", "sniff")
@@ -302,7 +311,7 @@ public class RemoteClusterSecurityCcrFcActionAuthorizationIT extends ESRestTestC
                 final ThreadContext threadContext = threadPool.getThreadContext();
                 try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
                     new CrossClusterAccessHeaders(
-                        "ApiKey " + API_KEY_MAP_REF.get().get("encoded"),
+                        "ApiKey " + encodedApiKey,
                         CrossClusterAccessUser.subjectInfo(TransportVersion.CURRENT, nodeName)
                     ).writeToContext(threadContext);
                     connection.sendRequest(requestId, action, request, options);

+ 2 - 6
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityLicensingAndFeatureUsageRestIT.java

@@ -73,15 +73,11 @@ public class RemoteClusterSecurityLicensingAndFeatureUsageRestIT extends Abstrac
                 if (API_KEY_MAP_REF.get() == null) {
                     final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey(Strings.format("""
                         {
-                          "role": {
-                            "cluster": ["cross_cluster_search"],
-                            "index": [
+                            "search": [
                               {
-                                  "names": ["%s"],
-                                  "privileges": ["read", "read_cross_cluster"]
+                                  "names": ["%s"]
                               }
                             ]
-                          }
                         }""", REMOTE_INDEX_NAME));
                     API_KEY_MAP_REF.set(apiKeyMap);
                 }

+ 2 - 6
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityMlIT.java

@@ -57,15 +57,11 @@ public class RemoteClusterSecurityMlIT extends AbstractRemoteClusterSecurityTest
             .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
                 API_KEY_MAP_REF.compareAndSet(null, createCrossClusterAccessApiKey("""
                     {
-                      "role": {
-                        "cluster": ["cross_cluster_search"],
-                        "index": [
+                        "search": [
                           {
-                              "names": ["shared-airline-data"],
-                              "privileges": ["read", "read_cross_cluster", "view_index_metadata"]
+                              "names": ["shared-airline-data"]
                           }
                         ]
-                      }
                     }"""));
                 return (String) API_KEY_MAP_REF.get().get("encoded");
             })

+ 56 - 10
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRestIT.java

@@ -26,6 +26,7 @@ import org.junit.rules.RuleChain;
 import org.junit.rules.TestRule;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
@@ -34,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
@@ -45,6 +47,7 @@ import static org.hamcrest.Matchers.notNullValue;
 public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTestCase {
 
     private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
+    private static final AtomicReference<Map<String, Object>> REST_API_KEY_MAP_REF = new AtomicReference<>();
     private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean();
 
     static {
@@ -71,15 +74,11 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
                 if (API_KEY_MAP_REF.get() == null) {
                     final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
                         {
-                          "role": {
-                            "cluster": ["cross_cluster_search"],
-                            "index": [
-                                {
-                                    "names": ["index*", "not_found_index", "shared-metrics"],
-                                    "privileges": ["read", "read_cross_cluster"]
-                                }
-                            ]
-                          }
+                          "search": [
+                            {
+                                "names": ["index*", "not_found_index", "shared-metrics"]
+                            }
+                          ]
                         }""");
                     API_KEY_MAP_REF.set(apiKeyMap);
                 }
@@ -87,6 +86,25 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
             })
             // Define a bogus API key for another remote cluster
             .keystore("cluster.remote.invalid_remote.credentials", randomEncodedApiKey())
+            // Define remote with a REST API key to observe expected failure
+            .keystore("cluster.remote.wrong_api_key_type.credentials", () -> {
+                if (REST_API_KEY_MAP_REF.get() == null) {
+                    initFulfillingClusterClient();
+                    final var createApiKeyRequest = new Request("POST", "/_security/api_key");
+                    createApiKeyRequest.setJsonEntity("""
+                        {
+                          "name": "rest_api_key"
+                        }""");
+                    try {
+                        final Response createApiKeyResponse = performRequestWithAdminUser(fulfillingClusterClient, createApiKeyRequest);
+                        assertOK(createApiKeyResponse);
+                        REST_API_KEY_MAP_REF.set(responseAsMap(createApiKeyResponse));
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                    }
+                }
+                return (String) REST_API_KEY_MAP_REF.get().get("encoded");
+            })
             .rolesFile(Resource.fromClasspath("roles.yml"))
             .user(REMOTE_METRIC_USER, PASS.toString(), "read_remote_shared_metrics")
             .build();
@@ -301,7 +319,35 @@ public class RemoteClusterSecurityRestIT extends AbstractRemoteClusterSecurityTe
                 () -> performRequestWithRemoteSearchUser(new Request("GET", "/invalid_remote:index1/_search"))
             );
             assertThat(exception4.getResponse().getStatusLine().getStatusCode(), equalTo(401));
-            assertThat(exception4.getMessage(), containsString("unable to authenticate user "));
+            assertThat(
+                exception4.getMessage(),
+                allOf(containsString("unable to authenticate user "), containsString("unable to find apikey"))
+            );
+
+            // check that REST API key is not supported by cross cluster access
+            updateClusterSettings(
+                randomBoolean()
+                    ? Settings.builder()
+                        .put("cluster.remote.wrong_api_key_type.seeds", fulfillingCluster.getRemoteClusterServerEndpoint(0))
+                        .build()
+                    : Settings.builder()
+                        .put("cluster.remote.wrong_api_key_type.mode", "proxy")
+                        .put("cluster.remote.wrong_api_key_type.proxy_address", fulfillingCluster.getRemoteClusterServerEndpoint(0))
+                        .build()
+            );
+            final ResponseException exception5 = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithRemoteSearchUser(new Request("GET", "/wrong_api_key_type:*/_search"))
+            );
+            assertThat(exception5.getResponse().getStatusLine().getStatusCode(), equalTo(401));
+            assertThat(
+                exception5.getMessage(),
+                containsString(
+                    "authentication expected API key type of [cross_cluster], but API key ["
+                        + REST_API_KEY_MAP_REF.get().get("id")
+                        + "] has type [rest]"
+                )
+            );
         }
     }
 

+ 3 - 6
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecuritySpecialUserIT.java

@@ -68,16 +68,12 @@ public class RemoteClusterSecuritySpecialUserIT extends AbstractRemoteClusterSec
                 if (API_KEY_MAP_REF.get() == null) {
                     final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
                         {
-                          "role": {
-                            "cluster": ["cross_cluster_search"],
-                            "index": [
+                            "search": [
                               {
                                 "names": ["shared-*", "apm-1", ".security*"],
-                                "privileges": ["read", "read_cross_cluster"],
                                 "allow_restricted_indices": true
                               }
                             ]
-                          }
                         }""");
                     API_KEY_MAP_REF.set(apiKeyMap);
                 }
@@ -129,7 +125,8 @@ public class RemoteClusterSecuritySpecialUserIT extends AbstractRemoteClusterSec
                 { "index": { "_index": "apm-2" } }
                 { "name": "apm-2" }
                 { "index": { "_index": "logs-apm.1" } }
-                { "name": "logs-apm.1" }\n"""));
+                { "name": "logs-apm.1" }
+                """));
             assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
         }
 

+ 2 - 6
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityTransformIT.java

@@ -57,15 +57,11 @@ public class RemoteClusterSecurityTransformIT extends AbstractRemoteClusterSecur
             .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
                 API_KEY_MAP_REF.compareAndSet(null, createCrossClusterAccessApiKey("""
                     {
-                      "role": {
-                        "cluster": ["cross_cluster_search"],
-                        "index": [
+                        "search": [
                           {
-                              "names": ["shared-transform-index"],
-                              "privileges": ["read", "read_cross_cluster", "view_index_metadata"]
+                              "names": ["shared-transform-index"]
                           }
                         ]
-                      }
                     }"""));
                 return (String) API_KEY_MAP_REF.get().get("encoded");
             })

+ 4 - 14
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithDlsAndFlsRestIT.java

@@ -27,30 +27,20 @@ public class RemoteClusterSecurityWithDlsAndFlsRestIT extends AbstractRemoteClus
 
     private static final AtomicReference<Map<String, Object>> API_KEY_REFERENCE = new AtomicReference<>();
 
-    private static final String API_KEY_ROLE = """
+    private static final String API_KEY_ACCESS = """
         {
-          "role1": {
-            "cluster": ["cross_cluster_search"],
-            "index": [
+            "search": [
               {
                   "names": ["remote_index*"],
-                  "privileges": ["read", "read_cross_cluster"],
                   "query": {"bool": { "must_not": { "term" : {"field2" : "value2"}}}},
                   "field_security": {"grant": [ "field2" ]}
-              }
-            ]
-          },
-          "role2": {
-            "cluster": ["cross_cluster_search"],
-            "index": [
+              },
               {
                   "names": ["remote_index*"],
-                  "privileges": ["read", "read_cross_cluster"],
                   "query": {"bool": { "must_not": { "term" : {"field1" : "value2"}}}},
                   "field_security": {"grant": [ "field1" ]}
               }
             ]
-          }
         }""";
 
     static {
@@ -72,7 +62,7 @@ public class RemoteClusterSecurityWithDlsAndFlsRestIT extends AbstractRemoteClus
             .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
             .keystore(
                 "cluster.remote." + REMOTE_CLUSTER_DLS_FLS + ".credentials",
-                () -> createCrossClusterAccessApiKey(API_KEY_ROLE, API_KEY_REFERENCE)
+                () -> createCrossClusterAccessApiKey(API_KEY_ACCESS, API_KEY_REFERENCE)
             )
             .build();
     }

+ 4 - 14
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithDlsRestIT.java

@@ -24,28 +24,18 @@ public class RemoteClusterSecurityWithDlsRestIT extends AbstractRemoteClusterSec
 
     private static final AtomicReference<Map<String, Object>> API_KEY_REFERENCE = new AtomicReference<>();
 
-    private static final String API_KEY_ROLE = """
+    private static final String API_KEY_ACCESS = """
         {
-          "role1": {
-            "cluster": ["cross_cluster_search"],
-            "index": [
+            "search": [
               {
                   "names": ["remote_index*"],
-                  "privileges": ["read", "read_cross_cluster"],
                   "query": {"term" : {"field1" : "value1"}}
-              }
-            ]
-          },
-          "role2": {
-            "cluster": ["cross_cluster_search"],
-            "index": [
+              },
               {
                   "names": ["remote_index1"],
-                  "privileges": ["read", "read_cross_cluster"],
                   "query": {"term" : {"field2" : "value1"}}
               }
             ]
-          }
         }""";
 
     static {
@@ -67,7 +57,7 @@ public class RemoteClusterSecurityWithDlsRestIT extends AbstractRemoteClusterSec
             .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
             .keystore(
                 "cluster.remote." + REMOTE_CLUSTER_DLS + ".credentials",
-                () -> createCrossClusterAccessApiKey(API_KEY_ROLE, API_KEY_REFERENCE)
+                () -> createCrossClusterAccessApiKey(API_KEY_ACCESS, API_KEY_REFERENCE)
             )
             .build();
     }

+ 4 - 14
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithFlsRestIT.java

@@ -24,28 +24,18 @@ public class RemoteClusterSecurityWithFlsRestIT extends AbstractRemoteClusterSec
 
     private static final AtomicReference<Map<String, Object>> API_KEY_REFERENCE = new AtomicReference<>();
 
-    private static final String API_KEY_ROLE = """
+    private static final String API_KEY_ACCESS = """
         {
-          "role1": {
-            "cluster": ["cross_cluster_search"],
-            "index": [
+            "search": [
               {
                   "names": ["remote_index2"],
-                  "privileges": ["read", "read_cross_cluster"],
                   "field_security": {"grant": [ "field2" ]}
-              }
-            ]
-          },
-          "role2": {
-            "cluster": ["cross_cluster_search"],
-            "index": [
+              },
               {
                   "names": ["remote_index2"],
-                  "privileges": ["read", "read_cross_cluster"],
                   "field_security": {"grant": [ "field3" ]}
               }
             ]
-          }
         }""";
 
     static {
@@ -67,7 +57,7 @@ public class RemoteClusterSecurityWithFlsRestIT extends AbstractRemoteClusterSec
             .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
             .keystore(
                 "cluster.remote." + REMOTE_CLUSTER_FLS + ".credentials",
-                () -> createCrossClusterAccessApiKey(API_KEY_ROLE, API_KEY_REFERENCE)
+                () -> createCrossClusterAccessApiKey(API_KEY_ACCESS, API_KEY_REFERENCE)
             )
             .build();
     }

+ 2 - 6
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithMixedModelRemotesRestIT.java

@@ -44,15 +44,11 @@ public class RemoteClusterSecurityWithMixedModelRemotesRestIT extends AbstractRe
                 if (API_KEY_MAP_REF.get() == null) {
                     final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
                         {
-                          "role": {
-                            "cluster": ["cross_cluster_search"],
-                            "index": [
+                            "search": [
                               {
-                                 "names": ["cluster1_index*"],
-                                 "privileges": ["read", "read_cross_cluster"]
+                                 "names": ["cluster1_index*"]
                               }
                             ]
-                          }
                         }""");
                     API_KEY_MAP_REF.set(apiKeyMap);
                 }

+ 4 - 12
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithSameModelRemotesRestIT.java

@@ -54,15 +54,11 @@ public class RemoteClusterSecurityWithSameModelRemotesRestIT extends AbstractRem
                 if (API_KEY_MAP_REF.get() == null) {
                     final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
                         {
-                          "role": {
-                            "cluster": ["cross_cluster_search"],
-                            "index": [
+                            "search": [
                               {
-                                  "names": ["cluster1_index*"],
-                                  "privileges": ["read", "read_cross_cluster"]
+                                  "names": ["cluster1_index*"]
                               }
                             ]
-                          }
                         }""");
                     API_KEY_MAP_REF.set(apiKeyMap);
                 }
@@ -73,15 +69,11 @@ public class RemoteClusterSecurityWithSameModelRemotesRestIT extends AbstractRem
                 if (OTHER_API_KEY_MAP_REF.get() == null) {
                     final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey(otherFulfillingClusterClient, """
                          {
-                          "role": {
-                            "cluster": ["cross_cluster_search"],
-                            "index": [
+                            "search": [
                               {
-                                  "names": ["cluster2_index*"],
-                                  "privileges": ["read", "read_cross_cluster"]
+                                  "names": ["cluster2_index*"]
                               }
                             ]
-                          }
                         }""");
                     OTHER_API_KEY_MAP_REF.set(apiKeyMap);
                 }

+ 4 - 8
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityWithoutDlsAndFlsRestIT.java

@@ -24,17 +24,13 @@ public class RemoteClusterSecurityWithoutDlsAndFlsRestIT extends AbstractRemoteC
 
     private static final AtomicReference<Map<String, Object>> API_KEY_REFERENCE = new AtomicReference<>();
 
-    private static final String API_KEY_ROLE = """
+    private static final String API_KEY_ACCESS = """
         {
-          "role": {
-            "cluster": ["cross_cluster_search"],
-            "index": [
+            "search": [
               {
-                  "names": ["remote_index*"],
-                  "privileges": ["read", "read_cross_cluster"]
+                  "names": ["remote_index*"]
               }
             ]
-          }
         }""";
 
     static {
@@ -56,7 +52,7 @@ public class RemoteClusterSecurityWithoutDlsAndFlsRestIT extends AbstractRemoteC
             .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
             .keystore(
                 "cluster.remote." + REMOTE_CLUSTER_NO_DLS_FLS + ".credentials",
-                () -> createCrossClusterAccessApiKey(API_KEY_ROLE, API_KEY_REFERENCE)
+                () -> createCrossClusterAccessApiKey(API_KEY_ACCESS, API_KEY_REFERENCE)
             )
             .build();
     }

+ 33 - 0
x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/apikey/ApiKeyRestIT.java

@@ -709,6 +709,39 @@ public class ApiKeyRestIT extends SecurityOnTrialLicenseRestTestCase {
         final ObjectPath createResponse = assertOKAndCreateObjectPath(client().performRequest(createRequest));
         final String apiKeyId = createResponse.evaluate("id");
 
+        // Cross cluster API key cannot be used on the REST interface
+        final Request authenticateRequest1 = new Request("GET", "/_security/_authenticate");
+        authenticateRequest1.setOptions(
+            authenticateRequest1.getOptions().toBuilder().addHeader("Authorization", "ApiKey " + createResponse.evaluate("encoded"))
+        );
+        final ResponseException authenticateError1 = expectThrows(
+            ResponseException.class,
+            () -> client().performRequest(authenticateRequest1)
+        );
+        assertThat(authenticateError1.getResponse().getStatusLine().getStatusCode(), equalTo(401));
+        assertThat(
+            authenticateError1.getMessage(),
+            containsString("authentication expected API key type of [rest], but API key [" + apiKeyId + "] has type [cross_cluster]")
+        );
+
+        // Not allowed as secondary authentication on the REST interface either
+        final Request authenticateRequest2 = new Request("GET", "/_security/_authenticate");
+        setUserForRequest(authenticateRequest2, MANAGE_SECURITY_USER, END_USER_PASSWORD);
+        authenticateRequest2.setOptions(
+            authenticateRequest2.getOptions()
+                .toBuilder()
+                .addHeader("es-secondary-authorization", "ApiKey " + createResponse.evaluate("encoded"))
+        );
+        final ResponseException authenticateError2 = expectThrows(
+            ResponseException.class,
+            () -> client().performRequest(authenticateRequest2)
+        );
+        assertThat(authenticateError2.getResponse().getStatusLine().getStatusCode(), equalTo(401));
+        assertThat(
+            authenticateError2.getMessage(),
+            containsString("authentication expected API key type of [rest], but API key [" + apiKeyId + "] has type [cross_cluster]")
+        );
+
         final Request fetchRequest;
         if (randomBoolean()) {
             fetchRequest = new Request("GET", "/_security/api_key");

+ 22 - 12
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/apikey/ApiKeySingleNodeTests.java

@@ -32,8 +32,6 @@ import org.elasticsearch.test.SecuritySingleNodeTestCase;
 import org.elasticsearch.test.TestSecurityClient;
 import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.transport.TcpTransport;
-import org.elasticsearch.xcontent.XContentParser;
-import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.security.action.Grant;
@@ -42,7 +40,6 @@ import org.elasticsearch.xpack.core.security.action.apikey.CreateApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.apikey.CreateApiKeyResponse;
 import org.elasticsearch.xpack.core.security.action.apikey.CreateCrossClusterApiKeyAction;
 import org.elasticsearch.xpack.core.security.action.apikey.CreateCrossClusterApiKeyRequest;
-import org.elasticsearch.xpack.core.security.action.apikey.CrossClusterApiKeyRoleDescriptorBuilder;
 import org.elasticsearch.xpack.core.security.action.apikey.GetApiKeyAction;
 import org.elasticsearch.xpack.core.security.action.apikey.GetApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.apikey.GetApiKeyResponse;
@@ -59,6 +56,8 @@ import org.elasticsearch.xpack.core.security.action.service.CreateServiceAccount
 import org.elasticsearch.xpack.core.security.action.token.CreateTokenAction;
 import org.elasticsearch.xpack.core.security.action.token.CreateTokenRequestBuilder;
 import org.elasticsearch.xpack.core.security.action.token.CreateTokenResponse;
+import org.elasticsearch.xpack.core.security.action.user.AuthenticateAction;
+import org.elasticsearch.xpack.core.security.action.user.AuthenticateRequest;
 import org.elasticsearch.xpack.core.security.action.user.PutUserAction;
 import org.elasticsearch.xpack.core.security.action.user.PutUserRequest;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
@@ -78,7 +77,6 @@ import java.util.Map;
 
 import static org.elasticsearch.test.SecuritySettingsSource.ES_TEST_ROOT_USER;
 import static org.elasticsearch.test.SecuritySettingsSourceField.TEST_PASSWORD;
-import static org.elasticsearch.xcontent.json.JsonXContent.jsonXContent;
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
 import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_MAIN_ALIAS;
 import static org.hamcrest.Matchers.anEmptyMap;
@@ -423,22 +421,34 @@ public class ApiKeySingleNodeTests extends SecuritySingleNodeTestCase {
     public void testCreateCrossClusterApiKey() throws IOException {
         assumeTrue("untrusted remote cluster feature flag must be enabled", TcpTransport.isUntrustedRemoteClusterEnabled());
 
-        final XContentParser parser = jsonXContent.createParser(XContentParserConfiguration.EMPTY, """
+        final var request = CreateCrossClusterApiKeyRequest.withNameAndAccess(randomAlphaOfLengthBetween(3, 8), """
             {
               "search": [ {"names": ["logs"]} ]
             }""");
-        final var roleDescriptorBuilder = CrossClusterApiKeyRoleDescriptorBuilder.PARSER.parse(parser, null);
-
-        final var request = new CreateCrossClusterApiKeyRequest(randomAlphaOfLengthBetween(3, 8), roleDescriptorBuilder, null, null);
 
         final PlainActionFuture<CreateApiKeyResponse> future = new PlainActionFuture<>();
         client().execute(CreateCrossClusterApiKeyAction.INSTANCE, request, future);
         final CreateApiKeyResponse createApiKeyResponse = future.actionGet();
 
-        final Map<String, Object> document = client().execute(
-            GetAction.INSTANCE,
-            new GetRequest(SECURITY_MAIN_ALIAS, createApiKeyResponse.getId())
-        ).actionGet().getSource();
+        final String apiKeyId = createApiKeyResponse.getId();
+        final String base64ApiKeyKeyValue = Base64.getEncoder()
+            .encodeToString((apiKeyId + ":" + createApiKeyResponse.getKey().toString()).getBytes(StandardCharsets.UTF_8));
+
+        // cross cluster API key cannot be used for regular actions
+        final ElasticsearchSecurityException e = expectThrows(
+            ElasticsearchSecurityException.class,
+            () -> client().filterWithHeader(Map.of("Authorization", "ApiKey " + base64ApiKeyKeyValue))
+                .execute(AuthenticateAction.INSTANCE, AuthenticateRequest.INSTANCE)
+                .actionGet()
+        );
+        assertThat(
+            e.getMessage(),
+            containsString("authentication expected API key type of [rest], but API key [" + apiKeyId + "] has type [cross_cluster]")
+        );
+
+        final Map<String, Object> document = client().execute(GetAction.INSTANCE, new GetRequest(SECURITY_MAIN_ALIAS, apiKeyId))
+            .actionGet()
+            .getSource();
 
         assertThat(document.get("type"), equalTo("cross_cluster"));
 

+ 5 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyAuthenticator.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.security.authc;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationResult;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationToken;
@@ -37,7 +38,9 @@ class ApiKeyAuthenticator implements Authenticator {
 
     @Override
     public AuthenticationToken extractCredentials(Context context) {
-        return apiKeyService.getCredentialsFromHeader(context.getThreadContext());
+        final ApiKeyCredentials apiKeyCredentials = apiKeyService.getCredentialsFromThreadContext(context.getThreadContext());
+        assert apiKeyCredentials == null || apiKeyCredentials.getExpectedType() == ApiKey.Type.REST;
+        return apiKeyCredentials;
     }
 
     @Override
@@ -57,6 +60,7 @@ class ApiKeyAuthenticator implements Authenticator {
                     ? authResult.getException()
                     : Exceptions.authenticationError(authResult.getMessage());
                 logger.debug(() -> "API key service terminated authentication for request [" + context.getRequest() + "]", e);
+                // TODO: emit audit event
                 listener.onFailure(e);
             } else {
                 if (authResult.getMessage() != null) {

+ 52 - 11
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java

@@ -70,6 +70,7 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.DeprecationHandler;
 import org.elasticsearch.xcontent.InstantiatingObjectParser;
+import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
@@ -1024,7 +1025,7 @@ public class ApiKeyService {
                         if (result.success) {
                             if (result.verify(credentials.getKey())) {
                                 // move on
-                                validateApiKeyExpiration(apiKeyDoc, credentials, clock, listener);
+                                validateApiKeyTypeAndExpiration(apiKeyDoc, credentials, clock, listener);
                             } else {
                                 listener.onResponse(
                                     AuthenticationResult.unsuccessful("invalid credentials for API key [" + credentials.getId() + "]", null)
@@ -1044,7 +1045,7 @@ public class ApiKeyService {
                         listenableCacheEntry.onResponse(new CachedApiKeyHashResult(verified, credentials.getKey()));
                         if (verified) {
                             // move on
-                            validateApiKeyExpiration(apiKeyDoc, credentials, clock, listener);
+                            validateApiKeyTypeAndExpiration(apiKeyDoc, credentials, clock, listener);
                         } else {
                             listener.onResponse(
                                 AuthenticationResult.unsuccessful("invalid credentials for API key [" + credentials.getId() + "]", null)
@@ -1056,7 +1057,7 @@ public class ApiKeyService {
                 verifyKeyAgainstHash(apiKeyDoc.hash, credentials, ActionListener.wrap(verified -> {
                     if (verified) {
                         // move on
-                        validateApiKeyExpiration(apiKeyDoc, credentials, clock, listener);
+                        validateApiKeyTypeAndExpiration(apiKeyDoc, credentials, clock, listener);
                     } else {
                         listener.onResponse(
                             AuthenticationResult.unsuccessful("invalid credentials for API key [" + credentials.getId() + "]", null)
@@ -1088,12 +1089,26 @@ public class ApiKeyService {
     }
 
     // package-private for testing
-    static void validateApiKeyExpiration(
+    static void validateApiKeyTypeAndExpiration(
         ApiKeyDoc apiKeyDoc,
         ApiKeyCredentials credentials,
         Clock clock,
         ActionListener<AuthenticationResult<User>> listener
     ) {
+        if (apiKeyDoc.type != credentials.expectedType) {
+            listener.onResponse(
+                AuthenticationResult.terminate(
+                    Strings.format(
+                        "authentication expected API key type of [%s], but API key [%s] has type [%s]",
+                        credentials.expectedType.value(),
+                        credentials.getId(),
+                        apiKeyDoc.type.value()
+                    )
+                )
+            );
+            return;
+        }
+
         if (apiKeyDoc.expirationTime == -1 || Instant.ofEpochMilli(apiKeyDoc.expirationTime).isAfter(clock.instant())) {
             final String principal = Objects.requireNonNull((String) apiKeyDoc.creator.get("principal"));
             final String fullName = (String) apiKeyDoc.creator.get("full_name");
@@ -1108,6 +1123,7 @@ public class ApiKeyService {
             authResultMetadata.put(AuthenticationField.API_KEY_LIMITED_ROLE_DESCRIPTORS_KEY, apiKeyDoc.limitedByRoleDescriptorsBytes);
             authResultMetadata.put(AuthenticationField.API_KEY_ID_KEY, credentials.getId());
             authResultMetadata.put(AuthenticationField.API_KEY_NAME_KEY, apiKeyDoc.name);
+            authResultMetadata.put(AuthenticationField.API_KEY_TYPE_KEY, apiKeyDoc.type.value());
             if (apiKeyDoc.metadataFlattened != null) {
                 authResultMetadata.put(AuthenticationField.API_KEY_METADATA_KEY, apiKeyDoc.metadataFlattened);
             }
@@ -1121,22 +1137,22 @@ public class ApiKeyService {
      * Gets the API Key from the <code>Authorization</code> header if the header begins with
      * <code>ApiKey </code>
      */
-    ApiKeyCredentials getCredentialsFromHeader(ThreadContext threadContext) {
+    ApiKeyCredentials getCredentialsFromThreadContext(ThreadContext threadContext) {
         if (false == isEnabled()) {
             return null;
         }
-        return getCredentialsFromHeader(threadContext.getHeader("Authorization"));
+        return getCredentialsFromHeader(threadContext.getHeader("Authorization"), ApiKey.Type.REST);
     }
 
-    static ApiKeyCredentials getCredentialsFromHeader(final String header) {
-        return parseApiKey(Authenticator.extractCredentialFromHeaderValue(header, "ApiKey"));
+    static ApiKeyCredentials getCredentialsFromHeader(final String header, ApiKey.Type expectedType) {
+        return parseApiKey(Authenticator.extractCredentialFromHeaderValue(header, "ApiKey"), expectedType);
     }
 
     public static String withApiKeyPrefix(final String encodedApiKey) {
         return "ApiKey " + encodedApiKey;
     }
 
-    private static ApiKeyCredentials parseApiKey(SecureString apiKeyString) {
+    private static ApiKeyCredentials parseApiKey(SecureString apiKeyString, ApiKey.Type expectedType) {
         if (apiKeyString != null) {
             final byte[] decodedApiKeyCredBytes = Base64.getDecoder().decode(CharArrays.toUtf8Bytes(apiKeyString.getChars()));
             char[] apiKeyCredChars = null;
@@ -1155,7 +1171,8 @@ public class ApiKeyService {
                 }
                 return new ApiKeyCredentials(
                     new String(Arrays.copyOfRange(apiKeyCredChars, 0, colonIndex)),
-                    new SecureString(Arrays.copyOfRange(apiKeyCredChars, colonIndex + 1, apiKeyCredChars.length))
+                    new SecureString(Arrays.copyOfRange(apiKeyCredChars, colonIndex + 1, apiKeyCredChars.length)),
+                    expectedType
                 );
             } finally {
                 if (apiKeyCredChars != null) {
@@ -1205,10 +1222,12 @@ public class ApiKeyService {
     public static final class ApiKeyCredentials implements AuthenticationToken, Closeable {
         private final String id;
         private final SecureString key;
+        private final ApiKey.Type expectedType;
 
-        public ApiKeyCredentials(String id, SecureString key) {
+        public ApiKeyCredentials(String id, SecureString key, ApiKey.Type expectedType) {
             this.id = id;
             this.key = key;
+            this.expectedType = expectedType;
         }
 
         String getId() {
@@ -1239,6 +1258,9 @@ public class ApiKeyService {
             close();
         }
 
+        public ApiKey.Type getExpectedType() {
+            return expectedType;
+        }
     }
 
     private static class ApiKeyLoggingDeprecationHandler implements DeprecationHandler {
@@ -2023,6 +2045,12 @@ public class ApiKeyService {
                 ApiKeyDoc.class
             );
             builder.declareString(constructorArg(), new ParseField("doc_type"));
+            builder.declareField(
+                optionalConstructorArg(),
+                ApiKey.Type::fromXContent,
+                new ParseField("type"),
+                ObjectParser.ValueType.STRING
+            );
             builder.declareLong(constructorArg(), new ParseField("creation_time"));
             builder.declareLongOrNull(constructorArg(), -1, new ParseField("expiration_time"));
             builder.declareBoolean(constructorArg(), new ParseField("api_key_invalidated"));
@@ -2037,6 +2065,7 @@ public class ApiKeyService {
         }
 
         final String docType;
+        final ApiKey.Type type;
         final long creationTime;
         final long expirationTime;
         final Boolean invalidated;
@@ -2052,6 +2081,7 @@ public class ApiKeyService {
 
         public ApiKeyDoc(
             String docType,
+            ApiKey.Type type,
             long creationTime,
             long expirationTime,
             Boolean invalidated,
@@ -2064,6 +2094,12 @@ public class ApiKeyService {
             @Nullable BytesReference metadataFlattened
         ) {
             this.docType = docType;
+            if (type == null) {
+                logger.trace("API key document with [null] type defaults to [rest] type");
+                this.type = ApiKey.Type.REST;
+            } else {
+                this.type = type;
+            }
             this.creationTime = creationTime;
             this.expirationTime = expirationTime;
             this.invalidated = invalidated;
@@ -2084,6 +2120,7 @@ public class ApiKeyService {
                 MessageDigests.digest(limitedByRoleDescriptorsBytes, digest)
             );
             return new CachedApiKeyDoc(
+                type,
                 creationTime,
                 expirationTime,
                 invalidated,
@@ -2108,6 +2145,7 @@ public class ApiKeyService {
      * so that duplicate role descriptors are cached only once (and therefore consume less memory).
      */
     public static final class CachedApiKeyDoc {
+        final ApiKey.Type type;
         final long creationTime;
         final long expirationTime;
         final Boolean invalidated;
@@ -2121,6 +2159,7 @@ public class ApiKeyService {
         final BytesReference metadataFlattened;
 
         public CachedApiKeyDoc(
+            ApiKey.Type type,
             long creationTime,
             long expirationTime,
             Boolean invalidated,
@@ -2132,6 +2171,7 @@ public class ApiKeyService {
             String limitedByRoleDescriptorsHash,
             @Nullable BytesReference metadataFlattened
         ) {
+            this.type = type;
             this.creationTime = creationTime;
             this.expirationTime = expirationTime;
             this.invalidated = invalidated;
@@ -2147,6 +2187,7 @@ public class ApiKeyService {
         public ApiKeyDoc toApiKeyDoc(BytesReference roleDescriptorsBytes, BytesReference limitedByRoleDescriptorsBytes) {
             return new ApiKeyDoc(
                 "api_key",
+                type,
                 creationTime,
                 expirationTime,
                 invalidated,

+ 4 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/CrossClusterAccessAuthenticationService.java

@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo;
 
@@ -55,7 +56,9 @@ public class CrossClusterAccessAuthenticationService {
         try {
             // parse and add as authentication token as early as possible so that failure events in audit log include API key ID
             crossClusterAccessHeaders = CrossClusterAccessHeaders.readFromContext(threadContext);
-            authcContext.addAuthenticationToken(crossClusterAccessHeaders.credentials());
+            final ApiKeyService.ApiKeyCredentials apiKeyCredentials = crossClusterAccessHeaders.credentials();
+            assert ApiKey.Type.CROSS_CLUSTER == apiKeyCredentials.getExpectedType();
+            authcContext.addAuthenticationToken(apiKeyCredentials);
             apiKeyService.ensureEnabled();
         } catch (Exception ex) {
             withRequestProcessingFailure(authcContext, ex, listener);

+ 2 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/CrossClusterAccessHeaders.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.security.authc;
 
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo;
 
 import java.io.IOException;
@@ -49,7 +50,7 @@ public final class CrossClusterAccessHeaders {
 
     private static ApiKeyService.ApiKeyCredentials parseCredentialsHeader(final String header) {
         try {
-            return Objects.requireNonNull(ApiKeyService.getCredentialsFromHeader(header));
+            return Objects.requireNonNull(ApiKeyService.getCredentialsFromHeader(header, ApiKey.Type.CROSS_CLUSTER));
         } catch (Exception ex) {
             throw new IllegalArgumentException(
                 "cross cluster access header ["

+ 193 - 84
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java

@@ -153,6 +153,7 @@ import static org.elasticsearch.test.TestMatchers.throwableWithMessage;
 import static org.elasticsearch.transport.RemoteClusterPortSettings.TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY_CCS;
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.API_KEY_ID_KEY;
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.API_KEY_METADATA_KEY;
+import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.API_KEY_TYPE_KEY;
 import static org.elasticsearch.xpack.core.security.authz.store.ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR;
 import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.INTERNAL_SECURITY_MAIN_INDEX_7;
 import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
@@ -165,6 +166,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.emptyArray;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -180,6 +182,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -521,7 +524,7 @@ public class ApiKeyServiceTests extends ESTestCase {
     public void testGetCredentialsFromThreadContext() {
         final ApiKeyService apiKeyService = createApiKeyService();
         ThreadContext threadContext = threadPool.getThreadContext();
-        assertNull(apiKeyService.getCredentialsFromHeader(threadContext));
+        assertNull(apiKeyService.getCredentialsFromThreadContext(threadContext));
 
         final String apiKeyAuthScheme = randomFrom("apikey", "apiKey", "ApiKey", "APikey", "APIKEY");
         final String id = randomAlphaOfLength(12);
@@ -530,7 +533,7 @@ public class ApiKeyServiceTests extends ESTestCase {
 
         try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
             threadContext.putHeader("Authorization", headerValue);
-            ApiKeyService.ApiKeyCredentials creds = apiKeyService.getCredentialsFromHeader(threadContext);
+            ApiKeyService.ApiKeyCredentials creds = apiKeyService.getCredentialsFromThreadContext(threadContext);
             assertNotNull(creds);
             assertEquals(id, creds.getId());
             assertEquals(key, creds.getKey().toString());
@@ -540,7 +543,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         headerValue = apiKeyAuthScheme + Base64.getEncoder().encodeToString((id + ":" + key).getBytes(StandardCharsets.UTF_8));
         try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
             threadContext.putHeader("Authorization", headerValue);
-            ApiKeyService.ApiKeyCredentials creds = apiKeyService.getCredentialsFromHeader(threadContext);
+            ApiKeyService.ApiKeyCredentials creds = apiKeyService.getCredentialsFromThreadContext(threadContext);
             assertNull(creds);
         }
 
@@ -550,7 +553,7 @@ public class ApiKeyServiceTests extends ESTestCase {
             threadContext.putHeader("Authorization", headerValue);
             IllegalArgumentException e = expectThrows(
                 IllegalArgumentException.class,
-                () -> apiKeyService.getCredentialsFromHeader(threadContext)
+                () -> apiKeyService.getCredentialsFromThreadContext(threadContext)
             );
             assertEquals("invalid ApiKey value", e.getMessage());
         }
@@ -571,9 +574,10 @@ public class ApiKeyServiceTests extends ESTestCase {
             user = new User("hulk", new String[] { "superuser" }, "Bruce Banner", "hulk@test.com", Map.of(), true);
             authUser = null;
         }
-        final Map<String, Object> metadata = mockKeyDocument(service, id, key, user, authUser, false, Duration.ofSeconds(3600), null);
+        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
+        final Map<String, Object> metadata = mockKeyDocument(id, key, user, authUser, false, Duration.ofSeconds(3600), null, type);
 
-        final AuthenticationResult<User> auth = tryAuthenticate(service, id, key);
+        final AuthenticationResult<User> auth = tryAuthenticate(service, id, key, type);
         assertThat(auth.getStatus(), is(AuthenticationResult.Status.SUCCESS));
         assertThat(auth.getValue(), notNullValue());
         assertThat(auth.getValue().principal(), is("hulk"));
@@ -583,6 +587,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         assertThat(auth.getMetadata().get(AuthenticationField.API_KEY_CREATOR_REALM_TYPE), is("native"));
         assertThat(auth.getMetadata().get(AuthenticationField.API_KEY_ID_KEY), is(id));
         assertThat(auth.getMetadata().get(AuthenticationField.API_KEY_NAME_KEY), is("test"));
+        assertThat(auth.getMetadata().get(API_KEY_TYPE_KEY), is(type.value()));
         checkAuthApiKeyMetadata(metadata, auth);
     }
 
@@ -592,10 +597,11 @@ public class ApiKeyServiceTests extends ESTestCase {
 
         final String id = randomAlphaOfLength(12);
         final String key = randomAlphaOfLength(16);
+        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
 
-        mockKeyDocument(service, id, key, new User("hulk", "superuser"), null, true, Duration.ofSeconds(3600), null);
+        mockKeyDocument(id, key, new User("hulk", "superuser"), null, true, Duration.ofSeconds(3600), null, type);
 
-        final AuthenticationResult<User> auth = tryAuthenticate(service, id, key);
+        final AuthenticationResult<User> auth = tryAuthenticate(service, id, key, type);
         assertThat(auth.getStatus(), is(AuthenticationResult.Status.CONTINUE));
         assertThat(auth.getValue(), nullValue());
         assertThat(auth.getMessage(), containsString("invalidated"));
@@ -617,9 +623,10 @@ public class ApiKeyServiceTests extends ESTestCase {
             user = new User("hulk", "superuser");
             authUser = null;
         }
-        mockKeyDocument(service, id, realKey, user, authUser, false, Duration.ofSeconds(3600), null);
+        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
+        mockKeyDocument(id, realKey, user, authUser, false, Duration.ofSeconds(3600), null, type);
 
-        final AuthenticationResult<User> auth = tryAuthenticate(service, id, wrongKey);
+        final AuthenticationResult<User> auth = tryAuthenticate(service, id, wrongKey, type);
         assertThat(auth.getStatus(), is(AuthenticationResult.Status.CONTINUE));
         assertThat(auth.getValue(), nullValue());
         assertThat(auth.getMessage(), containsString("invalid credentials for API key [" + id + "]"));
@@ -632,9 +639,10 @@ public class ApiKeyServiceTests extends ESTestCase {
         final String id = randomAlphaOfLength(12);
         final String key = randomAlphaOfLength(16);
 
-        mockKeyDocument(service, id, key, new User("hulk", "superuser"), null, false, Duration.ofSeconds(-1), null);
+        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
+        mockKeyDocument(id, key, new User("hulk", "superuser"), null, false, Duration.ofSeconds(-1), null, type);
 
-        final AuthenticationResult<User> auth = tryAuthenticate(service, id, key);
+        final AuthenticationResult<User> auth = tryAuthenticate(service, id, key, type);
         assertThat(auth.getStatus(), is(AuthenticationResult.Status.CONTINUE));
         assertThat(auth.getValue(), nullValue());
         assertThat(auth.getMessage(), containsString("expired"));
@@ -658,19 +666,21 @@ public class ApiKeyServiceTests extends ESTestCase {
             user = new User("hulk", "superuser");
             authUser = null;
         }
-        final Map<String, Object> metadata = mockKeyDocument(service, id, realKey, user, authUser, false, Duration.ofSeconds(3600), null);
+        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
+        final Map<String, Object> metadata = mockKeyDocument(id, realKey, user, authUser, false, Duration.ofSeconds(3600), null, type);
 
         for (int i = 0; i < 3; i++) {
             final String wrongKey = "=" + randomAlphaOfLength(14) + "@";
-            AuthenticationResult<User> auth = tryAuthenticate(service, id, wrongKey);
+            AuthenticationResult<User> auth = tryAuthenticate(service, id, wrongKey, type);
             assertThat(auth.getStatus(), is(AuthenticationResult.Status.CONTINUE));
             assertThat(auth.getValue(), nullValue());
             assertThat(auth.getMessage(), containsString("invalid credentials for API key [" + id + "]"));
 
-            auth = tryAuthenticate(service, id, realKey);
+            auth = tryAuthenticate(service, id, realKey, type);
             assertThat(auth.getStatus(), is(AuthenticationResult.Status.SUCCESS));
             assertThat(auth.getValue(), notNullValue());
             assertThat(auth.getValue().principal(), is("hulk"));
+            assertThat(auth.getMetadata().get(API_KEY_TYPE_KEY), is(type.value()));
             checkAuthApiKeyMetadata(metadata, auth);
         }
     }
@@ -693,14 +703,14 @@ public class ApiKeyServiceTests extends ESTestCase {
     }
 
     private Map<String, Object> mockKeyDocument(
-        ApiKeyService service,
         String id,
         String key,
         User user,
         @Nullable User authUser,
         boolean invalidated,
         Duration expiry,
-        @Nullable List<RoleDescriptor> keyRoles
+        @Nullable List<RoleDescriptor> keyRoles,
+        ApiKey.Type type
     ) throws IOException {
         final Authentication authentication;
         if (authUser != null) {
@@ -717,7 +727,6 @@ public class ApiKeyServiceTests extends ESTestCase {
                 .realmRef(new RealmRef("realm1", "native", "node01"))
                 .build(false);
         }
-        @SuppressWarnings("unchecked")
         final Map<String, Object> metadata = ApiKeyTests.randomMetadata();
         XContentBuilder docSource = ApiKeyService.newDocument(
             getFastStoredHashAlgoForTests().hash(new SecureString(key.toCharArray())),
@@ -727,7 +736,7 @@ public class ApiKeyServiceTests extends ESTestCase {
             Instant.now(),
             Instant.now().plus(expiry),
             keyRoles,
-            randomFrom(ApiKey.Type.values()),
+            type,
             Version.CURRENT,
             metadata
         );
@@ -740,14 +749,14 @@ public class ApiKeyServiceTests extends ESTestCase {
         return metadata;
     }
 
-    private AuthenticationResult<User> tryAuthenticate(ApiKeyService service, String id, String key) throws Exception {
+    private AuthenticationResult<User> tryAuthenticate(ApiKeyService service, String id, String key, ApiKey.Type type) throws Exception {
         final ThreadContext threadContext = threadPool.getThreadContext();
         try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
             final String header = "ApiKey " + Base64.getEncoder().encodeToString((id + ":" + key).getBytes(StandardCharsets.UTF_8));
             threadContext.putHeader("Authorization", header);
 
             final PlainActionFuture<AuthenticationResult<User>> future = new PlainActionFuture<>();
-            service.tryAuthenticate(threadContext, new ApiKeyCredentials(id, new SecureString(key.toCharArray())), future);
+            service.tryAuthenticate(threadContext, getApiKeyCredentials(id, key, type), future);
 
             final AuthenticationResult<User> auth = future.get();
             assertThat(auth, notNullValue());
@@ -768,7 +777,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         service.validateApiKeyCredentials(
             apiKeyId,
             apiKeyDoc,
-            new ApiKeyCredentials(apiKeyId, new SecureString(apiKey.toCharArray())),
+            getApiKeyCredentials(apiKeyId, apiKey, apiKeyDoc.type),
             Clock.systemUTC(),
             future
         );
@@ -786,13 +795,14 @@ public class ApiKeyServiceTests extends ESTestCase {
             equalTo(apiKeyDoc.limitedByRoleDescriptorsBytes)
         );
         assertThat(result.getMetadata().get(AuthenticationField.API_KEY_CREATOR_REALM_NAME), is("realm1"));
+        assertThat(result.getMetadata().get(API_KEY_TYPE_KEY), is(apiKeyDoc.type.value()));
 
         apiKeyDoc = buildApiKeyDoc(hash, Clock.systemUTC().instant().plus(1L, ChronoUnit.HOURS).toEpochMilli(), false);
         future = new PlainActionFuture<>();
         service.validateApiKeyCredentials(
             apiKeyId,
             apiKeyDoc,
-            new ApiKeyCredentials(apiKeyId, new SecureString(apiKey.toCharArray())),
+            getApiKeyCredentials(apiKeyId, apiKey, apiKeyDoc.type),
             Clock.systemUTC(),
             future
         );
@@ -810,13 +820,14 @@ public class ApiKeyServiceTests extends ESTestCase {
             equalTo(apiKeyDoc.limitedByRoleDescriptorsBytes)
         );
         assertThat(result.getMetadata().get(AuthenticationField.API_KEY_CREATOR_REALM_NAME), is("realm1"));
+        assertThat(result.getMetadata().get(API_KEY_TYPE_KEY), is(apiKeyDoc.type.value()));
 
         apiKeyDoc = buildApiKeyDoc(hash, Clock.systemUTC().instant().minus(1L, ChronoUnit.HOURS).toEpochMilli(), false);
         future = new PlainActionFuture<>();
         service.validateApiKeyCredentials(
             apiKeyId,
             apiKeyDoc,
-            new ApiKeyCredentials(apiKeyId, new SecureString(apiKey.toCharArray())),
+            getApiKeyCredentials(apiKeyId, apiKey, apiKeyDoc.type),
             Clock.systemUTC(),
             future
         );
@@ -832,7 +843,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         service.validateApiKeyCredentials(
             apiKeyId,
             apiKeyDoc,
-            new ApiKeyCredentials(apiKeyId, new SecureString(randomAlphaOfLength(15).toCharArray())),
+            getApiKeyCredentials(apiKeyId, randomAlphaOfLength(15), apiKeyDoc.type),
             Clock.systemUTC(),
             future
         );
@@ -988,7 +999,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         ApiKeyDoc apiKeyDoc = buildApiKeyDoc(hash, -1, false);
 
         ApiKeyService service = createApiKeyService(Settings.EMPTY);
-        ApiKeyCredentials creds = new ApiKeyCredentials(apiKeyId, new SecureString(apiKey.toCharArray()));
+        ApiKeyCredentials creds = getApiKeyCredentials(apiKeyId, apiKey, apiKeyDoc.type);
         PlainActionFuture<AuthenticationResult<User>> future = new PlainActionFuture<>();
         service.validateApiKeyCredentials(creds.getId(), apiKeyDoc, creds, Clock.systemUTC(), future);
         AuthenticationResult<User> result = future.actionGet();
@@ -997,7 +1008,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         assertNotNull(cachedApiKeyHashResult);
         assertThat(cachedApiKeyHashResult.success, is(true));
 
-        creds = new ApiKeyCredentials(creds.getId(), new SecureString("somelongenoughrandomstring".toCharArray()));
+        creds = getApiKeyCredentials(creds.getId(), "somelongenoughrandomstring", apiKeyDoc.type);
         future = new PlainActionFuture<>();
         service.validateApiKeyCredentials(creds.getId(), apiKeyDoc, creds, Clock.systemUTC(), future);
         result = future.actionGet();
@@ -1007,7 +1018,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         assertThat(shouldBeSame, sameInstance(cachedApiKeyHashResult));
 
         apiKeyDoc = buildApiKeyDoc(hasher.hash(new SecureString("somelongenoughrandomstring".toCharArray())), -1, false);
-        creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString("otherlongenoughrandomstring".toCharArray()));
+        creds = getApiKeyCredentials(randomAlphaOfLength(12), "otherlongenoughrandomstring", apiKeyDoc.type);
         future = new PlainActionFuture<>();
         service.validateApiKeyCredentials(creds.getId(), apiKeyDoc, creds, Clock.systemUTC(), future);
         result = future.actionGet();
@@ -1016,7 +1027,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         assertNotNull(cachedApiKeyHashResult);
         assertThat(cachedApiKeyHashResult.success, is(false));
 
-        creds = new ApiKeyCredentials(creds.getId(), new SecureString("otherlongenoughrandomstring2".toCharArray()));
+        creds = getApiKeyCredentials(creds.getId(), "otherlongenoughrandomstring2", apiKeyDoc.type);
         future = new PlainActionFuture<>();
         service.validateApiKeyCredentials(creds.getId(), apiKeyDoc, creds, Clock.systemUTC(), future);
         result = future.actionGet();
@@ -1024,7 +1035,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         assertThat(service.getFromCache(creds.getId()), not(sameInstance(cachedApiKeyHashResult)));
         assertThat(service.getFromCache(creds.getId()).success, is(false));
 
-        creds = new ApiKeyCredentials(creds.getId(), new SecureString("somelongenoughrandomstring".toCharArray()));
+        creds = getApiKeyCredentials(creds.getId(), "somelongenoughrandomstring", apiKeyDoc.type);
         future = new PlainActionFuture<>();
         service.validateApiKeyCredentials(creds.getId(), apiKeyDoc, creds, Clock.systemUTC(), future);
         result = future.actionGet();
@@ -1220,6 +1231,7 @@ public class ApiKeyServiceTests extends ESTestCase {
 
         Map<String, Object> sourceMap = buildApiKeySourceDoc(hash);
         final Object metadata = sourceMap.get("metadata_flattened");
+        final ApiKey.Type type = parseTypeFromSourceMap(sourceMap);
 
         ApiKeyService realService = createApiKeyService(Settings.EMPTY);
         ApiKeyService service = Mockito.spy(realService);
@@ -1242,13 +1254,7 @@ public class ApiKeyServiceTests extends ESTestCase {
 
         // This needs to be done in another thread, because we need it to not complete until we say so, but it should not block this test
         this.threadPool.generic()
-            .execute(
-                () -> service.tryAuthenticate(
-                    threadPool.getThreadContext(),
-                    new ApiKeyCredentials(apiKeyId, new SecureString(apiKey.toCharArray())),
-                    future1
-                )
-            );
+            .execute(() -> service.tryAuthenticate(threadPool.getThreadContext(), getApiKeyCredentials(apiKeyId, apiKey, type), future1));
 
         // Wait for the first credential validation to get to the blocked state
         assertBusy(() -> assertThat(hashCounter.get(), equalTo(1)));
@@ -1260,11 +1266,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         // The second authentication should pass (but not immediately, but will not block)
         PlainActionFuture<AuthenticationResult<User>> future2 = new PlainActionFuture<>();
 
-        service.tryAuthenticate(
-            threadPool.getThreadContext(),
-            new ApiKeyCredentials(apiKeyId, new SecureString(apiKey.toCharArray())),
-            future2
-        );
+        service.tryAuthenticate(threadPool.getThreadContext(), getApiKeyCredentials(apiKeyId, apiKey, type), future2);
 
         assertThat(hashCounter.get(), equalTo(1));
         if (future2.isDone()) {
@@ -1296,7 +1298,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         ApiKeyDoc apiKeyDoc = buildApiKeyDoc(hash, -1, false);
 
         ApiKeyService service = createApiKeyService(settings);
-        ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey.toCharArray()));
+        ApiKeyCredentials creds = getApiKeyCredentials(randomAlphaOfLength(12), apiKey, apiKeyDoc.type);
         PlainActionFuture<AuthenticationResult<User>> future = new PlainActionFuture<>();
         service.validateApiKeyCredentials(creds.getId(), apiKeyDoc, creds, Clock.systemUTC(), future);
         AuthenticationResult<User> result = future.actionGet();
@@ -1317,7 +1319,7 @@ public class ApiKeyServiceTests extends ESTestCase {
 
         ApiKeyService service = createApiKeyService(settings);
 
-        ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey.toCharArray()));
+        ApiKeyCredentials creds = getApiKeyCredentials(randomAlphaOfLength(12), apiKey, apiKeyDoc.type);
         PlainActionFuture<AuthenticationResult<User>> future = new PlainActionFuture<>();
         service.validateApiKeyCredentials(creds.getId(), apiKeyDoc, creds, Clock.systemUTC(), future);
         AuthenticationResult<User> result = future.actionGet();
@@ -1337,16 +1339,18 @@ public class ApiKeyServiceTests extends ESTestCase {
         // 1. A new API key document will be cached after its authentication
         final String docId = randomAlphaOfLength(16);
         final String apiKey = randomAlphaOfLength(16);
-        ApiKeyCredentials apiKeyCredentials = new ApiKeyCredentials(docId, new SecureString(apiKey.toCharArray()));
+        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
+        ApiKeyCredentials apiKeyCredentials = getApiKeyCredentials(docId, apiKey, type);
         final Map<String, Object> metadata = mockKeyDocument(
-            service,
             docId,
             apiKey,
             new User("hulk", "superuser"),
             null,
             false,
             Duration.ofSeconds(3600),
-            null
+            null,
+            type
+
         );
         PlainActionFuture<AuthenticationResult<User>> future = new PlainActionFuture<>();
         service.loadApiKeyAndValidateCredentials(threadContext, apiKeyCredentials, future);
@@ -1371,20 +1375,22 @@ public class ApiKeyServiceTests extends ESTestCase {
         } else {
             assertThat(cachedApiKeyDoc.metadataFlattened, equalTo(XContentTestUtils.convertToXContent(metadata, XContentType.JSON)));
         }
+        assertThat(cachedApiKeyDoc.type, is(type));
 
         // 2. A different API Key with the same role descriptors will share the entries in the role descriptor cache
         final String docId2 = randomAlphaOfLength(16);
         final String apiKey2 = randomAlphaOfLength(16);
-        ApiKeyCredentials apiKeyCredentials2 = new ApiKeyCredentials(docId2, new SecureString(apiKey2.toCharArray()));
+        final ApiKey.Type type2 = randomFrom(ApiKey.Type.values());
+        ApiKeyCredentials apiKeyCredentials2 = getApiKeyCredentials(docId2, apiKey2, type2);
         final Map<String, Object> metadata2 = mockKeyDocument(
-            service,
             docId2,
             apiKey2,
             new User("thor", "superuser"),
             null,
             false,
             Duration.ofSeconds(3600),
-            null
+            null,
+            type2
         );
         PlainActionFuture<AuthenticationResult<User>> future2 = new PlainActionFuture<>();
         service.loadApiKeyAndValidateCredentials(threadContext, apiKeyCredentials2, future2);
@@ -1401,23 +1407,25 @@ public class ApiKeyServiceTests extends ESTestCase {
         } else {
             assertThat(cachedApiKeyDoc2.metadataFlattened, equalTo(XContentTestUtils.convertToXContent(metadata2, XContentType.JSON)));
         }
+        assertThat(cachedApiKeyDoc2.type, is(type2));
 
         // 3. Different role descriptors will be cached into a separate entry
         final String docId3 = randomAlphaOfLength(16);
         final String apiKey3 = randomAlphaOfLength(16);
-        ApiKeyCredentials apiKeyCredentials3 = new ApiKeyCredentials(docId3, new SecureString(apiKey3.toCharArray()));
+        final ApiKey.Type type3 = randomFrom(ApiKey.Type.values());
+        ApiKeyCredentials apiKeyCredentials3 = getApiKeyCredentials(docId3, apiKey3, type3);
         final List<RoleDescriptor> keyRoles = List.of(
             RoleDescriptor.parse("key-role", new BytesArray("{\"cluster\":[\"monitor\"]}"), true, XContentType.JSON)
         );
         final Map<String, Object> metadata3 = mockKeyDocument(
-            service,
             docId3,
             apiKey3,
             new User("banner", "superuser"),
             null,
             false,
             Duration.ofSeconds(3600),
-            keyRoles
+            keyRoles,
+            type3
         );
         PlainActionFuture<AuthenticationResult<User>> future3 = new PlainActionFuture<>();
         service.loadApiKeyAndValidateCredentials(threadContext, apiKeyCredentials3, future3);
@@ -1438,42 +1446,37 @@ public class ApiKeyServiceTests extends ESTestCase {
         } else {
             assertThat(cachedApiKeyDoc3.metadataFlattened, equalTo(XContentTestUtils.convertToXContent(metadata3, XContentType.JSON)));
         }
+        assertThat(cachedApiKeyDoc3.type, is(type3));
 
         // 4. Will fetch document from security index if role descriptors are not found even when
         // cachedApiKeyDoc is available
         service.getRoleDescriptorsBytesCache().invalidateAll();
         final Map<String, Object> metadata4 = mockKeyDocument(
-            service,
             docId,
             apiKey,
             new User("hulk", "superuser"),
             null,
             false,
             Duration.ofSeconds(3600),
-            null
+            null,
+            type
         );
         PlainActionFuture<AuthenticationResult<User>> future4 = new PlainActionFuture<>();
-        service.loadApiKeyAndValidateCredentials(
-            threadContext,
-            new ApiKeyCredentials(docId, new SecureString(apiKey.toCharArray())),
-            future4
-        );
+        service.loadApiKeyAndValidateCredentials(threadContext, getApiKeyCredentials(docId, apiKey, type), future4);
         verify(client, times(4)).get(any(GetRequest.class), anyActionListener());
         assertEquals(2, service.getRoleDescriptorsBytesCache().count());
         final AuthenticationResult<User> authResult4 = future4.get();
         assertSame(AuthenticationResult.Status.SUCCESS, authResult4.getStatus());
+        assertThat(authResult4.getMetadata().get(API_KEY_TYPE_KEY), is(type.value()));
         checkAuthApiKeyMetadata(metadata4, authResult4);
 
         // 5. Cached entries will be used for the same API key doc
         SecurityMocks.mockGetRequestException(client, new EsRejectedExecutionException("rejected"));
         PlainActionFuture<AuthenticationResult<User>> future5 = new PlainActionFuture<>();
-        service.loadApiKeyAndValidateCredentials(
-            threadContext,
-            new ApiKeyCredentials(docId, new SecureString(apiKey.toCharArray())),
-            future5
-        );
+        service.loadApiKeyAndValidateCredentials(threadContext, getApiKeyCredentials(docId, apiKey, type), future5);
         final AuthenticationResult<User> authResult5 = future5.get();
         assertSame(AuthenticationResult.Status.SUCCESS, authResult5.getStatus());
+        assertThat(authResult5.getMetadata().get(API_KEY_TYPE_KEY), is(type.value()));
         checkAuthApiKeyMetadata(metadata4, authResult5);
     }
 
@@ -1482,7 +1485,8 @@ public class ApiKeyServiceTests extends ESTestCase {
         final ThreadContext threadContext = threadPool.getThreadContext();
         final String docId = randomAlphaOfLength(16);
         final String apiKey = randomAlphaOfLength(16);
-        ApiKeyCredentials apiKeyCredentials = new ApiKeyCredentials(docId, new SecureString(apiKey.toCharArray()));
+        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
+        ApiKeyCredentials apiKeyCredentials = getApiKeyCredentials(docId, apiKey, type);
         service.getApiKeyAuthCache().put(docId, new ListenableFuture<>());
         assertNotNull(service.getApiKeyAuthCache().get(docId));
         SecurityMocks.mockGetRequest(
@@ -1648,7 +1652,7 @@ public class ApiKeyServiceTests extends ESTestCase {
 
     public void testAuthWillTerminateIfGetThreadPoolIsSaturated() throws ExecutionException, InterruptedException {
         final String apiKey = randomAlphaOfLength(16);
-        final ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey.toCharArray()));
+        final ApiKeyCredentials creds = getApiKeyCredentials(randomAlphaOfLength(12), apiKey, randomFrom(ApiKey.Type.values()));
         SecurityMocks.mockGetRequestException(client, new EsRejectedExecutionException("rejected"));
         ApiKeyService service = createApiKeyService(Settings.EMPTY);
         final PlainActionFuture<AuthenticationResult<User>> future = new PlainActionFuture<>();
@@ -1660,11 +1664,12 @@ public class ApiKeyServiceTests extends ESTestCase {
 
     public void testAuthWillTerminateIfHashingThreadPoolIsSaturated() throws IOException, ExecutionException, InterruptedException {
         final String apiKey = randomAlphaOfLength(16);
-        final ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey.toCharArray()));
 
         Hasher hasher = getFastStoredHashAlgoForTests();
         final char[] hash = hasher.hash(new SecureString(apiKey.toCharArray()));
         Map<String, Object> sourceMap = buildApiKeySourceDoc(hash);
+        final ApiKey.Type type = parseTypeFromSourceMap(sourceMap);
+        final ApiKeyCredentials creds = getApiKeyCredentials(randomAlphaOfLength(12), apiKey, type);
         mockSourceDocument(creds.getId(), sourceMap);
         final ExecutorService mockExecutorService = mock(ExecutorService.class);
         when(threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME)).thenReturn(mockExecutorService);
@@ -1704,7 +1709,6 @@ public class ApiKeyServiceTests extends ESTestCase {
     public void testCachedApiKeyValidationWillNotBeBlockedByUnCachedApiKey() throws IOException, ExecutionException, InterruptedException {
         final String apiKeyId1 = randomAlphaOfLength(12);
         final String apiKey1 = randomAlphaOfLength(16);
-        final ApiKeyCredentials creds = new ApiKeyCredentials(apiKeyId1, new SecureString(apiKey1.toCharArray()));
 
         Hasher hasher = getFastStoredHashAlgoForTests();
         final char[] hash = hasher.hash(new SecureString(apiKey1.toCharArray()));
@@ -1714,6 +1718,7 @@ public class ApiKeyServiceTests extends ESTestCase {
 
         // Authenticate the key once to cache it
         ApiKeyService service = createApiKeyService(Settings.EMPTY);
+        final ApiKeyCredentials creds = getApiKeyCredentials(apiKeyId1, apiKey1, parseTypeFromSourceMap(sourceMap));
         final PlainActionFuture<AuthenticationResult<User>> future = new PlainActionFuture<>();
         service.tryAuthenticate(threadPool.getThreadContext(), creds, future);
         final AuthenticationResult<User> authenticationResult = future.get();
@@ -1732,8 +1737,9 @@ public class ApiKeyServiceTests extends ESTestCase {
         // A new API key trying to connect that must go through full hash computation
         final String apiKeyId2 = randomAlphaOfLength(12);
         final String apiKey2 = randomAlphaOfLength(16);
-        final ApiKeyCredentials creds2 = new ApiKeyCredentials(apiKeyId2, new SecureString(apiKey2.toCharArray()));
-        mockSourceDocument(apiKeyId2, buildApiKeySourceDoc(hasher.hash(new SecureString(apiKey2.toCharArray()))));
+        final Map<String, Object> sourceMap2 = buildApiKeySourceDoc(hasher.hash(new SecureString(apiKey2.toCharArray())));
+        mockSourceDocument(apiKeyId2, sourceMap2);
+        final ApiKeyCredentials creds2 = getApiKeyCredentials(apiKeyId2, apiKey2, parseTypeFromSourceMap(sourceMap2));
         final PlainActionFuture<AuthenticationResult<User>> future2 = new PlainActionFuture<>();
         service.tryAuthenticate(threadPool.getThreadContext(), creds2, future2);
         final AuthenticationResult<User> authenticationResult2 = future2.get();
@@ -1745,7 +1751,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         final PlainActionFuture<AuthenticationResult<User>> future3 = new PlainActionFuture<>();
         service.tryAuthenticate(
             threadPool.getThreadContext(),
-            new ApiKeyCredentials(apiKeyId1, new SecureString(apiKey1.toCharArray())),
+            getApiKeyCredentials(apiKeyId1, apiKey1, parseTypeFromSourceMap(sourceMap)),
             future3
         );
         final AuthenticationResult<User> authenticationResult3 = future3.get();
@@ -2159,6 +2165,102 @@ public class ApiKeyServiceTests extends ESTestCase {
         );
     }
 
+    public void testAuthenticationFailureWithApiKeyTypeMismatch() throws Exception {
+        final Settings settings = Settings.builder().put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true).build();
+        final ApiKeyService service = spy(createApiKeyService(settings));
+
+        final String id = randomAlphaOfLength(12);
+        final String key = randomAlphaOfLength(16);
+        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
+        mockKeyDocument(id, key, new User("hulk", "superuser"), null, false, Duration.ofSeconds(3600), null, type);
+
+        final ApiKey.Type expectedType = randomValueOtherThan(type, () -> randomFrom(ApiKey.Type.values()));
+        final AuthenticationResult<User> auth = tryAuthenticate(service, id, key, expectedType);
+        assertThat(auth.getStatus(), is(AuthenticationResult.Status.TERMINATE));
+        assertThat(auth.getValue(), nullValue());
+        assertThat(
+            auth.getMessage(),
+            containsString(
+                "authentication expected API key type of ["
+                    + expectedType.value()
+                    + "], but API key ["
+                    + id
+                    + "] has type ["
+                    + type.value()
+                    + "]"
+            )
+        );
+
+        // API key type mismatch should be checked after API key secret is verified
+        verify(service).verifyKeyAgainstHash(any(), any(), anyActionListener());
+        assertThat(service.getDocCache().keys(), contains(id));
+        assertThat(service.getApiKeyAuthCache().keys(), contains(id));
+    }
+
+    public void testValidateApiKeyTypeAndExpiration() throws IOException {
+        final var apiKeyId = randomAlphaOfLength(12);
+        final var apiKey = randomAlphaOfLength(16);
+        final var hasher = getFastStoredHashAlgoForTests();
+        final char[] hash = hasher.hash(new SecureString(apiKey.toCharArray()));
+
+        final long futureTime = Instant.now().plus(7, ChronoUnit.DAYS).toEpochMilli();
+        final long pastTime = Instant.now().plus(-7, ChronoUnit.DAYS).toEpochMilli();
+
+        // Wrong API key type
+        final var apiKeyDoc1 = buildApiKeyDoc(
+            hash,
+            randomFrom(-1L, futureTime),
+            false,
+            randomAlphaOfLengthBetween(3, 8),
+            Version.CURRENT.id
+        );
+        final ApiKey.Type expectedType1 = randomValueOtherThan(apiKeyDoc1.type, () -> randomFrom(ApiKey.Type.values()));
+        final ApiKeyCredentials apiKeyCredentials1 = getApiKeyCredentials(apiKeyId, apiKey, expectedType1);
+        final PlainActionFuture<AuthenticationResult<User>> future1 = new PlainActionFuture<>();
+        ApiKeyService.validateApiKeyTypeAndExpiration(apiKeyDoc1, apiKeyCredentials1, clock, future1);
+        final AuthenticationResult<User> auth1 = future1.actionGet();
+        assertThat(auth1.getStatus(), is(AuthenticationResult.Status.TERMINATE));
+        assertThat(auth1.getValue(), nullValue());
+        assertThat(
+            auth1.getMessage(),
+            containsString(
+                "authentication expected API key type of ["
+                    + expectedType1.value()
+                    + "], but API key ["
+                    + apiKeyId
+                    + "] has type ["
+                    + apiKeyDoc1.type.value()
+                    + "]"
+            )
+        );
+
+        // Expired API key
+        final var apiKeyDoc2 = buildApiKeyDoc(hash, pastTime, false, randomAlphaOfLengthBetween(3, 8), Version.CURRENT.id);
+        final ApiKeyCredentials apiKeyCredentials2 = getApiKeyCredentials(apiKeyId, apiKey, apiKeyDoc2.type);
+        final PlainActionFuture<AuthenticationResult<User>> future2 = new PlainActionFuture<>();
+        ApiKeyService.validateApiKeyTypeAndExpiration(apiKeyDoc2, apiKeyCredentials2, clock, future2);
+        final AuthenticationResult<User> auth2 = future2.actionGet();
+        assertThat(auth2.getStatus(), is(AuthenticationResult.Status.CONTINUE));
+        assertThat(auth2.getValue(), nullValue());
+        assertThat(auth2.getMessage(), containsString("api key is expired"));
+
+        // Good API key
+        final var apiKeyDoc3 = buildApiKeyDoc(
+            hash,
+            randomFrom(-1L, futureTime),
+            false,
+            randomAlphaOfLengthBetween(3, 8),
+            Version.CURRENT.id
+        );
+        final ApiKeyCredentials apiKeyCredentials3 = getApiKeyCredentials(apiKeyId, apiKey, apiKeyDoc3.type);
+        final PlainActionFuture<AuthenticationResult<User>> future3 = new PlainActionFuture<>();
+        ApiKeyService.validateApiKeyTypeAndExpiration(apiKeyDoc3, apiKeyCredentials3, clock, future3);
+        final AuthenticationResult<User> auth3 = future3.actionGet();
+        assertThat(auth3.getStatus(), is(AuthenticationResult.Status.SUCCESS));
+        assertThat(auth3.getValue(), notNullValue());
+        assertThat(auth3.getMetadata(), hasEntry(API_KEY_TYPE_KEY, apiKeyDoc3.type.value()));
+    }
+
     private static RoleDescriptor randomRoleDescriptorWithRemoteIndexPrivileges() {
         return new RoleDescriptor(
             randomAlphaOfLengthBetween(3, 90),
@@ -2192,7 +2294,7 @@ public class ApiKeyServiceTests extends ESTestCase {
                 Instant.now(),
                 Instant.now().plus(Duration.ofSeconds(3600)),
                 keyRoles,
-                randomFrom(ApiKey.Type.values()),
+                ApiKey.Type.REST,
                 Version.CURRENT,
                 randomBoolean() ? null : Map.of(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8))
             );
@@ -2205,9 +2307,9 @@ public class ApiKeyServiceTests extends ESTestCase {
                 )
             );
             PlainActionFuture<AuthenticationResult<User>> authenticationResultFuture = PlainActionFuture.newFuture();
-            ApiKeyService.validateApiKeyExpiration(
+            ApiKeyService.validateApiKeyTypeAndExpiration(
                 apiKeyDoc,
-                new ApiKeyService.ApiKeyCredentials("id", new SecureString(randomAlphaOfLength(16).toCharArray())),
+                new ApiKeyService.ApiKeyCredentials("id", new SecureString(randomAlphaOfLength(16).toCharArray()), ApiKey.Type.REST),
                 Clock.systemUTC(),
                 authenticationResultFuture
             );
@@ -2291,6 +2393,9 @@ public class ApiKeyServiceTests extends ESTestCase {
     private Map<String, Object> buildApiKeySourceDoc(char[] hash) {
         Map<String, Object> sourceMap = new HashMap<>();
         sourceMap.put("doc_type", "api_key");
+        if (randomBoolean()) {
+            sourceMap.put("type", randomFrom(ApiKey.Type.values()).value());
+        }
         sourceMap.put("creation_time", Clock.systemUTC().instant().toEpochMilli());
         sourceMap.put("expiration_time", -1);
         sourceMap.put("api_key_hash", new String(hash));
@@ -2313,15 +2418,6 @@ public class ApiKeyServiceTests extends ESTestCase {
         return sourceMap;
     }
 
-    private void writeCredentialsToThreadContext(ApiKeyCredentials creds) {
-        final String credentialString = creds.getId() + ":" + creds.getKey();
-        this.threadPool.getThreadContext()
-            .putHeader(
-                "Authorization",
-                "ApiKey " + Base64.getEncoder().encodeToString(credentialString.getBytes(StandardCharsets.US_ASCII))
-            );
-    }
-
     private void mockSourceDocument(String id, Map<String, Object> sourceMap) throws IOException {
         try (XContentBuilder builder = JsonXContent.contentBuilder()) {
             builder.map(sourceMap);
@@ -2341,6 +2437,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         final BytesReference metadataBytes = XContentTestUtils.convertToXContent(ApiKeyTests.randomMetadata(), XContentType.JSON);
         return new ApiKeyDoc(
             "api_key",
+            randomBoolean() ? randomFrom(ApiKey.Type.values()) : null,
             Clock.systemUTC().instant().toEpochMilli(),
             expirationTime,
             invalidated,
@@ -2382,4 +2479,16 @@ public class ApiKeyServiceTests extends ESTestCase {
     private RoleReference.ApiKeyRoleType randomApiKeyRoleType() {
         return randomFrom(RoleReference.ApiKeyRoleType.values());
     }
+
+    private ApiKeyCredentials getApiKeyCredentials(String id, String key, ApiKey.Type type) {
+        return new ApiKeyCredentials(id, new SecureString(key.toCharArray()), type);
+    }
+
+    private ApiKey.Type parseTypeFromSourceMap(Map<String, Object> sourceMap) {
+        if (sourceMap.containsKey("type")) {
+            return ApiKey.Type.parse((String) sourceMap.get("type"));
+        } else {
+            return ApiKey.Type.REST;
+        }
+    }
 }

+ 29 - 8
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticatorChainTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockLogAppender;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationResult;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
@@ -39,6 +40,7 @@ import org.junit.Before;
 import java.io.IOException;
 import java.util.List;
 
+import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
@@ -47,6 +49,7 @@ import static org.hamcrest.Matchers.not;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -181,9 +184,18 @@ public class AuthenticatorChainTests extends ESTestCase {
 
     public void testAuthenticateWithApiKey() throws IOException {
         final Authenticator.Context context = createAuthenticatorContext();
-        when(apiKeyAuthenticator.extractCredentials(context)).thenReturn(
-            new ApiKeyCredentials(randomAlphaOfLength(20), new SecureString(randomAlphaOfLength(22).toCharArray()))
-        );
+        final String apiKeyId = randomAlphaOfLength(20);
+        final SecureString apiKeySecret = new SecureString(randomAlphaOfLength(22).toCharArray());
+        final boolean shouldExtractCredentials = randomBoolean();
+        if (shouldExtractCredentials) {
+            when(apiKeyAuthenticator.extractCredentials(context)).thenReturn(
+                new ApiKeyCredentials(apiKeyId, apiKeySecret, ApiKey.Type.REST)
+            );
+        } else {
+            context.addAuthenticationToken(new ApiKeyCredentials(apiKeyId, apiKeySecret, randomFrom(ApiKey.Type.values())));
+            doCallRealMethod().when(serviceAccountAuthenticator).authenticate(eq(context), anyActionListener());
+            doCallRealMethod().when(oAuth2TokenAuthenticator).authenticate(eq(context), anyActionListener());
+        }
         doAnswer(invocationOnMock -> {
             @SuppressWarnings("unchecked")
             final ActionListener<AuthenticationResult<Authentication>> listener = (ActionListener<
@@ -195,10 +207,19 @@ public class AuthenticatorChainTests extends ESTestCase {
         final PlainActionFuture<Authentication> future = new PlainActionFuture<>();
         authenticatorChain.authenticateAsync(context, future);
         assertThat(future.actionGet(), is(authentication));
-        verify(serviceAccountAuthenticator).extractCredentials(eq(context));
-        verify(serviceAccountAuthenticator, never()).authenticate(eq(context), any());
-        verify(oAuth2TokenAuthenticator).extractCredentials(eq(context));
-        verify(oAuth2TokenAuthenticator, never()).authenticate(eq(context), any());
+
+        if (shouldExtractCredentials) {
+            verify(serviceAccountAuthenticator).extractCredentials(eq(context));
+            verify(serviceAccountAuthenticator, never()).authenticate(eq(context), any());
+            verify(oAuth2TokenAuthenticator).extractCredentials(eq(context));
+            verify(oAuth2TokenAuthenticator, never()).authenticate(eq(context), any());
+        } else {
+            verify(serviceAccountAuthenticator, never()).extractCredentials(eq(context));
+            verify(serviceAccountAuthenticator).authenticate(eq(context), any());
+            verify(oAuth2TokenAuthenticator, never()).extractCredentials(eq(context));
+            verify(oAuth2TokenAuthenticator).authenticate(eq(context), any());
+        }
+
         verifyNoMoreInteractions(realmsAuthenticator);
         verify(authenticationContextSerializer).writeToContext(eq(authentication), any());
         verify(operatorPrivilegesService).maybeMarkOperatorUser(eq(authentication), any());
@@ -261,7 +282,7 @@ public class AuthenticatorChainTests extends ESTestCase {
         threadContext.putHeader("Authorization", unsuccessfulApiKey ? "ApiKey key_id:key_secret" : "Bearer some_token_value");
         if (unsuccessfulApiKey) {
             when(apiKeyAuthenticator.extractCredentials(context)).thenReturn(
-                new ApiKeyCredentials(randomAlphaOfLength(20), new SecureString(randomAlphaOfLength(22).toCharArray()))
+                new ApiKeyCredentials(randomAlphaOfLength(20), new SecureString(randomAlphaOfLength(22).toCharArray()), ApiKey.Type.REST)
             );
             doAnswer(invocationOnMock -> {
                 @SuppressWarnings("unchecked")

+ 13 - 20
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java

@@ -2072,25 +2072,18 @@ public class CompositeRolesStoreTests extends ESTestCase {
             effectiveRoleDescriptors::set
         );
         AuditUtil.getOrGenerateRequestId(threadContext);
-        final TransportVersion version = TransportVersion.CURRENT;
-        final String apiKeyRoleName = "user_role_" + randomAlphaOfLength(4);
-        final Authentication apiKeyAuthentication = createApiKeyAuthentication(
-            apiKeyService,
-            randomValueOtherThanMany(
-                authc -> authc.getAuthenticationType() == AuthenticationType.API_KEY,
-                () -> AuthenticationTestHelper.builder().build()
-            ),
-            Collections.singleton(
-                new RoleDescriptor(
-                    apiKeyRoleName,
-                    null,
-                    new IndicesPrivileges[] { IndicesPrivileges.builder().indices("index*").privileges("all").build() },
-                    null
-                )
-            ),
-            null,
-            version
-        );
+        final Authentication apiKeyAuthentication = AuthenticationTestHelper.builder()
+            .crossClusterApiKey(randomAlphaOfLength(20))
+            .metadata(Map.of(API_KEY_ROLE_DESCRIPTORS_KEY, new BytesArray("""
+                {
+                  "cross_cluster": {
+                    "cluster": ["cross_cluster_search"],
+                    "indices": [
+                      { "names":["index*"], "privileges":["read","read_cross_cluster","view_index_metadata"] }
+                    ]
+                  }
+                }""")))
+            .build(false);
         final boolean emptyRemoteRole = randomBoolean();
         Authentication authentication = apiKeyAuthentication.toCrossClusterAccess(
             AuthenticationTestHelper.randomCrossClusterAccessSubjectInfo(
@@ -2129,7 +2122,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
 
         verify(apiKeyService, times(1)).parseRoleDescriptorsBytes(anyString(), any(BytesReference.class), any());
         assertThat(role.names().length, is(1));
-        assertThat(role.names()[0], equalTo(apiKeyRoleName));
+        assertThat(role.names()[0], equalTo("cross_cluster"));
 
         // Smoke-test for authorization
         final Metadata indexMetadata = Metadata.builder()

+ 6 - 4
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/crossclusteraccess/CrossClusterAccessAuthenticationServiceIntegTests.java

@@ -16,8 +16,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.test.SecurityIntegTestCase;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.xpack.core.security.SecurityContext;
-import org.elasticsearch.xpack.core.security.action.apikey.CreateApiKeyRequestBuilder;
 import org.elasticsearch.xpack.core.security.action.apikey.CreateApiKeyResponse;
+import org.elasticsearch.xpack.core.security.action.apikey.CreateCrossClusterApiKeyAction;
+import org.elasticsearch.xpack.core.security.action.apikey.CreateCrossClusterApiKeyRequest;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationTestHelper;
 import org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo;
@@ -138,9 +139,10 @@ public class CrossClusterAccessAuthenticationServiceIntegTests extends SecurityI
         }
     }
 
-    private String getEncodedCrossClusterAccessApiKey() {
-        final CreateApiKeyResponse response = new CreateApiKeyRequestBuilder(client().admin().cluster()).setName("cross_cluster_access_key")
-            .get();
+    private String getEncodedCrossClusterAccessApiKey() throws IOException {
+        final CreateCrossClusterApiKeyRequest request = CreateCrossClusterApiKeyRequest.withNameAndAccess("cross_cluster_access_key", """
+            {"search": [{"names": ["*"]}]}""");
+        final CreateApiKeyResponse response = client().execute(CreateCrossClusterApiKeyAction.INSTANCE, request).actionGet();
         return ApiKeyService.withApiKeyPrefix(
             Base64.getEncoder().encodeToString((response.getId() + ":" + response.getKey()).getBytes(StandardCharsets.UTF_8))
         );