1
0
Эх сурвалжийг харах

Get and Query API Key with profile uid (#106531)

Add new optional request option, `with_profile_uid`,
to the Get and Query API Key Information endpoints,
to return the API keys owner users' profile uid.

Closes #98939
Albert Zaharovits 1 жил өмнө
parent
commit
3e0a0f6291
30 өөрчлөгдсөн 1719 нэмэгдсэн , 283 устгасан
  1. 5 0
      docs/changelog/106531.yaml
  2. 5 0
      docs/reference/rest-api/security/get-api-keys.asciidoc
  3. 5 0
      docs/reference/rest-api/security/query-api-key.asciidoc
  4. 5 0
      rest-api-spec/src/main/resources/rest-api-spec/api/security.get_api_key.json
  5. 5 0
      rest-api-spec/src/main/resources/rest-api-spec/api/security.query_api_keys.json
  6. 55 46
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKey.java
  7. 27 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyRequest.java
  8. 64 12
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyResponse.java
  9. 9 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyRequest.java
  10. 44 6
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyResponse.java
  11. 19 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyRequestTests.java
  12. 41 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyResponseTests.java
  13. 3 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyRequestTests.java
  14. 87 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyResponseTests.java
  15. 11 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/ManageOwnApiKeyClusterPrivilegeTests.java
  16. 116 44
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java
  17. 2 0
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/apikey/ApiKeySingleNodeTests.java
  18. 3 3
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/AbstractProfileIntegTestCase.java
  19. 378 0
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ApiKeyOwnerProfileIntegTests.java
  20. 1 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
  21. 26 2
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/apikey/TransportGetApiKeyAction.java
  22. 34 3
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/apikey/TransportQueryApiKeyAction.java
  23. 23 19
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java
  24. 66 2
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/profile/ProfileService.java
  25. 2 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestGetApiKeyAction.java
  26. 4 3
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestQueryApiKeyAction.java
  27. 25 29
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java
  28. 397 9
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/profile/ProfileServiceTests.java
  29. 152 89
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/apikey/RestGetApiKeyActionTests.java
  30. 105 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/apikey/RestQueryApiKeyActionTests.java

+ 5 - 0
docs/changelog/106531.yaml

@@ -0,0 +1,5 @@
+pr: 106531
+summary: Get and Query API Key with profile uid
+area: Security
+type: feature
+issues: []

+ 5 - 0
docs/reference/rest-api/security/get-api-keys.asciidoc

@@ -67,6 +67,11 @@ associated with the API key. An API key's actual permission is the intersection
 its <<api-key-role-descriptors,assigned role descriptors>> and the owner user's role descriptors
 (effectively limited by it). An API key must have `manage_api_key` or higher privileges to retrieve the limited-by role descriptors of any API key, including itself.
 
+`with_profile_uid`::
+(Optional, boolean) Determines whether to also retrieve the <<user-profile,user profile>> `uid`, for the API key owner user.
+If it exists, the profile uid is returned under the `profile_uid` response field for each API key.
+Defaults to `false`.
+
 `active_only`::
 (Optional, Boolean) A boolean flag that can be used to query API keys that are currently active.
 An API key is considered active if it is neither invalidated, nor expired at query time. You can specify this together

+ 5 - 0
docs/reference/rest-api/security/query-api-key.asciidoc

@@ -154,6 +154,11 @@ its <<api-key-role-descriptors,assigned role descriptors>> and the owner user's
 (effectively limited by it). An API key cannot retrieve any API key's limited-by role descriptors
 (including itself) unless it has `manage_api_key` or higher privileges.
 
+`with_profile_uid`::
+(Optional, boolean) Determines whether to also retrieve the <<user-profile,user profile>> `uid`, for the API key owner user.
+If it exists, the profile uid is returned under the `profile_uid` response field for each API key.
+Defaults to `false`.
+
 [[security-api-query-api-key-request-body]]
 ==== {api-request-body-title}
 

+ 5 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/security.get_api_key.json

@@ -46,6 +46,11 @@
         "default":false,
         "description": "flag to show the limited-by role descriptors of API Keys"
       },
+      "with_profile_uid":{
+        "type":"boolean",
+        "default":false,
+        "description": "flag to also retrieve the API Key's owner profile uid, if it exists"
+      },
       "active_only":{
         "type":"boolean",
         "default":false,

+ 5 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/security.query_api_keys.json

@@ -26,6 +26,11 @@
         "type":"boolean",
         "default":false,
         "description": "flag to show the limited-by role descriptors of API Keys"
+      },
+      "with_profile_uid":{
+        "type":"boolean",
+        "default":false,
+        "description": "flag to also retrieve the API Key's owner profile uid, if it exists"
       }
     },
     "body":{

+ 55 - 46
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/ApiKey.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.core.security.action.apikey;
 import org.elasticsearch.common.xcontent.XContentParserUtils;
 import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.xcontent.AbstractObjectParser;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
@@ -164,6 +165,26 @@ public final class ApiKey implements ToXContentObject {
         this.limitedBy = limitedBy;
     }
 
+    // Should only be used by XContent parsers
+    @SuppressWarnings("unchecked")
+    ApiKey(Object[] parsed) {
+        this(
+            (String) parsed[0],
+            (String) parsed[1],
+            (Type) parsed[2],
+            Instant.ofEpochMilli((Long) parsed[3]),
+            (parsed[4] == null) ? null : Instant.ofEpochMilli((Long) parsed[4]),
+            (Boolean) parsed[5],
+            (parsed[6] == null) ? null : Instant.ofEpochMilli((Long) parsed[6]),
+            (String) parsed[7],
+            (String) parsed[8],
+            (String) parsed[9],
+            (parsed[10] == null) ? null : (Map<String, Object>) parsed[10],
+            (List<RoleDescriptor>) parsed[11],
+            (RoleDescriptorsIntersection) parsed[12]
+        );
+    }
+
     public String getId() {
         return id;
     }
@@ -343,52 +364,6 @@ public final class ApiKey implements ToXContentObject {
             && Objects.equals(limitedBy, other.limitedBy);
     }
 
-    @SuppressWarnings("unchecked")
-    static final ConstructingObjectParser<ApiKey, Void> PARSER = new ConstructingObjectParser<>("api_key", true, args -> {
-        return new ApiKey(
-            (String) args[0],
-            (String) args[1],
-            (Type) args[2],
-            Instant.ofEpochMilli((Long) args[3]),
-            (args[4] == null) ? null : Instant.ofEpochMilli((Long) args[4]),
-            (Boolean) args[5],
-            (args[6] == null) ? null : Instant.ofEpochMilli((Long) args[6]),
-            (String) args[7],
-            (String) args[8],
-            (String) args[9],
-            (args[10] == null) ? null : (Map<String, Object>) args[10],
-            (List<RoleDescriptor>) args[11],
-            (RoleDescriptorsIntersection) args[12]
-        );
-    });
-    static {
-        PARSER.declareString(constructorArg(), new ParseField("name"));
-        PARSER.declareString(constructorArg(), new ParseField("id"));
-        PARSER.declareField(constructorArg(), Type::fromXContent, new ParseField("type"), ObjectParser.ValueType.STRING);
-        PARSER.declareLong(constructorArg(), new ParseField("creation"));
-        PARSER.declareLong(optionalConstructorArg(), new ParseField("expiration"));
-        PARSER.declareBoolean(constructorArg(), new ParseField("invalidated"));
-        PARSER.declareLong(optionalConstructorArg(), new ParseField("invalidation"));
-        PARSER.declareString(constructorArg(), new ParseField("username"));
-        PARSER.declareString(constructorArg(), new ParseField("realm"));
-        PARSER.declareStringOrNull(optionalConstructorArg(), new ParseField("realm_type"));
-        PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.map(), new ParseField("metadata"));
-        PARSER.declareNamedObjects(optionalConstructorArg(), (p, c, n) -> {
-            p.nextToken();
-            return RoleDescriptor.parse(n, p, false);
-        }, new ParseField("role_descriptors"));
-        PARSER.declareField(
-            optionalConstructorArg(),
-            (p, c) -> RoleDescriptorsIntersection.fromXContent(p),
-            new ParseField("limited_by"),
-            ObjectParser.ValueType.OBJECT_ARRAY
-        );
-    }
-
-    public static ApiKey fromXContent(XContentParser parser) throws IOException {
-        return PARSER.parse(parser, null);
-    }
-
     @Override
     public String toString() {
         return "ApiKey [name="
@@ -420,4 +395,38 @@ public final class ApiKey implements ToXContentObject {
             + "]";
     }
 
+    static final ConstructingObjectParser<ApiKey, Void> PARSER;
+    static {
+        PARSER = new ConstructingObjectParser<>("api_key", true, ApiKey::new);
+        initializeParser(PARSER);
+    }
+
+    public static ApiKey fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    static int initializeParser(AbstractObjectParser<?, Void> parser) {
+        parser.declareString(constructorArg(), new ParseField("name"));
+        parser.declareString(constructorArg(), new ParseField("id"));
+        parser.declareField(constructorArg(), Type::fromXContent, new ParseField("type"), ObjectParser.ValueType.STRING);
+        parser.declareLong(constructorArg(), new ParseField("creation"));
+        parser.declareLong(optionalConstructorArg(), new ParseField("expiration"));
+        parser.declareBoolean(constructorArg(), new ParseField("invalidated"));
+        parser.declareLong(optionalConstructorArg(), new ParseField("invalidation"));
+        parser.declareString(constructorArg(), new ParseField("username"));
+        parser.declareString(constructorArg(), new ParseField("realm"));
+        parser.declareStringOrNull(optionalConstructorArg(), new ParseField("realm_type"));
+        parser.declareObject(optionalConstructorArg(), (p, c) -> p.map(), new ParseField("metadata"));
+        parser.declareNamedObjects(optionalConstructorArg(), (p, c, n) -> {
+            p.nextToken();
+            return RoleDescriptor.parse(n, p, false);
+        }, new ParseField("role_descriptors"));
+        parser.declareField(
+            optionalConstructorArg(),
+            (p, c) -> RoleDescriptorsIntersection.fromXContent(p),
+            new ParseField("limited_by"),
+            ObjectParser.ValueType.OBJECT_ARRAY
+        );
+        return 13; // the number of fields to parse
+    }
 }

+ 27 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyRequest.java

@@ -31,6 +31,7 @@ public final class GetApiKeyRequest extends ActionRequest {
     private final boolean ownedByAuthenticatedUser;
     private final boolean withLimitedBy;
     private final boolean activeOnly;
+    private final boolean withProfileUid;
 
     private GetApiKeyRequest(
         @Nullable String realmName,
@@ -39,7 +40,8 @@ public final class GetApiKeyRequest extends ActionRequest {
         @Nullable String apiKeyName,
         boolean ownedByAuthenticatedUser,
         boolean withLimitedBy,
-        boolean activeOnly
+        boolean activeOnly,
+        boolean withProfileUid
     ) {
         this.realmName = textOrNull(realmName);
         this.userName = textOrNull(userName);
@@ -48,6 +50,7 @@ public final class GetApiKeyRequest extends ActionRequest {
         this.ownedByAuthenticatedUser = ownedByAuthenticatedUser;
         this.withLimitedBy = withLimitedBy;
         this.activeOnly = activeOnly;
+        this.withProfileUid = withProfileUid;
     }
 
     private static String textOrNull(@Nullable String arg) {
@@ -82,6 +85,10 @@ public final class GetApiKeyRequest extends ActionRequest {
         return activeOnly;
     }
 
+    public boolean withProfileUid() {
+        return withProfileUid;
+    }
+
     @Override
     public ActionRequestValidationException validate() {
         ActionRequestValidationException validationException = null;
@@ -127,12 +134,13 @@ public final class GetApiKeyRequest extends ActionRequest {
             && Objects.equals(apiKeyId, that.apiKeyId)
             && Objects.equals(apiKeyName, that.apiKeyName)
             && withLimitedBy == that.withLimitedBy
-            && activeOnly == that.activeOnly;
+            && activeOnly == that.activeOnly
+            && withProfileUid == that.withProfileUid;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(realmName, userName, apiKeyId, apiKeyName, ownedByAuthenticatedUser, withLimitedBy, activeOnly);
+        return Objects.hash(realmName, userName, apiKeyId, apiKeyName, ownedByAuthenticatedUser, withLimitedBy, activeOnly, withProfileUid);
     }
 
     public static Builder builder() {
@@ -147,6 +155,7 @@ public final class GetApiKeyRequest extends ActionRequest {
         private boolean ownedByAuthenticatedUser = false;
         private boolean withLimitedBy = false;
         private boolean activeOnly = false;
+        private boolean withProfileUid = false;
 
         public Builder realmName(String realmName) {
             this.realmName = realmName;
@@ -191,8 +200,22 @@ public final class GetApiKeyRequest extends ActionRequest {
             return this;
         }
 
+        public Builder withProfileUid(boolean withProfileUid) {
+            this.withProfileUid = withProfileUid;
+            return this;
+        }
+
         public GetApiKeyRequest build() {
-            return new GetApiKeyRequest(realmName, userName, apiKeyId, apiKeyName, ownedByAuthenticatedUser, withLimitedBy, activeOnly);
+            return new GetApiKeyRequest(
+                realmName,
+                userName,
+                apiKeyId,
+                apiKeyName,
+                ownedByAuthenticatedUser,
+                withLimitedBy,
+                activeOnly,
+                withProfileUid
+            );
         }
     }
 }

+ 64 - 12
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyResponse.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.core.security.action.apikey;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
@@ -17,7 +18,9 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 
@@ -33,9 +36,34 @@ public final class GetApiKeyResponse extends ActionResponse implements ToXConten
 
     private final List<Item> foundApiKeyInfoList;
 
-    public GetApiKeyResponse(Collection<ApiKey> foundApiKeyInfos) {
-        Objects.requireNonNull(foundApiKeyInfos, "found_api_keys_info must be provided");
-        this.foundApiKeyInfoList = foundApiKeyInfos.stream().map(Item::new).toList();
+    public GetApiKeyResponse(Collection<Item> foundApiKeysInfos) {
+        Objects.requireNonNull(foundApiKeysInfos, "found_api_keys_info must be provided");
+        if (foundApiKeysInfos instanceof List<Item>) {
+            this.foundApiKeyInfoList = (List<Item>) foundApiKeysInfos;
+        } else {
+            this.foundApiKeyInfoList = new ArrayList<>(foundApiKeysInfos);
+        }
+    }
+
+    public GetApiKeyResponse(Collection<ApiKey> foundApiKeysInfos, @Nullable Collection<String> ownerProfileUids) {
+        Objects.requireNonNull(foundApiKeysInfos, "found_api_keys_info must be provided");
+        if (ownerProfileUids == null) {
+            this.foundApiKeyInfoList = foundApiKeysInfos.stream().map(Item::new).toList();
+        } else {
+            if (foundApiKeysInfos.size() != ownerProfileUids.size()) {
+                throw new IllegalStateException("Each api key info must be associated to a (nullable) owner profile uid");
+            }
+            int size = foundApiKeysInfos.size();
+            this.foundApiKeyInfoList = new ArrayList<>(size);
+            Iterator<ApiKey> apiKeyIterator = foundApiKeysInfos.iterator();
+            Iterator<String> profileUidIterator = ownerProfileUids.iterator();
+            while (apiKeyIterator.hasNext()) {
+                if (false == profileUidIterator.hasNext()) {
+                    throw new IllegalStateException("Each api key info must be associated to a (nullable) owner profile uid");
+                }
+                this.foundApiKeyInfoList.add(new Item(apiKeyIterator.next(), profileUidIterator.next()));
+            }
+        }
     }
 
     public List<Item> getApiKeyInfoList() {
@@ -73,31 +101,55 @@ public final class GetApiKeyResponse extends ActionResponse implements ToXConten
         return "GetApiKeyResponse{foundApiKeysInfo=" + foundApiKeyInfoList + "}";
     }
 
-    public record Item(ApiKey apiKeyInfo) implements ToXContentObject {
+    public record Item(ApiKey apiKeyInfo, @Nullable String ownerProfileUid) implements ToXContentObject {
+
+        public Item(ApiKey apiKeyInfo) {
+            this(apiKeyInfo, null);
+        }
+
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
             apiKeyInfo.innerToXContent(builder, params);
+            if (ownerProfileUid != null) {
+                builder.field("profile_uid", ownerProfileUid);
+            }
             builder.endObject();
             return builder;
         }
 
         @Override
         public String toString() {
-            return "Item{apiKeyInfo=" + apiKeyInfo + "}";
+            return "Item{apiKeyInfo=" + apiKeyInfo + ", ownerProfileUid=" + ownerProfileUid + "}";
         }
     }
 
-    @SuppressWarnings("unchecked")
-    static final ConstructingObjectParser<GetApiKeyResponse, Void> PARSER = new ConstructingObjectParser<>(
-        "get_api_key_response",
-        args -> (args[0] == null) ? GetApiKeyResponse.EMPTY : new GetApiKeyResponse((List<ApiKey>) args[0])
-    );
+    static final ConstructingObjectParser<GetApiKeyResponse, Void> RESPONSE_PARSER;
     static {
-        PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> ApiKey.fromXContent(p), new ParseField("api_keys"));
+        int nFieldsForParsingApiKeyInfo = 13; // this must be changed whenever ApiKey#initializeParser is changed for the number of parsers
+        ConstructingObjectParser<Item, Void> keyInfoParser = new ConstructingObjectParser<>(
+            "api_key_with_profile_uid",
+            true,
+            args -> new Item(new ApiKey(args), (String) args[nFieldsForParsingApiKeyInfo])
+        );
+        int nParsedFields = ApiKey.initializeParser(keyInfoParser);
+        if (nFieldsForParsingApiKeyInfo != nParsedFields) {
+            throw new IllegalStateException("Unexpected fields for parsing API Keys");
+        }
+        keyInfoParser.declareStringOrNull(optionalConstructorArg(), new ParseField("profile_uid"));
+        RESPONSE_PARSER = new ConstructingObjectParser<>("get_api_key_response", args -> {
+            if (args[0] == null) {
+                return GetApiKeyResponse.EMPTY;
+            } else {
+                @SuppressWarnings("unchecked")
+                List<Item> apiKeysWithProfileUids = (List<Item>) args[0];
+                return new GetApiKeyResponse(apiKeysWithProfileUids);
+            }
+        });
+        RESPONSE_PARSER.declareObjectArray(optionalConstructorArg(), keyInfoParser, new ParseField("api_keys"));
     }
 
     public static GetApiKeyResponse fromXContent(XContentParser parser) throws IOException {
-        return PARSER.parse(parser, null);
+        return RESPONSE_PARSER.parse(parser, null);
     }
 }

+ 9 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyRequest.java

@@ -38,13 +38,14 @@ public final class QueryApiKeyRequest extends ActionRequest {
     private final SearchAfterBuilder searchAfterBuilder;
     private final boolean withLimitedBy;
     private boolean filterForCurrentUser;
+    private final boolean withProfileUid;
 
     public QueryApiKeyRequest() {
         this((QueryBuilder) null);
     }
 
     public QueryApiKeyRequest(QueryBuilder queryBuilder) {
-        this(queryBuilder, null, null, null, null, null, false);
+        this(queryBuilder, null, null, null, null, null, false, false);
     }
 
     public QueryApiKeyRequest(
@@ -54,7 +55,8 @@ public final class QueryApiKeyRequest extends ActionRequest {
         @Nullable Integer size,
         @Nullable List<FieldSortBuilder> fieldSortBuilders,
         @Nullable SearchAfterBuilder searchAfterBuilder,
-        boolean withLimitedBy
+        boolean withLimitedBy,
+        boolean withProfileUid
     ) {
         this.queryBuilder = queryBuilder;
         this.aggsBuilder = aggsBuilder;
@@ -63,6 +65,7 @@ public final class QueryApiKeyRequest extends ActionRequest {
         this.fieldSortBuilders = fieldSortBuilders;
         this.searchAfterBuilder = searchAfterBuilder;
         this.withLimitedBy = withLimitedBy;
+        this.withProfileUid = withProfileUid;
     }
 
     public QueryBuilder getQueryBuilder() {
@@ -101,6 +104,10 @@ public final class QueryApiKeyRequest extends ActionRequest {
         return withLimitedBy;
     }
 
+    public boolean withProfileUid() {
+        return withProfileUid;
+    }
+
     @Override
     public ActionRequestValidationException validate() {
         ActionRequestValidationException validationException = null;

+ 44 - 6
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyResponse.java

@@ -19,6 +19,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 
@@ -28,16 +29,44 @@ import java.util.Objects;
  */
 public final class QueryApiKeyResponse extends ActionResponse implements ToXContentObject {
 
-    public static final QueryApiKeyResponse EMPTY = new QueryApiKeyResponse(0, List.of(), null);
+    public static final QueryApiKeyResponse EMPTY = new QueryApiKeyResponse(0, List.of(), List.of(), null, null);
 
     private final long total;
     private final List<Item> foundApiKeyInfoList;
     private final @Nullable InternalAggregations aggregations;
 
-    public QueryApiKeyResponse(long total, Collection<Item> items, @Nullable InternalAggregations aggregations) {
+    public QueryApiKeyResponse(
+        long total,
+        Collection<ApiKey> foundApiKeysInfos,
+        Collection<Object[]> sortValues,
+        @Nullable Collection<String> ownerProfileUids,
+        @Nullable InternalAggregations aggregations
+    ) {
         this.total = total;
-        Objects.requireNonNull(items, "items must be provided");
-        this.foundApiKeyInfoList = items instanceof List<Item> ? (List<Item>) items : new ArrayList<>(items);
+        Objects.requireNonNull(foundApiKeysInfos, "found_api_keys_infos must be provided");
+        Objects.requireNonNull(sortValues, "sort_values must be provided");
+        if (foundApiKeysInfos.size() != sortValues.size()) {
+            throw new IllegalStateException("Each api key info must be associated to a (nullable) sort value");
+        }
+        if (ownerProfileUids != null && foundApiKeysInfos.size() != ownerProfileUids.size()) {
+            throw new IllegalStateException("Each api key info must be associated to a (nullable) owner profile uid");
+        }
+        int size = foundApiKeysInfos.size();
+        this.foundApiKeyInfoList = new ArrayList<>(size);
+        Iterator<ApiKey> apiKeyIterator = foundApiKeysInfos.iterator();
+        Iterator<Object[]> sortValueIterator = sortValues.iterator();
+        Iterator<String> profileUidIterator = ownerProfileUids != null ? ownerProfileUids.iterator() : null;
+        while (apiKeyIterator.hasNext()) {
+            if (false == sortValueIterator.hasNext()) {
+                throw new IllegalStateException("Each api key info must be associated to a (nullable) sort value");
+            }
+            if (profileUidIterator != null && false == profileUidIterator.hasNext()) {
+                throw new IllegalStateException("Each api key info must be associated to a (nullable) owner profile uid");
+            }
+            this.foundApiKeyInfoList.add(
+                new Item(apiKeyIterator.next(), sortValueIterator.next(), profileUidIterator != null ? profileUidIterator.next() : null)
+            );
+        }
         this.aggregations = aggregations;
     }
 
@@ -92,7 +121,7 @@ public final class QueryApiKeyResponse extends ActionResponse implements ToXCont
         return "QueryApiKeyResponse{total=" + total + ", items=" + foundApiKeyInfoList + ", aggs=" + aggregations + "}";
     }
 
-    public record Item(ApiKey apiKeyInfo, @Nullable Object[] sortValues) implements ToXContentObject {
+    public record Item(ApiKey apiKeyInfo, @Nullable Object[] sortValues, @Nullable String ownerProfileUid) implements ToXContentObject {
 
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@@ -101,13 +130,22 @@ public final class QueryApiKeyResponse extends ActionResponse implements ToXCont
             if (sortValues != null && sortValues.length > 0) {
                 builder.array("_sort", sortValues);
             }
+            if (ownerProfileUid != null) {
+                builder.field("profile_uid", ownerProfileUid);
+            }
             builder.endObject();
             return builder;
         }
 
         @Override
         public String toString() {
-            return "Item{apiKeyInfo=" + apiKeyInfo + ", sortValues=" + Arrays.toString(sortValues) + '}';
+            return "Item{apiKeyInfo="
+                + apiKeyInfo
+                + ", sortValues="
+                + Arrays.toString(sortValues)
+                + ", ownerProfileUid="
+                + ownerProfileUid
+                + '}';
         }
     }
 }

+ 19 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyRequestTests.java

@@ -22,22 +22,36 @@ public class GetApiKeyRequestTests extends ESTestCase {
         GetApiKeyRequest request = GetApiKeyRequest.builder()
             .apiKeyId(randomAlphaOfLength(5))
             .ownedByAuthenticatedUser(randomBoolean())
+            .withProfileUid(randomBoolean())
             .build();
         ActionRequestValidationException ve = request.validate();
         assertNull(ve);
-        request = GetApiKeyRequest.builder().apiKeyName(randomAlphaOfLength(5)).ownedByAuthenticatedUser(randomBoolean()).build();
+        request = GetApiKeyRequest.builder()
+            .apiKeyName(randomAlphaOfLength(5))
+            .ownedByAuthenticatedUser(randomBoolean())
+            .withProfileUid(randomBoolean())
+            .build();
         ve = request.validate();
         assertNull(ve);
-        request = GetApiKeyRequest.builder().realmName(randomAlphaOfLength(5)).activeOnly(randomBoolean()).build();
+        request = GetApiKeyRequest.builder()
+            .realmName(randomAlphaOfLength(5))
+            .activeOnly(randomBoolean())
+            .withProfileUid(randomBoolean())
+            .build();
         ve = request.validate();
         assertNull(ve);
-        request = GetApiKeyRequest.builder().userName(randomAlphaOfLength(5)).activeOnly(randomBoolean()).build();
+        request = GetApiKeyRequest.builder()
+            .userName(randomAlphaOfLength(5))
+            .activeOnly(randomBoolean())
+            .withProfileUid(randomBoolean())
+            .build();
         ve = request.validate();
         assertNull(ve);
         request = GetApiKeyRequest.builder()
             .realmName(randomAlphaOfLength(5))
             .userName(randomAlphaOfLength(7))
             .activeOnly(randomBoolean())
+            .withProfileUid(randomBoolean())
             .build();
         ve = request.validate();
         assertNull(ve);
@@ -70,6 +84,7 @@ public class GetApiKeyRequestTests extends ESTestCase {
                 .apiKeyId(inputs[caseNo][2])
                 .apiKeyName(inputs[caseNo][3])
                 .ownedByAuthenticatedUser(Boolean.parseBoolean(inputs[caseNo][4]))
+                .withProfileUid(randomBoolean())
                 .build();
             ActionRequestValidationException ve = request.validate();
             assertNotNull(ve);
@@ -87,6 +102,7 @@ public class GetApiKeyRequestTests extends ESTestCase {
             .apiKeyName(randomBlankString.get())
             .ownedByAuthenticatedUser(randomBoolean())
             .withLimitedBy(randomBoolean())
+            .withProfileUid(randomBoolean())
             .build();
         assertThat(request.getRealmName(), nullValue());
         assertThat(request.getUserName(), nullValue());

+ 41 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/GetApiKeyResponseTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 
 import java.io.IOException;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -25,6 +26,7 @@ import static org.elasticsearch.xpack.core.security.action.apikey.CrossClusterAp
 import static org.elasticsearch.xpack.core.security.action.apikey.CrossClusterApiKeyRoleDescriptorBuilder.CCS_AND_CCR_CLUSTER_PRIVILEGE_NAMES;
 import static org.elasticsearch.xpack.core.security.action.apikey.CrossClusterApiKeyRoleDescriptorBuilder.CCS_INDICES_PRIVILEGE_NAMES;
 import static org.elasticsearch.xpack.core.security.action.apikey.CrossClusterApiKeyRoleDescriptorBuilder.ROLE_DESCRIPTOR_NAME;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
 public class GetApiKeyResponseTests extends ESTestCase {
@@ -119,7 +121,14 @@ public class GetApiKeyResponseTests extends ESTestCase {
             crossClusterAccessRoleDescriptors,
             null
         );
-        GetApiKeyResponse response = new GetApiKeyResponse(Arrays.asList(apiKeyInfo1, apiKeyInfo2, apiKeyInfo3, apiKeyInfo4));
+        String profileUid2 = "profileUid2";
+        String profileUid4 = "profileUid4";
+        List<String> profileUids = new ArrayList<>(4);
+        profileUids.add(null);
+        profileUids.add(profileUid2);
+        profileUids.add(null);
+        profileUids.add(profileUid4);
+        GetApiKeyResponse response = new GetApiKeyResponse(Arrays.asList(apiKeyInfo1, apiKeyInfo2, apiKeyInfo3, apiKeyInfo4), profileUids);
         XContentBuilder builder = XContentFactory.jsonBuilder();
         response.toXContent(builder, ToXContent.EMPTY_PARAMS);
         assertThat(Strings.toString(builder), equalTo(XContentHelper.stripWhitespace(Strings.format("""
@@ -179,7 +188,8 @@ public class GetApiKeyResponseTests extends ESTestCase {
                         }
                       }
                     }
-                  ]
+                  ],
+                  "profile_uid": "profileUid2"
                 },
                 {
                   "id": "id-3",
@@ -312,12 +322,40 @@ public class GetApiKeyResponseTests extends ESTestCase {
                         "allow_restricted_indices": false
                       }
                     ]
-                  }
+                  },
+                  "profile_uid": "profileUid4"
                 }
               ]
             }""", getType("rest"), getType("rest"), getType("rest"), getType("cross_cluster")))));
     }
 
+    public void testMismatchApiKeyInfoAndProfileData() {
+        List<ApiKey> apiKeys = randomList(
+            0,
+            3,
+            () -> new ApiKey(
+                randomAlphaOfLength(4),
+                randomAlphaOfLength(4),
+                randomFrom(ApiKey.Type.values()),
+                Instant.now(),
+                Instant.now(),
+                randomBoolean(),
+                null,
+                randomAlphaOfLength(4),
+                randomAlphaOfLength(4),
+                null,
+                null,
+                null,
+                null
+            )
+        );
+        List<String> profileUids = randomList(0, 5, () -> randomFrom(randomAlphaOfLength(4), null));
+        if (apiKeys.size() != profileUids.size()) {
+            IllegalStateException ise = expectThrows(IllegalStateException.class, () -> new GetApiKeyResponse(apiKeys, profileUids));
+            assertThat(ise.getMessage(), containsString("Each api key info must be associated to a (nullable) owner profile uid"));
+        }
+    }
+
     private ApiKey createApiKeyInfo(
         String name,
         String id,

+ 3 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyRequestTests.java

@@ -23,6 +23,7 @@ public class QueryApiKeyRequestTests extends ESTestCase {
             randomIntBetween(0, Integer.MAX_VALUE),
             null,
             null,
+            randomBoolean(),
             randomBoolean()
         );
         assertThat(request1.validate(), nullValue());
@@ -34,6 +35,7 @@ public class QueryApiKeyRequestTests extends ESTestCase {
             randomIntBetween(0, Integer.MAX_VALUE),
             null,
             null,
+            randomBoolean(),
             randomBoolean()
         );
         assertThat(request2.validate().getMessage(), containsString("[from] parameter cannot be negative"));
@@ -45,6 +47,7 @@ public class QueryApiKeyRequestTests extends ESTestCase {
             randomIntBetween(Integer.MIN_VALUE, -1),
             null,
             null,
+            randomBoolean(),
             randomBoolean()
         );
         assertThat(request3.validate().getMessage(), containsString("[size] parameter cannot be negative"));

+ 87 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/apikey/QueryApiKeyResponseTests.java

@@ -0,0 +1,87 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.security.action.apikey;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.Matchers.containsString;
+
+public class QueryApiKeyResponseTests extends ESTestCase {
+
+    public void testMismatchApiKeyInfoAndProfileData() {
+        List<ApiKey> apiKeys = randomList(
+            0,
+            3,
+            () -> new ApiKey(
+                randomAlphaOfLength(4),
+                randomAlphaOfLength(4),
+                randomFrom(ApiKey.Type.values()),
+                Instant.now(),
+                Instant.now(),
+                randomBoolean(),
+                null,
+                randomAlphaOfLength(4),
+                randomAlphaOfLength(4),
+                null,
+                null,
+                null,
+                null
+            )
+        );
+        List<Object[]> sortValues = new ArrayList<>(apiKeys.size());
+        for (int i = 0; i < apiKeys.size(); i++) {
+            sortValues.add(new String[] { "dummy sort value" });
+        }
+        List<String> profileUids = randomList(0, 5, () -> randomFrom(randomAlphaOfLength(4), null));
+        if (apiKeys.size() != profileUids.size()) {
+            IllegalStateException iae = expectThrows(
+                IllegalStateException.class,
+                () -> new QueryApiKeyResponse(100, apiKeys, sortValues, profileUids, null)
+            );
+            assertThat(iae.getMessage(), containsString("Each api key info must be associated to a (nullable) owner profile uid"));
+        }
+    }
+
+    public void testMismatchApiKeyInfoAndSortValues() {
+        List<ApiKey> apiKeys = randomList(
+            0,
+            3,
+            () -> new ApiKey(
+                randomAlphaOfLength(4),
+                randomAlphaOfLength(4),
+                randomFrom(ApiKey.Type.values()),
+                Instant.now(),
+                Instant.now(),
+                randomBoolean(),
+                null,
+                randomAlphaOfLength(4),
+                randomAlphaOfLength(4),
+                null,
+                null,
+                null,
+                null
+            )
+        );
+        List<String> profileUids = new ArrayList<>(apiKeys.size());
+        for (int i = 0; i < apiKeys.size(); i++) {
+            profileUids.add(randomFrom(randomAlphaOfLength(8), null));
+        }
+        List<Object[]> sortValues = randomList(0, 6, () -> new String[] { "dummy sort value" });
+        if (apiKeys.size() != sortValues.size()) {
+            IllegalStateException iae = expectThrows(
+                IllegalStateException.class,
+                () -> new QueryApiKeyResponse(100, apiKeys, sortValues, profileUids, null)
+            );
+            assertThat(iae.getMessage(), containsString("Each api key info must be associated to a (nullable) sort value"));
+        }
+    }
+}

+ 11 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/ManageOwnApiKeyClusterPrivilegeTests.java

@@ -259,8 +259,16 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
     public void testCheckQueryApiKeyRequest() {
         final ClusterPermission clusterPermission = ManageOwnApiKeyClusterPrivilege.INSTANCE.buildPermission(ClusterPermission.builder())
             .build();
-
-        final QueryApiKeyRequest queryApiKeyRequest = new QueryApiKeyRequest(null, null, null, null, null, null, randomBoolean());
+        QueryApiKeyRequest queryApiKeyRequest = new QueryApiKeyRequest(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            randomBoolean(),
+            randomBoolean()
+        );
         if (randomBoolean()) {
             queryApiKeyRequest.setFilterForCurrentUser();
         }
@@ -279,7 +287,7 @@ public class ManageOwnApiKeyClusterPrivilegeTests extends ESTestCase {
             .build();
 
         final boolean withLimitedBy = randomBoolean();
-        final QueryApiKeyRequest queryApiKeyRequest = new QueryApiKeyRequest(null, null, null, null, null, null, withLimitedBy);
+        QueryApiKeyRequest queryApiKeyRequest = new QueryApiKeyRequest(null, null, null, null, null, null, withLimitedBy, randomBoolean());
         queryApiKeyRequest.setFilterForCurrentUser();
         assertThat(
             clusterPermission.check(QueryApiKeyAction.NAME, queryApiKeyRequest, AuthenticationTestHelper.builder().apiKey().build(false)),

+ 116 - 44
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java

@@ -1598,7 +1598,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
             getApiKeyDocument(apiKeyId)
         );
 
-        final ApiKey apiKeyInfo = getApiKeyInfo(client(), apiKeyId, true, randomBoolean());
+        ApiKey apiKeyInfo = getApiKeyInfo(client(), apiKeyId, true, randomBoolean());
         assertThat(
             apiKeyInfo.getLimitedBy().roleDescriptorsList().iterator().next(),
             equalTo(Set.of(ReservedRolesStore.SUPERUSER_ROLE_DESCRIPTOR))
@@ -2795,7 +2795,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
     private void doTestApiKeyHasExpectedAttributes(String apiKeyId, Map<ApiKeyAttribute, Object> attributes) throws IOException {
         final Map<String, Object> apiKeyDocMap = getApiKeyDocument(apiKeyId);
         final boolean useGetApiKey = randomBoolean();
-        final ApiKey apiKeyInfo = getApiKeyInfo(client(), apiKeyId, true, useGetApiKey);
+        ApiKey apiKeyInfo = getApiKeyInfo(client(), apiKeyId, true, useGetApiKey);
         // Update does not change API key type
         assertThat(apiKeyDocMap.get("type"), equalTo("rest"));
         assertThat(apiKeyInfo.getType(), equalTo(ApiKey.Type.REST));
@@ -2888,48 +2888,6 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         return client().execute(TransportGetAction.TYPE, new GetRequest(SECURITY_MAIN_ALIAS, apiKeyId)).actionGet().getSource();
     }
 
-    private ApiKey getApiKeyInfo(Client client, String apiKeyId, boolean withLimitedBy, boolean useGetApiKey) {
-        if (useGetApiKey) {
-            final PlainActionFuture<GetApiKeyResponse> future = new PlainActionFuture<>();
-            client.execute(
-                GetApiKeyAction.INSTANCE,
-                GetApiKeyRequest.builder().apiKeyId(apiKeyId).withLimitedBy(withLimitedBy).build(),
-                future
-            );
-            final GetApiKeyResponse getApiKeyResponse = future.actionGet();
-            assertThat(getApiKeyResponse.getApiKeyInfoList(), iterableWithSize(1));
-            return getApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo();
-        } else {
-            final PlainActionFuture<QueryApiKeyResponse> future = new PlainActionFuture<>();
-            client.execute(
-                QueryApiKeyAction.INSTANCE,
-                new QueryApiKeyRequest(QueryBuilders.idsQuery().addIds(apiKeyId), null, null, null, null, null, withLimitedBy),
-                future
-            );
-            final QueryApiKeyResponse queryApiKeyResponse = future.actionGet();
-            assertThat(queryApiKeyResponse.getApiKeyInfoList(), iterableWithSize(1));
-            return queryApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo();
-        }
-    }
-
-    private List<ApiKey> getAllApiKeyInfo(Client client, boolean withLimitedBy) {
-        if (randomBoolean()) {
-            final PlainActionFuture<GetApiKeyResponse> future = new PlainActionFuture<>();
-            client.execute(GetApiKeyAction.INSTANCE, GetApiKeyRequest.builder().withLimitedBy(withLimitedBy).build(), future);
-            final GetApiKeyResponse getApiKeyResponse = future.actionGet();
-            return getApiKeyResponse.getApiKeyInfoList().stream().map(GetApiKeyResponse.Item::apiKeyInfo).toList();
-        } else {
-            final PlainActionFuture<QueryApiKeyResponse> future = new PlainActionFuture<>();
-            client.execute(
-                QueryApiKeyAction.INSTANCE,
-                new QueryApiKeyRequest(QueryBuilders.matchAllQuery(), null, null, 1000, null, null, withLimitedBy),
-                future
-            );
-            final QueryApiKeyResponse queryApiKeyResponse = future.actionGet();
-            return queryApiKeyResponse.getApiKeyInfoList().stream().map(QueryApiKeyResponse.Item::apiKeyInfo).toList();
-        }
-    }
-
     private ServiceWithNodeName getServiceWithNodeName() {
         final var nodeName = randomFrom(internalCluster().getNodeNames());
         final var service = internalCluster().getInstance(ApiKeyService.class, nodeName);
@@ -3361,6 +3319,120 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         assertThat(ese, throwableWithMessage(containsString("manage_api_key,manage_security,all]")));
     }
 
+    public static ApiKey getApiKeyInfo(Client client, String apiKeyId, boolean withLimitedBy, boolean useGetApiKey) {
+        if (useGetApiKey) {
+            final PlainActionFuture<GetApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                GetApiKeyAction.INSTANCE,
+                GetApiKeyRequest.builder().apiKeyId(apiKeyId).withLimitedBy(withLimitedBy).withProfileUid(randomBoolean()).build(),
+                future
+            );
+            final GetApiKeyResponse getApiKeyResponse = future.actionGet();
+            assertThat(getApiKeyResponse.getApiKeyInfoList(), iterableWithSize(1));
+            return getApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo();
+        } else {
+            final PlainActionFuture<QueryApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                QueryApiKeyAction.INSTANCE,
+                new QueryApiKeyRequest(
+                    QueryBuilders.idsQuery().addIds(apiKeyId),
+                    null,
+                    null,
+                    null,
+                    null,
+                    null,
+                    withLimitedBy,
+                    randomBoolean()
+                ),
+                future
+            );
+            final QueryApiKeyResponse queryApiKeyResponse = future.actionGet();
+            assertThat(queryApiKeyResponse.getApiKeyInfoList(), iterableWithSize(1));
+            return queryApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo();
+        }
+    }
+
+    public static Tuple<ApiKey, String> getApiKeyInfoWithProfileUid(Client client, String apiKeyId, boolean ownKey) {
+        if (randomBoolean()) {
+            PlainActionFuture<GetApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                GetApiKeyAction.INSTANCE,
+                GetApiKeyRequest.builder()
+                    .apiKeyId(apiKeyId)
+                    .withLimitedBy(randomBoolean())
+                    .withProfileUid(true)
+                    .ownedByAuthenticatedUser(ownKey)
+                    .build(),
+                future
+            );
+            GetApiKeyResponse getApiKeyResponse = future.actionGet();
+            assertThat(getApiKeyResponse.getApiKeyInfoList(), iterableWithSize(1));
+            GetApiKeyResponse.Item item = getApiKeyResponse.getApiKeyInfoList().get(0);
+            return new Tuple<>(item.apiKeyInfo(), item.ownerProfileUid());
+        } else {
+            PlainActionFuture<QueryApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                QueryApiKeyAction.INSTANCE,
+                new QueryApiKeyRequest(QueryBuilders.idsQuery().addIds(apiKeyId), null, null, null, null, null, randomBoolean(), true),
+                future
+            );
+            QueryApiKeyResponse queryApiKeyResponse = future.actionGet();
+            assertThat(queryApiKeyResponse.getApiKeyInfoList(), iterableWithSize(1));
+            QueryApiKeyResponse.Item item = queryApiKeyResponse.getApiKeyInfoList().get(0);
+            return new Tuple<>(item.apiKeyInfo(), item.ownerProfileUid());
+        }
+    }
+
+    public static List<ApiKey> getAllApiKeyInfo(Client client, boolean withLimitedBy) {
+        if (randomBoolean()) {
+            PlainActionFuture<GetApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                GetApiKeyAction.INSTANCE,
+                GetApiKeyRequest.builder().withLimitedBy(withLimitedBy).withProfileUid(randomBoolean()).build(),
+                future
+            );
+            GetApiKeyResponse getApiKeyResponse = future.actionGet();
+            return getApiKeyResponse.getApiKeyInfoList().stream().map(GetApiKeyResponse.Item::apiKeyInfo).toList();
+        } else {
+            PlainActionFuture<QueryApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                QueryApiKeyAction.INSTANCE,
+                new QueryApiKeyRequest(QueryBuilders.matchAllQuery(), null, null, 1000, null, null, withLimitedBy, randomBoolean()),
+                future
+            );
+            QueryApiKeyResponse queryApiKeyResponse = future.actionGet();
+            return queryApiKeyResponse.getApiKeyInfoList().stream().map(QueryApiKeyResponse.Item::apiKeyInfo).toList();
+        }
+    }
+
+    public static List<Tuple<ApiKey, String>> getAllApiKeyInfoWithProfileUid(Client client) {
+        if (randomBoolean()) {
+            PlainActionFuture<GetApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                GetApiKeyAction.INSTANCE,
+                GetApiKeyRequest.builder().withLimitedBy(randomBoolean()).withProfileUid(true).build(),
+                future
+            );
+            GetApiKeyResponse getApiKeyResponse = future.actionGet();
+            return getApiKeyResponse.getApiKeyInfoList()
+                .stream()
+                .map(item -> new Tuple<>(item.apiKeyInfo(), item.ownerProfileUid()))
+                .toList();
+        } else {
+            PlainActionFuture<QueryApiKeyResponse> future = new PlainActionFuture<>();
+            client.execute(
+                QueryApiKeyAction.INSTANCE,
+                new QueryApiKeyRequest(QueryBuilders.matchAllQuery(), null, null, 1000, null, null, randomBoolean(), true),
+                future
+            );
+            QueryApiKeyResponse queryApiKeyResponse = future.actionGet();
+            return queryApiKeyResponse.getApiKeyInfoList()
+                .stream()
+                .map(item -> new Tuple<>(item.apiKeyInfo(), item.ownerProfileUid()))
+                .toList();
+        }
+    }
+
     private static <T extends Throwable> T expectThrowsWithUnwrappedExecutionException(Class<T> expectedType, ThrowingRunnable runnable) {
         final var ex = expectThrowsAnyOf(List.of(expectedType, ExecutionException.class), runnable);
         if (ex instanceof ExecutionException) {

+ 2 - 0
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/apikey/ApiKeySingleNodeTests.java

@@ -631,6 +631,7 @@ public class ApiKeySingleNodeTests extends SecuritySingleNodeTestCase {
             null,
             null,
             null,
+            randomBoolean(),
             randomBoolean()
         );
         final QueryApiKeyResponse queryApiKeyResponse = client().execute(QueryApiKeyAction.INSTANCE, queryApiKeyRequest).actionGet();
@@ -737,6 +738,7 @@ public class ApiKeySingleNodeTests extends SecuritySingleNodeTestCase {
             null,
             null,
             null,
+            randomBoolean(),
             randomBoolean()
         );
         final QueryApiKeyResponse queryApiKeyResponse = client().execute(QueryApiKeyAction.INSTANCE, queryApiKeyRequest).actionGet();

+ 3 - 3
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/AbstractProfileIntegTestCase.java

@@ -94,12 +94,12 @@ public abstract class AbstractProfileIntegTestCase extends SecurityIntegTestCase
         return super.configUsersRoles() + RAC_ROLE + ":" + RAC_USER_NAME + "," + OTHER_RAC_USER_NAME + "\n";
     }
 
-    protected Profile doActivateProfile(String username, SecureString password) {
+    protected static Profile doActivateProfile(String username, SecureString password) {
         // User and its access token should be associated to the same profile
         return doActivateProfile(username, password, randomBoolean());
     }
 
-    protected Profile doActivateProfile(String username, SecureString password, boolean useToken) {
+    protected static Profile doActivateProfile(String username, SecureString password, boolean useToken) {
         final ActivateProfileRequest activateProfileRequest = new ActivateProfileRequest();
         if (useToken) {
             final CreateTokenRequest createTokenRequest = new CreateTokenRequest("password", username, password.clone(), null, null, null);
@@ -122,7 +122,7 @@ public abstract class AbstractProfileIntegTestCase extends SecurityIntegTestCase
         return profile;
     }
 
-    protected Profile getProfile(String uid, Set<String> dataKeys) {
+    protected static Profile getProfile(String uid, Set<String> dataKeys) {
         final GetProfilesResponse getProfilesResponse = client().execute(GetProfilesAction.INSTANCE, new GetProfilesRequest(uid, dataKeys))
             .actionGet();
         assertThat(getProfilesResponse.getProfiles(), hasSize(1));

+ 378 - 0
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ApiKeyOwnerProfileIntegTests.java

@@ -0,0 +1,378 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.profile;
+
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.common.settings.SecureString;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.test.SecurityIntegTestCase;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
+import org.elasticsearch.xpack.core.security.action.apikey.CreateApiKeyAction;
+import org.elasticsearch.xpack.core.security.action.apikey.CreateApiKeyRequest;
+import org.elasticsearch.xpack.core.security.action.profile.Profile;
+import org.elasticsearch.xpack.core.security.action.user.PutUserAction;
+import org.elasticsearch.xpack.core.security.action.user.PutUserRequest;
+import org.elasticsearch.xpack.security.authc.ApiKeyIntegTests;
+import org.junit.Before;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
+import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.WAIT_UNTIL;
+import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.iterableWithSize;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+
+@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
+public class ApiKeyOwnerProfileIntegTests extends SecurityIntegTestCase {
+
+    public static final SecureString FILE_USER_TEST_PASSWORD = new SecureString("file-user-test-password".toCharArray());
+    public static final SecureString NATIVE_USER_TEST_PASSWORD = new SecureString("native-user-test-password".toCharArray());
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        final Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
+        builder.put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true);
+        builder.put(XPackSettings.TOKEN_SERVICE_ENABLED_SETTING.getKey(), true);
+        return builder.build();
+    }
+
+    @Before
+    public void createNativeUsers() {
+        ensureGreen();
+        {
+            PutUserRequest putUserRequest = new PutUserRequest();
+            putUserRequest.username("user_with_manage_api_key_role");
+            putUserRequest.roles("manage_api_key_role");
+            putUserRequest.passwordHash(getFastStoredHashAlgoForTests().hash(NATIVE_USER_TEST_PASSWORD));
+            assertThat(client().execute(PutUserAction.INSTANCE, putUserRequest).actionGet().created(), is(true));
+        }
+        {
+            PutUserRequest putUserRequest = new PutUserRequest();
+            putUserRequest.username("user_with_manage_own_api_key_role");
+            putUserRequest.roles("manage_own_api_key_role");
+            putUserRequest.passwordHash(getFastStoredHashAlgoForTests().hash(NATIVE_USER_TEST_PASSWORD));
+            assertThat(client().execute(PutUserAction.INSTANCE, putUserRequest).actionGet().created(), is(true));
+        }
+    }
+
+    @Override
+    public String configRoles() {
+        return super.configRoles() + """
+            manage_api_key_role:
+              cluster: ["manage_api_key"]
+            manage_own_api_key_role:
+              cluster: ["manage_own_api_key"]
+            """;
+    }
+
+    @Override
+    public String configUsers() {
+        final String usersPasswdHashed = new String(getFastStoredHashAlgoForTests().hash(FILE_USER_TEST_PASSWORD));
+        return super.configUsers()
+            + "user_with_manage_api_key_role:"
+            + usersPasswdHashed
+            + "\n"
+            + "user_with_manage_own_api_key_role:"
+            + usersPasswdHashed
+            + "\n";
+    }
+
+    @Override
+    public String configUsersRoles() {
+        return super.configUsersRoles() + """
+            manage_api_key_role:user_with_manage_api_key_role
+            manage_own_api_key_role:user_with_manage_own_api_key_role
+            """;
+    }
+
+    public void testApiKeyOwnerProfileWithoutDomain() {
+        boolean ownKey = randomBoolean();
+        final String username;
+        if (ownKey) {
+            username = "user_with_manage_own_api_key_role";
+        } else {
+            username = "user_with_manage_api_key_role";
+        }
+        SecureString password = randomFrom(FILE_USER_TEST_PASSWORD, NATIVE_USER_TEST_PASSWORD);
+        Client client = client().filterWithHeader(Map.of("Authorization", basicAuthHeaderValue(username, password)));
+        CreateApiKeyRequest request = new CreateApiKeyRequest("key1", null, null, null);
+        request.setRefreshPolicy(randomFrom(IMMEDIATE, WAIT_UNTIL));
+        // activate profile, then create API key, or vice-versa
+        boolean firstActivateProfile = randomBoolean();
+        Profile userWithManageOwnProfile = null;
+        if (firstActivateProfile) {
+            userWithManageOwnProfile = AbstractProfileIntegTestCase.doActivateProfile(username, password);
+        }
+        String keyId = client.execute(CreateApiKeyAction.INSTANCE, request).actionGet().getId();
+        if (false == firstActivateProfile) {
+            userWithManageOwnProfile = AbstractProfileIntegTestCase.doActivateProfile(username, password);
+        }
+        // assert key owner profile uid
+        Tuple<ApiKey, String> apiKeyWithProfileUid = ApiKeyIntegTests.getApiKeyInfoWithProfileUid(client, keyId, ownKey || randomBoolean());
+        assertThat(apiKeyWithProfileUid.v1().getId(), is(keyId));
+        assertThat(apiKeyWithProfileUid.v2(), is(userWithManageOwnProfile.uid()));
+        // manage_api_key user can similarly observe the API key with the profile uid
+        assertAllKeysWithProfiles(new String[] { keyId }, new String[] { userWithManageOwnProfile.uid() });
+    }
+
+    public void testApiKeyOwnerJoinsDomain() throws Exception {
+        // one user creates the API Key, the other activates the profile
+        String username = randomFrom("user_with_manage_own_api_key_role", "user_with_manage_api_key_role");
+        SecureString password1;
+        SecureString password2;
+        if (randomBoolean()) {
+            password1 = FILE_USER_TEST_PASSWORD;
+            password2 = NATIVE_USER_TEST_PASSWORD;
+        } else {
+            password1 = NATIVE_USER_TEST_PASSWORD;
+            password2 = FILE_USER_TEST_PASSWORD;
+        }
+        // activate profile, then create API key, or vice-versa
+        boolean firstActivateProfile = randomBoolean();
+        Profile user2Profile = null;
+        if (firstActivateProfile) {
+            user2Profile = AbstractProfileIntegTestCase.doActivateProfile(username, password2);
+        }
+        Client client1 = client().filterWithHeader(Map.of("Authorization", basicAuthHeaderValue(username, password1)));
+        CreateApiKeyRequest request = new CreateApiKeyRequest("key1", null, null, null);
+        request.setRefreshPolicy(randomFrom(IMMEDIATE, WAIT_UNTIL));
+        String keyId = client1.execute(CreateApiKeyAction.INSTANCE, request).actionGet().getId();
+        if (false == firstActivateProfile) {
+            user2Profile = AbstractProfileIntegTestCase.doActivateProfile(username, password2);
+        }
+        // assert key owner (username1) without profile uid
+        Tuple<ApiKey, String> apiKeyWithProfileUid = ApiKeyIntegTests.getApiKeyInfoWithProfileUid(
+            client1,
+            keyId,
+            username.equals("user_with_manage_own_api_key_role") || randomBoolean()
+        );
+        assertThat(apiKeyWithProfileUid.v1().getId(), is(keyId));
+        // no profile for API Key owner
+        assertThat(apiKeyWithProfileUid.v2(), nullValue());
+        // manage all api keys user can similarly see the key WITHOUT the profile uid
+        assertAllKeysWithProfiles(new String[] { keyId }, new String[] { null });
+        // restart cluster nodes to add the 2 users to the same domain
+        internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
+            @Override
+            public Settings onNodeStopped(String nodeName) {
+                // Register both file and native realms under the same domain
+                return Settings.builder().put("xpack.security.authc.domains.my_domain.realms", "file,index").build();
+            }
+        });
+        ensureGreen();
+        client1 = client().filterWithHeader(Map.of("Authorization", basicAuthHeaderValue(username, password1)));
+        apiKeyWithProfileUid = ApiKeyIntegTests.getApiKeyInfoWithProfileUid(
+            client1,
+            keyId,
+            username.equals("user_with_manage_own_api_key_role") || randomBoolean()
+        );
+        assertThat(apiKeyWithProfileUid.v1().getId(), is(keyId));
+        // API key owner (username1) now has a profile uid
+        assertThat(apiKeyWithProfileUid.v2(), is(user2Profile.uid()));
+        // manage all api keys user can similarly see the key with the profile uid
+        assertAllKeysWithProfiles(new String[] { keyId }, new String[] { user2Profile.uid() });
+    }
+
+    public void testApiKeyOwnerLeavesDomain() throws Exception {
+        // put the 2 realms under the same domain
+        internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
+            @Override
+            public Settings onNodeStopped(String nodeName) {
+                // Register both file and native realms under the same domain
+                return Settings.builder().put("xpack.security.authc.domains.file_and_index_domain.realms", "file,index").build();
+            }
+        });
+        ensureGreen();
+
+        // one user creates the API Key, the other activates the profile
+        String username = randomFrom("user_with_manage_own_api_key_role", "user_with_manage_api_key_role");
+        SecureString password1;
+        SecureString password2;
+        if (randomBoolean()) {
+            password1 = FILE_USER_TEST_PASSWORD;
+            password2 = NATIVE_USER_TEST_PASSWORD;
+        } else {
+            password1 = NATIVE_USER_TEST_PASSWORD;
+            password2 = FILE_USER_TEST_PASSWORD;
+        }
+        // activate profile, then create API key, or vice-versa
+        boolean firstActivateProfile = randomBoolean();
+        Profile user2Profile = null;
+        if (firstActivateProfile) {
+            user2Profile = AbstractProfileIntegTestCase.doActivateProfile(username, password2);
+        }
+        Client client1 = client().filterWithHeader(Map.of("Authorization", basicAuthHeaderValue(username, password1)));
+        CreateApiKeyRequest request = new CreateApiKeyRequest("key1", null, null, null);
+        request.setRefreshPolicy(randomFrom(IMMEDIATE, WAIT_UNTIL));
+        String keyId = client1.execute(CreateApiKeyAction.INSTANCE, request).actionGet().getId();
+        if (false == firstActivateProfile) {
+            user2Profile = AbstractProfileIntegTestCase.doActivateProfile(username, password2);
+        }
+        // assert the key owner (password1) has profile uid of password2
+        Tuple<ApiKey, String> apiKeyWithProfileUid = ApiKeyIntegTests.getApiKeyInfoWithProfileUid(
+            client1,
+            keyId,
+            username.equals("user_with_manage_own_api_key_role") || randomBoolean()
+        );
+        assertThat(apiKeyWithProfileUid.v1().getId(), is(keyId));
+        // API Key owner (password1) has profile of password2
+        assertThat(apiKeyWithProfileUid.v2(), is(user2Profile.uid()));
+        // manage all api keys user can similarly see the key with the profile uid of username2
+        assertAllKeysWithProfiles(new String[] { keyId }, new String[] { user2Profile.uid() });
+        // the realms are not under the same domain anymore
+        internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
+            @Override
+            public Settings onNodeStopped(String nodeName) {
+                // Register both file and native realms under the same domain
+                Settings.Builder settingsBuilder = Settings.builder();
+                settingsBuilder.put("xpack.security.authc.domains.file_and_index_domain.realms", (String) null);
+                if (randomBoolean()) {
+                    settingsBuilder.put("xpack.security.authc.domains.file_domain.realms", "file");
+                }
+                if (randomBoolean()) {
+                    settingsBuilder.put("xpack.security.authc.domains.index_domain.realms", "index");
+                }
+                return settingsBuilder.build();
+            }
+        });
+        ensureGreen();
+        // assert the key owner (username1) now has no profile
+        client1 = client().filterWithHeader(Map.of("Authorization", basicAuthHeaderValue(username, password1)));
+        apiKeyWithProfileUid = ApiKeyIntegTests.getApiKeyInfoWithProfileUid(
+            client1,
+            keyId,
+            username.equals("user_with_manage_own_api_key_role") || randomBoolean()
+        );
+        assertThat(apiKeyWithProfileUid.v1().getId(), is(keyId));
+        // no profile for API Key owner
+        assertThat(apiKeyWithProfileUid.v2(), nullValue());
+        // manage all api keys user can similarly see the key WITHOUT the profile uid
+        assertAllKeysWithProfiles(new String[] { keyId }, new String[] { null });
+        // but password1 can also activate its own profile now
+        Profile user1Profile = AbstractProfileIntegTestCase.doActivateProfile(username, password1);
+        assertThat(user1Profile.uid(), not(user2Profile.uid()));
+        // which is reflected in the API key owner profile uid information
+        apiKeyWithProfileUid = ApiKeyIntegTests.getApiKeyInfoWithProfileUid(
+            client1,
+            keyId,
+            username.equals("user_with_manage_own_api_key_role") || randomBoolean()
+        );
+        assertThat(apiKeyWithProfileUid.v1().getId(), is(keyId));
+        // no profile for API Key owner
+        assertThat(apiKeyWithProfileUid.v2(), is(user1Profile.uid()));
+        // manage all api keys user can similarly see the key with the profile uid of username1 now
+        assertAllKeysWithProfiles(new String[] { keyId }, new String[] { user1Profile.uid() });
+    }
+
+    public void testDifferentKeyOwnersSameProfile() throws Exception {
+        // put the 2 realms under the same domain
+        internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
+            @Override
+            public Settings onNodeStopped(String nodeName) {
+                // Register both file and native realms under the same domain
+                return Settings.builder().put("xpack.security.authc.domains.one_domain.realms", "file,index").build();
+            }
+        });
+        ensureGreen();
+        String username = randomFrom("user_with_manage_own_api_key_role", "user_with_manage_api_key_role");
+        SecureString password1;
+        SecureString password2;
+        boolean user1IsFile;
+        if (randomBoolean()) {
+            password1 = FILE_USER_TEST_PASSWORD;
+            user1IsFile = true;
+            password2 = NATIVE_USER_TEST_PASSWORD;
+        } else {
+            password1 = NATIVE_USER_TEST_PASSWORD;
+            user1IsFile = false;
+            password2 = FILE_USER_TEST_PASSWORD;
+        }
+        // activate the profile, then create the 2 keys, or vice-versa
+        Profile user1Profile = null;
+        Profile user2Profile = null;
+        boolean firstActivateProfile1 = randomBoolean();
+        if (firstActivateProfile1) {
+            user1Profile = AbstractProfileIntegTestCase.doActivateProfile(username, password1);
+        }
+        boolean firstActivateProfile2 = randomBoolean();
+        if (firstActivateProfile2) {
+            user2Profile = AbstractProfileIntegTestCase.doActivateProfile(username, password2);
+        }
+        Client client1 = client().filterWithHeader(Map.of("Authorization", basicAuthHeaderValue(username, password1)));
+        CreateApiKeyRequest request1 = new CreateApiKeyRequest("key1", null, null, null);
+        request1.setRefreshPolicy(randomFrom(IMMEDIATE, WAIT_UNTIL));
+        String key1Id = client1.execute(CreateApiKeyAction.INSTANCE, request1).actionGet().getId();
+        Client client2 = client().filterWithHeader(Map.of("Authorization", basicAuthHeaderValue(username, password2)));
+        CreateApiKeyRequest request2 = new CreateApiKeyRequest("key2", null, null, null);
+        request2.setRefreshPolicy(randomFrom(IMMEDIATE, WAIT_UNTIL));
+        String key2Id = client2.execute(CreateApiKeyAction.INSTANCE, request2).actionGet().getId();
+        if (false == firstActivateProfile1) {
+            user1Profile = AbstractProfileIntegTestCase.doActivateProfile(username, password1);
+        }
+        if (false == firstActivateProfile2) {
+            user2Profile = AbstractProfileIntegTestCase.doActivateProfile(username, password2);
+        }
+        // there should only be a single profile, because both users are under the same domain
+        assertThat(user1Profile.uid(), is(user2Profile.uid()));
+        String profileUid = user1Profile.uid();
+        // the 2 API keys should also show the one profile uid for the 2 owners
+        Tuple<ApiKey, String> apiKeyWithProfileUid = ApiKeyIntegTests.getApiKeyInfoWithProfileUid(
+            client1,
+            key1Id,
+            username.equals("user_with_manage_own_api_key_role") || randomBoolean()
+        );
+        assertThat(apiKeyWithProfileUid.v1().getId(), is(key1Id));
+        if (user1IsFile) {
+            assertThat(apiKeyWithProfileUid.v1().getRealm(), is("file"));
+        } else {
+            assertThat(apiKeyWithProfileUid.v1().getRealm(), is("index"));
+        }
+        assertThat(apiKeyWithProfileUid.v2(), is(profileUid));
+        apiKeyWithProfileUid = ApiKeyIntegTests.getApiKeyInfoWithProfileUid(
+            client2,
+            key2Id,
+            username.equals("user_with_manage_own_api_key_role") || randomBoolean()
+        );
+        assertThat(apiKeyWithProfileUid.v1().getId(), is(key2Id));
+        if (user1IsFile) {
+            assertThat(apiKeyWithProfileUid.v1().getRealm(), is("index"));
+        } else {
+            assertThat(apiKeyWithProfileUid.v1().getRealm(), is("file"));
+        }
+        assertThat(apiKeyWithProfileUid.v2(), is(profileUid));
+        // manage all api keys user can similarly see the 2 keys with the same profile uid
+        assertAllKeysWithProfiles(new String[] { key1Id, key2Id }, new String[] { profileUid, profileUid });
+    }
+
+    private void assertAllKeysWithProfiles(String[] keyIds, String[] profileUids) {
+        assert keyIds.length == profileUids.length;
+        Client client = client().filterWithHeader(
+            Map.of(
+                "Authorization",
+                basicAuthHeaderValue("user_with_manage_api_key_role", randomFrom(FILE_USER_TEST_PASSWORD, NATIVE_USER_TEST_PASSWORD))
+            )
+        );
+        List<Tuple<String, String>> allApiKeyIdsWithProfileUid = ApiKeyIntegTests.getAllApiKeyInfoWithProfileUid(client)
+            .stream()
+            .map(t -> new Tuple<>(t.v1().getId(), t.v2()))
+            .toList();
+        assertThat(allApiKeyIdsWithProfileUid, iterableWithSize(keyIds.length));
+        for (int i = 0; i < keyIds.length; i++) {
+            assertThat(allApiKeyIdsWithProfileUid, hasItem(new Tuple<>(keyIds[i], profileUids[i])));
+        }
+    }
+}

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

@@ -930,7 +930,7 @@ public class Security extends Plugin
             systemIndices.getProfileIndexManager(),
             clusterService,
             featureService,
-            realms::getDomainConfig
+            realms
         );
         components.add(profileService);
 

+ 26 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/apikey/TransportGetApiKeyAction.java

@@ -20,22 +20,26 @@ import org.elasticsearch.xpack.core.security.action.apikey.GetApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.apikey.GetApiKeyResponse;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.security.authc.ApiKeyService;
+import org.elasticsearch.xpack.security.profile.ProfileService;
 
 public final class TransportGetApiKeyAction extends TransportAction<GetApiKeyRequest, GetApiKeyResponse> {
 
     private final ApiKeyService apiKeyService;
     private final SecurityContext securityContext;
+    private final ProfileService profileService;
 
     @Inject
     public TransportGetApiKeyAction(
         TransportService transportService,
         ActionFilters actionFilters,
         ApiKeyService apiKeyService,
-        SecurityContext context
+        SecurityContext context,
+        ProfileService profileService
     ) {
         super(GetApiKeyAction.NAME, actionFilters, transportService.getTaskManager());
         this.apiKeyService = apiKeyService;
         this.securityContext = context;
+        this.profileService = profileService;
     }
 
     @Override
@@ -57,7 +61,27 @@ public final class TransportGetApiKeyAction extends TransportAction<GetApiKeyReq
             realms = ApiKeyService.getOwnersRealmNames(authentication);
         }
 
-        apiKeyService.getApiKeys(realms, username, apiKeyName, apiKeyIds, request.withLimitedBy(), request.activeOnly(), listener);
+        apiKeyService.getApiKeys(
+            realms,
+            username,
+            apiKeyName,
+            apiKeyIds,
+            request.withLimitedBy(),
+            request.activeOnly(),
+            ActionListener.wrap(apiKeyInfos -> {
+                if (request.withProfileUid()) {
+                    profileService.resolveProfileUidsForApiKeys(
+                        apiKeyInfos,
+                        ActionListener.wrap(
+                            ownerProfileUids -> listener.onResponse(new GetApiKeyResponse(apiKeyInfos, ownerProfileUids)),
+                            listener::onFailure
+                        )
+                    );
+                } else {
+                    listener.onResponse(new GetApiKeyResponse(apiKeyInfos, null));
+                }
+            }, listener::onFailure)
+        );
     }
 
 }

+ 34 - 3
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/apikey/TransportQueryApiKeyAction.java

@@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.security.action.apikey.QueryApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.apikey.QueryApiKeyResponse;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.security.authc.ApiKeyService;
+import org.elasticsearch.xpack.security.profile.ProfileService;
 import org.elasticsearch.xpack.security.support.ApiKeyAggregationsBuilder;
 import org.elasticsearch.xpack.security.support.ApiKeyBoolQueryBuilder;
 
@@ -47,17 +48,20 @@ public final class TransportQueryApiKeyAction extends TransportAction<QueryApiKe
 
     private final ApiKeyService apiKeyService;
     private final SecurityContext securityContext;
+    private final ProfileService profileService;
 
     @Inject
     public TransportQueryApiKeyAction(
         TransportService transportService,
         ActionFilters actionFilters,
         ApiKeyService apiKeyService,
-        SecurityContext context
+        SecurityContext context,
+        ProfileService profileService
     ) {
         super(QueryApiKeyAction.NAME, actionFilters, transportService.getTaskManager());
         this.apiKeyService = apiKeyService;
         this.securityContext = context;
+        this.profileService = profileService;
     }
 
     @Override
@@ -113,7 +117,34 @@ public final class TransportQueryApiKeyAction extends TransportAction<QueryApiKe
         }
 
         final SearchRequest searchRequest = new SearchRequest(new String[] { SECURITY_MAIN_ALIAS }, searchSourceBuilder);
-        apiKeyService.queryApiKeys(searchRequest, request.withLimitedBy(), listener);
+        apiKeyService.queryApiKeys(searchRequest, request.withLimitedBy(), ActionListener.wrap(queryApiKeysResult -> {
+            if (request.withProfileUid()) {
+                profileService.resolveProfileUidsForApiKeys(
+                    queryApiKeysResult.apiKeyInfos(),
+                    ActionListener.wrap(
+                        ownerProfileUids -> listener.onResponse(
+                            new QueryApiKeyResponse(
+                                queryApiKeysResult.total(),
+                                queryApiKeysResult.apiKeyInfos(),
+                                queryApiKeysResult.sortValues(),
+                                ownerProfileUids,
+                                queryApiKeysResult.aggregations()
+                            )
+                        ),
+                        listener::onFailure
+                    )
+                );
+            } else {
+                listener.onResponse(
+                    new QueryApiKeyResponse(
+                        queryApiKeysResult.total(),
+                        queryApiKeysResult.apiKeyInfos(),
+                        queryApiKeysResult.sortValues(),
+                        null,
+                        queryApiKeysResult.aggregations()
+                    )
+                );
+            }
+        }, listener::onFailure));
     }
-
 }

+ 23 - 19
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java

@@ -68,6 +68,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.DeprecationHandler;
 import org.elasticsearch.xcontent.InstantiatingObjectParser;
@@ -90,9 +91,7 @@ import org.elasticsearch.xpack.core.security.action.apikey.BaseBulkUpdateApiKeyR
 import org.elasticsearch.xpack.core.security.action.apikey.BaseUpdateApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.apikey.BulkUpdateApiKeyResponse;
 import org.elasticsearch.xpack.core.security.action.apikey.CreateApiKeyResponse;
-import org.elasticsearch.xpack.core.security.action.apikey.GetApiKeyResponse;
 import org.elasticsearch.xpack.core.security.action.apikey.InvalidateApiKeyResponse;
-import org.elasticsearch.xpack.core.security.action.apikey.QueryApiKeyResponse;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationResult;
@@ -1897,7 +1896,7 @@ public class ApiKeyService {
      * @param apiKeyName API key name
      * @param apiKeyIds API key ids
      * @param withLimitedBy whether to parse and return the limited by role descriptors
-     * @param listener listener for {@link GetApiKeyResponse}
+     * @param listener receives the requested collection of {@link ApiKey}s
      */
     public void getApiKeys(
         String[] realmNames,
@@ -1906,7 +1905,7 @@ public class ApiKeyService {
         String[] apiKeyIds,
         boolean withLimitedBy,
         boolean activeOnly,
-        ActionListener<GetApiKeyResponse> listener
+        ActionListener<Collection<ApiKey>> listener
     ) {
         ensureEnabled();
         findApiKeysForUserRealmApiKeyIdAndNameCombination(
@@ -1918,7 +1917,7 @@ public class ApiKeyService {
             activeOnly,
             hit -> convertSearchHitToApiKeyInfo(hit, withLimitedBy),
             ActionListener.wrap(apiKeyInfos -> {
-                if (apiKeyInfos.isEmpty()) {
+                if (apiKeyInfos.isEmpty() && logger.isDebugEnabled()) {
                     logger.debug(
                         "No API keys found for realms {}, user [{}], API key name [{}], API key IDs {}, and active_only flag [{}]",
                         Arrays.toString(realmNames),
@@ -1927,20 +1926,27 @@ public class ApiKeyService {
                         Arrays.toString(apiKeyIds),
                         activeOnly
                     );
-                    listener.onResponse(GetApiKeyResponse.EMPTY);
-                } else {
-                    listener.onResponse(new GetApiKeyResponse(apiKeyInfos));
                 }
+                listener.onResponse(apiKeyInfos);
             }, listener::onFailure)
         );
     }
 
-    public void queryApiKeys(SearchRequest searchRequest, boolean withLimitedBy, ActionListener<QueryApiKeyResponse> listener) {
+    public record QueryApiKeysResult(
+        long total,
+        Collection<ApiKey> apiKeyInfos,
+        Collection<Object[]> sortValues,
+        @Nullable InternalAggregations aggregations
+    ) {
+        static final QueryApiKeysResult EMPTY = new QueryApiKeysResult(0, List.of(), List.of(), null);
+    }
+
+    public void queryApiKeys(SearchRequest searchRequest, boolean withLimitedBy, ActionListener<QueryApiKeysResult> listener) {
         ensureEnabled();
         final SecurityIndexManager frozenSecurityIndex = securityIndex.defensiveCopy();
         if (frozenSecurityIndex.indexExists() == false) {
             logger.debug("security index does not exist");
-            listener.onResponse(QueryApiKeyResponse.EMPTY);
+            listener.onResponse(QueryApiKeysResult.EMPTY);
         } else if (frozenSecurityIndex.isAvailable(SEARCH_SHARDS) == false) {
             listener.onFailure(frozenSecurityIndex.getUnavailableReason(SEARCH_SHARDS));
         } else {
@@ -1952,26 +1958,24 @@ public class ApiKeyService {
                     TransportSearchAction.TYPE,
                     searchRequest,
                     ActionListener.wrap(searchResponse -> {
-                        final long total = searchResponse.getHits().getTotalHits().value;
+                        long total = searchResponse.getHits().getTotalHits().value;
                         if (total == 0) {
                             logger.debug("No api keys found for query [{}]", searchRequest.source().query());
-                            listener.onResponse(QueryApiKeyResponse.EMPTY);
+                            listener.onResponse(QueryApiKeysResult.EMPTY);
                             return;
                         }
-                        final List<QueryApiKeyResponse.Item> apiKeyItem = Arrays.stream(searchResponse.getHits().getHits())
-                            .map(hit -> convertSearchHitToQueryItem(hit, withLimitedBy))
+                        SearchHit[] hits = searchResponse.getHits().getHits();
+                        List<ApiKey> apiKeyInfos = Arrays.stream(hits)
+                            .map(hit -> convertSearchHitToApiKeyInfo(hit, withLimitedBy))
                             .toList();
-                        listener.onResponse(new QueryApiKeyResponse(total, apiKeyItem, searchResponse.getAggregations()));
+                        List<Object[]> sortValues = Arrays.stream(hits).map(SearchHit::getSortValues).toList();
+                        listener.onResponse(new QueryApiKeysResult(total, apiKeyInfos, sortValues, searchResponse.getAggregations()));
                     }, listener::onFailure)
                 )
             );
         }
     }
 
-    private QueryApiKeyResponse.Item convertSearchHitToQueryItem(SearchHit hit, boolean withLimitedBy) {
-        return new QueryApiKeyResponse.Item(convertSearchHitToApiKeyInfo(hit, withLimitedBy), hit.getSortValues());
-    }
-
     private ApiKey convertSearchHitToApiKeyInfo(SearchHit hit) {
         return convertSearchHitToApiKeyInfo(hit, false);
     }

+ 66 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/profile/ProfileService.java

@@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
@@ -50,6 +51,7 @@ import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.MultiMatchQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.sort.SortOrder;
@@ -61,16 +63,19 @@ import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xpack.core.common.ResultsAndErrors;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.action.profile.Profile;
 import org.elasticsearch.xpack.core.security.action.profile.SuggestProfilesRequest;
 import org.elasticsearch.xpack.core.security.action.profile.SuggestProfilesResponse;
 import org.elasticsearch.xpack.core.security.action.profile.UpdateProfileDataRequest;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.DomainConfig;
+import org.elasticsearch.xpack.core.security.authc.RealmConfig;
 import org.elasticsearch.xpack.core.security.authc.RealmDomain;
 import org.elasticsearch.xpack.core.security.authc.Subject;
 import org.elasticsearch.xpack.core.security.user.InternalUser;
 import org.elasticsearch.xpack.core.security.user.User;
+import org.elasticsearch.xpack.security.authc.Realms;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
 
 import java.io.IOException;
@@ -85,6 +90,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
@@ -118,6 +124,7 @@ public class ProfileService {
     private final ClusterService clusterService;
     private final FeatureService featureService;
     private final Function<String, DomainConfig> domainConfigLookup;
+    private final Function<RealmConfig.RealmIdentifier, Authentication.RealmRef> realmRefLookup;
 
     public ProfileService(
         Settings settings,
@@ -126,7 +133,7 @@ public class ProfileService {
         SecurityIndexManager profileIndex,
         ClusterService clusterService,
         FeatureService featureService,
-        Function<String, DomainConfig> domainConfigLookup
+        Realms realms
     ) {
         this.settings = settings;
         this.clock = clock;
@@ -134,7 +141,8 @@ public class ProfileService {
         this.profileIndex = profileIndex;
         this.clusterService = clusterService;
         this.featureService = featureService;
-        this.domainConfigLookup = domainConfigLookup;
+        this.domainConfigLookup = realms::getDomainConfig;
+        this.realmRefLookup = realms::getRealmRef;
     }
 
     public void getProfiles(List<String> uids, Set<String> dataKeys, ActionListener<ResultsAndErrors<Profile>> listener) {
@@ -315,6 +323,34 @@ public class ProfileService {
         doUpdate(buildUpdateRequest(uid, builder, refreshPolicy), listener.map(updateResponse -> AcknowledgedResponse.TRUE));
     }
 
+    public void resolveProfileUidsForApiKeys(Collection<ApiKey> apiKeyInfos, ActionListener<Collection<String>> listener) {
+        List<Subject> subjects = apiKeyInfos.stream().map(this::getApiKeyCreatorSubject).filter(Objects::nonNull).distinct().toList();
+        searchProfilesForSubjects(subjects, ActionListener.wrap(resultsAndErrors -> {
+            if (resultsAndErrors == null) {
+                // profile index does not exist
+                listener.onResponse(null);
+            } else if (resultsAndErrors.errors().isEmpty()) {
+                assert subjects.size() == resultsAndErrors.results().size();
+                Map<Subject, String> profileUidLookup = resultsAndErrors.results()
+                    .stream()
+                    .filter(t -> Objects.nonNull(t.v2()))
+                    .map(t -> new Tuple<>(t.v1(), t.v2().uid()))
+                    .collect(Collectors.toUnmodifiableMap(Tuple::v1, Tuple::v2));
+                listener.onResponse(apiKeyInfos.stream().map(apiKeyInfo -> {
+                    Subject subject = getApiKeyCreatorSubject(apiKeyInfo);
+                    return subject == null ? null : profileUidLookup.get(subject);
+                }).toList());
+            } else {
+                final ElasticsearchStatusException exception = new ElasticsearchStatusException(
+                    "failed to retrieve profile for users. please retry without fetching profile uid (with_profile_uid=false)",
+                    RestStatus.INTERNAL_SERVER_ERROR
+                );
+                resultsAndErrors.errors().values().forEach(exception::addSuppressed);
+                listener.onFailure(exception);
+            }
+        }, listener::onFailure));
+    }
+
     public void searchProfilesForSubjects(List<Subject> subjects, ActionListener<SubjectSearchResultsAndErrors<Profile>> listener) {
         searchVersionedDocumentsForSubjects(subjects, ActionListener.wrap(resultsAndErrors -> {
             if (resultsAndErrors == null) {
@@ -854,6 +890,34 @@ public class ProfileService {
         }
     }
 
+    private Subject getApiKeyCreatorSubject(ApiKey apiKeyInfo) {
+        if (apiKeyInfo.getUsername() == null) {
+            logger.debug("encountered api key with id [{}] of the \"null\" username", apiKeyInfo.getId());
+            return null;
+        }
+        RealmConfig.RealmIdentifier realmIdentifier = apiKeyInfo.getRealmIdentifier();
+        if (realmIdentifier == null) {
+            logger.debug(
+                "encountered api key with id [{}] of the username [{}] that has a \"null\" realm type or realm name",
+                apiKeyInfo.getId(),
+                apiKeyInfo.getUsername()
+            );
+            return null;
+        }
+        Authentication.RealmRef realmRef = realmRefLookup.apply(realmIdentifier);
+        if (realmRef == null) {
+            logger.debug(
+                "encountered api key with id [{}] of the username [{}] from realm [{}], "
+                    + "where that realm is not currently configured on the local node",
+                apiKeyInfo.getId(),
+                apiKeyInfo.getUsername(),
+                realmIdentifier
+            );
+            return null;
+        }
+        return new Subject(new User(apiKeyInfo.getUsername(), Strings.EMPTY_ARRAY), realmRef);
+    }
+
     // package private for testing
     void updateProfileForActivate(Subject subject, VersionedDocument currentVersionedDocumentBySearch, ActionListener<Profile> listener)
         throws IOException {

+ 2 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestGetApiKeyAction.java

@@ -51,6 +51,7 @@ public final class RestGetApiKeyAction extends ApiKeyBaseRestHandler {
         final boolean myApiKeysOnly = request.paramAsBoolean("owner", false);
         final boolean withLimitedBy = request.paramAsBoolean("with_limited_by", false);
         final boolean activeOnly = request.paramAsBoolean("active_only", false);
+        final boolean withProfileUid = request.paramAsBoolean("with_profile_uid", false);
         final GetApiKeyRequest getApiKeyRequest = GetApiKeyRequest.builder()
             .realmName(realmName)
             .userName(userName)
@@ -59,6 +60,7 @@ public final class RestGetApiKeyAction extends ApiKeyBaseRestHandler {
             .ownedByAuthenticatedUser(myApiKeysOnly)
             .withLimitedBy(withLimitedBy)
             .activeOnly(activeOnly)
+            .withProfileUid(withProfileUid)
             .build();
         return channel -> client.execute(GetApiKeyAction.INSTANCE, getApiKeyRequest, new RestBuilderListener<>(channel) {
             @Override

+ 4 - 3
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestQueryApiKeyAction.java

@@ -102,7 +102,7 @@ public final class RestQueryApiKeyAction extends ApiKeyBaseRestHandler {
     @Override
     protected RestChannelConsumer innerPrepareRequest(final RestRequest request, final NodeClient client) throws IOException {
         final boolean withLimitedBy = request.paramAsBoolean("with_limited_by", false);
-
+        final boolean withProfileUid = request.paramAsBoolean("with_profile_uid", false);
         final QueryApiKeyRequest queryApiKeyRequest;
         if (request.hasContentOrSourceParam()) {
             final Payload payload = PARSER.parse(request.contentOrSourceParamParser(), null);
@@ -113,10 +113,11 @@ public final class RestQueryApiKeyAction extends ApiKeyBaseRestHandler {
                 payload.size,
                 payload.fieldSortBuilders,
                 payload.searchAfterBuilder,
-                withLimitedBy
+                withLimitedBy,
+                withProfileUid
             );
         } else {
-            queryApiKeyRequest = new QueryApiKeyRequest(null, null, null, null, null, null, withLimitedBy);
+            queryApiKeyRequest = new QueryApiKeyRequest(null, null, null, null, null, null, withLimitedBy, withProfileUid);
         }
         return channel -> client.execute(QueryApiKeyAction.INSTANCE, queryApiKeyRequest, new RestToXContentListener<>(channel));
     }

+ 25 - 29
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java

@@ -92,9 +92,7 @@ import org.elasticsearch.xpack.core.security.action.apikey.BulkUpdateApiKeyRespo
 import org.elasticsearch.xpack.core.security.action.apikey.CreateApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.apikey.CreateApiKeyResponse;
 import org.elasticsearch.xpack.core.security.action.apikey.CrossClusterApiKeyRoleDescriptorBuilder;
-import org.elasticsearch.xpack.core.security.action.apikey.GetApiKeyResponse;
 import org.elasticsearch.xpack.core.security.action.apikey.InvalidateApiKeyResponse;
-import org.elasticsearch.xpack.core.security.action.apikey.QueryApiKeyResponse;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.Authentication.RealmRef;
 import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
@@ -156,6 +154,7 @@ import java.util.stream.IntStream;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
+import static org.elasticsearch.test.LambdaMatchers.transformedMatch;
 import static org.elasticsearch.test.SecurityIntegTestCase.getFastStoredHashAlgoForTests;
 import static org.elasticsearch.test.TestMatchers.throwableWithMessage;
 import static org.elasticsearch.transport.RemoteClusterPortSettings.TRANSPORT_VERSION_ADVANCED_REMOTE_CLUSTER_SECURITY;
@@ -178,10 +177,10 @@ import static org.hamcrest.Matchers.emptyArray;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.iterableWithSize;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
@@ -287,7 +286,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         String username = randomFrom(randomAlphaOfLengthBetween(3, 8), null);
         String apiKeyName = randomFrom(randomAlphaOfLengthBetween(3, 8), null);
         String[] apiKeyIds = generateRandomStringArray(4, 4, true, true);
-        PlainActionFuture<GetApiKeyResponse> getApiKeyResponsePlainActionFuture = new PlainActionFuture<>();
+        PlainActionFuture<Collection<ApiKey>> getApiKeyResponsePlainActionFuture = new PlainActionFuture<>();
         final boolean activeOnly = randomBoolean();
         service.getApiKeys(realmNames, username, apiKeyName, apiKeyIds, randomBoolean(), activeOnly, getApiKeyResponsePlainActionFuture);
         final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("doc_type", "api_key"));
@@ -326,8 +325,7 @@ public class ApiKeyServiceTests extends ESTestCase {
         verify(searchRequestBuilder).setQuery(eq(boolQuery));
         verify(searchRequestBuilder).setFetchSource(eq(true));
         assertThat(searchRequest.get().source().query(), is(boolQuery));
-        GetApiKeyResponse getApiKeyResponse = getApiKeyResponsePlainActionFuture.get();
-        assertThat(getApiKeyResponse.getApiKeyInfoList(), emptyIterable());
+        assertThat(getApiKeyResponsePlainActionFuture.get(), emptyIterable());
     }
 
     @SuppressWarnings("unchecked")
@@ -397,7 +395,7 @@ public class ApiKeyServiceTests extends ESTestCase {
             return null;
         }).when(client).execute(eq(TransportSearchAction.TYPE), any(SearchRequest.class), anyActionListener());
         {
-            PlainActionFuture<GetApiKeyResponse> getApiKeyResponsePlainActionFuture = new PlainActionFuture<>();
+            PlainActionFuture<Collection<ApiKey>> getApiKeyResponsePlainActionFuture = new PlainActionFuture<>();
             service.getApiKeys(
                 generateRandomStringArray(4, 4, true, true),
                 randomFrom(randomAlphaOfLengthBetween(3, 8), null),
@@ -407,32 +405,30 @@ public class ApiKeyServiceTests extends ESTestCase {
                 randomBoolean(),
                 getApiKeyResponsePlainActionFuture
             );
-            GetApiKeyResponse getApiKeyResponse = getApiKeyResponsePlainActionFuture.get();
-            assertThat(getApiKeyResponse.getApiKeyInfoList(), iterableWithSize(2));
-            assertThat(getApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo().getRealm(), is(realm1));
-            assertThat(getApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo().getRealmType(), is(realm1Type));
-            assertThat(
-                getApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo().getRealmIdentifier(),
-                is(new RealmConfig.RealmIdentifier(realm1Type, realm1))
-            );
-            assertThat(getApiKeyResponse.getApiKeyInfoList().get(1).apiKeyInfo().getRealm(), is(realm2));
-            assertThat(getApiKeyResponse.getApiKeyInfoList().get(1).apiKeyInfo().getRealmType(), nullValue());
-            assertThat(getApiKeyResponse.getApiKeyInfoList().get(1).apiKeyInfo().getRealmIdentifier(), nullValue());
+            Collection<ApiKey> getApiKeyResponse = getApiKeyResponsePlainActionFuture.get();
+            assertThat(getApiKeyResponse.size(), is(2));
+            assertThat(getApiKeyResponse, hasItem(transformedMatch(ApiKey::getRealm, is(realm1))));
+            assertThat(getApiKeyResponse, hasItem(transformedMatch(ApiKey::getRealmType, is(realm1Type))));
+            assertThat(getApiKeyResponse, hasItem(transformedMatch(ApiKey::getRealmType, is(realm1Type))));
+            assertThat(getApiKeyResponse, hasItem(transformedMatch(ApiKey::getRealm, is(realm2))));
+            assertThat(getApiKeyResponse, hasItem(transformedMatch(ApiKey::getRealmType, nullValue())));
+            assertThat(getApiKeyResponse, hasItem(transformedMatch(ApiKey::getRealmType, nullValue())));
         }
         {
-            PlainActionFuture<QueryApiKeyResponse> queryApiKeyResponsePlainActionFuture = new PlainActionFuture<>();
-            service.queryApiKeys(new SearchRequest(".security"), false, queryApiKeyResponsePlainActionFuture);
-            QueryApiKeyResponse queryApiKeyResponse = queryApiKeyResponsePlainActionFuture.get();
-            assertThat(queryApiKeyResponse.getApiKeyInfoList(), iterableWithSize(2));
-            assertThat(queryApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo().getRealm(), is(realm1));
-            assertThat(queryApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo().getRealmType(), is(realm1Type));
+            PlainActionFuture<ApiKeyService.QueryApiKeysResult> queryApiKeysResultPlainActionFuture = new PlainActionFuture<>();
+            service.queryApiKeys(new SearchRequest(".security"), false, queryApiKeysResultPlainActionFuture);
+            ApiKeyService.QueryApiKeysResult queryApiKeysResult = queryApiKeysResultPlainActionFuture.get();
+            assertThat(queryApiKeysResult.apiKeyInfos().size(), is(2));
+            assertThat(queryApiKeysResult.sortValues().size(), is(2));
+            assertThat(queryApiKeysResult.apiKeyInfos(), hasItem(transformedMatch(ApiKey::getRealm, is(realm1))));
+            assertThat(queryApiKeysResult.apiKeyInfos(), hasItem(transformedMatch(ApiKey::getRealmType, is(realm1Type))));
             assertThat(
-                queryApiKeyResponse.getApiKeyInfoList().get(0).apiKeyInfo().getRealmIdentifier(),
-                is(new RealmConfig.RealmIdentifier(realm1Type, realm1))
+                queryApiKeysResult.apiKeyInfos(),
+                hasItem(transformedMatch(ApiKey::getRealmIdentifier, is(new RealmConfig.RealmIdentifier(realm1Type, realm1))))
             );
-            assertThat(queryApiKeyResponse.getApiKeyInfoList().get(1).apiKeyInfo().getRealm(), is(realm2));
-            assertThat(queryApiKeyResponse.getApiKeyInfoList().get(1).apiKeyInfo().getRealmType(), nullValue());
-            assertThat(queryApiKeyResponse.getApiKeyInfoList().get(1).apiKeyInfo().getRealmIdentifier(), nullValue());
+            assertThat(queryApiKeysResult.apiKeyInfos(), hasItem(transformedMatch(ApiKey::getRealm, is(realm2))));
+            assertThat(queryApiKeysResult.apiKeyInfos(), hasItem(transformedMatch(ApiKey::getRealmType, nullValue())));
+            assertThat(queryApiKeysResult.apiKeyInfos(), hasItem(transformedMatch(ApiKey::getRealmIdentifier, nullValue())));
         }
     }
 

+ 397 - 9
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/profile/ProfileServiceTests.java

@@ -27,6 +27,7 @@ import org.elasticsearch.action.search.MultiSearchResponse;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.search.TransportMultiSearchAction;
 import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.support.PlainActionFuture;
@@ -54,6 +55,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.MultiMatchQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -69,6 +71,7 @@ import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xpack.core.common.ResultsAndErrors;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.action.profile.Profile;
 import org.elasticsearch.xpack.core.security.action.profile.SuggestProfilesRequest;
 import org.elasticsearch.xpack.core.security.action.profile.SuggestProfilesRequestTests;
@@ -80,7 +83,9 @@ import org.elasticsearch.xpack.core.security.authc.DomainConfig;
 import org.elasticsearch.xpack.core.security.authc.RealmConfig;
 import org.elasticsearch.xpack.core.security.authc.RealmDomain;
 import org.elasticsearch.xpack.core.security.authc.Subject;
+import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 import org.elasticsearch.xpack.core.security.user.User;
+import org.elasticsearch.xpack.security.authc.Realms;
 import org.elasticsearch.xpack.security.profile.ProfileDocument.ProfileDocumentUser;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
 import org.elasticsearch.xpack.security.support.SecuritySystemIndices;
@@ -97,6 +102,7 @@ import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -117,13 +123,18 @@ import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SEC
 import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.arrayWithSize;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.iterableWithSize;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -175,6 +186,7 @@ public class ProfileServiceTests extends ESTestCase {
     private Client client;
     private SecurityIndexManager profileIndex;
     private ProfileService profileService;
+    Function<RealmConfig.RealmIdentifier, Authentication.RealmRef> realmRefLookup;
     private boolean useProfileOrigin;
 
     @Before
@@ -206,6 +218,10 @@ public class ProfileServiceTests extends ESTestCase {
         when(featureService.clusterHasFeature(any(), eq(SecuritySystemIndices.SECURITY_PROFILE_ORIGIN_FEATURE))).thenReturn(
             useProfileOrigin
         );
+        realmRefLookup = realmIdentifier -> null;
+        Realms realms = mock(Realms.class);
+        when(realms.getDomainConfig(anyString())).then(args -> new DomainConfig(args.getArgument(0), Set.of(), false, null));
+        when(realms.getRealmRef(any(RealmConfig.RealmIdentifier.class))).then(args -> realmRefLookup.apply(args.getArgument(0)));
         this.profileService = new ProfileService(
             Settings.EMPTY,
             Clock.systemUTC(),
@@ -213,7 +229,7 @@ public class ProfileServiceTests extends ESTestCase {
             profileIndex,
             clusterService,
             featureService,
-            name -> new DomainConfig(name, Set.of(), false, null)
+            realms
         );
     }
 
@@ -486,6 +502,8 @@ public class ProfileServiceTests extends ESTestCase {
 
     public void testLiteralUsernameWillThrowOnDuplicate() throws IOException {
         final Subject subject = new Subject(AuthenticationTestHelper.randomUser(), AuthenticationTestHelper.randomRealmRef(true));
+        Realms realms = mock(Realms.class);
+        when(realms.getDomainConfig(anyString())).then(args -> new DomainConfig(args.getArgument(0), Set.of(), true, "suffix"));
         final ProfileService service = new ProfileService(
             Settings.EMPTY,
             Clock.systemUTC(),
@@ -493,7 +511,7 @@ public class ProfileServiceTests extends ESTestCase {
             profileIndex,
             mock(ClusterService.class),
             mock(FeatureService.class),
-            domainName -> new DomainConfig(domainName, Set.of(), true, "suffix")
+            realms
         );
         final PlainActionFuture<Profile> future = new PlainActionFuture<>();
         service.maybeIncrementDifferentiatorAndCreateNewProfile(
@@ -648,6 +666,15 @@ public class ProfileServiceTests extends ESTestCase {
     }
 
     public void testActivateProfileWithDifferentUidFormats() throws IOException {
+        Realms realms = mock(Realms.class);
+        when(realms.getDomainConfig(anyString())).then(args -> {
+            String domainName = args.getArgument(0);
+            if (domainName.startsWith("hash")) {
+                return new DomainConfig(domainName, Set.of(), false, null);
+            } else {
+                return new DomainConfig(domainName, Set.of(), true, "suffix");
+            }
+        });
         final ProfileService service = spy(
             new ProfileService(
                 Settings.EMPTY,
@@ -656,13 +683,7 @@ public class ProfileServiceTests extends ESTestCase {
                 profileIndex,
                 mock(ClusterService.class),
                 mock(FeatureService.class),
-                domainName -> {
-                    if (domainName.startsWith("hash")) {
-                        return new DomainConfig(domainName, Set.of(), false, null);
-                    } else {
-                        return new DomainConfig(domainName, Set.of(), true, "suffix");
-                    }
-                }
+                realms
             )
         );
 
@@ -1061,6 +1082,318 @@ public class ProfileServiceTests extends ESTestCase {
         assertThat(future.actionGet(), equalTo(Map.of("total", 0L, "enabled", 0L, "recent", 0L)));
     }
 
+    @SuppressWarnings("unchecked")
+    public void testProfileSearchForApiKeyOwnerWithoutDomain() throws Exception {
+        String realmName = "realmName_" + randomAlphaOfLength(8);
+        String realmType = "realmType_" + randomAlphaOfLength(8);
+        String username = "username_" + randomAlphaOfLength(8);
+        List<ApiKey> apiKeys = List.of(createApiKeyForOwner("keyId_" + randomAlphaOfLength(8), username, realmName, realmType));
+        realmRefLookup = realmIdentifier -> {
+            assertThat(realmIdentifier.getName(), is(realmName));
+            assertThat(realmIdentifier.getType(), is(realmType));
+            return new Authentication.RealmRef(realmName, realmType, "nodeName_" + randomAlphaOfLength(8));
+        };
+        MultiSearchResponse.Item[] responseItems = new MultiSearchResponse.Item[1];
+        responseItems[0] = new MultiSearchResponse.Item(new TestEmptySearchResponse(), null);
+        MultiSearchResponse emptyMultiSearchResponse = new MultiSearchResponse(responseItems, randomNonNegativeLong());
+        try {
+            doAnswer(invocation -> {
+                assertThat(
+                    threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME),
+                    equalTo(useProfileOrigin ? SECURITY_PROFILE_ORIGIN : SECURITY_ORIGIN)
+                );
+                MultiSearchRequest multiSearchRequest = (MultiSearchRequest) invocation.getArguments()[1];
+                assertThat(multiSearchRequest.requests(), iterableWithSize(1));
+                assertThat(multiSearchRequest.requests().get(0).source().query(), instanceOf(BoolQueryBuilder.class));
+                assertThat(((BoolQueryBuilder) multiSearchRequest.requests().get(0).source().query()).filter(), iterableWithSize(3));
+                assertThat(
+                    ((BoolQueryBuilder) multiSearchRequest.requests().get(0).source().query()).filter(),
+                    containsInAnyOrder(
+                        new TermQueryBuilder("user_profile.user.username.keyword", username),
+                        new TermQueryBuilder("user_profile.user.realm.type", realmType),
+                        new TermQueryBuilder("user_profile.user.realm.name", realmName)
+                    )
+                );
+                var listener = (ActionListener<MultiSearchResponse>) invocation.getArgument(2);
+                listener.onResponse(emptyMultiSearchResponse);
+                return null;
+            }).when(client).execute(eq(TransportMultiSearchAction.TYPE), any(MultiSearchRequest.class), anyActionListener());
+            when(client.prepareMultiSearch()).thenReturn(new MultiSearchRequestBuilder(client));
+
+            PlainActionFuture<Collection<String>> listener = new PlainActionFuture<>();
+            profileService.resolveProfileUidsForApiKeys(apiKeys, listener);
+            Collection<String> profileUids = listener.get();
+            assertThat(profileUids, iterableWithSize(1));
+            assertThat(profileUids.iterator().next(), nullValue());
+        } finally {
+            emptyMultiSearchResponse.decRef();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testProfileSearchForApiKeyOwnerWithDomain() throws Exception {
+        String realmName = "realmName_" + randomAlphaOfLength(8);
+        String realmType = "realmType_" + randomAlphaOfLength(8);
+        String username = "username_" + randomAlphaOfLength(8);
+        int domainSize = randomIntBetween(1, 3);
+        Set<RealmConfig.RealmIdentifier> domain = new HashSet<>(domainSize + 1);
+        domain.add(new RealmConfig.RealmIdentifier(realmType, realmName));
+        for (int i = 0; i < domainSize; i++) {
+            domain.add(new RealmConfig.RealmIdentifier("realmTypeFromDomain_" + i, "realmNameFromDomain_" + i));
+        }
+        RealmDomain realmDomain = new RealmDomain("domainName_ " + randomAlphaOfLength(8), domain);
+        List<ApiKey> apiKeys = List.of(createApiKeyForOwner("keyId_" + randomAlphaOfLength(8), username, realmName, realmType));
+        realmRefLookup = realmIdentifier -> {
+            assertThat(realmIdentifier.getName(), is(realmName));
+            assertThat(realmIdentifier.getType(), is(realmType));
+            return new Authentication.RealmRef(realmName, realmType, "nodeName_" + randomAlphaOfLength(8), realmDomain);
+        };
+        MultiSearchResponse.Item[] responseItems = new MultiSearchResponse.Item[1];
+        responseItems[0] = new MultiSearchResponse.Item(new TestEmptySearchResponse(), null);
+        MultiSearchResponse emptyMultiSearchResponse = new MultiSearchResponse(responseItems, randomNonNegativeLong());
+        try {
+            doAnswer(invocation -> {
+                assertThat(
+                    threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME),
+                    equalTo(useProfileOrigin ? SECURITY_PROFILE_ORIGIN : SECURITY_ORIGIN)
+                );
+                MultiSearchRequest multiSearchRequest = (MultiSearchRequest) invocation.getArguments()[1];
+                assertThat(multiSearchRequest.requests(), iterableWithSize(1));
+                assertThat(multiSearchRequest.requests().get(0).source().query(), instanceOf(BoolQueryBuilder.class));
+                assertThat(((BoolQueryBuilder) multiSearchRequest.requests().get(0).source().query()).filter(), iterableWithSize(1));
+                assertThat(
+                    ((BoolQueryBuilder) multiSearchRequest.requests().get(0).source().query()).filter(),
+                    contains(new TermQueryBuilder("user_profile.user.username.keyword", username))
+                );
+                assertThat(
+                    ((BoolQueryBuilder) multiSearchRequest.requests().get(0).source().query()).should(),
+                    iterableWithSize(domain.size())
+                );
+                assertThat(((BoolQueryBuilder) multiSearchRequest.requests().get(0).source().query()).minimumShouldMatch(), is("1"));
+                for (RealmConfig.RealmIdentifier domainRealmIdentifier : domain) {
+                    BoolQueryBuilder realmDomainBoolQueryBuilder = new BoolQueryBuilder();
+                    realmDomainBoolQueryBuilder.filter()
+                        .add(new TermQueryBuilder("user_profile.user.realm.type", domainRealmIdentifier.getType()));
+                    realmDomainBoolQueryBuilder.filter()
+                        .add(new TermQueryBuilder("user_profile.user.realm.name", domainRealmIdentifier.getName()));
+                    assertThat(
+                        ((BoolQueryBuilder) multiSearchRequest.requests().get(0).source().query()).should(),
+                        hasItem(realmDomainBoolQueryBuilder)
+                    );
+                }
+                var listener = (ActionListener<MultiSearchResponse>) invocation.getArgument(2);
+                listener.onResponse(emptyMultiSearchResponse);
+                return null;
+            }).when(client).execute(eq(TransportMultiSearchAction.TYPE), any(MultiSearchRequest.class), anyActionListener());
+            when(client.prepareMultiSearch()).thenReturn(new MultiSearchRequestBuilder(client));
+
+            PlainActionFuture<Collection<String>> listener = new PlainActionFuture<>();
+            profileService.resolveProfileUidsForApiKeys(apiKeys, listener);
+            Collection<String> profileUids = listener.get();
+            assertThat(profileUids, iterableWithSize(1));
+            assertThat(profileUids.iterator().next(), nullValue());
+        } finally {
+            emptyMultiSearchResponse.decRef();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testProfileSearchForOwnerOfMultipleApiKeys() throws Exception {
+        String realmName = "realmName_" + randomAlphaOfLength(8);
+        String realmType = "realmType_" + randomAlphaOfLength(8);
+        String username = "username_" + randomAlphaOfLength(8);
+        int apiKeyCount = randomIntBetween(2, 6);
+        List<ApiKey> apiKeys = new ArrayList<>(apiKeyCount);
+        for (int i = 0; i < apiKeyCount; i++) {
+            // all keys have the same owner
+            apiKeys.add(createApiKeyForOwner("keyId_" + i, username, realmName, realmType));
+        }
+        realmRefLookup = realmIdentifier -> {
+            assertThat(realmIdentifier.getName(), is(realmName));
+            assertThat(realmIdentifier.getType(), is(realmType));
+            return new Authentication.RealmRef(realmName, realmType, "nodeName");
+        };
+        MultiSearchResponse.Item[] responseItems = new MultiSearchResponse.Item[1];
+        responseItems[0] = new MultiSearchResponse.Item(new TestEmptySearchResponse(), null);
+        MultiSearchResponse emptyMultiSearchResponse = new MultiSearchResponse(responseItems, randomNonNegativeLong());
+        try {
+            doAnswer(invocation -> {
+                assertThat(
+                    threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME),
+                    equalTo(useProfileOrigin ? SECURITY_PROFILE_ORIGIN : SECURITY_ORIGIN)
+                );
+                MultiSearchRequest multiSearchRequest = (MultiSearchRequest) invocation.getArguments()[1];
+                // a single search request for a single owner of multiple keys
+                assertThat(multiSearchRequest.requests(), iterableWithSize(1));
+                assertThat(multiSearchRequest.requests().get(0).source().query(), instanceOf(BoolQueryBuilder.class));
+                assertThat(((BoolQueryBuilder) multiSearchRequest.requests().get(0).source().query()).filter(), iterableWithSize(3));
+                assertThat(
+                    ((BoolQueryBuilder) multiSearchRequest.requests().get(0).source().query()).filter(),
+                    containsInAnyOrder(
+                        new TermQueryBuilder("user_profile.user.username.keyword", username),
+                        new TermQueryBuilder("user_profile.user.realm.type", realmType),
+                        new TermQueryBuilder("user_profile.user.realm.name", realmName)
+                    )
+                );
+                var listener = (ActionListener<MultiSearchResponse>) invocation.getArgument(2);
+                listener.onResponse(emptyMultiSearchResponse);
+                return null;
+            }).when(client).execute(eq(TransportMultiSearchAction.TYPE), any(MultiSearchRequest.class), anyActionListener());
+            when(client.prepareMultiSearch()).thenReturn(new MultiSearchRequestBuilder(client));
+
+            PlainActionFuture<Collection<String>> listener = new PlainActionFuture<>();
+            profileService.resolveProfileUidsForApiKeys(apiKeys, listener);
+            Collection<String> profileUids = listener.get();
+            assertThat(profileUids, iterableWithSize(apiKeyCount));
+            var profileUidsIterator = profileUids.iterator();
+            while (profileUidsIterator.hasNext()) {
+                assertThat(profileUidsIterator.next(), nullValue());
+            }
+        } finally {
+            emptyMultiSearchResponse.decRef();
+        }
+    }
+
+    public void testProfileSearchErrorForApiKeyOwner() {
+        // 2 keys with different owners
+        List<ApiKey> apiKeys = List.of(
+            createApiKeyForOwner("keyId_0", "username_0", "realmName_0", "realmType_0"),
+            createApiKeyForOwner("keyId_1", "username_1", "realmName_1", "realmType_1")
+        );
+        realmRefLookup = realmIdentifier -> {
+            assertThat(realmIdentifier.getName(), either(is("realmName_0")).or(is("realmName_1")));
+            assertThat(realmIdentifier.getType(), either(is("realmType_0")).or(is("realmType_1")));
+            return new Authentication.RealmRef(realmIdentifier.getName(), realmIdentifier.getType(), "nodeName");
+        };
+        MultiSearchResponse.Item[] responseItems = new MultiSearchResponse.Item[2];
+        // one search request (for one of the key owner) fails
+        if (randomBoolean()) {
+            responseItems[0] = new MultiSearchResponse.Item(new TestEmptySearchResponse(), null);
+            responseItems[1] = new MultiSearchResponse.Item(null, new Exception("test search failure"));
+        } else {
+            responseItems[0] = new MultiSearchResponse.Item(null, new Exception("test search failure"));
+            responseItems[1] = new MultiSearchResponse.Item(new TestEmptySearchResponse(), null);
+        }
+        MultiSearchResponse multiSearchResponseWithError = new MultiSearchResponse(responseItems, randomNonNegativeLong());
+        try {
+            doAnswer(invocation -> {
+                assertThat(
+                    threadPool.getThreadContext().getTransient(ACTION_ORIGIN_TRANSIENT_NAME),
+                    equalTo(useProfileOrigin ? SECURITY_PROFILE_ORIGIN : SECURITY_ORIGIN)
+                );
+                // a single search request for a single owner of multiple keys
+                MultiSearchRequest multiSearchRequest = (MultiSearchRequest) invocation.getArguments()[1];
+                // 2 search requests for the 2 Api key owners
+                assertThat(multiSearchRequest.requests(), iterableWithSize(2));
+                for (int i = 0; i < 2; i++) {
+                    assertThat(multiSearchRequest.requests().get(i).source().query(), instanceOf(BoolQueryBuilder.class));
+                    assertThat(((BoolQueryBuilder) multiSearchRequest.requests().get(i).source().query()).filter(), iterableWithSize(3));
+                    List<QueryBuilder> filters = ((BoolQueryBuilder) multiSearchRequest.requests().get(i).source().query()).filter();
+                    assertThat(
+                        filters,
+                        either(
+                            Matchers.<QueryBuilder>containsInAnyOrder(
+                                new TermQueryBuilder("user_profile.user.username.keyword", "username_1"),
+                                new TermQueryBuilder("user_profile.user.realm.type", "realmType_1"),
+                                new TermQueryBuilder("user_profile.user.realm.name", "realmName_1")
+                            )
+                        ).or(
+                            Matchers.<QueryBuilder>containsInAnyOrder(
+                                new TermQueryBuilder("user_profile.user.username.keyword", "username_0"),
+                                new TermQueryBuilder("user_profile.user.realm.type", "realmType_0"),
+                                new TermQueryBuilder("user_profile.user.realm.name", "realmName_0")
+                            )
+                        )
+                    );
+                }
+                @SuppressWarnings("unchecked")
+                var listener = (ActionListener<MultiSearchResponse>) invocation.getArgument(2);
+                listener.onResponse(multiSearchResponseWithError);
+                return null;
+            }).when(client).execute(eq(TransportMultiSearchAction.TYPE), any(MultiSearchRequest.class), anyActionListener());
+            when(client.prepareMultiSearch()).thenReturn(new MultiSearchRequestBuilder(client));
+
+            PlainActionFuture<Collection<String>> listener = new PlainActionFuture<>();
+            profileService.resolveProfileUidsForApiKeys(apiKeys, listener);
+            ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
+            assertThat(
+                e.getMessage(),
+                containsString("failed to retrieve profile for users. please retry without fetching profile uid (with_profile_uid=false)")
+            );
+        } finally {
+            multiSearchResponseWithError.decRef();
+        }
+    }
+
+    public void testUnclearApiKeyOwnersAreIgnoredWhenRetrievingProfiles() throws Exception {
+        String realmName = "realmName_" + randomAlphaOfLength(8);
+        String realmType = "realmType_" + randomAlphaOfLength(8);
+        String username = "username_" + randomAlphaOfLength(8);
+        List<ApiKey> apiKeys = List.of(
+            // null username
+            createApiKeyForOwner("keyId_" + randomAlphaOfLength(8), null, randomAlphaOfLength(4), randomAlphaOfLength(4)),
+            // null realm name
+            createApiKeyForOwner("keyId_" + randomAlphaOfLength(8), randomAlphaOfLength(4), null, randomAlphaOfLength(4)),
+            // null realm type
+            createApiKeyForOwner("keyId_" + randomAlphaOfLength(8), randomAlphaOfLength(4), randomAlphaOfLength(4), null),
+            // the realm does not exist
+            createApiKeyForOwner("keyId_" + randomAlphaOfLength(8), username, realmName, realmType)
+        );
+        realmRefLookup = realmIdentifier -> {
+            assertThat(realmIdentifier.getName(), is(realmName));
+            assertThat(realmIdentifier.getType(), is(realmType));
+            // realm not configured
+            return null;
+        };
+        doAnswer(invocation -> {
+            @SuppressWarnings("unchecked")
+            var listener = (ActionListener<MultiSearchResponse>) invocation.getArgument(2);
+            listener.onFailure(new Exception("test failed, code should not be reachable"));
+            return null;
+        }).when(client).execute(eq(TransportMultiSearchAction.TYPE), any(MultiSearchRequest.class), anyActionListener());
+        when(client.prepareMultiSearch()).thenReturn(new MultiSearchRequestBuilder(client));
+
+        PlainActionFuture<Collection<String>> listener = new PlainActionFuture<>();
+        profileService.resolveProfileUidsForApiKeys(apiKeys, listener);
+        Collection<String> profileUids = listener.get();
+        assertThat(profileUids, iterableWithSize(4));
+        assertThat(profileUids, contains(nullValue(), nullValue(), nullValue(), nullValue()));
+    }
+
+    public void testProfilesIndexMissingOrUnavailableWhenRetrievingProfilesOfApiKeyOwners() throws Exception {
+        // profiles index missing
+        when(this.profileIndex.indexExists()).thenReturn(false);
+        String realmName = "realmName_" + randomAlphaOfLength(8);
+        String realmType = "realmType_" + randomAlphaOfLength(8);
+        String username = "username_" + randomAlphaOfLength(8);
+        List<ApiKey> apiKeys = List.of(createApiKeyForOwner("keyId_" + randomAlphaOfLength(8), username, realmName, realmType));
+        realmRefLookup = realmIdentifier -> {
+            assertThat(realmIdentifier.getName(), is(realmName));
+            assertThat(realmIdentifier.getType(), is(realmType));
+            return new Authentication.RealmRef(realmName, realmType, "nodeName_" + randomAlphaOfLength(8));
+        };
+        doAnswer(invocation -> {
+            @SuppressWarnings("unchecked")
+            var listener = (ActionListener<MultiSearchResponse>) invocation.getArgument(2);
+            listener.onFailure(new Exception("test failed, code should not be reachable"));
+            return null;
+        }).when(client).execute(eq(TransportMultiSearchAction.TYPE), any(MultiSearchRequest.class), anyActionListener());
+        when(client.prepareMultiSearch()).thenReturn(new MultiSearchRequestBuilder(client));
+        PlainActionFuture<Collection<String>> listener = new PlainActionFuture<>();
+        profileService.resolveProfileUidsForApiKeys(apiKeys, listener);
+        Collection<String> profileUids = listener.get();
+        assertThat(profileUids, nullValue());
+        // profiles index unavailable
+        when(this.profileIndex.indexExists()).thenReturn(true);
+        when(this.profileIndex.isAvailable(any())).thenReturn(false);
+        when(this.profileIndex.getUnavailableReason(any())).thenReturn(new ElasticsearchException("test unavailable"));
+        listener = new PlainActionFuture<>();
+        profileService.resolveProfileUidsForApiKeys(apiKeys, listener);
+        PlainActionFuture<Collection<String>> finalListener = listener;
+        ExecutionException e = expectThrows(ExecutionException.class, () -> finalListener.get());
+        assertThat(e.getMessage(), containsString("test unavailable"));
+    }
+
     record SampleDocumentParameter(String uid, String username, List<String> roles, long lastSynchronized) {}
 
     private void mockMultiGetRequest(List<SampleDocumentParameter> sampleDocumentParameters) {
@@ -1123,4 +1456,59 @@ public class ProfileServiceTests extends ESTestCase {
             null
         );
     }
+
+    private static ApiKey createApiKeyForOwner(String apiKeyId, String username, String realmName, String realmType) {
+        return new ApiKey(
+            randomAlphaOfLength(4),
+            apiKeyId,
+            randomFrom(ApiKey.Type.values()),
+            Instant.now(),
+            Instant.now().plusSeconds(3600),
+            false,
+            null,
+            username,
+            realmName,
+            realmType,
+            null,
+            List.of(
+                new RoleDescriptor(
+                    randomAlphaOfLength(5),
+                    new String[] { "manage_index_template" },
+                    new RoleDescriptor.IndicesPrivileges[] {
+                        RoleDescriptor.IndicesPrivileges.builder().indices("*").privileges("rad").build() },
+                    null,
+                    null,
+                    null,
+                    Map.of("_key", "value"),
+                    null,
+                    null,
+                    null
+                )
+            ),
+            null
+        );
+    }
+
+    private static class TestEmptySearchResponse extends SearchResponse {
+
+        TestEmptySearchResponse() {
+            super(
+                SearchHits.EMPTY_WITH_TOTAL_HITS,
+                null,
+                null,
+                false,
+                null,
+                null,
+                1,
+                null,
+                0,
+                0,
+                0,
+                0L,
+                ShardSearchFailure.EMPTY_ARRAY,
+                Clusters.EMPTY,
+                null
+            );
+        }
+    }
 }

+ 152 - 89
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/apikey/RestGetApiKeyActionTests.java

@@ -36,8 +36,8 @@ import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -96,35 +96,9 @@ public class RestGetApiKeyActionTests extends ESTestCase {
                 responseSetOnce.set(restResponse);
             }
         };
-        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
-        final Instant creation = Instant.now();
-        final Instant expiration = randomFrom(Arrays.asList(null, Instant.now().plus(10, ChronoUnit.DAYS)));
-        final Map<String, Object> metadata = ApiKeyTests.randomMetadata();
-        final List<RoleDescriptor> roleDescriptors = type == ApiKey.Type.CROSS_CLUSTER
-            ? List.of(randomCrossClusterAccessRoleDescriptor())
-            : randomUniquelyNamedRoleDescriptors(0, 3);
-        final List<RoleDescriptor> limitedByRoleDescriptors = withLimitedBy && type != ApiKey.Type.CROSS_CLUSTER
-            ? randomUniquelyNamedRoleDescriptors(1, 3)
-            : null;
-        final GetApiKeyResponse getApiKeyResponseExpected = new GetApiKeyResponse(
-            Collections.singletonList(
-                new ApiKey(
-                    "api-key-name-1",
-                    "api-key-id-1",
-                    type,
-                    creation,
-                    expiration,
-                    false,
-                    null,
-                    "user-x",
-                    "realm-1",
-                    "realm-type-1",
-                    metadata,
-                    roleDescriptors,
-                    limitedByRoleDescriptors
-                )
-            )
-        );
+        final List<String> profileUids = randomSize1ProfileUidsList();
+        final ApiKey apiKey = randomApiKeyInfo(withLimitedBy);
+        final GetApiKeyResponse getApiKeyResponseExpected = new GetApiKeyResponse(List.of(apiKey), profileUids);
 
         final var client = new NodeClient(Settings.EMPTY, threadPool) {
             @SuppressWarnings("unchecked")
@@ -167,29 +141,67 @@ public class RestGetApiKeyActionTests extends ESTestCase {
         } else {
             assertThat(
                 actual.getApiKeyInfoList(),
-                contains(
-                    new GetApiKeyResponse.Item(
-                        new ApiKey(
-                            "api-key-name-1",
-                            "api-key-id-1",
-                            type,
-                            creation,
-                            expiration,
-                            false,
-                            null,
-                            "user-x",
-                            "realm-1",
-                            "realm-type-1",
-                            metadata,
-                            roleDescriptors,
-                            limitedByRoleDescriptors
-                        )
-                    )
-                )
+                contains(new GetApiKeyResponse.Item(apiKey, profileUids == null ? null : profileUids.get(0)))
             );
         }
     }
 
+    public void testGetApiKeyWithProfileUid() throws Exception {
+        final boolean isGetRequestWithProfileUid = randomBoolean();
+        final Map<String, String> param = new HashMap<>();
+        if (isGetRequestWithProfileUid) {
+            param.put("with_profile_uid", Boolean.TRUE.toString());
+        } else {
+            if (randomBoolean()) {
+                param.put("with_profile_uid", Boolean.FALSE.toString());
+            }
+        }
+        final FakeRestRequest restRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(param).build();
+        final SetOnce<RestResponse> responseSetOnce = new SetOnce<>();
+        final RestChannel restChannel = new AbstractRestChannel(restRequest, randomBoolean()) {
+            @Override
+            public void sendResponse(RestResponse restResponse) {
+                responseSetOnce.set(restResponse);
+            }
+        };
+        final ApiKey apiKey1 = randomApiKeyInfo(randomBoolean());
+        final List<String> profileUids1 = randomSize1ProfileUidsList();
+        final var client = new NodeClient(Settings.EMPTY, threadPool) {
+            @SuppressWarnings("unchecked")
+            @Override
+            public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+                ActionType<Response> action,
+                Request request,
+                ActionListener<Response> listener
+            ) {
+                GetApiKeyRequest getApiKeyRequest = (GetApiKeyRequest) request;
+                ActionRequestValidationException validationException = getApiKeyRequest.validate();
+                if (validationException != null) {
+                    listener.onFailure(validationException);
+                    return;
+                }
+
+                if (getApiKeyRequest.withProfileUid()) {
+                    listener.onResponse((Response) new GetApiKeyResponse(List.of(apiKey1), profileUids1));
+                } else {
+                    listener.onResponse((Response) new GetApiKeyResponse(List.of(apiKey1), null));
+                }
+            }
+        };
+        final RestGetApiKeyAction restGetApiKeyAction = new RestGetApiKeyAction(Settings.EMPTY, mockLicenseState);
+        restGetApiKeyAction.handleRequest(restRequest, restChannel, client);
+        final RestResponse restResponse = responseSetOnce.get();
+        assertNotNull(restResponse);
+        assertThat(restResponse.status(), is(RestStatus.OK));
+        final GetApiKeyResponse actual = GetApiKeyResponse.fromXContent(createParser(XContentType.JSON.xContent(), restResponse.content()));
+        boolean responseHasProfile = isGetRequestWithProfileUid && profileUids1 != null && profileUids1.get(0) != null;
+        if (responseHasProfile) {
+            assertThat(actual.getApiKeyInfoList(), contains(new GetApiKeyResponse.Item(apiKey1, profileUids1.get(0))));
+        } else {
+            assertThat(actual.getApiKeyInfoList(), contains(new GetApiKeyResponse.Item(apiKey1, null)));
+        }
+    }
+
     public void testGetApiKeyOwnedByCurrentAuthenticatedUser() throws Exception {
         final boolean isGetRequestForOwnedKeysOnly = randomBoolean();
         final Map<String, String> param = new HashMap<>();
@@ -218,45 +230,15 @@ public class RestGetApiKeyActionTests extends ESTestCase {
             }
         };
 
-        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
-        final Instant creation = Instant.now();
-        final Instant expiration = randomFrom(Arrays.asList(null, Instant.now().plus(10, ChronoUnit.DAYS)));
-        final ApiKey apiKey1 = new ApiKey(
-            "api-key-name-1",
-            "api-key-id-1",
-            type,
-            creation,
-            expiration,
-            false,
-            null,
-            "user-x",
-            "realm-1",
-            "realm-type-1",
-            ApiKeyTests.randomMetadata(),
-            type == ApiKey.Type.CROSS_CLUSTER
-                ? List.of(randomCrossClusterAccessRoleDescriptor())
-                : randomUniquelyNamedRoleDescriptors(0, 3),
-            withLimitedBy && type != ApiKey.Type.CROSS_CLUSTER ? randomUniquelyNamedRoleDescriptors(1, 3) : null
-        );
-        final ApiKey apiKey2 = new ApiKey(
-            "api-key-name-2",
-            "api-key-id-2",
-            type,
-            creation,
-            expiration,
-            false,
-            null,
-            "user-y",
-            "realm-1",
-            "realm-type-1",
-            ApiKeyTests.randomMetadata(),
-            type == ApiKey.Type.CROSS_CLUSTER
-                ? List.of(randomCrossClusterAccessRoleDescriptor())
-                : randomUniquelyNamedRoleDescriptors(0, 3),
-            withLimitedBy && type != ApiKey.Type.CROSS_CLUSTER ? randomUniquelyNamedRoleDescriptors(1, 3) : null
+        final ApiKey apiKey1 = randomApiKeyInfo(withLimitedBy);
+        final List<String> profileUids1 = randomSize1ProfileUidsList();
+        final ApiKey apiKey2 = randomApiKeyInfo(withLimitedBy);
+        final List<String> profileUids2 = randomSize2ProfileUidsList();
+        final GetApiKeyResponse getApiKeyResponseExpectedWhenOwnerFlagIsTrue = new GetApiKeyResponse(List.of(apiKey1), profileUids1);
+        final GetApiKeyResponse getApiKeyResponseExpectedWhenOwnerFlagIsFalse = new GetApiKeyResponse(
+            List.of(apiKey1, apiKey2),
+            profileUids2
         );
-        final GetApiKeyResponse getApiKeyResponseExpectedWhenOwnerFlagIsTrue = new GetApiKeyResponse(Collections.singletonList(apiKey1));
-        final GetApiKeyResponse getApiKeyResponseExpectedWhenOwnerFlagIsFalse = new GetApiKeyResponse(List.of(apiKey1, apiKey2));
 
         final var client = new NodeClient(Settings.EMPTY, threadPool) {
             @SuppressWarnings("unchecked")
@@ -289,9 +271,90 @@ public class RestGetApiKeyActionTests extends ESTestCase {
         assertThat(restResponse.status(), is(RestStatus.OK));
         final GetApiKeyResponse actual = GetApiKeyResponse.fromXContent(createParser(XContentType.JSON.xContent(), restResponse.content()));
         if (isGetRequestForOwnedKeysOnly) {
-            assertThat(actual.getApiKeyInfoList().stream().map(GetApiKeyResponse.Item::apiKeyInfo).toList(), contains(apiKey1));
+            assertThat(
+                actual.getApiKeyInfoList(),
+                contains(new GetApiKeyResponse.Item(apiKey1, profileUids1 == null ? null : profileUids1.get(0)))
+            );
+        } else {
+            assertThat(
+                actual.getApiKeyInfoList(),
+                contains(
+                    new GetApiKeyResponse.Item(apiKey1, profileUids2 == null ? null : profileUids2.get(0)),
+                    new GetApiKeyResponse.Item(apiKey2, profileUids2 == null ? null : profileUids2.get(1))
+                )
+            );
+        }
+    }
+
+    private static List<String> randomSize1ProfileUidsList() {
+        final List<String> profileUids;
+        if (randomBoolean()) {
+            if (randomBoolean()) {
+                profileUids = null;
+            } else {
+                profileUids = new ArrayList<>(1);
+                profileUids.add(null);
+            }
+        } else {
+            profileUids = new ArrayList<>(1);
+            profileUids.add(randomAlphaOfLength(8));
+        }
+        return profileUids;
+    }
+
+    private static List<String> randomSize2ProfileUidsList() {
+        final List<String> profileUids2;
+        if (randomBoolean()) {
+            if (randomBoolean()) {
+                profileUids2 = null;
+            } else {
+                profileUids2 = new ArrayList<>(2);
+                profileUids2.add(null);
+                profileUids2.add(null);
+            }
         } else {
-            assertThat(actual.getApiKeyInfoList().stream().map(GetApiKeyResponse.Item::apiKeyInfo).toList(), contains(apiKey1, apiKey2));
+            profileUids2 = new ArrayList<>(2);
+            if (randomBoolean()) {
+                if (randomBoolean()) {
+                    profileUids2.add(randomAlphaOfLength(8));
+                    profileUids2.add(null);
+                } else {
+                    profileUids2.add(null);
+                    profileUids2.add(randomAlphaOfLength(8));
+                }
+            } else {
+                profileUids2.add(randomAlphaOfLength(8));
+                profileUids2.add(randomAlphaOfLength(8));
+            }
         }
+        return profileUids2;
+    }
+
+    private ApiKey randomApiKeyInfo(boolean withLimitedBy) {
+        final ApiKey.Type type = randomFrom(ApiKey.Type.values());
+        final Instant creation = Instant.now();
+        final Instant expiration = randomFrom(Arrays.asList(null, Instant.now().plus(10, ChronoUnit.DAYS)));
+        final Map<String, Object> metadata = ApiKeyTests.randomMetadata();
+        final List<RoleDescriptor> roleDescriptors = type == ApiKey.Type.CROSS_CLUSTER
+            ? List.of(randomCrossClusterAccessRoleDescriptor())
+            : randomUniquelyNamedRoleDescriptors(0, 3);
+        final List<RoleDescriptor> limitedByRoleDescriptors = withLimitedBy && type != ApiKey.Type.CROSS_CLUSTER
+            ? randomUniquelyNamedRoleDescriptors(1, 3)
+            : null;
+        return new ApiKey(
+            "api-key-name-" + randomAlphaOfLength(4),
+            "api-key-id-" + randomAlphaOfLength(4),
+            type,
+            creation,
+            expiration,
+            false,
+            null,
+            "user-x",
+            "realm-1",
+            "realm-type-1",
+            metadata,
+            roleDescriptors,
+            limitedByRoleDescriptors
+        );
     }
 }

+ 105 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/apikey/RestQueryApiKeyActionTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.rest.AbstractRestChannel;
 import org.elasticsearch.rest.RestChannel;
 import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.searchafter.SearchAfterBuilder;
 import org.elasticsearch.search.sort.FieldSortBuilder;
@@ -34,15 +35,24 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.rest.FakeRestRequest;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.core.security.action.apikey.ApiKey;
 import org.elasticsearch.xpack.core.security.action.apikey.QueryApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.apikey.QueryApiKeyResponse;
 
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.iterableWithSize;
+import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Mockito.mock;
 
 public class RestQueryApiKeyActionTests extends ESTestCase {
@@ -199,4 +209,99 @@ public class RestQueryApiKeyActionTests extends ESTestCase {
 
         assertNotNull(responseSetOnce.get());
     }
+
+    @SuppressWarnings("unchecked")
+    public void testQueryApiKeyWithProfileUid() throws Exception {
+        final boolean isQueryRequestWithProfileUid = randomBoolean();
+        Map<String, String> param = new HashMap<>();
+        if (isQueryRequestWithProfileUid) {
+            param.put("with_profile_uid", Boolean.TRUE.toString());
+        } else {
+            if (randomBoolean()) {
+                param.put("with_profile_uid", Boolean.FALSE.toString());
+            }
+        }
+        FakeRestRequest restRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(param).build();
+        SetOnce<RestResponse> responseSetOnce = new SetOnce<>();
+        RestChannel restChannel = new AbstractRestChannel(restRequest, randomBoolean()) {
+            @Override
+            public void sendResponse(RestResponse restResponse) {
+                responseSetOnce.set(restResponse);
+            }
+        };
+        ApiKey apiKey1 = new ApiKey(
+            randomAlphaOfLength(4),
+            randomAlphaOfLength(4),
+            randomFrom(ApiKey.Type.values()),
+            Instant.now(),
+            Instant.now(),
+            randomBoolean(),
+            null,
+            randomAlphaOfLength(4),
+            randomAlphaOfLength(4),
+            null,
+            null,
+            null,
+            null
+        );
+        final List<String> profileUids;
+        if (randomBoolean()) {
+            if (randomBoolean()) {
+                profileUids = null;
+            } else {
+                profileUids = new ArrayList<>(1);
+                profileUids.add(null);
+            }
+        } else {
+            profileUids = new ArrayList<>(1);
+            profileUids.add(randomAlphaOfLength(8));
+        }
+        var client = new NodeClient(Settings.EMPTY, threadPool) {
+            @Override
+            public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+                ActionType<Response> action,
+                Request request,
+                ActionListener<Response> listener
+            ) {
+                QueryApiKeyRequest queryApiKeyRequest = (QueryApiKeyRequest) request;
+                if (queryApiKeyRequest.withProfileUid()) {
+                    listener.onResponse(
+                        (Response) new QueryApiKeyResponse(
+                            1,
+                            List.of(apiKey1),
+                            List.<Object[]>of(new Object[] { "last" }),
+                            profileUids,
+                            null
+                        )
+                    );
+                } else {
+                    listener.onResponse(
+                        (Response) new QueryApiKeyResponse(1, List.of(apiKey1), List.<Object[]>of(new Object[] { "last" }), null, null)
+                    );
+                }
+            }
+        };
+        RestQueryApiKeyAction restQueryApiKeyAction = new RestQueryApiKeyAction(Settings.EMPTY, mockLicenseState);
+        restQueryApiKeyAction.handleRequest(restRequest, restChannel, client);
+        RestResponse restResponse = responseSetOnce.get();
+        assertNotNull(restResponse);
+        assertThat(restResponse.status(), is(RestStatus.OK));
+        try (XContentParser parser = createParser(XContentType.JSON.xContent(), restResponse.content())) {
+            Map<String, Object> queryApiKeyResponseMap = parser.map();
+            assertThat((List<Map<String, Object>>) queryApiKeyResponseMap.get("api_keys"), iterableWithSize(1));
+            assertThat(((List<Map<String, Object>>) queryApiKeyResponseMap.get("api_keys")).get(0).get("id"), is(apiKey1.getId()));
+            assertThat(
+                (List<Object[]>) ((List<Map<String, Object>>) queryApiKeyResponseMap.get("api_keys")).get(0).get("_sort"),
+                contains("last")
+            );
+            if (isQueryRequestWithProfileUid && profileUids != null && profileUids.get(0) != null) {
+                assertThat(
+                    ((List<Map<String, Object>>) queryApiKeyResponseMap.get("api_keys")).get(0).get("profile_uid"),
+                    is(profileUids.get(0))
+                );
+            } else {
+                assertThat(((List<Map<String, Object>>) queryApiKeyResponseMap.get("api_keys")).get(0).get("profile_uid"), nullValue());
+            }
+        }
+    }
 }