1
0
Эх сурвалжийг харах

Limited-by role descriptors in Get/QueryApiKey response (#89273)

An API key's effective permission is an intersection between its
assigned role descriptors and a snapshot of its owner user's role
descriptors (limited-by role descriptors). In #89166, the assigned role
descriptors are now returned by default in Get/Query API key responses.

This PR further adds support to optionally return limited-by role
descriptors in the responses. Unlike assign role descriptors, an API key
cannot view any limited-by role descriptors unless it has manage_api_key
or higher privileges.

Relates: #89058
Yang Wang 3 жил өмнө
parent
commit
8dfbcd5c02
26 өөрчлөгдсөн 1126 нэмэгдсэн , 186 устгасан
  1. 5 0
      docs/changelog/89273.yaml
  2. 60 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKey.java
  3. 88 15
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyRequest.java
  4. 18 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyRequest.java
  5. 68 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorsIntersection.java
  6. 8 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ManageOwnApiKeyClusterPrivilege.java
  7. 20 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/test/SecuritySettingsSourceField.java
  8. 34 5
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKeyTests.java
  9. 9 7
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyRequestTests.java
  10. 93 9
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyResponseTests.java
  11. 9 4
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyRequestTests.java
  12. 12 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyResponseTests.java
  13. 104 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorsIntersectionTests.java
  14. 64 14
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/ManageOwnApiKeyClusterPrivilegeTests.java
  15. 2 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java
  16. 25 0
      x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/apikey/ApiKeyRestIT.java
  17. 394 88
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java
  18. 2 3
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/apikey/TransportGetApiKeyAction.java
  19. 1 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/apikey/TransportQueryApiKeyAction.java
  20. 18 6
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java
  21. 2 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java
  22. 10 2
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestGetApiKeyAction.java
  23. 27 12
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestQueryApiKeyAction.java
  24. 9 2
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java
  25. 9 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java
  26. 35 4
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/apikey/RestGetApiKeyActionTests.java

+ 5 - 0
docs/changelog/89273.yaml

@@ -0,0 +1,5 @@
+pr: 89273
+summary: Limited-by role descriptors in Get/QueryApiKey response
+area: Security
+type: enhancement
+issues: []

+ 60 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKey.java

@@ -13,17 +13,20 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
+import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+import org.elasticsearch.xpack.core.security.authz.RoleDescriptorsIntersection;
 
 import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
 import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
@@ -43,6 +46,8 @@ public final class ApiKey implements ToXContentObject, Writeable {
     private final Map<String, Object> metadata;
     @Nullable
     private final List<RoleDescriptor> roleDescriptors;
+    @Nullable
+    private final RoleDescriptorsIntersection limitedBy;
 
     public ApiKey(
         String name,
@@ -53,7 +58,34 @@ public final class ApiKey implements ToXContentObject, Writeable {
         String username,
         String realm,
         @Nullable Map<String, Object> metadata,
-        @Nullable List<RoleDescriptor> roleDescriptors
+        @Nullable List<RoleDescriptor> roleDescriptors,
+        @Nullable List<RoleDescriptor> limitedByRoleDescriptors
+    ) {
+        this(
+            name,
+            id,
+            creation,
+            expiration,
+            invalidated,
+            username,
+            realm,
+            metadata,
+            roleDescriptors,
+            limitedByRoleDescriptors == null ? null : new RoleDescriptorsIntersection(List.of(Set.copyOf(limitedByRoleDescriptors)))
+        );
+    }
+
+    private ApiKey(
+        String name,
+        String id,
+        Instant creation,
+        Instant expiration,
+        boolean invalidated,
+        String username,
+        String realm,
+        @Nullable Map<String, Object> metadata,
+        @Nullable List<RoleDescriptor> roleDescriptors,
+        @Nullable RoleDescriptorsIntersection limitedBy
     ) {
         this.name = name;
         this.id = id;
@@ -66,7 +98,10 @@ public final class ApiKey implements ToXContentObject, Writeable {
         this.username = username;
         this.realm = realm;
         this.metadata = metadata == null ? Map.of() : metadata;
-        this.roleDescriptors = roleDescriptors;
+        this.roleDescriptors = roleDescriptors != null ? List.copyOf(roleDescriptors) : null;
+        // This assertion will need to be changed (or removed) when derived keys are properly supported
+        assert limitedBy == null || limitedBy.roleDescriptorsList().size() == 1 : "can only have one set of limited-by role descriptors";
+        this.limitedBy = limitedBy;
     }
 
     public ApiKey(StreamInput in) throws IOException {
@@ -89,8 +124,10 @@ public final class ApiKey implements ToXContentObject, Writeable {
         if (in.getVersion().onOrAfter(Version.V_8_5_0)) {
             final List<RoleDescriptor> roleDescriptors = in.readOptionalList(RoleDescriptor::new);
             this.roleDescriptors = roleDescriptors != null ? List.copyOf(roleDescriptors) : null;
+            this.limitedBy = in.readOptionalWriteable(RoleDescriptorsIntersection::new);
         } else {
             this.roleDescriptors = null;
+            this.limitedBy = null;
         }
     }
 
@@ -130,6 +167,10 @@ public final class ApiKey implements ToXContentObject, Writeable {
         return roleDescriptors;
     }
 
+    public RoleDescriptorsIntersection getLimitedBy() {
+        return limitedBy;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
@@ -153,6 +194,9 @@ public final class ApiKey implements ToXContentObject, Writeable {
             }
             builder.endObject();
         }
+        if (limitedBy != null) {
+            builder.field("limited_by", limitedBy);
+        }
         return builder;
     }
 
@@ -174,12 +218,13 @@ public final class ApiKey implements ToXContentObject, Writeable {
         }
         if (out.getVersion().onOrAfter(Version.V_8_5_0)) {
             out.writeOptionalCollection(roleDescriptors);
+            out.writeOptionalWriteable(limitedBy);
         }
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, id, creation, expiration, invalidated, username, realm, metadata, roleDescriptors);
+        return Objects.hash(name, id, creation, expiration, invalidated, username, realm, metadata, roleDescriptors, limitedBy);
     }
 
     @Override
@@ -202,7 +247,8 @@ public final class ApiKey implements ToXContentObject, Writeable {
             && Objects.equals(username, other.username)
             && Objects.equals(realm, other.realm)
             && Objects.equals(metadata, other.metadata)
-            && Objects.equals(roleDescriptors, other.roleDescriptors);
+            && Objects.equals(roleDescriptors, other.roleDescriptors)
+            && Objects.equals(limitedBy, other.limitedBy);
     }
 
     @SuppressWarnings("unchecked")
@@ -216,7 +262,8 @@ public final class ApiKey implements ToXContentObject, Writeable {
             (String) args[5],
             (String) args[6],
             (args[7] == null) ? null : (Map<String, Object>) args[7],
-            (List<RoleDescriptor>) args[8]
+            (List<RoleDescriptor>) args[8],
+            (RoleDescriptorsIntersection) args[9]
         );
     });
     static {
@@ -232,6 +279,12 @@ public final class ApiKey implements ToXContentObject, Writeable {
             p.nextToken();
             return RoleDescriptor.parse(n, p, false);
         }, new ParseField("role_descriptors"));
+        PARSER.declareField(
+            optionalConstructorArg(),
+            (p, c) -> RoleDescriptorsIntersection.fromXContent(p),
+            new ParseField("limited_by"),
+            ObjectParser.ValueType.OBJECT_ARRAY
+        );
     }
 
     public static ApiKey fromXContent(XContentParser parser) throws IOException {
@@ -258,6 +311,8 @@ public final class ApiKey implements ToXContentObject, Writeable {
             + metadata
             + ", role_descriptors="
             + roleDescriptors
+            + ", limited_by="
+            + limitedBy
             + "]";
     }
 

+ 88 - 15
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyRequest.java

@@ -30,10 +30,7 @@ public final class GetApiKeyRequest extends ActionRequest {
     private final String apiKeyId;
     private final String apiKeyName;
     private final boolean ownedByAuthenticatedUser;
-
-    public GetApiKeyRequest() {
-        this(null, null, null, null, false);
-    }
+    private final boolean withLimitedBy;
 
     public GetApiKeyRequest(StreamInput in) throws IOException {
         super(in);
@@ -46,20 +43,27 @@ public final class GetApiKeyRequest extends ActionRequest {
         } else {
             ownedByAuthenticatedUser = false;
         }
+        if (in.getVersion().onOrAfter(Version.V_8_5_0)) {
+            withLimitedBy = in.readBoolean();
+        } else {
+            withLimitedBy = false;
+        }
     }
 
-    public GetApiKeyRequest(
+    private GetApiKeyRequest(
         @Nullable String realmName,
         @Nullable String userName,
         @Nullable String apiKeyId,
         @Nullable String apiKeyName,
-        boolean ownedByAuthenticatedUser
+        boolean ownedByAuthenticatedUser,
+        boolean withLimitedBy
     ) {
         this.realmName = textOrNull(realmName);
         this.userName = textOrNull(userName);
         this.apiKeyId = textOrNull(apiKeyId);
         this.apiKeyName = textOrNull(apiKeyName);
         this.ownedByAuthenticatedUser = ownedByAuthenticatedUser;
+        this.withLimitedBy = withLimitedBy;
     }
 
     private static String textOrNull(@Nullable String arg) {
@@ -86,13 +90,18 @@ public final class GetApiKeyRequest extends ActionRequest {
         return ownedByAuthenticatedUser;
     }
 
+    public boolean withLimitedBy() {
+        return withLimitedBy;
+    }
+
     /**
      * Creates get API key request for given realm name
      * @param realmName realm name
      * @return {@link GetApiKeyRequest}
      */
+    @Deprecated
     public static GetApiKeyRequest usingRealmName(String realmName) {
-        return new GetApiKeyRequest(realmName, null, null, null, false);
+        return new GetApiKeyRequest(realmName, null, null, null, false, false);
     }
 
     /**
@@ -100,8 +109,9 @@ public final class GetApiKeyRequest extends ActionRequest {
      * @param userName user name
      * @return {@link GetApiKeyRequest}
      */
+    @Deprecated
     public static GetApiKeyRequest usingUserName(String userName) {
-        return new GetApiKeyRequest(null, userName, null, null, false);
+        return new GetApiKeyRequest(null, userName, null, null, false, false);
     }
 
     /**
@@ -110,8 +120,9 @@ public final class GetApiKeyRequest extends ActionRequest {
      * @param userName user name
      * @return {@link GetApiKeyRequest}
      */
+    @Deprecated
     public static GetApiKeyRequest usingRealmAndUserName(String realmName, String userName) {
-        return new GetApiKeyRequest(realmName, userName, null, null, false);
+        return new GetApiKeyRequest(realmName, userName, null, null, false, false);
     }
 
     /**
@@ -121,8 +132,9 @@ public final class GetApiKeyRequest extends ActionRequest {
      * {@code false}
      * @return {@link GetApiKeyRequest}
      */
+    @Deprecated
     public static GetApiKeyRequest usingApiKeyId(String apiKeyId, boolean ownedByAuthenticatedUser) {
-        return new GetApiKeyRequest(null, null, apiKeyId, null, ownedByAuthenticatedUser);
+        return new GetApiKeyRequest(null, null, apiKeyId, null, ownedByAuthenticatedUser, false);
     }
 
     /**
@@ -133,21 +145,23 @@ public final class GetApiKeyRequest extends ActionRequest {
      * @return {@link GetApiKeyRequest}
      */
     public static GetApiKeyRequest usingApiKeyName(String apiKeyName, boolean ownedByAuthenticatedUser) {
-        return new GetApiKeyRequest(null, null, null, apiKeyName, ownedByAuthenticatedUser);
+        return new GetApiKeyRequest(null, null, null, apiKeyName, ownedByAuthenticatedUser, false);
     }
 
     /**
      * Creates get api key request to retrieve api key information for the api keys owned by the current authenticated user.
      */
+    @Deprecated
     public static GetApiKeyRequest forOwnedApiKeys() {
-        return new GetApiKeyRequest(null, null, null, null, true);
+        return new GetApiKeyRequest(null, null, null, null, true, false);
     }
 
     /**
      * Creates get api key request to retrieve api key information for all api keys if the authenticated user is authorized to do so.
      */
+    @Deprecated
     public static GetApiKeyRequest forAllApiKeys() {
-        return new GetApiKeyRequest();
+        return GetApiKeyRequest.builder().build();
     }
 
     @Override
@@ -185,6 +199,9 @@ public final class GetApiKeyRequest extends ActionRequest {
         if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
             out.writeOptionalBoolean(ownedByAuthenticatedUser);
         }
+        if (out.getVersion().onOrAfter(Version.V_8_5_0)) {
+            out.writeBoolean(withLimitedBy);
+        }
     }
 
     @Override
@@ -200,11 +217,67 @@ public final class GetApiKeyRequest extends ActionRequest {
             && Objects.equals(realmName, that.realmName)
             && Objects.equals(userName, that.userName)
             && Objects.equals(apiKeyId, that.apiKeyId)
-            && Objects.equals(apiKeyName, that.apiKeyName);
+            && Objects.equals(apiKeyName, that.apiKeyName)
+            && withLimitedBy == that.withLimitedBy;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(realmName, userName, apiKeyId, apiKeyName, ownedByAuthenticatedUser);
+        return Objects.hash(realmName, userName, apiKeyId, apiKeyName, ownedByAuthenticatedUser, withLimitedBy);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String realmName = null;
+        private String userName = null;
+        private String apiKeyId = null;
+        private String apiKeyName = null;
+        private boolean ownedByAuthenticatedUser = false;
+        private boolean withLimitedBy = false;
+
+        public Builder realmName(String realmName) {
+            this.realmName = realmName;
+            return this;
+        }
+
+        public Builder userName(String userName) {
+            this.userName = userName;
+            return this;
+        }
+
+        public Builder apiKeyId(String apiKeyId) {
+            this.apiKeyId = apiKeyId;
+            return this;
+        }
+
+        public Builder apiKeyName(String apiKeyName) {
+            this.apiKeyName = apiKeyName;
+            return this;
+        }
+
+        public Builder ownedByAuthenticatedUser() {
+            return ownedByAuthenticatedUser(true);
+        }
+
+        public Builder ownedByAuthenticatedUser(boolean ownedByAuthenticatedUser) {
+            this.ownedByAuthenticatedUser = ownedByAuthenticatedUser;
+            return this;
+        }
+
+        public Builder withLimitedBy() {
+            return withLimitedBy(true);
+        }
+
+        public Builder withLimitedBy(boolean withLimitedBy) {
+            this.withLimitedBy = withLimitedBy;
+            return this;
+        }
+
+        public GetApiKeyRequest build() {
+            return new GetApiKeyRequest(realmName, userName, apiKeyId, apiKeyName, ownedByAuthenticatedUser, withLimitedBy);
+        }
     }
 }

+ 18 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyRequest.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.core.security.action.apikey;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -33,6 +34,7 @@ public final class QueryApiKeyRequest extends ActionRequest {
     private final List<FieldSortBuilder> fieldSortBuilders;
     @Nullable
     private final SearchAfterBuilder searchAfterBuilder;
+    private final boolean withLimitedBy;
     private boolean filterForCurrentUser;
 
     public QueryApiKeyRequest() {
@@ -40,7 +42,7 @@ public final class QueryApiKeyRequest extends ActionRequest {
     }
 
     public QueryApiKeyRequest(QueryBuilder queryBuilder) {
-        this(queryBuilder, null, null, null, null);
+        this(queryBuilder, null, null, null, null, false);
     }
 
     public QueryApiKeyRequest(
@@ -48,13 +50,15 @@ public final class QueryApiKeyRequest extends ActionRequest {
         @Nullable Integer from,
         @Nullable Integer size,
         @Nullable List<FieldSortBuilder> fieldSortBuilders,
-        @Nullable SearchAfterBuilder searchAfterBuilder
+        @Nullable SearchAfterBuilder searchAfterBuilder,
+        boolean withLimitedBy
     ) {
         this.queryBuilder = queryBuilder;
         this.from = from;
         this.size = size;
         this.fieldSortBuilders = fieldSortBuilders;
         this.searchAfterBuilder = searchAfterBuilder;
+        this.withLimitedBy = withLimitedBy;
     }
 
     public QueryApiKeyRequest(StreamInput in) throws IOException {
@@ -68,6 +72,11 @@ public final class QueryApiKeyRequest extends ActionRequest {
             this.fieldSortBuilders = null;
         }
         this.searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
+        if (in.getVersion().onOrAfter(Version.V_8_5_0)) {
+            this.withLimitedBy = in.readBoolean();
+        } else {
+            this.withLimitedBy = false;
+        }
     }
 
     public QueryBuilder getQueryBuilder() {
@@ -98,6 +107,10 @@ public final class QueryApiKeyRequest extends ActionRequest {
         filterForCurrentUser = true;
     }
 
+    public boolean withLimitedBy() {
+        return withLimitedBy;
+    }
+
     @Override
     public ActionRequestValidationException validate() {
         ActionRequestValidationException validationException = null;
@@ -123,5 +136,8 @@ public final class QueryApiKeyRequest extends ActionRequest {
             out.writeList(fieldSortBuilders);
         }
         out.writeOptionalWriteable(searchAfterBuilder);
+        if (out.getVersion().onOrAfter(Version.V_8_5_0)) {
+            out.writeBoolean(withLimitedBy);
+        }
     }
 }

+ 68 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorsIntersection.java

@@ -0,0 +1,68 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.security.authz;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.XContentParserUtils;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public record RoleDescriptorsIntersection(Collection<Set<RoleDescriptor>> roleDescriptorsList) implements ToXContentObject, Writeable {
+
+    public RoleDescriptorsIntersection(StreamInput in) throws IOException {
+        this(List.copyOf(in.readList(inner -> inner.readSet(RoleDescriptor::new))));
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeCollection(roleDescriptorsList, StreamOutput::writeCollection);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startArray();
+        {
+            for (Set<RoleDescriptor> roleDescriptors : roleDescriptorsList) {
+                builder.startObject();
+                for (RoleDescriptor roleDescriptor : roleDescriptors) {
+                    builder.field(roleDescriptor.getName(), roleDescriptor);
+                }
+                builder.endObject();
+            }
+        }
+        builder.endArray();
+        return builder;
+    }
+
+    public static RoleDescriptorsIntersection fromXContent(XContentParser xContentParser) throws IOException {
+        if (xContentParser.currentToken() == null) {
+            xContentParser.nextToken();
+        }
+        final List<Set<RoleDescriptor>> roleDescriptorsList = XContentParserUtils.parseList(xContentParser, p -> {
+            XContentParser.Token token = p.currentToken();
+            XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, p);
+            final List<RoleDescriptor> roleDescriptors = new ArrayList<>();
+            while ((token = p.nextToken()) != XContentParser.Token.END_OBJECT) {
+                XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, p);
+                XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, p.nextToken(), p);
+                roleDescriptors.add(RoleDescriptor.parse(p.currentName(), p, false));
+            }
+            return Set.copyOf(roleDescriptors);
+        });
+        return new RoleDescriptorsIntersection(List.copyOf(roleDescriptorsList));
+    }
+}

+ 8 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ManageOwnApiKeyClusterPrivilege.java

@@ -70,6 +70,10 @@ public class ManageOwnApiKeyClusterPrivilege implements NamedClusterPrivilege {
                 // Ownership of an API key, for regular users, is enforced at the service layer.
                 return true;
             } else if (request instanceof final GetApiKeyRequest getApiKeyRequest) {
+                // An API key requires manage_api_key privilege or higher to view any limited-by role descriptors
+                if (authentication.isApiKey() && getApiKeyRequest.withLimitedBy()) {
+                    return false;
+                }
                 return checkIfUserIsOwnerOfApiKeys(
                     authentication,
                     getApiKeyRequest.getApiKeyId(),
@@ -100,6 +104,10 @@ public class ManageOwnApiKeyClusterPrivilege implements NamedClusterPrivilege {
                         );
                 }
             } else if (request instanceof final QueryApiKeyRequest queryApiKeyRequest) {
+                // An API key requires manage_api_key privilege or higher to view any limited-by role descriptors
+                if (authentication.isApiKey() && queryApiKeyRequest.withLimitedBy()) {
+                    return false;
+                }
                 return queryApiKeyRequest.isFilterForCurrentUser();
             } else if (request instanceof GrantApiKeyRequest) {
                 return false;

+ 20 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/test/SecuritySettingsSourceField.java

@@ -7,6 +7,9 @@
 package org.elasticsearch.test;
 
 import org.elasticsearch.common.settings.SecureString;
+import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+
+import java.util.Map;
 
 public final class SecuritySettingsSourceField {
     public static final SecureString TEST_PASSWORD_SECURE_STRING = new SecureString("x-pack-test-password".toCharArray());
@@ -22,9 +25,24 @@ public final class SecuritySettingsSourceField {
               allow_restricted_indices: true
               privileges: [ "ALL" ]
           run_as: [ "*" ]
-          # The _es_test_root role doesn't have any application privileges because that would require loading data (Application Privileges)
-          # from the security index, which can causes problems if the index is not available
+          applications:
+            - application: "*"
+              privileges: [ "*" ]
+              resources: [ "*" ]
         """;
 
+    public static final RoleDescriptor ES_TEST_ROOT_ROLE_DESCRIPTOR = new RoleDescriptor(
+        "_es_test_root",
+        new String[] { "ALL" },
+        new RoleDescriptor.IndicesPrivileges[] {
+            RoleDescriptor.IndicesPrivileges.builder().indices("*").privileges("ALL").allowRestrictedIndices(true).build() },
+        new RoleDescriptor.ApplicationResourcePrivileges[] {
+            RoleDescriptor.ApplicationResourcePrivileges.builder().application("*").privileges("*").resources("*").build() },
+        null,
+        new String[] { "*" },
+        Map.of(),
+        Map.of("enabled", true)
+    );
+
     private SecuritySettingsSourceField() {}
 }

+ 34 - 5
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKeyTests.java

@@ -7,13 +7,16 @@
 
 package org.elasticsearch.xpack.core.security.action.apikey;
 
-import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 
 import java.io.IOException;
@@ -31,6 +34,7 @@ import static org.hamcrest.Matchers.notNullValue;
 
 public class ApiKeyTests extends ESTestCase {
 
+    @SuppressWarnings("unchecked")
     public void testXContent() throws IOException {
         final String name = randomAlphaOfLengthBetween(4, 10);
         final String id = randomAlphaOfLength(20);
@@ -44,14 +48,31 @@ public class ApiKeyTests extends ESTestCase {
         final String realmName = randomAlphaOfLengthBetween(3, 8);
         final Map<String, Object> metadata = randomMetadata();
         final List<RoleDescriptor> roleDescriptors = randomBoolean() ? null : randomUniquelyNamedRoleDescriptors(0, 3);
+        final List<RoleDescriptor> limitedByRoleDescriptors = randomUniquelyNamedRoleDescriptors(0, 3);
 
-        final ApiKey apiKey = new ApiKey(name, id, creation, expiration, invalidated, username, realmName, metadata, roleDescriptors);
+        final ApiKey apiKey = new ApiKey(
+            name,
+            id,
+            creation,
+            expiration,
+            invalidated,
+            username,
+            realmName,
+            metadata,
+            roleDescriptors,
+            limitedByRoleDescriptors
+        );
         // The metadata will never be null because the constructor convert it to empty map if a null is passed in
         assertThat(apiKey.getMetadata(), notNullValue());
 
         XContentBuilder builder = XContentFactory.jsonBuilder();
         apiKey.toXContent(builder, ToXContent.EMPTY_PARAMS);
-        final Map<String, Object> map = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2();
+        final String jsonString = Strings.toString(builder);
+        final Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, jsonString, false);
+
+        try (XContentParser parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, jsonString)) {
+            assertThat(ApiKey.fromXContent(parser), equalTo(apiKey));
+        }
 
         assertThat(map.get("name"), equalTo(name));
         assertThat(map.get("id"), equalTo(id));
@@ -69,14 +90,22 @@ public class ApiKeyTests extends ESTestCase {
         if (roleDescriptors == null) {
             assertThat(map, not(hasKey("role_descriptors")));
         } else {
-            @SuppressWarnings("unchecked")
-            final Map<String, Object> rdMap = (Map<String, Object>) map.get("role_descriptors");
+            final var rdMap = (Map<String, Object>) map.get("role_descriptors");
             assertThat(rdMap.size(), equalTo(roleDescriptors.size()));
             for (var roleDescriptor : roleDescriptors) {
                 assertThat(rdMap, hasKey(roleDescriptor.getName()));
                 assertThat(XContentTestUtils.convertToMap(roleDescriptor), equalTo(rdMap.get(roleDescriptor.getName())));
             }
         }
+
+        final var limitedByList = (List<Map<String, Object>>) map.get("limited_by");
+        assertThat(limitedByList.size(), equalTo(1));
+        final Map<String, Object> limitedByMap = limitedByList.get(0);
+        assertThat(limitedByMap.size(), equalTo(limitedByRoleDescriptors.size()));
+        for (RoleDescriptor roleDescriptor : limitedByRoleDescriptors) {
+            assertThat(limitedByMap, hasKey(roleDescriptor.getName()));
+            assertThat(XContentTestUtils.convertToMap(roleDescriptor), equalTo(limitedByMap.get(roleDescriptor.getName())));
+        }
     }
 
     @SuppressWarnings("unchecked")

+ 9 - 7
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyRequestTests.java

@@ -75,6 +75,7 @@ public class GetApiKeyRequestTests extends ESTestCase {
                 out.writeOptionalString(apiKeyId);
                 out.writeOptionalString(apiKeyName);
                 out.writeOptionalBoolean(ownedByAuthenticatedUser);
+                out.writeBoolean(randomBoolean());
             }
         }
 
@@ -151,13 +152,14 @@ public class GetApiKeyRequestTests extends ESTestCase {
 
     public void testEmptyStringsAreCoercedToNull() {
         Supplier<String> randomBlankString = () -> " ".repeat(randomIntBetween(0, 5));
-        final GetApiKeyRequest request = new GetApiKeyRequest(
-            randomBlankString.get(), // realm name
-            randomBlankString.get(), // user name
-            randomBlankString.get(), // key id
-            randomBlankString.get(), // key name
-            randomBoolean() // owned by user
-        );
+        final GetApiKeyRequest request = GetApiKeyRequest.builder()
+            .realmName(randomBlankString.get())
+            .userName(randomBlankString.get())
+            .apiKeyId(randomBlankString.get())
+            .apiKeyName(randomBlankString.get())
+            .ownedByAuthenticatedUser(randomBoolean())
+            .withLimitedBy(randomBoolean())
+            .build();
         assertThat(request.getRealmName(), nullValue());
         assertThat(request.getUserName(), nullValue());
         assertThat(request.getApiKeyId(), nullValue());

+ 93 - 9
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyResponseTests.java

@@ -45,7 +45,8 @@ public class GetApiKeyResponseTests extends ESTestCase {
             randomAlphaOfLength(4),
             randomAlphaOfLength(5),
             randomBoolean() ? null : Map.of(randomAlphaOfLengthBetween(3, 8), randomAlphaOfLengthBetween(3, 8)),
-            randomBoolean() ? null : randomUniquelyNamedRoleDescriptors(0, 3)
+            randomBoolean() ? null : randomUniquelyNamedRoleDescriptors(0, 3),
+            randomUniquelyNamedRoleDescriptors(1, 3)
         );
         GetApiKeyResponse response = new GetApiKeyResponse(Collections.singletonList(apiKeyInfo));
 
@@ -83,6 +84,15 @@ public class GetApiKeyResponseTests extends ESTestCase {
                 new String[] { "foo" }
             )
         );
+        final List<RoleDescriptor> limitedByRoleDescriptors = List.of(
+            new RoleDescriptor(
+                "rd_0",
+                new String[] { "all" },
+                new RoleDescriptor.IndicesPrivileges[] {
+                    RoleDescriptor.IndicesPrivileges.builder().indices("index").privileges("all").build() },
+                new String[] { "*" }
+            )
+        );
 
         ApiKey apiKeyInfo1 = createApiKeyInfo(
             "name1",
@@ -93,7 +103,8 @@ public class GetApiKeyResponseTests extends ESTestCase {
             "user-a",
             "realm-x",
             null,
-            null
+            null,
+            List.of() // empty limited-by role descriptor to simulate derived keys
         );
         ApiKey apiKeyInfo2 = createApiKeyInfo(
             "name2",
@@ -104,7 +115,8 @@ public class GetApiKeyResponseTests extends ESTestCase {
             "user-b",
             "realm-y",
             Map.of(),
-            List.of()
+            List.of(),
+            limitedByRoleDescriptors
         );
         ApiKey apiKeyInfo3 = createApiKeyInfo(
             null,
@@ -115,7 +127,8 @@ public class GetApiKeyResponseTests extends ESTestCase {
             "user-c",
             "realm-z",
             Map.of("foo", "bar"),
-            roleDescriptors
+            roleDescriptors,
+            limitedByRoleDescriptors
         );
         GetApiKeyResponse response = new GetApiKeyResponse(Arrays.asList(apiKeyInfo1, apiKeyInfo2, apiKeyInfo3));
         XContentBuilder builder = XContentFactory.jsonBuilder();
@@ -131,7 +144,10 @@ public class GetApiKeyResponseTests extends ESTestCase {
                   "invalidated": false,
                   "username": "user-a",
                   "realm": "realm-x",
-                  "metadata": {}
+                  "metadata": {},
+                  "limited_by": [
+                    { }
+                  ]
                 },
                 {
                   "id": "id-2",
@@ -142,7 +158,35 @@ public class GetApiKeyResponseTests extends ESTestCase {
                   "username": "user-b",
                   "realm": "realm-y",
                   "metadata": {},
-                  "role_descriptors": {}
+                  "role_descriptors": {},
+                  "limited_by": [
+                    {
+                      "rd_0": {
+                        "cluster": [
+                          "all"
+                        ],
+                        "indices": [
+                          {
+                            "names": [
+                              "index"
+                            ],
+                            "privileges": [
+                              "all"
+                            ],
+                            "allow_restricted_indices": false
+                          }
+                        ],
+                        "applications": [],
+                        "run_as": [
+                          "*"
+                        ],
+                        "metadata": {},
+                        "transient_metadata": {
+                          "enabled": true
+                        }
+                      }
+                    }
+                  ]
                 },
                 {
                   "id": "id-3",
@@ -179,7 +223,35 @@ public class GetApiKeyResponseTests extends ESTestCase {
                         "enabled": true
                       }
                     }
-                  }
+                  },
+                  "limited_by": [
+                    {
+                      "rd_0": {
+                        "cluster": [
+                          "all"
+                        ],
+                        "indices": [
+                          {
+                            "names": [
+                              "index"
+                            ],
+                            "privileges": [
+                              "all"
+                            ],
+                            "allow_restricted_indices": false
+                          }
+                        ],
+                        "applications": [],
+                        "run_as": [
+                          "*"
+                        ],
+                        "metadata": {},
+                        "transient_metadata": {
+                          "enabled": true
+                        }
+                      }
+                    }
+                  ]
                 }
               ]
             }""")));
@@ -194,8 +266,20 @@ public class GetApiKeyResponseTests extends ESTestCase {
         String username,
         String realm,
         Map<String, Object> metadata,
-        List<RoleDescriptor> roleDescriptors
+        List<RoleDescriptor> roleDescriptors,
+        List<RoleDescriptor> limitedByRoleDescriptors
     ) {
-        return new ApiKey(name, id, creation, expiration, invalidated, username, realm, metadata, roleDescriptors);
+        return new ApiKey(
+            name,
+            id,
+            creation,
+            expiration,
+            invalidated,
+            username,
+            realm,
+            metadata,
+            roleDescriptors,
+            limitedByRoleDescriptors
+        );
     }
 }

+ 9 - 4
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyRequestTests.java

@@ -72,7 +72,8 @@ public class QueryApiKeyRequestTests extends ESTestCase {
                 new FieldSortBuilder("creation_time").setFormat("strict_date_time").order(SortOrder.DESC),
                 new FieldSortBuilder("username")
             ),
-            new SearchAfterBuilder().setSortValues(new String[] { "key-2048", "2021-07-01T00:00:59.000Z" })
+            new SearchAfterBuilder().setSortValues(new String[] { "key-2048", "2021-07-01T00:00:59.000Z" }),
+            randomBoolean()
         );
         try (BytesStreamOutput out = new BytesStreamOutput()) {
             request3.writeTo(out);
@@ -83,6 +84,7 @@ public class QueryApiKeyRequestTests extends ESTestCase {
                 assertThat(deserialized.getSize(), equalTo(request3.getSize()));
                 assertThat(deserialized.getFieldSortBuilders(), equalTo(request3.getFieldSortBuilders()));
                 assertThat(deserialized.getSearchAfterBuilder(), equalTo(request3.getSearchAfterBuilder()));
+                assertThat(deserialized.withLimitedBy(), equalTo(request3.withLimitedBy()));
             }
         }
     }
@@ -93,7 +95,8 @@ public class QueryApiKeyRequestTests extends ESTestCase {
             randomIntBetween(0, Integer.MAX_VALUE),
             randomIntBetween(0, Integer.MAX_VALUE),
             null,
-            null
+            null,
+            randomBoolean()
         );
         assertThat(request1.validate(), nullValue());
 
@@ -102,7 +105,8 @@ public class QueryApiKeyRequestTests extends ESTestCase {
             randomIntBetween(Integer.MIN_VALUE, -1),
             randomIntBetween(0, Integer.MAX_VALUE),
             null,
-            null
+            null,
+            randomBoolean()
         );
         assertThat(request2.validate().getMessage(), containsString("[from] parameter cannot be negative"));
 
@@ -111,7 +115,8 @@ public class QueryApiKeyRequestTests extends ESTestCase {
             randomIntBetween(0, Integer.MAX_VALUE),
             randomIntBetween(Integer.MIN_VALUE, -1),
             null,
-            null
+            null,
+            randomBoolean()
         );
         assertThat(request3.validate().getMessage(), containsString("[size] parameter cannot be negative"));
     }

+ 12 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyResponseTests.java

@@ -95,7 +95,18 @@ public class QueryApiKeyResponseTests extends AbstractWireSerializingTestCase<Qu
         final Instant expiration = randomBoolean() ? Instant.ofEpochMilli(randomMillisUpToYear9999()) : null;
         final Map<String, Object> metadata = ApiKeyTests.randomMetadata();
         final List<RoleDescriptor> roleDescriptors = randomFrom(randomUniquelyNamedRoleDescriptors(0, 3), null);
-        return new ApiKey(name, id, creation, expiration, false, username, realm_name, metadata, roleDescriptors);
+        return new ApiKey(
+            name,
+            id,
+            creation,
+            expiration,
+            false,
+            username,
+            realm_name,
+            metadata,
+            roleDescriptors,
+            randomUniquelyNamedRoleDescriptors(1, 3)
+        );
     }
 
     private Object[] randomSortValues() {

+ 104 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/RoleDescriptorsIntersectionTests.java

@@ -0,0 +1,104 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.security.authz;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentFactory;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.core.security.authz.privilege.ConfigurableClusterPrivilege;
+import org.elasticsearch.xpack.core.security.authz.privilege.ConfigurableClusterPrivileges;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import static org.elasticsearch.xpack.core.security.authz.RoleDescriptorTests.randomUniquelyNamedRoleDescriptors;
+import static org.hamcrest.Matchers.equalTo;
+
+public class RoleDescriptorsIntersectionTests extends ESTestCase {
+
+    public void testSerialization() throws IOException {
+        final RoleDescriptorsIntersection roleDescriptorsIntersection = new RoleDescriptorsIntersection(
+            randomList(0, 3, () -> Set.copyOf(randomUniquelyNamedRoleDescriptors(0, 3)))
+        );
+
+        final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
+            List.of(
+                new NamedWriteableRegistry.Entry(
+                    ConfigurableClusterPrivilege.class,
+                    ConfigurableClusterPrivileges.ManageApplicationPrivileges.WRITEABLE_NAME,
+                    ConfigurableClusterPrivileges.ManageApplicationPrivileges::createFrom
+                ),
+                new NamedWriteableRegistry.Entry(
+                    ConfigurableClusterPrivilege.class,
+                    ConfigurableClusterPrivileges.WriteProfileDataPrivileges.WRITEABLE_NAME,
+                    ConfigurableClusterPrivileges.WriteProfileDataPrivileges::createFrom
+                )
+            )
+        );
+
+        try (BytesStreamOutput output = new BytesStreamOutput()) {
+            roleDescriptorsIntersection.writeTo(output);
+            try (StreamInput input = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
+                RoleDescriptorsIntersection deserialized = new RoleDescriptorsIntersection(input);
+                assertThat(deserialized.roleDescriptorsList(), equalTo(roleDescriptorsIntersection.roleDescriptorsList()));
+            }
+        }
+    }
+
+    public void testXContent() throws IOException {
+        final RoleDescriptorsIntersection roleDescriptorsIntersection = new RoleDescriptorsIntersection(
+            List.of(
+                Set.of(new RoleDescriptor("role_0", new String[] { "monitor" }, null, null)),
+                Set.of(new RoleDescriptor("role_1", new String[] { "all" }, null, null))
+            )
+        );
+
+        final XContentBuilder builder = XContentFactory.jsonBuilder();
+        roleDescriptorsIntersection.toXContent(builder, ToXContent.EMPTY_PARAMS);
+        final String jsonString = Strings.toString(builder);
+
+        assertThat(jsonString, equalTo(XContentHelper.stripWhitespace("""
+            [
+              {
+                "role_0": {
+                  "cluster": ["monitor"],
+                  "indices": [],
+                  "applications": [],
+                  "run_as": [],
+                  "metadata": {},
+                  "transient_metadata": {"enabled": true}
+                }
+              },
+              {
+                "role_1": {
+                  "cluster": ["all"],
+                  "indices": [],
+                  "applications": [],
+                  "run_as": [],
+                  "metadata": {},
+                  "transient_metadata": {"enabled": true}
+                }
+              }
+            ]""")));
+
+        try (XContentParser p = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, jsonString)) {
+            assertThat(RoleDescriptorsIntersection.fromXContent(p), equalTo(roleDescriptorsIntersection));
+        }
+    }
+}

+ 64 - 14
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/ManageOwnApiKeyClusterPrivilegeTests.java

@@ -35,20 +35,38 @@ import static org.mockito.Mockito.mock;
 
 public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
 
-    public void testAuthenticationWithApiKeyAllowsAccessToApiKeyActionsWhenItIsOwner() {
+    public void testAuthenticationWithApiKeyAllowsAccessToApiKeyActionsWhenItIsItself() {
         final ClusterPermission clusterPermission = ManageOwnApiKeyClusterPrivilege.INSTANCE.buildPermission(ClusterPermission.builder())
             .build();
 
         final String apiKeyId = randomAlphaOfLengthBetween(4, 7);
         final User userJoe = new User("joe");
         final Authentication authentication = AuthenticationTests.randomApiKeyAuthentication(userJoe, apiKeyId);
-        final TransportRequest getApiKeyRequest = GetApiKeyRequest.usingApiKeyId(apiKeyId, randomBoolean());
+        final TransportRequest getApiKeyRequest = GetApiKeyRequest.builder()
+            .apiKeyId(apiKeyId)
+            .ownedByAuthenticatedUser(randomBoolean())
+            .build();
         final TransportRequest invalidateApiKeyRequest = InvalidateApiKeyRequest.usingApiKeyId(apiKeyId, randomBoolean());
         assertTrue(clusterPermission.check("cluster:admin/xpack/security/api_key/get", getApiKeyRequest, authentication));
         assertTrue(clusterPermission.check("cluster:admin/xpack/security/api_key/invalidate", invalidateApiKeyRequest, authentication));
         assertFalse(clusterPermission.check("cluster:admin/something", mock(TransportRequest.class), authentication));
     }
 
+    public void testAuthenticationWithApiKeyAllowsDeniesGetApiKeyWithLimitedByWhenItIsItself() {
+        final ClusterPermission clusterPermission = ManageOwnApiKeyClusterPrivilege.INSTANCE.buildPermission(ClusterPermission.builder())
+            .build();
+        final String apiKeyId = randomAlphaOfLengthBetween(4, 7);
+        final User userJoe = new User("joe");
+        final Authentication authentication = AuthenticationTests.randomApiKeyAuthentication(userJoe, apiKeyId);
+        assertFalse(
+            clusterPermission.check(
+                "cluster:admin/xpack/security/api_key/get",
+                GetApiKeyRequest.builder().apiKeyId(apiKeyId).withLimitedBy().build(),
+                authentication
+            )
+        );
+    }
+
     public void testAuthenticationForUpdateApiKeyAllowsAll() {
         final ClusterPermission clusterPermission = ManageOwnApiKeyClusterPrivilege.INSTANCE.buildPermission(ClusterPermission.builder())
             .build();
@@ -76,7 +94,11 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
         final String apiKeyId = randomAlphaOfLengthBetween(4, 7);
         final User userJoe = new User("joe");
         final Authentication authentication = AuthenticationTests.randomApiKeyAuthentication(userJoe, randomAlphaOfLength(20));
-        final TransportRequest getApiKeyRequest = GetApiKeyRequest.usingApiKeyId(apiKeyId, randomBoolean());
+        final TransportRequest getApiKeyRequest = GetApiKeyRequest.builder()
+            .apiKeyId(apiKeyId)
+            .ownedByAuthenticatedUser(randomBoolean())
+            .withLimitedBy(randomBoolean())
+            .build();
         final TransportRequest invalidateApiKeyRequest = InvalidateApiKeyRequest.usingApiKeyId(apiKeyId, randomBoolean());
 
         assertFalse(clusterPermission.check("cluster:admin/xpack/security/api_key/get", getApiKeyRequest, authentication));
@@ -90,7 +112,11 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
         final Authentication.RealmRef realmRef = AuthenticationTests.randomRealmRef(randomBoolean());
         final Authentication authentication = AuthenticationTests.randomAuthentication(new User("joe"), realmRef);
 
-        TransportRequest getApiKeyRequest = GetApiKeyRequest.usingRealmAndUserName(realmRef.getName(), "joe");
+        TransportRequest getApiKeyRequest = GetApiKeyRequest.builder()
+            .realmName(realmRef.getName())
+            .userName("joe")
+            .withLimitedBy(randomBoolean())
+            .build();
         TransportRequest invalidateApiKeyRequest = InvalidateApiKeyRequest.usingRealmAndUserName(realmRef.getName(), "joe");
         TransportRequest updateApiKeyRequest = UpdateApiKeyRequest.usingApiKeyId(randomAlphaOfLength(10));
         assertTrue(clusterPermission.check("cluster:admin/xpack/security/api_key/get", getApiKeyRequest, authentication));
@@ -99,7 +125,7 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
 
         assertFalse(clusterPermission.check("cluster:admin/something", mock(TransportRequest.class), authentication));
 
-        getApiKeyRequest = GetApiKeyRequest.usingRealmAndUserName(realmRef.getName(), "jane");
+        getApiKeyRequest = GetApiKeyRequest.builder().realmName(realmRef.getName()).userName("jane").withLimitedBy(randomBoolean()).build();
         assertFalse(clusterPermission.check("cluster:admin/xpack/security/api_key/get", getApiKeyRequest, authentication));
         invalidateApiKeyRequest = InvalidateApiKeyRequest.usingRealmAndUserName(realmRef.getName(), "jane");
         assertFalse(clusterPermission.check("cluster:admin/xpack/security/api_key/invalidate", invalidateApiKeyRequest, authentication));
@@ -108,7 +134,11 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
         final String otherRealmName;
         if (realmDomain != null) {
             for (RealmConfig.RealmIdentifier realmIdentifier : realmDomain.realms()) {
-                getApiKeyRequest = GetApiKeyRequest.usingRealmAndUserName(realmIdentifier.getName(), "joe");
+                getApiKeyRequest = GetApiKeyRequest.builder()
+                    .realmName(realmIdentifier.getName())
+                    .userName("joe")
+                    .withLimitedBy(randomBoolean())
+                    .build();
                 assertTrue(clusterPermission.check("cluster:admin/xpack/security/api_key/get", getApiKeyRequest, authentication));
                 invalidateApiKeyRequest = InvalidateApiKeyRequest.usingRealmAndUserName(realmIdentifier.getName(), "joe");
                 assertTrue(
@@ -122,7 +152,7 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
         } else {
             otherRealmName = randomValueOtherThan(realmRef.getName(), () -> randomAlphaOfLengthBetween(2, 10));
         }
-        getApiKeyRequest = GetApiKeyRequest.usingRealmAndUserName(otherRealmName, "joe");
+        getApiKeyRequest = GetApiKeyRequest.builder().realmName(otherRealmName).userName("joe").withLimitedBy(randomBoolean()).build();
         assertFalse(clusterPermission.check("cluster:admin/xpack/security/api_key/get", getApiKeyRequest, authentication));
         invalidateApiKeyRequest = InvalidateApiKeyRequest.usingRealmAndUserName(otherRealmName, "joe");
         assertFalse(clusterPermission.check("cluster:admin/xpack/security/api_key/invalidate", invalidateApiKeyRequest, authentication));
@@ -148,7 +178,10 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
             authentication = AuthenticationTests.randomAuthentication(userJoe, realmRef);
         }
 
-        final TransportRequest getApiKeyRequest = GetApiKeyRequest.forOwnedApiKeys();
+        final TransportRequest getApiKeyRequest = GetApiKeyRequest.builder()
+            .ownedByAuthenticatedUser()
+            .withLimitedBy(randomBoolean())
+            .build();
         final TransportRequest invalidateApiKeyRequest = InvalidateApiKeyRequest.forOwnedApiKeys();
 
         assertTrue(clusterPermission.check("cluster:admin/xpack/security/api_key/get", getApiKeyRequest, authentication));
@@ -177,9 +210,9 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
         }
 
         final TransportRequest getApiKeyRequest = randomFrom(
-            GetApiKeyRequest.usingRealmAndUserName("realm1", randomAlphaOfLength(7)),
-            GetApiKeyRequest.usingRealmAndUserName(randomAlphaOfLength(5), "joe"),
-            new GetApiKeyRequest(randomAlphaOfLength(5), randomAlphaOfLength(7), null, null, false)
+            GetApiKeyRequest.builder().realmName("realm1").userName(randomAlphaOfLength(7)).withLimitedBy(randomBoolean()).build(),
+            GetApiKeyRequest.builder().realmName(randomAlphaOfLength(5)).userName("joe").withLimitedBy(randomBoolean()).build(),
+            GetApiKeyRequest.builder().realmName(randomAlphaOfLength(5)).userName(randomAlphaOfLength(7)).build()
         );
         final TransportRequest invalidateApiKeyRequest = randomFrom(
             InvalidateApiKeyRequest.usingRealmAndUserName("realm1", randomAlphaOfLength(7)),
@@ -203,7 +236,7 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
         assertTrue(
             clusterPermission.check(
                 "cluster:admin/xpack/security/api_key/get",
-                GetApiKeyRequest.usingRealmAndUserName("realm_b", "user_b"),
+                GetApiKeyRequest.builder().realmName("realm_b").userName("user_b").withLimitedBy(randomBoolean()).build(),
                 authentication
             )
         );
@@ -220,16 +253,33 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
         final ClusterPermission clusterPermission = ManageOwnApiKeyClusterPrivilege.INSTANCE.buildPermission(ClusterPermission.builder())
             .build();
 
-        final QueryApiKeyRequest queryApiKeyRequest = new QueryApiKeyRequest();
+        final QueryApiKeyRequest queryApiKeyRequest = new QueryApiKeyRequest(null, null, null, null, null, randomBoolean());
         if (randomBoolean()) {
             queryApiKeyRequest.setFilterForCurrentUser();
         }
         assertThat(
-            clusterPermission.check(QueryApiKeyAction.NAME, queryApiKeyRequest, AuthenticationTestHelper.builder().build()),
+            clusterPermission.check(
+                QueryApiKeyAction.NAME,
+                queryApiKeyRequest,
+                randomValueOtherThanMany(Authentication::isApiKey, () -> AuthenticationTestHelper.builder().build())
+            ),
             is(queryApiKeyRequest.isFilterForCurrentUser())
         );
     }
 
+    public void testAuthenticationWithApiKeyAllowsDeniesQueryApiKeyWithLimitedBy() {
+        final ClusterPermission clusterPermission = ManageOwnApiKeyClusterPrivilege.INSTANCE.buildPermission(ClusterPermission.builder())
+            .build();
+
+        final boolean withLimitedBy = randomBoolean();
+        final QueryApiKeyRequest queryApiKeyRequest = new QueryApiKeyRequest(null, null, null, null, null, withLimitedBy);
+        queryApiKeyRequest.setFilterForCurrentUser();
+        assertThat(
+            clusterPermission.check(QueryApiKeyAction.NAME, queryApiKeyRequest, AuthenticationTestHelper.builder().apiKey().build(false)),
+            is(false == queryApiKeyRequest.withLimitedBy())
+        );
+    }
+
     public void testCheckGrantApiKeyRequestDenied() {
         final ClusterPermission clusterPermission = ManageOwnApiKeyClusterPrivilege.INSTANCE.buildPermission(ClusterPermission.builder())
             .build();

+ 2 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java

@@ -416,9 +416,9 @@ public class ReservedRolesStoreTests extends ESTestCase {
         final CreateApiKeyRequest createApiKeyRequest = new CreateApiKeyRequest(randomAlphaOfLength(8), null, null);
         assertThat(kibanaRole.cluster().check(CreateApiKeyAction.NAME, createApiKeyRequest, authentication), is(true));
         // Can only get and query its own API keys
-        assertThat(kibanaRole.cluster().check(GetApiKeyAction.NAME, new GetApiKeyRequest(), authentication), is(false));
+        assertThat(kibanaRole.cluster().check(GetApiKeyAction.NAME, GetApiKeyRequest.forAllApiKeys(), authentication), is(false));
         assertThat(
-            kibanaRole.cluster().check(GetApiKeyAction.NAME, new GetApiKeyRequest(null, null, null, null, true), authentication),
+            kibanaRole.cluster().check(GetApiKeyAction.NAME, GetApiKeyRequest.builder().ownedByAuthenticatedUser().build(), authentication),
             is(true)
         );
         final QueryApiKeyRequest queryApiKeyRequest = new QueryApiKeyRequest();

+ 25 - 0
x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/apikey/ApiKeyRestIT.java

@@ -32,6 +32,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.elasticsearch.test.SecuritySettingsSourceField.ES_TEST_ROOT_ROLE;
+import static org.elasticsearch.test.SecuritySettingsSourceField.ES_TEST_ROOT_ROLE_DESCRIPTOR;
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField.RUN_AS_USER_HEADER;
 import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.contains;
@@ -144,14 +146,25 @@ public class ApiKeyRestIT extends SecurityOnTrialLicenseRestTestCase {
         assertOK(adminClient().performRequest(createApiKeyRequest3));
 
         // Role descriptors are returned by both get and query api key calls
+        final boolean withLimitedBy = randomBoolean();
         final List<Map<String, Object>> apiKeyMaps;
         if (randomBoolean()) {
             final Request getApiKeyRequest = new Request("GET", "_security/api_key");
+            if (withLimitedBy) {
+                getApiKeyRequest.addParameter("with_limited_by", "true");
+            } else if (randomBoolean()) {
+                getApiKeyRequest.addParameter("with_limited_by", "false");
+            }
             final Response getApiKeyResponse = adminClient().performRequest(getApiKeyRequest);
             assertOK(getApiKeyResponse);
             apiKeyMaps = (List<Map<String, Object>>) responseAsMap(getApiKeyResponse).get("api_keys");
         } else {
             final Request queryApiKeyRequest = new Request("POST", "_security/_query/api_key");
+            if (withLimitedBy) {
+                queryApiKeyRequest.addParameter("with_limited_by", "true");
+            } else if (randomBoolean()) {
+                queryApiKeyRequest.addParameter("with_limited_by", "false");
+            }
             final Response queryApiKeyResponse = adminClient().performRequest(queryApiKeyRequest);
             assertOK(queryApiKeyResponse);
             apiKeyMaps = (List<Map<String, Object>>) responseAsMap(queryApiKeyResponse).get("api_keys");
@@ -162,6 +175,18 @@ public class ApiKeyRestIT extends SecurityOnTrialLicenseRestTestCase {
             final String name = (String) apiKeyMap.get("name");
             @SuppressWarnings("unchecked")
             final var roleDescriptors = (Map<String, Object>) apiKeyMap.get("role_descriptors");
+
+            if (withLimitedBy) {
+                final List<Map<String, Object>> limitedBy = (List<Map<String, Object>>) apiKeyMap.get("limited_by");
+                assertThat(limitedBy.size(), equalTo(1));
+                assertThat(
+                    limitedBy.get(0),
+                    equalTo(Map.of(ES_TEST_ROOT_ROLE, XContentTestUtils.convertToMap(ES_TEST_ROOT_ROLE_DESCRIPTOR)))
+                );
+            } else {
+                assertThat(apiKeyMap, not(hasKey("limited_by")));
+            }
+
             switch (name) {
                 case "k1" -> {
                     assertThat(roleDescriptors, anEmptyMap());

+ 394 - 88
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java

@@ -37,6 +37,7 @@ import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.SecurityIntegTestCase;
@@ -65,6 +66,9 @@ import org.elasticsearch.xpack.core.security.action.apikey.GetApiKeyResponse;
 import org.elasticsearch.xpack.core.security.action.apikey.InvalidateApiKeyAction;
 import org.elasticsearch.xpack.core.security.action.apikey.InvalidateApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.apikey.InvalidateApiKeyResponse;
+import org.elasticsearch.xpack.core.security.action.apikey.QueryApiKeyAction;
+import org.elasticsearch.xpack.core.security.action.apikey.QueryApiKeyRequest;
+import org.elasticsearch.xpack.core.security.action.apikey.QueryApiKeyResponse;
 import org.elasticsearch.xpack.core.security.action.apikey.UpdateApiKeyAction;
 import org.elasticsearch.xpack.core.security.action.apikey.UpdateApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.apikey.UpdateApiKeyResponse;
@@ -85,6 +89,7 @@ import org.elasticsearch.xpack.core.security.authc.RealmDomain;
 import org.elasticsearch.xpack.core.security.authc.file.FileRealmSettings;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptorTests;
+import org.elasticsearch.xpack.core.security.authz.RoleDescriptorsIntersection;
 import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
 import org.elasticsearch.xpack.core.security.authz.store.ReservedRolesStore;
 import org.elasticsearch.xpack.core.security.user.User;
@@ -112,6 +117,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -121,6 +127,7 @@ import static org.elasticsearch.test.SecuritySettingsSource.ES_TEST_ROOT_USER;
 import static org.elasticsearch.test.SecuritySettingsSource.HASHER;
 import static org.elasticsearch.test.SecuritySettingsSource.TEST_ROLE;
 import static org.elasticsearch.test.SecuritySettingsSource.TEST_USER_NAME;
+import static org.elasticsearch.test.SecuritySettingsSourceField.ES_TEST_ROOT_ROLE_DESCRIPTOR;
 import static org.elasticsearch.test.SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING;
 import static org.elasticsearch.test.TestMatchers.throwableWithMessage;
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
@@ -134,6 +141,7 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasKey;
@@ -643,15 +651,21 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         assertThat(invalidateResponse.getPreviouslyInvalidatedApiKeys().size(), equalTo(0));
         assertThat(invalidateResponse.getErrors().size(), equalTo(0));
 
+        final boolean withLimitedBy = randomBoolean();
         PlainActionFuture<GetApiKeyResponse> getApiKeyResponseListener = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingRealmName("file"), getApiKeyResponseListener);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().realmName("file").withLimitedBy(withLimitedBy).build(),
+            getApiKeyResponseListener
+        );
         GetApiKeyResponse response = getApiKeyResponseListener.get();
-        verifyGetResponse(
+        verifyApiKeyInfos(
             2,
             responses,
             tuple.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            response,
+            withLimitedBy ? List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR) : null,
+            response.getApiKeyInfos(),
             Collections.singleton(responses.get(0).getId()),
             Collections.singletonList(responses.get(1).getId())
         );
@@ -685,15 +699,21 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             expectedValidKeyIds = responses.stream().map(o -> o.getId()).collect(Collectors.toSet());
         }
 
+        final boolean withLimitedBy = randomBoolean();
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingRealmName("file"), listener);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().realmName("file").withLimitedBy(withLimitedBy).build(),
+            listener
+        );
         GetApiKeyResponse response = listener.get();
-        verifyGetResponse(
+        verifyApiKeyInfos(
             noOfApiKeys,
             responses,
             tuple.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            response,
+            withLimitedBy ? List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR) : null,
+            response.getApiKeyInfos(),
             expectedValidKeyIds,
             invalidatedApiKeyIds
         );
@@ -706,15 +726,21 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         Client client = client().filterWithHeader(
             Collections.singletonMap("Authorization", basicAuthHeaderValue(ES_TEST_ROOT_USER, TEST_PASSWORD_SECURE_STRING))
         );
+        final boolean withLimitedBy = randomBoolean();
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingUserName(ES_TEST_ROOT_USER), listener);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().userName(ES_TEST_ROOT_USER).withLimitedBy(withLimitedBy).build(),
+            listener
+        );
         GetApiKeyResponse response = listener.get();
-        verifyGetResponse(
+        verifyApiKeyInfos(
             noOfApiKeys,
             responses,
             tuple.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            response,
+            withLimitedBy ? List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR) : null,
+            response.getApiKeyInfos(),
             responses.stream().map(o -> o.getId()).collect(Collectors.toSet()),
             null
         );
@@ -726,15 +752,21 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         Client client = client().filterWithHeader(
             Collections.singletonMap("Authorization", basicAuthHeaderValue(ES_TEST_ROOT_USER, TEST_PASSWORD_SECURE_STRING))
         );
+        final boolean withLimitedBy = randomBoolean();
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingRealmAndUserName("file", ES_TEST_ROOT_USER), listener);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().realmName("file").userName(ES_TEST_ROOT_USER).withLimitedBy(withLimitedBy).build(),
+            listener
+        );
         GetApiKeyResponse response = listener.get();
-        verifyGetResponse(
+        verifyApiKeyInfos(
             1,
             responses,
             tuple.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            response,
+            withLimitedBy ? List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR) : null,
+            response.getApiKeyInfos(),
             Collections.singleton(responses.get(0).getId()),
             null
         );
@@ -746,15 +778,21 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         Client client = client().filterWithHeader(
             Collections.singletonMap("Authorization", basicAuthHeaderValue(ES_TEST_ROOT_USER, TEST_PASSWORD_SECURE_STRING))
         );
+        final boolean withLimitedBy = randomBoolean();
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingApiKeyId(responses.get(0).getId(), false), listener);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().apiKeyId(responses.get(0).getId()).withLimitedBy(withLimitedBy).build(),
+            listener
+        );
         GetApiKeyResponse response = listener.get();
-        verifyGetResponse(
+        verifyApiKeyInfos(
             1,
             responses,
             tuple.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            response,
+            withLimitedBy ? List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR) : null,
+            response.getApiKeyInfos(),
             Collections.singleton(responses.get(0).getId()),
             null
         );
@@ -779,30 +817,41 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         final List<CreateApiKeyResponse> createApiKeyResponses2 = tuple2.v1();
 
         Client client = client().filterWithHeader(headers);
+        final boolean withLimitedBy = randomBoolean();
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
         @SuppressWarnings("unchecked")
         List<CreateApiKeyResponse> responses = randomFrom(createApiKeyResponses1, createApiKeyResponses2);
         List<Map<String, Object>> metadatas = responses == createApiKeyResponses1 ? tuple1.v2() : tuple2.v2();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingApiKeyName(responses.get(0).getName(), false), listener);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().apiKeyName(responses.get(0).getName()).withLimitedBy(withLimitedBy).build(),
+            listener
+        );
         // role descriptors are the same between randomization
-        verifyGetResponse(
+        verifyApiKeyInfos(
             1,
             responses,
             metadatas,
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            listener.get(),
+            withLimitedBy ? List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR) : null,
+            listener.get().getApiKeyInfos(),
             Collections.singleton(responses.get(0).getId()),
             null
         );
 
         PlainActionFuture<GetApiKeyResponse> listener2 = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingApiKeyName("test-key*", false), listener2);
-        verifyGetResponse(
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().apiKeyName("test-key*").withLimitedBy(withLimitedBy).build(),
+            listener2
+        );
+        verifyApiKeyInfos(
             noOfApiKeys,
             createApiKeyResponses1,
             tuple1.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            listener2.get(),
+            withLimitedBy ? List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR) : null,
+            listener2.get().getApiKeyInfos(),
             createApiKeyResponses1.stream().map(CreateApiKeyResponse::getId).collect(Collectors.toSet()),
             null
         );
@@ -812,31 +861,54 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         );
 
         PlainActionFuture<GetApiKeyResponse> listener3 = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingApiKeyName("*", false), listener3);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().apiKeyName("*").withLimitedBy(withLimitedBy).build(),
+            listener3
+        );
         responses = Stream.concat(createApiKeyResponses1.stream(), createApiKeyResponses2.stream()).collect(Collectors.toList());
         metadatas = Stream.concat(tuple1.v2().stream(), tuple2.v2().stream()).collect(Collectors.toList());
-        verifyGetResponse(
+        verifyApiKeyInfos(
             2 * noOfApiKeys,
             responses,
             metadatas,
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            listener3.get(),
+            withLimitedBy ? List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR) : null,
+            listener3.get().getApiKeyInfos(),
             responses.stream().map(CreateApiKeyResponse::getId).collect(Collectors.toSet()),
             null
         );
 
         PlainActionFuture<GetApiKeyResponse> listener4 = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingApiKeyName("does-not-exist*", false), listener4);
-        verifyGetResponse(0, Collections.emptyList(), null, List.of(), listener4.get(), Collections.emptySet(), null);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().apiKeyName("does-not-exist*").withLimitedBy(withLimitedBy).build(),
+            listener4
+        );
+        verifyApiKeyInfos(
+            0,
+            Collections.emptyList(),
+            null,
+            List.of(),
+            List.of(),
+            listener4.get().getApiKeyInfos(),
+            Collections.emptySet(),
+            null
+        );
 
         PlainActionFuture<GetApiKeyResponse> listener5 = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingApiKeyName("another-test-key*", false), listener5);
-        verifyGetResponse(
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().apiKeyName("another-test-key*").withLimitedBy(withLimitedBy).build(),
+            listener5
+        );
+        verifyApiKeyInfos(
             noOfApiKeys,
             createApiKeyResponses2,
             tuple2.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            listener5.get(),
+            withLimitedBy ? List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR) : null,
+            listener5.get().getApiKeyInfos(),
             createApiKeyResponses2.stream().map(CreateApiKeyResponse::getId).collect(Collectors.toSet()),
             null
         );
@@ -858,16 +930,37 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             Collections.singletonMap("Authorization", basicAuthHeaderValue(userWithManageApiKeyRole, TEST_PASSWORD_SECURE_STRING))
         );
 
+        final boolean withLimitedBy = randomBoolean();
+        final List<RoleDescriptor> expectedLimitedByRoleDescriptors;
+        if (withLimitedBy) {
+            if (userWithManageApiKeyRole.equals("user_with_manage_api_key_role")) {
+                expectedLimitedByRoleDescriptors = List.of(
+                    new RoleDescriptor("manage_api_key_role", new String[] { "manage_api_key" }, null, null)
+                );
+            } else {
+                expectedLimitedByRoleDescriptors = List.of(
+                    new RoleDescriptor("manage_own_api_key_role", new String[] { "manage_own_api_key" }, null, null)
+                );
+            }
+        } else {
+            expectedLimitedByRoleDescriptors = null;
+        }
+
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.forOwnedApiKeys(), listener);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().ownedByAuthenticatedUser().withLimitedBy(withLimitedBy).build(),
+            listener
+        );
         GetApiKeyResponse response = listener.get();
-        verifyGetResponse(
+        verifyApiKeyInfos(
             userWithManageApiKeyRole,
             noOfApiKeysForUserWithManageApiKeyRole,
             userWithManageApiKeyRoleApiKeys,
             tuple.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            response,
+            expectedLimitedByRoleDescriptors,
+            response.getApiKeyInfos(),
             userWithManageApiKeyRoleApiKeys.stream().map(o -> o.getId()).collect(Collectors.toSet()),
             null
         );
@@ -886,16 +979,24 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             "monitor"
         );
         List<CreateApiKeyResponse> userWithManageOwnApiKeyRoleApiKeys = tuple.v1();
+        final boolean withLimitedBy = randomBoolean();
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
-        getClientForRunAsUser().execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.forOwnedApiKeys(), listener);
+        getClientForRunAsUser().execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().ownedByAuthenticatedUser().withLimitedBy(withLimitedBy).build(),
+            listener
+        );
         GetApiKeyResponse response = listener.get();
-        verifyGetResponse(
+        verifyApiKeyInfos(
             "user_with_manage_own_api_key_role",
             noOfApiKeysForUserWithManageApiKeyRole,
             userWithManageOwnApiKeyRoleApiKeys,
             tuple.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            response,
+            withLimitedBy
+                ? List.of(new RoleDescriptor("manage_own_api_key_role", new String[] { "manage_own_api_key" }, null, null))
+                : null,
+            response.getApiKeyInfos(),
             userWithManageOwnApiKeyRoleApiKeys.stream().map(o -> o.getId()).collect(Collectors.toSet()),
             null
         );
@@ -914,20 +1015,24 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             "monitor"
         );
         List<CreateApiKeyResponse> userWithManageOwnApiKeyRoleApiKeys = tuple.v1();
+        final boolean withLimitedBy = randomBoolean();
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
         getClientForRunAsUser().execute(
             GetApiKeyAction.INSTANCE,
-            GetApiKeyRequest.usingRealmAndUserName("file", "user_with_manage_own_api_key_role"),
+            GetApiKeyRequest.builder().realmName("file").userName("user_with_manage_own_api_key_role").withLimitedBy(withLimitedBy).build(),
             listener
         );
         GetApiKeyResponse response = listener.get();
-        verifyGetResponse(
+        verifyApiKeyInfos(
             "user_with_manage_own_api_key_role",
             noOfApiKeysForUserWithManageApiKeyRole,
             userWithManageOwnApiKeyRoleApiKeys,
             tuple.v2(),
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            response,
+            withLimitedBy
+                ? List.of(new RoleDescriptor("manage_own_api_key_role", new String[] { "manage_own_api_key" }, null, null))
+                : null,
+            response.getApiKeyInfos(),
             userWithManageOwnApiKeyRoleApiKeys.stream().map(o -> o.getId()).collect(Collectors.toSet()),
             null
         );
@@ -988,8 +1093,9 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         final Client client = client().filterWithHeader(
             Collections.singletonMap("Authorization", basicAuthHeaderValue("user_with_manage_api_key_role", TEST_PASSWORD_SECURE_STRING))
         );
+        final boolean withLimitedBy = randomBoolean();
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, new GetApiKeyRequest(), listener);
+        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.builder().withLimitedBy(withLimitedBy).build(), listener);
         GetApiKeyResponse response = listener.get();
         int totalApiKeys = noOfSuperuserApiKeys + noOfApiKeysForUserWithManageApiKeyRole + noOfApiKeysForUserWithManageOwnApiKeyRole;
         List<CreateApiKeyResponse> allApiKeys = new ArrayList<>();
@@ -997,14 +1103,32 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         final List<Map<String, Object>> metadatas = Stream.of(defaultUserTuple.v2(), userWithManageTuple.v2(), userWithManageOwnTuple.v2())
             .flatMap(List::stream)
             .collect(Collectors.toList());
-        verifyGetResponse(
+
+        final Function<String, List<RoleDescriptor>> expectedLimitedByRoleDescriptorsLookup = username -> {
+            if (withLimitedBy) {
+                return switch (username) {
+                    case ES_TEST_ROOT_USER -> List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR);
+                    case "user_with_manage_api_key_role" -> List.of(
+                        new RoleDescriptor("manage_api_key_role", new String[] { "manage_api_key" }, null, null)
+                    );
+                    case "user_with_manage_own_api_key_role" -> List.of(
+                        new RoleDescriptor("manage_own_api_key_role", new String[] { "manage_own_api_key" }, null, null)
+                    );
+                    default -> throw new IllegalStateException("unknown username: " + username);
+                };
+            } else {
+                return null;
+            }
+        };
+        verifyApiKeyInfos(
             new String[] { ES_TEST_ROOT_USER, "user_with_manage_api_key_role", "user_with_manage_own_api_key_role" },
             totalApiKeys,
             allApiKeys,
             metadatas,
             List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR),
-            response,
-            allApiKeys.stream().map(o -> o.getId()).collect(Collectors.toSet()),
+            expectedLimitedByRoleDescriptorsLookup,
+            response.getApiKeyInfos(),
+            allApiKeys.stream().map(CreateApiKeyResponse::getId).collect(Collectors.toSet()),
             null
         );
     }
@@ -1032,7 +1156,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             Collections.singletonMap("Authorization", basicAuthHeaderValue(withUser, TEST_PASSWORD_SECURE_STRING))
         );
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, new GetApiKeyRequest(), listener);
+        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.forAllApiKeys(), listener);
         ElasticsearchSecurityException ese = expectThrows(ElasticsearchSecurityException.class, () -> listener.actionGet());
         assertErrorMessage(ese, "cluster:admin/xpack/security/api_key/get", withUser);
     }
@@ -1143,20 +1267,35 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             .encodeToString((responses.get(0).getId() + ":" + responses.get(0).getKey().toString()).getBytes(StandardCharsets.UTF_8));
         Client client = client().filterWithHeader(Map.of("Authorization", "ApiKey " + base64ApiKeyKeyValue));
         PlainActionFuture<GetApiKeyResponse> listener = new PlainActionFuture<>();
-        client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingApiKeyId(responses.get(0).getId(), randomBoolean()), listener);
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().apiKeyId(responses.get(0).getId()).ownedByAuthenticatedUser(randomBoolean()).build(),
+            listener
+        );
         GetApiKeyResponse response = listener.get();
-        verifyGetResponse(
+        verifyApiKeyInfos(
             1,
             responses,
             tuple.v2(),
             List.of(new RoleDescriptor(DEFAULT_API_KEY_ROLE_DESCRIPTOR.getName(), Strings.EMPTY_ARRAY, null, null)),
-            response,
+            null,
+            response.getApiKeyInfos(),
             Collections.singleton(responses.get(0).getId()),
             null
         );
 
-        final PlainActionFuture<GetApiKeyResponse> failureListener = new PlainActionFuture<>();
+        // It cannot retrieve its own limited-by role descriptors
+        final PlainActionFuture<GetApiKeyResponse> future2 = new PlainActionFuture<>();
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().apiKeyId(responses.get(0).getId()).ownedByAuthenticatedUser(randomBoolean()).withLimitedBy().build(),
+            future2
+        );
+        final ElasticsearchSecurityException e2 = expectThrows(ElasticsearchSecurityException.class, future2::actionGet);
+        assertErrorMessage(e2, "cluster:admin/xpack/security/api_key/get", ES_TEST_ROOT_USER, responses.get(0).getId());
+
         // for any other API key id, it must deny access
+        final PlainActionFuture<GetApiKeyResponse> failureListener = new PlainActionFuture<>();
         client.execute(
             GetApiKeyAction.INSTANCE,
             GetApiKeyRequest.usingApiKeyId(responses.get(1).getId(), randomBoolean()),
@@ -1171,6 +1310,117 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         assertErrorMessage(ese, "cluster:admin/xpack/security/api_key/get", ES_TEST_ROOT_USER, responses.get(0).getId());
     }
 
+    public void testApiKeyViewLimitedBy() {
+        // 1. An API key with manage_own_api_key
+        final Tuple<List<CreateApiKeyResponse>, List<Map<String, Object>>> tuple1 = createApiKeys(
+            ES_TEST_ROOT_USER,
+            1,
+            null,
+            "manage_own_api_key"
+        );
+        final List<CreateApiKeyResponse> responses1 = tuple1.v1();
+        final String apiKeyId1 = responses1.get(0).getId();
+        final Client client1 = client().filterWithHeader(
+            Map.of(
+                "Authorization",
+                "ApiKey "
+                    + Base64.getEncoder()
+                        .encodeToString((apiKeyId1 + ":" + responses1.get(0).getKey().toString()).getBytes(StandardCharsets.UTF_8))
+            )
+        );
+
+        // Can view itself without limited-by
+        verifyApiKeyInfos(
+            1,
+            responses1,
+            tuple1.v2(),
+            List.of(new RoleDescriptor(DEFAULT_API_KEY_ROLE_DESCRIPTOR.getName(), new String[] { "manage_own_api_key" }, null, null)),
+            null,
+            new ApiKey[] { getApiKeyInfo(client1, apiKeyId1, false, randomBoolean()) },
+            Collections.singleton(apiKeyId1),
+            null
+        );
+
+        // Cannot view itself with limited-by
+        final boolean useGetApiKey = randomBoolean();
+        final var e2 = expectThrows(ElasticsearchSecurityException.class, () -> getApiKeyInfo(client1, apiKeyId1, true, useGetApiKey));
+        assertErrorMessage(e2, "cluster:admin/xpack/security/api_key/" + (useGetApiKey ? "get" : "query"), ES_TEST_ROOT_USER, apiKeyId1);
+
+        // 2. An API key with manage_api_key can view its own limited-by or any other key's limited-by
+        final Tuple<List<CreateApiKeyResponse>, List<Map<String, Object>>> tuple3 = createApiKeys(
+            ES_TEST_ROOT_USER,
+            1,
+            null,
+            "manage_api_key"
+        );
+        final List<CreateApiKeyResponse> responses3 = tuple3.v1();
+        final String apiKeyId3 = responses3.get(0).getId();
+        final Client client3 = client().filterWithHeader(
+            Map.of(
+                "Authorization",
+                "ApiKey "
+                    + Base64.getEncoder()
+                        .encodeToString((apiKeyId3 + ":" + responses3.get(0).getKey().toString()).getBytes(StandardCharsets.UTF_8))
+            )
+        );
+
+        // View its own limited-by
+        verifyApiKeyInfos(
+            1,
+            responses3,
+            tuple3.v2(),
+            List.of(new RoleDescriptor(DEFAULT_API_KEY_ROLE_DESCRIPTOR.getName(), new String[] { "manage_api_key" }, null, null)),
+            List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR),
+            new ApiKey[] { getApiKeyInfo(client3, apiKeyId3, true, randomBoolean()) },
+            Collections.singleton(apiKeyId3),
+            null
+        );
+
+        // View other key's limited-by
+        verifyApiKeyInfos(
+            1,
+            responses1,
+            tuple1.v2(),
+            List.of(new RoleDescriptor(DEFAULT_API_KEY_ROLE_DESCRIPTOR.getName(), new String[] { "manage_own_api_key" }, null, null)),
+            List.of(ES_TEST_ROOT_ROLE_DESCRIPTOR),
+            new ApiKey[] { getApiKeyInfo(client3, apiKeyId1, true, randomBoolean()) },
+            Collections.singleton(apiKeyId1),
+            null
+        );
+    }
+
+    public void testLegacySuperuserLimitedByWillBeReturnedAsTransformed() throws Exception {
+        final Tuple<CreateApiKeyResponse, Map<String, Object>> createdApiKey = createApiKey(TEST_USER_NAME, null);
+        final var apiKeyId = createdApiKey.v1().getId();
+        final ServiceWithNodeName serviceWithNodeName = getServiceWithNodeName();
+        final Authentication authentication = Authentication.newRealmAuthentication(
+            new User(TEST_USER_NAME, TEST_ROLE),
+            new Authentication.RealmRef("file", "file", serviceWithNodeName.nodeName())
+        );
+        // Force set user role descriptors to 7.x legacy superuser role descriptors
+        assertSingleUpdate(
+            apiKeyId,
+            updateApiKeys(
+                serviceWithNodeName.service(),
+                authentication,
+                BulkUpdateApiKeyRequest.usingApiKeyIds(apiKeyId),
+                Set.of(ApiKeyService.LEGACY_SUPERUSER_ROLE_DESCRIPTOR)
+            )
+        );
+        // raw document has the legacy superuser role descriptor
+        expectRoleDescriptorsForApiKey(
+            "limited_by_role_descriptors",
+            Set.of(ApiKeyService.LEGACY_SUPERUSER_ROLE_DESCRIPTOR),
+            getApiKeyDocument(apiKeyId)
+        );
+
+        final ApiKey apiKeyInfo = getApiKeyInfo(client(), apiKeyId, true, randomBoolean());
+        assertThat(
+            apiKeyInfo.getLimitedBy().roleDescriptorsList().iterator().next(),
+            equalTo(Set.of(ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR))
+        );
+    }
+
     public void testApiKeyWithManageOwnPrivilegeIsAbleToInvalidateItselfButNotAnyOtherKeysCreatedBySameOwner() throws InterruptedException,
         ExecutionException {
         List<CreateApiKeyResponse> responses = createApiKeys(ES_TEST_ROOT_USER, 2, null, "manage_own_api_key").v1();
@@ -1285,6 +1535,18 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         assertNotNull(key100Response.getId());
         assertNotNull(key100Response.getKey());
 
+        // Derive keys have empty limited-by role descriptors
+        final PlainActionFuture<GetApiKeyResponse> future = new PlainActionFuture<>();
+        client.execute(
+            GetApiKeyAction.INSTANCE,
+            GetApiKeyRequest.builder().apiKeyId(key100Response.getId()).withLimitedBy().build(),
+            future
+        );
+        assertThat(future.actionGet().getApiKeyInfos().length, equalTo(1));
+        final RoleDescriptorsIntersection limitedBy = future.actionGet().getApiKeyInfos()[0].getLimitedBy();
+        assertThat(limitedBy.roleDescriptorsList().size(), equalTo(1));
+        assertThat(limitedBy.roleDescriptorsList().iterator().next(), emptyIterable());
+
         // Check at the end to allow sometime for the operation to happen. Since an erroneous creation is
         // asynchronous so that the document is not available immediately.
         assertApiKeyNotCreated(client, "key-2");
@@ -1546,7 +1808,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         final var expectedMetadata = request.getMetadata() != null ? request.getMetadata() : createdApiKey.v2();
         final var expectedRoleDescriptors = nullRoleDescriptors ? List.of(DEFAULT_API_KEY_ROLE_DESCRIPTOR) : newRoleDescriptors;
 
-        expectAttributesForApiKey(
+        doTestApiKeyHasExpectedAttributes(
             apiKeyId,
             Map.of(
                 ApiKeyAttribute.CREATOR,
@@ -1609,7 +1871,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             )
         );
         for (String apiKeyId : apiKeyIds) {
-            expectAttributesForApiKey(
+            doTestApiKeyHasExpectedAttributes(
                 apiKeyId,
                 Map.of(
                     ApiKeyAttribute.METADATA,
@@ -1641,7 +1903,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         assertThat(response.getNoops(), containsInAnyOrder(apiKeyIds.toArray()));
         assertThat(response.getErrorDetails().keySet(), containsInAnyOrder(notFoundIds.toArray()));
         for (String apiKeyId : apiKeyIds) {
-            expectAttributesForApiKey(
+            doTestApiKeyHasExpectedAttributes(
                 apiKeyId,
                 Map.of(
                     ApiKeyAttribute.METADATA,
@@ -1817,7 +2079,10 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             "all"
         ).v1().get(0);
         final String apiKeyId = createdApiKey.getId();
-        expectAttributesForApiKey(apiKeyId, Map.of(ApiKeyAttribute.LIMITED_BY_ROLE_DESCRIPTORS, Set.of(roleDescriptorBeforeUpdate)));
+        doTestApiKeyHasExpectedAttributes(
+            apiKeyId,
+            Map.of(ApiKeyAttribute.LIMITED_BY_ROLE_DESCRIPTORS, Set.of(roleDescriptorBeforeUpdate))
+        );
 
         final List<String> newClusterPrivileges = randomValueOtherThan(clusterPrivileges, () -> {
             final List<String> privs = new ArrayList<>(randomSubsetOf(ClusterPrivilegeResolver.names()));
@@ -1839,7 +2104,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
 
         assertNotNull(response);
         assertTrue(response.isUpdated());
-        expectAttributesForApiKey(apiKeyId, Map.of(ApiKeyAttribute.LIMITED_BY_ROLE_DESCRIPTORS, Set.of(roleDescriptorAfterUpdate)));
+        doTestApiKeyHasExpectedAttributes(apiKeyId, Map.of(ApiKeyAttribute.LIMITED_BY_ROLE_DESCRIPTORS, Set.of(roleDescriptorAfterUpdate)));
 
         // Update user role name only
         final RoleDescriptor roleDescriptorWithNewName = putRoleWithClusterPrivileges(
@@ -1865,7 +2130,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         expectedCreator.put("metadata", updatedUser.metadata());
         expectedCreator.put("realm_type", "native");
         expectedCreator.put("realm", "index");
-        expectAttributesForApiKey(
+        doTestApiKeyHasExpectedAttributes(
             apiKeyId,
             Map.of(ApiKeyAttribute.CREATOR, expectedCreator, ApiKeyAttribute.LIMITED_BY_ROLE_DESCRIPTORS, Set.of(roleDescriptorWithNewName))
         );
@@ -2181,7 +2446,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
                 currentSuperuserRoleDescriptors
             )
         );
-        expectAttributesForApiKey(apiKeyId, Map.of(ApiKeyAttribute.LIMITED_BY_ROLE_DESCRIPTORS, currentSuperuserRoleDescriptors));
+        doTestApiKeyHasExpectedAttributes(apiKeyId, Map.of(ApiKeyAttribute.LIMITED_BY_ROLE_DESCRIPTORS, currentSuperuserRoleDescriptors));
         // Second update is noop because role descriptors were auto-updated by the previous request
         assertSingleNoop(
             apiKeyId,
@@ -2299,35 +2564,40 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
 
     // Check attributes with both the raw document and the get api key response whenever possible
     @SuppressWarnings("unchecked")
-    private void expectAttributesForApiKey(String apiKeyId, Map<ApiKeyAttribute, Object> attributes) throws IOException {
+    private void doTestApiKeyHasExpectedAttributes(String apiKeyId, Map<ApiKeyAttribute, Object> attributes) throws IOException {
         final Map<String, Object> apiKeyDocMap = getApiKeyDocument(apiKeyId);
-        final PlainActionFuture<GetApiKeyResponse> future = new PlainActionFuture<>();
-        client().execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.usingApiKeyId(apiKeyId, false), future);
-        final GetApiKeyResponse getApiKeyResponse = future.actionGet();
-        assertThat(getApiKeyResponse.getApiKeyInfos(), arrayWithSize(1));
-        final ApiKey apiKeyInfo = getApiKeyResponse.getApiKeyInfos()[0];
-
+        final boolean useGetApiKey = randomBoolean();
+        final ApiKey apiKeyInfo = getApiKeyInfo(client(), apiKeyId, true, useGetApiKey);
         for (Map.Entry<ApiKeyAttribute, Object> entry : attributes.entrySet()) {
             switch (entry.getKey()) {
                 case CREATOR -> {
                     final var creatorMap = (Map<String, Object>) entry.getValue();
                     expectCreatorForApiKey(creatorMap, apiKeyDocMap);
-                    assertThat(creatorMap.get("principal"), equalTo(apiKeyInfo.getUsername()));
-                    assertThat(creatorMap.get("realm"), equalTo(apiKeyInfo.getRealm()));
+                    assertThat("useGetApiKey: " + useGetApiKey, creatorMap.get("principal"), equalTo(apiKeyInfo.getUsername()));
+                    assertThat("useGetApiKey: " + useGetApiKey, creatorMap.get("realm"), equalTo(apiKeyInfo.getRealm()));
                 }
                 case METADATA -> {
                     final var metadata = (Map<String, Object>) entry.getValue();
                     expectMetadataForApiKey(metadata, apiKeyDocMap);
-                    assertThat(metadata, equalTo(apiKeyInfo.getMetadata()));
+                    assertThat("useGetApiKey: " + useGetApiKey, metadata, equalTo(apiKeyInfo.getMetadata()));
                 }
                 case ASSIGNED_ROLE_DESCRIPTORS -> {
                     final var expectedRoleDescriptors = (Collection<RoleDescriptor>) entry.getValue();
                     expectRoleDescriptorsForApiKey("role_descriptors", expectedRoleDescriptors, apiKeyDocMap);
-                    assertThat(expectedRoleDescriptors, containsInAnyOrder(apiKeyInfo.getRoleDescriptors().toArray(RoleDescriptor[]::new)));
+                    assertThat(
+                        "useGetApiKey: " + useGetApiKey,
+                        expectedRoleDescriptors,
+                        containsInAnyOrder(apiKeyInfo.getRoleDescriptors().toArray(RoleDescriptor[]::new))
+                    );
                 }
                 case LIMITED_BY_ROLE_DESCRIPTORS -> {
                     final var expectedRoleDescriptors = (Collection<RoleDescriptor>) entry.getValue();
                     expectRoleDescriptorsForApiKey("limited_by_role_descriptors", expectedRoleDescriptors, apiKeyDocMap);
+                    assertThat(
+                        "useGetApiKey: " + useGetApiKey,
+                        expectedRoleDescriptors,
+                        containsInAnyOrder(apiKeyInfo.getLimitedBy().roleDescriptorsList().iterator().next().toArray(RoleDescriptor[]::new))
+                    );
                 }
                 default -> throw new IllegalStateException("unexpected attribute name");
             }
@@ -2336,7 +2606,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
 
     private void expectAttributesForApiKeys(List<String> apiKeyIds, Map<ApiKeyAttribute, Object> attributes) throws IOException {
         for (String apiKeyId : apiKeyIds) {
-            expectAttributesForApiKey(apiKeyId, attributes);
+            doTestApiKeyHasExpectedAttributes(apiKeyId, attributes);
         }
     }
 
@@ -2387,6 +2657,30 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         return client().execute(GetAction.INSTANCE, new GetRequest(SECURITY_MAIN_ALIAS, apiKeyId)).actionGet().getSource();
     }
 
+    private ApiKey getApiKeyInfo(Client client, String apiKeyId, boolean withLimitedBy, boolean useGetApiKey) {
+        if (useGetApiKey) {
+            final PlainActionFuture<GetApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                GetApiKeyAction.INSTANCE,
+                GetApiKeyRequest.builder().apiKeyId(apiKeyId).withLimitedBy(withLimitedBy).build(),
+                future
+            );
+            final GetApiKeyResponse getApiKeyResponse = future.actionGet();
+            assertThat(getApiKeyResponse.getApiKeyInfos(), arrayWithSize(1));
+            return getApiKeyResponse.getApiKeyInfos()[0];
+        } else {
+            final PlainActionFuture<QueryApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                QueryApiKeyAction.INSTANCE,
+                new QueryApiKeyRequest(QueryBuilders.idsQuery().addIds(apiKeyId), null, null, null, null, withLimitedBy),
+                future
+            );
+            final QueryApiKeyResponse queryApiKeyResponse = future.actionGet();
+            assertThat(queryApiKeyResponse.getItems(), arrayWithSize(1));
+            return queryApiKeyResponse.getItems()[0].getApiKey();
+        }
+    }
+
     private ServiceWithNodeName getServiceWithNodeName() {
         final var nodeName = randomFrom(internalCluster().getNodeNames());
         final var service = internalCluster().getInstance(ApiKeyService.class, nodeName);
@@ -2439,65 +2733,70 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         );
     }
 
-    private void verifyGetResponse(
+    private void verifyApiKeyInfos(
         int expectedNumberOfApiKeys,
         List<CreateApiKeyResponse> responses,
         List<Map<String, Object>> metadatas,
         List<RoleDescriptor> expectedRoleDescriptors,
-        GetApiKeyResponse response,
+        List<RoleDescriptor> expectedLimitedByRoleDescriptors,
+        ApiKey[] apiKeyInfos,
         Set<String> validApiKeyIds,
         List<String> invalidatedApiKeyIds
     ) {
-        verifyGetResponse(
+        verifyApiKeyInfos(
             ES_TEST_ROOT_USER,
             expectedNumberOfApiKeys,
             responses,
             metadatas,
             expectedRoleDescriptors,
-            response,
+            expectedLimitedByRoleDescriptors,
+            apiKeyInfos,
             validApiKeyIds,
             invalidatedApiKeyIds
         );
     }
 
-    private void verifyGetResponse(
+    private void verifyApiKeyInfos(
         String user,
         int expectedNumberOfApiKeys,
         List<CreateApiKeyResponse> responses,
         List<Map<String, Object>> metadatas,
         List<RoleDescriptor> expectedRoleDescriptors,
-        GetApiKeyResponse response,
+        List<RoleDescriptor> expectedLimitedByRoleDescriptors,
+        ApiKey[] apiKeyInfos,
         Set<String> validApiKeyIds,
         List<String> invalidatedApiKeyIds
     ) {
-        verifyGetResponse(
+        verifyApiKeyInfos(
             new String[] { user },
             expectedNumberOfApiKeys,
             responses,
             metadatas,
             expectedRoleDescriptors,
-            response,
+            (ignore) -> expectedLimitedByRoleDescriptors,
+            apiKeyInfos,
             validApiKeyIds,
             invalidatedApiKeyIds
         );
     }
 
-    private void verifyGetResponse(
+    private void verifyApiKeyInfos(
         String[] user,
         int expectedNumberOfApiKeys,
         List<CreateApiKeyResponse> responses,
         List<Map<String, Object>> metadatas,
         List<RoleDescriptor> expectedRoleDescriptors,
-        GetApiKeyResponse response,
+        Function<String, List<RoleDescriptor>> expectedLimitedByRoleDescriptorsLookup,
+        ApiKey[] apiKeyInfos,
         Set<String> validApiKeyIds,
         List<String> invalidatedApiKeyIds
     ) {
-        assertThat(response.getApiKeyInfos().length, equalTo(expectedNumberOfApiKeys));
+        assertThat(apiKeyInfos.length, equalTo(expectedNumberOfApiKeys));
         List<String> expectedIds = responses.stream()
             .filter(o -> validApiKeyIds.contains(o.getId()))
             .map(o -> o.getId())
             .collect(Collectors.toList());
-        List<String> actualIds = Arrays.stream(response.getApiKeyInfos())
+        List<String> actualIds = Arrays.stream(apiKeyInfos)
             .filter(o -> o.isInvalidated() == false)
             .map(o -> o.getId())
             .collect(Collectors.toList());
@@ -2506,19 +2805,19 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             .filter(o -> validApiKeyIds.contains(o.getId()))
             .map(o -> o.getName())
             .collect(Collectors.toList());
-        List<String> actualNames = Arrays.stream(response.getApiKeyInfos())
+        List<String> actualNames = Arrays.stream(apiKeyInfos)
             .filter(o -> o.isInvalidated() == false)
             .map(o -> o.getName())
             .collect(Collectors.toList());
         assertThat(actualNames, containsInAnyOrder(expectedNames.toArray(Strings.EMPTY_ARRAY)));
         Set<String> expectedUsernames = (validApiKeyIds.isEmpty()) ? Collections.emptySet() : Set.of(user);
-        Set<String> actualUsernames = Arrays.stream(response.getApiKeyInfos())
+        Set<String> actualUsernames = Arrays.stream(apiKeyInfos)
             .filter(o -> o.isInvalidated() == false)
             .map(o -> o.getUsername())
             .collect(Collectors.toSet());
         assertThat(actualUsernames, containsInAnyOrder(expectedUsernames.toArray(Strings.EMPTY_ARRAY)));
         if (invalidatedApiKeyIds != null) {
-            List<String> actualInvalidatedApiKeyIds = Arrays.stream(response.getApiKeyInfos())
+            List<String> actualInvalidatedApiKeyIds = Arrays.stream(apiKeyInfos)
                 .filter(o -> o.isInvalidated())
                 .map(o -> o.getId())
                 .collect(Collectors.toList());
@@ -2531,18 +2830,25 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
                     (m, i) -> m.put(responses.get(i).getId(), metadatas.get(i)),
                     HashMap::putAll
                 );
-            for (ApiKey apiKey : response.getApiKeyInfos()) {
+            for (ApiKey apiKey : apiKeyInfos) {
                 final Map<String, Object> metadata = idToMetadata.get(apiKey.getId());
                 assertThat(apiKey.getMetadata(), equalTo(metadata == null ? Map.of() : metadata));
             }
         }
-        Arrays.stream(response.getApiKeyInfos())
-            .forEach(
-                apiKeyInfo -> assertThat(
-                    apiKeyInfo.getRoleDescriptors(),
-                    containsInAnyOrder(expectedRoleDescriptors.toArray(RoleDescriptor[]::new))
-                )
+        Arrays.stream(apiKeyInfos).forEach(apiKeyInfo -> {
+            assertThat(apiKeyInfo.getRoleDescriptors(), containsInAnyOrder(expectedRoleDescriptors.toArray(RoleDescriptor[]::new)));
+            final List<RoleDescriptor> expectedLimitedByRoleDescriptors = expectedLimitedByRoleDescriptorsLookup.apply(
+                apiKeyInfo.getUsername()
             );
+            if (expectedLimitedByRoleDescriptors == null) {
+                assertThat(apiKeyInfo.getLimitedBy(), nullValue());
+            } else {
+                assertThat(
+                    apiKeyInfo.getLimitedBy().roleDescriptorsList().iterator().next(),
+                    containsInAnyOrder(expectedLimitedByRoleDescriptors.toArray(RoleDescriptor[]::new))
+                );
+            }
+        });
     }
 
     private Tuple<CreateApiKeyResponse, Map<String, Object>> createApiKey(String user, TimeValue expiration) {

+ 2 - 3
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/apikey/TransportGetApiKeyAction.java

@@ -12,7 +12,6 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.security.SecurityContext;
@@ -34,7 +33,7 @@ public final class TransportGetApiKeyAction extends HandledTransportAction<GetAp
         ApiKeyService apiKeyService,
         SecurityContext context
     ) {
-        super(GetApiKeyAction.NAME, transportService, actionFilters, (Writeable.Reader<GetApiKeyRequest>) GetApiKeyRequest::new);
+        super(GetApiKeyAction.NAME, transportService, actionFilters, GetApiKeyRequest::new);
         this.apiKeyService = apiKeyService;
         this.securityContext = context;
     }
@@ -58,7 +57,7 @@ public final class TransportGetApiKeyAction extends HandledTransportAction<GetAp
             realms = ApiKeyService.getOwnersRealmNames(authentication);
         }
 
-        apiKeyService.getApiKeys(realms, username, apiKeyName, apiKeyIds, listener);
+        apiKeyService.getApiKeys(realms, username, apiKeyName, apiKeyIds, request.withLimitedBy(), listener);
     }
 
 }

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/apikey/TransportQueryApiKeyAction.java

@@ -80,7 +80,7 @@ public final class TransportQueryApiKeyAction extends HandledTransportAction<Que
         }
 
         final SearchRequest searchRequest = new SearchRequest(new String[] { SECURITY_MAIN_ALIAS }, searchSourceBuilder);
-        apiKeyService.queryApiKeys(searchRequest, listener);
+        apiKeyService.queryApiKeys(searchRequest, request.withLimitedBy(), listener);
     }
 
     // package private for testing

+ 18 - 6
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java

@@ -1212,6 +1212,7 @@ public class ApiKeyService {
                 apiKeyIds,
                 true,
                 false,
+                // TODO: instead of parsing the entire API key document, we can just convert the hit to the API key ID
                 this::convertSearchHitToApiKeyInfo,
                 ActionListener.wrap(apiKeys -> {
                     if (apiKeys.isEmpty()) {
@@ -1576,6 +1577,7 @@ public class ApiKeyService {
      * @param username user name
      * @param apiKeyName API key name
      * @param apiKeyIds API key ids
+     * @param withLimitedBy whether to parse and return the limited by role descriptors
      * @param listener listener for {@link GetApiKeyResponse}
      */
     public void getApiKeys(
@@ -1583,6 +1585,7 @@ public class ApiKeyService {
         String username,
         String apiKeyName,
         String[] apiKeyIds,
+        boolean withLimitedBy,
         ActionListener<GetApiKeyResponse> listener
     ) {
         ensureEnabled();
@@ -1593,7 +1596,7 @@ public class ApiKeyService {
             apiKeyIds,
             false,
             false,
-            this::convertSearchHitToApiKeyInfo,
+            hit -> convertSearchHitToApiKeyInfo(hit, withLimitedBy),
             ActionListener.wrap(apiKeyInfos -> {
                 if (apiKeyInfos.isEmpty()) {
                     logger.debug(
@@ -1611,7 +1614,7 @@ public class ApiKeyService {
         );
     }
 
-    public void queryApiKeys(SearchRequest searchRequest, ActionListener<QueryApiKeyResponse> listener) {
+    public void queryApiKeys(SearchRequest searchRequest, boolean withLimitedBy, ActionListener<QueryApiKeyResponse> listener) {
         ensureEnabled();
 
         final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
@@ -1636,7 +1639,7 @@ public class ApiKeyService {
                             return;
                         }
                         final List<QueryApiKeyResponse.Item> apiKeyItem = Arrays.stream(searchResponse.getHits().getHits())
-                            .map(this::convertSearchHitToQueryItem)
+                            .map(hit -> convertSearchHitToQueryItem(hit, withLimitedBy))
                             .toList();
                         listener.onResponse(new QueryApiKeyResponse(total, apiKeyItem));
                     }, listener::onFailure)
@@ -1645,11 +1648,15 @@ public class ApiKeyService {
         }
     }
 
-    private QueryApiKeyResponse.Item convertSearchHitToQueryItem(SearchHit hit) {
-        return new QueryApiKeyResponse.Item(convertSearchHitToApiKeyInfo(hit), hit.getSortValues());
+    private QueryApiKeyResponse.Item convertSearchHitToQueryItem(SearchHit hit, boolean withLimitedBy) {
+        return new QueryApiKeyResponse.Item(convertSearchHitToApiKeyInfo(hit, withLimitedBy), hit.getSortValues());
     }
 
     private ApiKey convertSearchHitToApiKeyInfo(SearchHit hit) {
+        return convertSearchHitToApiKeyInfo(hit, false);
+    }
+
+    private ApiKey convertSearchHitToApiKeyInfo(SearchHit hit, boolean withLimitedBy) {
         final ApiKeyDoc apiKeyDoc = convertSearchHitToVersionedApiKeyDoc(hit).doc;
         final String apiKeyId = hit.getId();
         final Map<String, Object> metadata = apiKeyDoc.metadataFlattened != null
@@ -1662,6 +1669,10 @@ public class ApiKeyService {
             RoleReference.ApiKeyRoleType.ASSIGNED
         );
 
+        final List<RoleDescriptor> limitedByRoleDescriptors = withLimitedBy
+            ? parseRoleDescriptorsBytes(apiKeyId, apiKeyDoc.limitedByRoleDescriptorsBytes, RoleReference.ApiKeyRoleType.LIMITED_BY)
+            : null;
+
         return new ApiKey(
             apiKeyDoc.name,
             apiKeyId,
@@ -1671,7 +1682,8 @@ public class ApiKeyService {
             (String) apiKeyDoc.creator.get("principal"),
             (String) apiKeyDoc.creator.get("realm"),
             metadata,
-            roleDescriptors
+            roleDescriptors,
+            limitedByRoleDescriptors
         );
     }
 

+ 2 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java

@@ -207,7 +207,8 @@ public class RBACEngine implements AuthorizationEngine {
                     // if the authentication is an API key then the request must also contain same API key id
                     String authenticatedApiKeyId = (String) authentication.getMetadata().get(AuthenticationField.API_KEY_ID_KEY);
                     if (Strings.hasText(getApiKeyRequest.getApiKeyId())) {
-                        return getApiKeyRequest.getApiKeyId().equals(authenticatedApiKeyId);
+                        // An API key requires manage_api_key privilege or higher to view any limited-by role descriptors
+                        return getApiKeyRequest.getApiKeyId().equals(authenticatedApiKeyId) && false == getApiKeyRequest.withLimitedBy();
                     } else {
                         return false;
                     }

+ 10 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestGetApiKeyAction.java

@@ -46,8 +46,16 @@ public final class RestGetApiKeyAction extends ApiKeyBaseRestHandler {
         final String userName = request.param("username");
         final String realmName = request.param("realm_name");
         final boolean myApiKeysOnly = request.paramAsBoolean("owner", false);
-        final GetApiKeyRequest getApiKeyRequest = new GetApiKeyRequest(realmName, userName, apiKeyId, apiKeyName, myApiKeysOnly);
-        return channel -> client.execute(GetApiKeyAction.INSTANCE, getApiKeyRequest, new RestBuilderListener<GetApiKeyResponse>(channel) {
+        final boolean withLimitedBy = request.paramAsBoolean("with_limited_by", false);
+        final GetApiKeyRequest getApiKeyRequest = GetApiKeyRequest.builder()
+            .realmName(realmName)
+            .userName(userName)
+            .apiKeyId(apiKeyId)
+            .apiKeyName(apiKeyName)
+            .ownedByAuthenticatedUser(myApiKeysOnly)
+            .withLimitedBy(withLimitedBy)
+            .build();
+        return channel -> client.execute(GetApiKeyAction.INSTANCE, getApiKeyRequest, new RestBuilderListener<>(channel) {
             @Override
             public RestResponse buildResponse(GetApiKeyResponse getApiKeyResponse, XContentBuilder builder) throws Exception {
                 getApiKeyResponse.toXContent(builder, channel.request());

+ 27 - 12
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestQueryApiKeyAction.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.security.rest.action.apikey;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentParserUtils;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.rest.RestRequest;
@@ -37,15 +38,9 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
 public final class RestQueryApiKeyAction extends ApiKeyBaseRestHandler {
 
     @SuppressWarnings("unchecked")
-    private static final ConstructingObjectParser<QueryApiKeyRequest, Void> PARSER = new ConstructingObjectParser<>(
-        "query_api_key_request",
-        a -> new QueryApiKeyRequest(
-            (QueryBuilder) a[0],
-            (Integer) a[1],
-            (Integer) a[2],
-            (List<FieldSortBuilder>) a[3],
-            (SearchAfterBuilder) a[4]
-        )
+    private static final ConstructingObjectParser<Payload, Void> PARSER = new ConstructingObjectParser<>(
+        "query_api_key_request_payload",
+        a -> new Payload((QueryBuilder) a[0], (Integer) a[1], (Integer) a[2], (List<FieldSortBuilder>) a[3], (SearchAfterBuilder) a[4])
     );
 
     static {
@@ -93,10 +88,30 @@ public final class RestQueryApiKeyAction extends ApiKeyBaseRestHandler {
 
     @Override
     protected RestChannelConsumer innerPrepareRequest(final RestRequest request, final NodeClient client) throws IOException {
-        final QueryApiKeyRequest queryApiKeyRequest = request.hasContentOrSourceParam()
-            ? PARSER.parse(request.contentOrSourceParamParser(), null)
-            : new QueryApiKeyRequest();
+        final boolean withLimitedBy = request.paramAsBoolean("with_limited_by", false);
 
+        final QueryApiKeyRequest queryApiKeyRequest;
+        if (request.hasContentOrSourceParam()) {
+            final Payload payload = PARSER.parse(request.contentOrSourceParamParser(), null);
+            queryApiKeyRequest = new QueryApiKeyRequest(
+                payload.queryBuilder,
+                payload.from,
+                payload.size,
+                payload.fieldSortBuilders,
+                payload.searchAfterBuilder,
+                withLimitedBy
+            );
+        } else {
+            queryApiKeyRequest = new QueryApiKeyRequest(null, null, null, null, null, withLimitedBy);
+        }
         return channel -> client.execute(QueryApiKeyAction.INSTANCE, queryApiKeyRequest, new RestToXContentListener<>(channel));
     }
+
+    private record Payload(
+        @Nullable QueryBuilder queryBuilder,
+        @Nullable Integer from,
+        @Nullable Integer size,
+        @Nullable List<FieldSortBuilder> fieldSortBuilders,
+        @Nullable SearchAfterBuilder searchAfterBuilder
+    ) {}
 }

+ 9 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java

@@ -242,7 +242,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         String apiKeyName = randomFrom(randomAlphaOfLengthBetween(3, 8), null);
         String[] apiKeyIds = generateRandomStringArray(4, 4, true, true);
         PlainActionFuture<GetApiKeyResponse> getApiKeyResponsePlainActionFuture = new PlainActionFuture<>();
-        service.getApiKeys(realmNames, username, apiKeyName, apiKeyIds, getApiKeyResponsePlainActionFuture);
+        service.getApiKeys(realmNames, username, apiKeyName, apiKeyIds, randomBoolean(), getApiKeyResponsePlainActionFuture);
         final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("doc_type", "api_key"));
         if (realmNames != null && realmNames.length > 0) {
             if (realmNames.length == 1) {
@@ -832,7 +832,14 @@ public class ApiKeyServiceTests extends ESTestCase {
 
         ElasticsearchException e = expectThrows(
             ElasticsearchException.class,
-            () -> service.getApiKeys(new String[] { randomAlphaOfLength(6) }, randomAlphaOfLength(8), null, null, new PlainActionFuture<>())
+            () -> service.getApiKeys(
+                new String[] { randomAlphaOfLength(6) },
+                randomAlphaOfLength(8),
+                null,
+                null,
+                randomBoolean(),
+                new PlainActionFuture<>()
+            )
         );
 
         assertThat(e, instanceOf(FeatureNotEnabledException.class));

+ 9 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java

@@ -302,10 +302,18 @@ public class RBACEngineTests extends ESTestCase {
         final User user = new User("joe");
         final String apiKeyId = randomAlphaOfLengthBetween(4, 7);
         final Authentication authentication = AuthenticationTests.randomApiKeyAuthentication(user, apiKeyId);
-        final TransportRequest request = GetApiKeyRequest.usingApiKeyId(apiKeyId, false);
+        final TransportRequest request = GetApiKeyRequest.builder().apiKeyId(apiKeyId).build();
         assertTrue(RBACEngine.checkSameUserPermissions(GetApiKeyAction.NAME, request, authentication));
     }
 
+    public void testSameUserPermissionDeniesSelfApiKeyInfoRetrievalWithLimitedByWhenAuthenticatedByApiKey() {
+        final User user = new User("joe");
+        final String apiKeyId = randomAlphaOfLengthBetween(4, 7);
+        final Authentication authentication = AuthenticationTests.randomApiKeyAuthentication(user, apiKeyId);
+        final TransportRequest request = GetApiKeyRequest.builder().apiKeyId(apiKeyId).withLimitedBy(true).build();
+        assertFalse(RBACEngine.checkSameUserPermissions(GetApiKeyAction.NAME, request, authentication));
+    }
+
     public void testSameUserPermissionDeniesApiKeyInfoRetrievalWhenAuthenticatedByADifferentApiKey() {
         final User user = new User("joe");
         final String apiKeyId = randomAlphaOfLengthBetween(4, 7);

+ 35 - 4
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/apikey/RestGetApiKeyActionTests.java

@@ -75,6 +75,14 @@ public class RestGetApiKeyActionTests extends ESTestCase {
         final Map<String, String> param4 = mapBuilder().put("id", "api-key-id-1").map();
         final Map<String, String> param5 = mapBuilder().put("name", "api-key-name-1").map();
         final Map<String, String> params = randomFrom(param1, param2, param3, param4, param5);
+        final boolean withLimitedBy = randomBoolean();
+        if (withLimitedBy) {
+            params.put("with_limited_by", "true");
+        } else {
+            if (randomBoolean()) {
+                params.put("with_limited_by", "false");
+            }
+        }
         final boolean replyEmptyResponse = rarely();
         final FakeRestRequest restRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(params).build();
 
@@ -90,9 +98,21 @@ public class RestGetApiKeyActionTests extends ESTestCase {
         @SuppressWarnings("unchecked")
         final Map<String, Object> metadata = ApiKeyTests.randomMetadata();
         final List<RoleDescriptor> roleDescriptors = randomUniquelyNamedRoleDescriptors(0, 3);
+        final List<RoleDescriptor> limitedByRoleDescriptors = withLimitedBy ? randomUniquelyNamedRoleDescriptors(1, 3) : null;
         final GetApiKeyResponse getApiKeyResponseExpected = new GetApiKeyResponse(
             Collections.singletonList(
-                new ApiKey("api-key-name-1", "api-key-id-1", creation, expiration, false, "user-x", "realm-1", metadata, roleDescriptors)
+                new ApiKey(
+                    "api-key-name-1",
+                    "api-key-id-1",
+                    creation,
+                    expiration,
+                    false,
+                    "user-x",
+                    "realm-1",
+                    metadata,
+                    roleDescriptors,
+                    limitedByRoleDescriptors
+                )
             )
         );
 
@@ -152,7 +172,8 @@ public class RestGetApiKeyActionTests extends ESTestCase {
                             "user-x",
                             "realm-1",
                             metadata,
-                            roleDescriptors
+                            roleDescriptors,
+                            limitedByRoleDescriptors
                         )
                     )
                 );
@@ -169,6 +190,14 @@ public class RestGetApiKeyActionTests extends ESTestCase {
         } else {
             param = mapBuilder().put("owner", Boolean.FALSE.toString()).put("realm_name", "realm-1").map();
         }
+        final boolean withLimitedBy = randomBoolean();
+        if (withLimitedBy) {
+            param.put("with_limited_by", "true");
+        } else {
+            if (randomBoolean()) {
+                param.put("with_limited_by", "false");
+            }
+        }
 
         final FakeRestRequest restRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(param).build();
 
@@ -191,7 +220,8 @@ public class RestGetApiKeyActionTests extends ESTestCase {
             "user-x",
             "realm-1",
             ApiKeyTests.randomMetadata(),
-            randomUniquelyNamedRoleDescriptors(0, 3)
+            randomUniquelyNamedRoleDescriptors(0, 3),
+            withLimitedBy ? randomUniquelyNamedRoleDescriptors(1, 3) : null
         );
         final ApiKey apiKey2 = new ApiKey(
             "api-key-name-2",
@@ -202,7 +232,8 @@ public class RestGetApiKeyActionTests extends ESTestCase {
             "user-y",
             "realm-1",
             ApiKeyTests.randomMetadata(),
-            randomUniquelyNamedRoleDescriptors(0, 3)
+            randomUniquelyNamedRoleDescriptors(0, 3),
+            withLimitedBy ? randomUniquelyNamedRoleDescriptors(1, 3) : null
         );
         final GetApiKeyResponse getApiKeyResponseExpectedWhenOwnerFlagIsTrue = new GetApiKeyResponse(Collections.singletonList(apiKey1));
         final GetApiKeyResponse getApiKeyResponseExpectedWhenOwnerFlagIsFalse = new GetApiKeyResponse(List.of(apiKey1, apiKey2));