Parcourir la source

Cache API key doc to reduce traffic to the security index (#59376)

Getting the API key document form the security index is the most time consuing part
of the API Key authentication flow (>60% if index is local and >90% if index is remote).
This traffic is now avoided by caching added with this PR.

Additionally, we add a cache invalidator registry so that clearing of different caches will
be managed in a single place (requires follow-up PRs).
Yang Wang il y a 5 ans
Parent
commit
9e1f912ccc
31 fichiers modifiés avec 1514 ajouts et 25 suppressions
  1. 33 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/SecurityClient.java
  2. 12 2
      client/rest-high-level/src/main/java/org/elasticsearch/client/SecurityRequestConverters.java
  3. 73 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/security/ClearApiKeyCacheRequest.java
  4. 50 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/security/ClearSecurityCacheResponse.java
  5. 50 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/SecurityDocumentationIT.java
  6. 34 0
      docs/java-rest/high-level/security/clear-api-key-cache.asciidoc
  7. 2 0
      docs/java-rest/high-level/supported-apis.asciidoc
  8. 2 0
      x-pack/docs/en/rest-api/security.asciidoc
  9. 43 0
      x-pack/docs/en/rest-api/security/clear-api-key-cache.asciidoc
  10. 2 0
      x-pack/docs/en/rest-api/security/clear-cache.asciidoc
  11. 19 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheAction.java
  12. 86 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheRequest.java
  13. 64 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheResponse.java
  14. 32 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheRequestTests.java
  15. 170 0
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java
  16. 12 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
  17. 80 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/TransportClearSecurityCacheAction.java
  18. 217 13
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java
  19. 46 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestClearApiKeyCacheAction.java
  20. 64 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/CacheInvalidatorRegistry.java
  21. 76 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/InvalidationCountingCacheWrapper.java
  22. 3 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailFilterTests.java
  23. 3 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailTests.java
  24. 130 3
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java
  25. 3 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java
  26. 3 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/SecondaryAuthenticatorTests.java
  27. 3 2
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java
  28. 91 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/CacheInvalidatorRegistryTests.java
  29. 70 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/InvalidationCountingCacheWrapperTests.java
  30. 26 0
      x-pack/plugin/src/test/resources/rest-api-spec/api/security.clear_api_key_cache.json
  31. 15 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/api_key/10_basic.yml

+ 33 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/SecurityClient.java

@@ -23,12 +23,14 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.security.AuthenticateRequest;
 import org.elasticsearch.client.security.AuthenticateResponse;
 import org.elasticsearch.client.security.ChangePasswordRequest;
+import org.elasticsearch.client.security.ClearApiKeyCacheRequest;
 import org.elasticsearch.client.security.ClearPrivilegesCacheRequest;
 import org.elasticsearch.client.security.ClearPrivilegesCacheResponse;
 import org.elasticsearch.client.security.ClearRealmCacheRequest;
 import org.elasticsearch.client.security.ClearRealmCacheResponse;
 import org.elasticsearch.client.security.ClearRolesCacheRequest;
 import org.elasticsearch.client.security.ClearRolesCacheResponse;
+import org.elasticsearch.client.security.ClearSecurityCacheResponse;
 import org.elasticsearch.client.security.CreateApiKeyRequest;
 import org.elasticsearch.client.security.CreateApiKeyResponse;
 import org.elasticsearch.client.security.CreateTokenRequest;
@@ -544,6 +546,37 @@ public final class SecurityClient {
             ClearPrivilegesCacheResponse::fromXContent, listener, emptySet());
     }
 
+    /**
+     * Clears the api key cache for a set of IDs.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-clear-api-key-cache.html">
+     * the docs</a> for more.
+     *
+     * @param request the request with the security for which the cache should be cleared for the specified API key IDs.
+     * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response from the clear security cache call
+     * @throws IOException in case there is a problem sending the request or parsing back the response
+     */public ClearSecurityCacheResponse clearApiKeyCache(ClearApiKeyCacheRequest request,
+                                                          RequestOptions options) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(request, SecurityRequestConverters::clearApiKeyCache, options,
+            ClearSecurityCacheResponse::fromXContent, emptySet());
+    }
+
+    /**
+     * Clears the api key cache for a set of IDs asynchronously.
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-clear-api-key-cache.html">
+     * the docs</a> for more.
+     *
+     * @param request  the request with the security for which the cache should be cleared for the specified API key IDs.
+     * @param options  the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @param listener the listener to be notified upon request completion
+     * @return cancellable that may be used to cancel the request
+     */
+    public Cancellable clearApiKeyCacheAsync(ClearApiKeyCacheRequest request, RequestOptions options,
+                                             ActionListener<ClearSecurityCacheResponse> listener) {
+        return restHighLevelClient.performRequestAsyncAndParseEntity(request, SecurityRequestConverters::clearApiKeyCache, options,
+            ClearSecurityCacheResponse::fromXContent, listener, emptySet());
+    }
+
     /**
      * Synchronously retrieve the X.509 certificates that are used to encrypt communications in an Elasticsearch cluster.
      * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-ssl.html">

+ 12 - 2
client/rest-high-level/src/main/java/org/elasticsearch/client/SecurityRequestConverters.java

@@ -24,6 +24,7 @@ import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.elasticsearch.client.security.ChangePasswordRequest;
+import org.elasticsearch.client.security.ClearApiKeyCacheRequest;
 import org.elasticsearch.client.security.ClearPrivilegesCacheRequest;
 import org.elasticsearch.client.security.ClearRealmCacheRequest;
 import org.elasticsearch.client.security.ClearRolesCacheRequest;
@@ -184,10 +185,19 @@ final class SecurityRequestConverters {
         return new Request(HttpPost.METHOD_NAME, endpoint);
     }
 
-    static Request clearPrivilegesCache(ClearPrivilegesCacheRequest disableCacheRequest) {
+    static Request clearPrivilegesCache(ClearPrivilegesCacheRequest clearPrivilegesCacheRequest) {
         String endpoint = new RequestConverters.EndpointBuilder()
             .addPathPartAsIs("_security/privilege")
-            .addCommaSeparatedPathParts(disableCacheRequest.applications())
+            .addCommaSeparatedPathParts(clearPrivilegesCacheRequest.applications())
+            .addPathPart("_clear_cache")
+            .build();
+        return new Request(HttpPost.METHOD_NAME, endpoint);
+    }
+
+    static Request clearApiKeyCache(ClearApiKeyCacheRequest clearApiKeyCacheRequest) {
+        String endpoint = new RequestConverters.EndpointBuilder()
+            .addPathPartAsIs("_security/api_key")
+            .addCommaSeparatedPathParts(clearApiKeyCacheRequest.ids())
             .addPathPart("_clear_cache")
             .build();
         return new Request(HttpPost.METHOD_NAME, endpoint);

+ 73 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/security/ClearApiKeyCacheRequest.java

@@ -0,0 +1,73 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.security;
+
+import org.elasticsearch.client.Validatable;
+
+import java.util.Arrays;
+
+/**
+ * The request used to clear the API key cache.
+ */
+public final class ClearApiKeyCacheRequest implements Validatable {
+
+    private final String[] ids;
+
+    /**
+     * @param ids      An array of API Key ids to be cleared from the specified cache.
+     *                 If not specified, all entries will be cleared.
+     */
+    private ClearApiKeyCacheRequest(String... ids) {
+        this.ids = ids;
+    }
+    
+    public static ClearApiKeyCacheRequest clearAll() {
+        return new ClearApiKeyCacheRequest();
+    }
+    
+    public static ClearApiKeyCacheRequest clearById(String ... ids) { 
+        if (ids.length == 0) {
+            throw new IllegalArgumentException("Ids cannot be empty");
+        }  
+        return new ClearApiKeyCacheRequest(ids);
+     }
+
+    /**
+     * @return an array of key names that will be evicted
+     */
+    public String[] ids() {
+        return ids;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        ClearApiKeyCacheRequest that = (ClearApiKeyCacheRequest) o;
+        return Arrays.equals(ids, that.ids);
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(ids);
+    }
+}

+ 50 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/security/ClearSecurityCacheResponse.java

@@ -0,0 +1,50 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.security;
+
+import org.elasticsearch.client.NodesResponseHeader;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The response object that will be returned when clearing a security cache
+ */
+public final class ClearSecurityCacheResponse extends SecurityNodesResponse {
+
+    @SuppressWarnings("unchecked")
+    private static final ConstructingObjectParser<ClearSecurityCacheResponse, Void> PARSER =
+        new ConstructingObjectParser<>("clear_security_cache_response", false,
+            args -> new ClearSecurityCacheResponse((List<Node>)args[0], (NodesResponseHeader) args[1], (String) args[2]));
+
+    static {
+        SecurityNodesResponse.declareCommonNodesResponseParsing(PARSER);
+    }
+
+    public ClearSecurityCacheResponse(List<Node> nodes, NodesResponseHeader header, String clusterName) {
+        super(nodes, header, clusterName);
+    }
+
+    public static ClearSecurityCacheResponse fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+}

+ 50 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/SecurityDocumentationIT.java

@@ -30,12 +30,14 @@ import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.security.AuthenticateResponse;
 import org.elasticsearch.client.security.AuthenticateResponse.RealmInfo;
 import org.elasticsearch.client.security.ChangePasswordRequest;
+import org.elasticsearch.client.security.ClearApiKeyCacheRequest;
 import org.elasticsearch.client.security.ClearPrivilegesCacheRequest;
 import org.elasticsearch.client.security.ClearPrivilegesCacheResponse;
 import org.elasticsearch.client.security.ClearRealmCacheRequest;
 import org.elasticsearch.client.security.ClearRealmCacheResponse;
 import org.elasticsearch.client.security.ClearRolesCacheRequest;
 import org.elasticsearch.client.security.ClearRolesCacheResponse;
+import org.elasticsearch.client.security.ClearSecurityCacheResponse;
 import org.elasticsearch.client.security.CreateApiKeyRequest;
 import org.elasticsearch.client.security.CreateApiKeyResponse;
 import org.elasticsearch.client.security.CreateTokenRequest;
@@ -1053,6 +1055,54 @@ public class SecurityDocumentationIT extends ESRestHighLevelClientTestCase {
         }
     }
 
+    public void testClearApiKeyCache() throws Exception {
+        RestHighLevelClient client = highLevelClient();
+        {
+            //tag::clear-api-key-cache-request
+            ClearApiKeyCacheRequest request = ClearApiKeyCacheRequest.clearById(
+                "yVGMr3QByxdh1MSaicYx"  // <1>
+            );
+            //end::clear-api-key-cache-request
+            //tag::clear-api-key-cache-execute
+            ClearSecurityCacheResponse response = client.security().clearApiKeyCache(request, RequestOptions.DEFAULT);
+            //end::clear-api-key-cache-execute
+
+            assertNotNull(response);
+            assertThat(response.getNodes(), not(empty()));
+
+            //tag::clear-api-key-cache-response
+            List<ClearSecurityCacheResponse.Node> nodes = response.getNodes(); // <1>
+            //end::clear-api-key-cache-response
+        }
+
+        {
+            //tag::clear-api-key-cache-execute-listener
+            ClearApiKeyCacheRequest request = ClearApiKeyCacheRequest.clearById("yVGMr3QByxdh1MSaicYx");
+            ActionListener<ClearSecurityCacheResponse> listener = new ActionListener<>() {
+                @Override
+                public void onResponse(ClearSecurityCacheResponse clearSecurityCacheResponse) {
+                    // <1>
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    // <2>
+                }
+            };
+            //end::clear-api-key-cache-execute-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            listener = new LatchedActionListener<>(listener, latch);
+
+            // tag::clear-api-key-cache-execute-async
+            client.security().clearApiKeyCacheAsync(request, RequestOptions.DEFAULT, listener); // <1>
+            // end::clear-api-key-cache-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+    }
+
     public void testGetSslCertificates() throws Exception {
         RestHighLevelClient client = highLevelClient();
         {

+ 34 - 0
docs/java-rest/high-level/security/clear-api-key-cache.asciidoc

@@ -0,0 +1,34 @@
+
+--
+:api: clear-api-key-cache
+:request: ClearApiKeyCacheRequest
+:response: ClearSecurityCacheResponse
+--
+[role="xpack"]
+[id="{upid}-{api}"]
+=== Clear API Key Cache API
+
+[id="{upid}-{api}-request"]
+==== Clear API Key Cache Request
+
+A +{request}+ supports clearing API key cache for the given IDs.
+It can also clear the entire cache if no ID is specified.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-request]
+--------------------------------------------------
+<1> the IDs(s) for the API keys to be evicted from the cache
+
+include::../execution.asciidoc[]
+
+[id="{upid}-{api}-response"]
+==== Clear API Key Cache Response
+
+The returned +{response}+ allows to retrieve information about where the cache was cleared.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-response]
+--------------------------------------------------
+<1> the list of nodes that the cache was cleared on

+ 2 - 0
docs/java-rest/high-level/supported-apis.asciidoc

@@ -479,6 +479,7 @@ The Java High Level REST Client supports the following Security APIs:
 * <<{upid}-clear-roles-cache>>
 * <<{upid}-clear-privileges-cache>>
 * <<{upid}-clear-realm-cache>>
+* <<{upid}-clear-api-key-cache>>
 * <<{upid}-authenticate>>
 * <<{upid}-has-privileges>>
 * <<{upid}-get-user-privileges>>
@@ -511,6 +512,7 @@ include::security/get-privileges.asciidoc[]
 include::security/clear-roles-cache.asciidoc[]
 include::security/clear-privileges-cache.asciidoc[]
 include::security/clear-realm-cache.asciidoc[]
+include::security/clear-api-key-cache.asciidoc[]
 include::security/authenticate.asciidoc[]
 include::security/has-privileges.asciidoc[]
 include::security/get-user-privileges.asciidoc[]

+ 2 - 0
x-pack/docs/en/rest-api/security.asciidoc

@@ -64,6 +64,7 @@ without requiring basic authentication:
 * <<security-api-create-api-key,Create API Key>>
 * <<security-api-get-api-key,Get API Key>>
 * <<security-api-invalidate-api-key,Invalidate API Key>>
+* <<security-api-clear-api-key-cache,Clear API key cache>>
 
 [discrete]
 [[security-user-apis]]
@@ -108,6 +109,7 @@ include::security/change-password.asciidoc[]
 include::security/clear-cache.asciidoc[]
 include::security/clear-roles-cache.asciidoc[]
 include::security/clear-privileges-cache.asciidoc[]
+include::security/clear-api-key-cache.asciidoc[]
 include::security/create-api-keys.asciidoc[]
 include::security/put-app-privileges.asciidoc[]
 include::security/create-role-mappings.asciidoc[]

+ 43 - 0
x-pack/docs/en/rest-api/security/clear-api-key-cache.asciidoc

@@ -0,0 +1,43 @@
+[role="xpack"]
+[[security-api-clear-api-key-cache]]
+=== Clear API key cache API
+++++
+<titleabbrev>Clear API key cache</titleabbrev>
+++++
+
+Evicts a subset of all entries from the API key cache.
+The cache is also automatically cleared on state changes of the security index.
+
+[[security-api-clear-api-key-cache-request]]
+==== {api-request-title}
+
+`POST /_security/api_key/<ids>/_clear_cache`
+
+[[security-api-clear-api-key-cache-prereqs]]
+==== {api-prereq-title}
+
+* To use this API, you must have at least the `manage_security` cluster
+privilege.
+
+[[security-api-clear-api-key-cache-desc]]
+==== {api-description-title}
+
+For more information about API keys, see <<security-api-create-api-key>>,
+<<security-api-get-api-key>>, and <<security-api-invalidate-api-key>>.
+
+[[security-api-clear-api-key-cache-path-params]]
+==== {api-path-parms-title}
+
+`ids`::
+(string) comma separated list of API key IDs. If empty, all keys are evicted from the cache.
+
+[[security-api-clear-api-key-cache-example]]
+==== {api-examples-title}
+
+The clear API key cache API evicts entries from the API key cache.
+For example, to clear the entry of API key with ID `yVGMr3QByxdh1MSaicYx`.
+
+[source,console]
+--------------------------------------------------
+POST /_security/api_key/yVGMr3QByxdh1MSaicYx/_clear_cache
+--------------------------------------------------

+ 2 - 0
x-pack/docs/en/rest-api/security/clear-cache.asciidoc

@@ -29,6 +29,8 @@ To evict roles from the role cache, see the
 <<security-api-clear-role-cache,Clear roles cache API>>.
 To evict privileges from the privilege cache, see the
 <<security-api-clear-privilege-cache,Clear privileges cache API>>.
+To evict API keys from the API key cache, see the
+<<security-api-clear-api-key-cache,Clear API key cache API>>.
 
 [[security-api-clear-path-params]]
 ==== {api-path-parms-title}

+ 19 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheAction.java

@@ -0,0 +1,19 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.security.action;
+
+import org.elasticsearch.action.ActionType;
+
+public class ClearSecurityCacheAction extends ActionType<ClearSecurityCacheResponse> {
+
+    public static final ClearSecurityCacheAction INSTANCE = new ClearSecurityCacheAction();
+    public static final String NAME = "cluster:admin/xpack/security/cache/clear";
+
+    protected ClearSecurityCacheAction() {
+        super(NAME, ClearSecurityCacheResponse::new);
+    }
+}

+ 86 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheRequest.java

@@ -0,0 +1,86 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.security.action;
+
+import org.elasticsearch.action.support.nodes.BaseNodesRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.transport.TransportRequest;
+
+import java.io.IOException;
+
+public class ClearSecurityCacheRequest extends BaseNodesRequest<ClearSecurityCacheRequest> {
+
+    private String cacheName;
+    private String[] keys;
+
+    public ClearSecurityCacheRequest() {
+        super((String[]) null);
+    }
+
+    public ClearSecurityCacheRequest(StreamInput in) throws IOException {
+        super(in);
+        cacheName = in.readString();
+        keys = in.readOptionalStringArray();
+    }
+
+    public ClearSecurityCacheRequest cacheName(String cacheName) {
+        this.cacheName = cacheName;
+        return this;
+    }
+
+    public String cacheName() {
+        return cacheName;
+    }
+
+    public ClearSecurityCacheRequest keys(String... keys) {
+        this.keys = keys;
+        return this;
+    }
+
+    public String[] keys() {
+        return keys;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeString(cacheName);
+        out.writeOptionalStringArray(keys);
+    }
+
+    public static class Node extends TransportRequest {
+        private String cacheName;
+        private String[] keys;
+
+        public Node(StreamInput in) throws IOException {
+            super(in);
+            cacheName = in.readString();
+            keys = in.readOptionalStringArray();
+        }
+
+        public Node(ClearSecurityCacheRequest request) {
+            this.cacheName = request.cacheName();
+            this.keys = request.keys();
+        }
+
+        public String getCacheName() {
+            return cacheName;
+        }
+
+        public String[] getKeys() {
+            return keys;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeString(cacheName);
+            out.writeOptionalStringArray(keys);
+        }
+    }
+}

+ 64 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheResponse.java

@@ -0,0 +1,64 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.security.action;
+
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.nodes.BaseNodeResponse;
+import org.elasticsearch.action.support.nodes.BaseNodesResponse;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ClearSecurityCacheResponse extends BaseNodesResponse<ClearSecurityCacheResponse.Node>
+    implements ToXContentFragment {
+
+    public ClearSecurityCacheResponse(StreamInput in) throws IOException {
+        super(in);
+    }
+
+    public ClearSecurityCacheResponse(ClusterName clusterName, List<Node> nodes, List<FailedNodeException> failures) {
+        super(clusterName, nodes, failures);
+    }
+
+    @Override
+    protected List<Node> readNodesFrom(StreamInput in) throws IOException {
+        return in.readList(Node::new);
+    }
+
+    @Override
+    protected void writeNodesTo(StreamOutput out, List<Node> nodes) throws IOException {
+        out.writeList(nodes);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject("nodes");
+        for (Node node : getNodes()) {
+            builder.startObject(node.getNode().getId());
+            builder.field("name", node.getNode().getName());
+            builder.endObject();
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    public static class Node extends BaseNodeResponse {
+        public Node(StreamInput in) throws IOException {
+            super(in);
+        }
+
+        public Node(DiscoveryNode node) {
+            super(node);
+        }
+    }
+}

+ 32 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/action/ClearSecurityCacheRequestTests.java

@@ -0,0 +1,32 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.security.action;
+
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+public class ClearSecurityCacheRequestTests extends ESTestCase {
+
+    public void testSerialisation() throws IOException {
+        final String cacheName = randomAlphaOfLengthBetween(4, 8);
+        final String[] keys = randomArray(0, 8, String[]::new, () -> randomAlphaOfLength(12));
+        final ClearSecurityCacheRequest request = new ClearSecurityCacheRequest();
+        request.cacheName(cacheName).keys(keys);
+
+        try (BytesStreamOutput out = new BytesStreamOutput()) {
+            request.writeTo(out);
+            try (StreamInput in = out.bytes().streamInput()) {
+                final ClearSecurityCacheRequest serialized = new ClearSecurityCacheRequest(in);
+                assertEquals(request.cacheName(), serialized.cacheName());
+                assertArrayEquals(request.keys(), serialized.keys());
+            }
+        }
+    }
+}

+ 170 - 0
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java

@@ -7,9 +7,12 @@
 package org.elasticsearch.xpack.security.authc;
 
 import org.elasticsearch.ElasticsearchSecurityException;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
+import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
+import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@@ -37,6 +40,9 @@ import org.elasticsearch.test.SecuritySettingsSourceField;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.security.action.ApiKey;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheAction;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheRequest;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheResponse;
 import org.elasticsearch.xpack.core.security.action.CreateApiKeyRequestBuilder;
 import org.elasticsearch.xpack.core.security.action.CreateApiKeyResponse;
 import org.elasticsearch.xpack.core.security.action.GetApiKeyAction;
@@ -73,6 +79,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.INTERNAL_SECURITY_MAIN_INDEX_7;
 import static org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames.SECURITY_MAIN_ALIAS;
 import static org.elasticsearch.xpack.security.Security.SECURITY_CRYPTO_THREAD_POOL_NAME;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -287,6 +294,58 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         verifyInvalidateResponse(1, responses, invalidateResponse);
     }
 
+    public void testInvalidateApiKeyWillClearApiKeyCache() throws IOException, ExecutionException, InterruptedException {
+        final List<ApiKeyService> services = Arrays.stream(internalCluster().getNodeNames())
+            .map(n -> internalCluster().getInstance(ApiKeyService.class, n))
+            .collect(Collectors.toList());
+
+        // Create two API keys and authenticate with them
+        Tuple<String, String> apiKey1 = createApiKeyAndAuthenticateWithIt();
+        Tuple<String, String> apiKey2 = createApiKeyAndAuthenticateWithIt();
+
+        // Find out which nodes handled the above authentication requests
+        final ApiKeyService serviceForDoc1 =
+            services.stream().filter(s -> s.getDocCache().get(apiKey1.v1()) != null).findFirst().orElseThrow();
+        final ApiKeyService serviceForDoc2 =
+            services.stream().filter(s -> s.getDocCache().get(apiKey2.v1()) != null).findFirst().orElseThrow();
+        assertNotNull(serviceForDoc1.getFromCache(apiKey1.v1()));
+        assertNotNull(serviceForDoc2.getFromCache(apiKey2.v1()));
+        final boolean sameServiceNode = serviceForDoc1 == serviceForDoc2;
+        if (sameServiceNode) {
+            assertEquals(2, serviceForDoc1.getDocCache().count());
+        } else {
+            assertEquals(1, serviceForDoc1.getDocCache().count());
+            assertEquals(1, serviceForDoc2.getDocCache().count());
+        }
+
+        // Invalidate the first key
+        Client client = client().filterWithHeader(Collections.singletonMap("Authorization", UsernamePasswordToken
+            .basicAuthHeaderValue(SecuritySettingsSource.TEST_SUPERUSER, SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
+        PlainActionFuture<InvalidateApiKeyResponse> listener = new PlainActionFuture<>();
+        client.execute(InvalidateApiKeyAction.INSTANCE, InvalidateApiKeyRequest.usingApiKeyId(apiKey1.v1(), false), listener);
+        InvalidateApiKeyResponse invalidateResponse = listener.get();
+        assertThat(invalidateResponse.getInvalidatedApiKeys().size(), equalTo(1));
+        // The cache entry should be gone for the first key
+        if (sameServiceNode) {
+            assertEquals(1, serviceForDoc1.getDocCache().count());
+            assertNull(serviceForDoc1.getDocCache().get(apiKey1.v1()));
+            assertNotNull(serviceForDoc1.getDocCache().get(apiKey2.v1()));
+        } else {
+            assertEquals(0, serviceForDoc1.getDocCache().count());
+            assertEquals(1, serviceForDoc2.getDocCache().count());
+        }
+
+        // Authentication with the first key should fail
+        final String base64ApiKeyKeyValue = Base64.getEncoder().encodeToString(
+            (apiKey1.v1() + ":" + apiKey1.v2()).getBytes(StandardCharsets.UTF_8));
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
+            () -> new TestRestHighLevelClient().security()
+                .authenticate(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization",
+                    "ApiKey " + base64ApiKeyKeyValue).build()));
+        assertThat(e.getMessage(), containsString("security_exception"));
+        assertThat(e.status(), is(RestStatus.UNAUTHORIZED));
+    }
+
     private void verifyInvalidateResponse(int noOfApiKeys, List<CreateApiKeyResponse> responses,
                                           InvalidateApiKeyResponse invalidateResponse) {
         assertThat(invalidateResponse.getInvalidatedApiKeys().size(), equalTo(noOfApiKeys));
@@ -947,6 +1006,117 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
         }
     }
 
+    public void testCacheInvalidationViaApiCalls() throws Exception {
+        final List<ApiKeyService> services = Arrays.stream(internalCluster().getNodeNames())
+            .map(n -> internalCluster().getInstance(ApiKeyService.class, n))
+            .collect(Collectors.toList());
+
+        // Create two API keys and authenticate with them
+        String docId1 = createApiKeyAndAuthenticateWithIt().v1();
+        String docId2 = createApiKeyAndAuthenticateWithIt().v1();
+
+        // Find out which nodes handled the above authentication requests
+        final ApiKeyService serviceForDoc1 =
+            services.stream().filter(s -> s.getDocCache().get(docId1) != null).findFirst().orElseThrow();
+        final ApiKeyService serviceForDoc2 =
+            services.stream().filter(s -> s.getDocCache().get(docId2) != null).findFirst().orElseThrow();
+        assertNotNull(serviceForDoc1.getFromCache(docId1));
+        assertNotNull(serviceForDoc2.getFromCache(docId2));
+        final boolean sameServiceNode = serviceForDoc1 == serviceForDoc2;
+        if (sameServiceNode) {
+            assertEquals(2, serviceForDoc1.getDocCache().count());
+            assertEquals(2, serviceForDoc1.getRoleDescriptorsBytesCache().count());
+        } else {
+            assertEquals(1, serviceForDoc1.getDocCache().count());
+            assertEquals(2, serviceForDoc1.getRoleDescriptorsBytesCache().count());
+            assertEquals(1, serviceForDoc2.getDocCache().count());
+            assertEquals(2, serviceForDoc2.getRoleDescriptorsBytesCache().count());
+        }
+
+        // Invalidate cache for only the first key
+        ClearSecurityCacheRequest clearSecurityCacheRequest = new ClearSecurityCacheRequest();
+        clearSecurityCacheRequest.cacheName("api_key");
+        clearSecurityCacheRequest.keys(docId1);
+        ClearSecurityCacheResponse clearSecurityCacheResponse =
+            client().execute(ClearSecurityCacheAction.INSTANCE, clearSecurityCacheRequest).get();
+        assertFalse(clearSecurityCacheResponse.hasFailures());
+
+        assertBusy(() -> {
+            expectThrows(NullPointerException.class, () -> serviceForDoc1.getFromCache(docId1));
+            if (sameServiceNode) {
+                assertEquals(1, serviceForDoc1.getDocCache().count());
+                assertNotNull(serviceForDoc1.getFromCache(docId2));
+            } else {
+                assertEquals(0, serviceForDoc1.getDocCache().count());
+                assertEquals(1, serviceForDoc2.getDocCache().count());
+                assertNotNull(serviceForDoc2.getFromCache(docId2));
+            }
+            // Role descriptors are not invalidated when invalidation is for specific API keys
+            assertEquals(2, serviceForDoc1.getRoleDescriptorsBytesCache().count());
+            assertEquals(2, serviceForDoc2.getRoleDescriptorsBytesCache().count());
+        });
+
+        // Invalidate all cache entries by setting keys to an empty array
+        clearSecurityCacheRequest.keys(new String[0]);
+        clearSecurityCacheResponse =
+            client().execute(ClearSecurityCacheAction.INSTANCE, clearSecurityCacheRequest).get();
+        assertFalse(clearSecurityCacheResponse.hasFailures());
+        assertBusy(() -> {
+            assertEquals(0, serviceForDoc1.getDocCache().count());
+            assertEquals(0, serviceForDoc1.getRoleDescriptorsBytesCache().count());
+            if (sameServiceNode) {
+                expectThrows(NullPointerException.class, () -> serviceForDoc1.getFromCache(docId2));
+            } else {
+                expectThrows(NullPointerException.class, () -> serviceForDoc2.getFromCache(docId2));
+                assertEquals(0, serviceForDoc2.getDocCache().count());
+                assertEquals(0, serviceForDoc2.getRoleDescriptorsBytesCache().count());
+            }
+        });
+    }
+
+    public void testSecurityIndexStateChangeWillInvalidateApiKeyCaches() throws Exception {
+        final List<ApiKeyService> services = Arrays.stream(internalCluster().getNodeNames())
+            .map(n -> internalCluster().getInstance(ApiKeyService.class, n))
+            .collect(Collectors.toList());
+
+        String docId = createApiKeyAndAuthenticateWithIt().v1();
+
+        // The API key is cached by one of the node that the above request hits, find out which one
+        final ApiKeyService apiKeyService =
+            services.stream().filter(s -> s.getDocCache().count() > 0).findFirst().orElseThrow();
+        assertNotNull(apiKeyService.getFromCache(docId));
+        assertEquals(1, apiKeyService.getDocCache().count());
+        assertEquals(2, apiKeyService.getRoleDescriptorsBytesCache().count());
+
+        // Close security index to trigger invalidation
+        final CloseIndexResponse closeIndexResponse = client().admin().indices().close(
+            new CloseIndexRequest(INTERNAL_SECURITY_MAIN_INDEX_7)).get();
+        assertTrue(closeIndexResponse.isAcknowledged());
+        assertBusy(() -> {
+            expectThrows(NullPointerException.class, () -> apiKeyService.getFromCache(docId));
+            assertEquals(0, apiKeyService.getDocCache().count());
+            assertEquals(0, apiKeyService.getRoleDescriptorsBytesCache().count());
+        });
+    }
+
+    private Tuple<String, String> createApiKeyAndAuthenticateWithIt() throws IOException {
+        Client client = client().filterWithHeader(Collections.singletonMap("Authorization",
+            UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_SUPERUSER,
+                SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
+
+        final CreateApiKeyResponse createApiKeyResponse = new CreateApiKeyRequestBuilder(client)
+            .setName("test key")
+            .get();
+        final String docId = createApiKeyResponse.getId();
+        final String base64ApiKeyKeyValue = Base64.getEncoder().encodeToString(
+            (docId + ":" + createApiKeyResponse.getKey().toString()).getBytes(StandardCharsets.UTF_8));
+        AuthenticateResponse authResponse = new TestRestHighLevelClient().security()
+            .authenticate(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization",
+                "ApiKey " + base64ApiKeyKeyValue).build());
+        assertEquals("api_key", authResponse.getAuthenticationType());
+        return Tuple.tuple(docId, createApiKeyResponse.getKey().toString());
+    }
+
     private void assertApiKeyNotCreated(Client client, String keyName) throws ExecutionException, InterruptedException {
         new RefreshRequestBuilder(client, RefreshAction.INSTANCE).setIndices(SECURITY_MAIN_ALIAS).execute().get();
         assertEquals(0, client.execute(GetApiKeyAction.INSTANCE,

+ 12 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

@@ -81,6 +81,7 @@ import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.core.security.SecurityExtension;
 import org.elasticsearch.xpack.core.security.SecurityField;
 import org.elasticsearch.xpack.core.security.SecuritySettings;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheAction;
 import org.elasticsearch.xpack.core.security.action.CreateApiKeyAction;
 import org.elasticsearch.xpack.core.security.action.DelegatePkiAuthenticationAction;
 import org.elasticsearch.xpack.core.security.action.GetApiKeyAction;
@@ -145,6 +146,7 @@ import org.elasticsearch.xpack.core.ssl.TLSLicenseBootstrapCheck;
 import org.elasticsearch.xpack.core.ssl.action.GetCertificateInfoAction;
 import org.elasticsearch.xpack.core.ssl.action.TransportGetCertificateInfoAction;
 import org.elasticsearch.xpack.core.ssl.rest.RestGetCertificateInfoAction;
+import org.elasticsearch.xpack.security.action.TransportClearSecurityCacheAction;
 import org.elasticsearch.xpack.security.action.TransportCreateApiKeyAction;
 import org.elasticsearch.xpack.security.action.TransportDelegatePkiAuthenticationAction;
 import org.elasticsearch.xpack.security.action.TransportGetApiKeyAction;
@@ -212,6 +214,7 @@ import org.elasticsearch.xpack.security.authz.store.NativeRolesStore;
 import org.elasticsearch.xpack.security.ingest.SetSecurityUserProcessor;
 import org.elasticsearch.xpack.security.rest.SecurityRestFilter;
 import org.elasticsearch.xpack.security.rest.action.RestAuthenticateAction;
+import org.elasticsearch.xpack.security.rest.action.apikey.RestClearApiKeyCacheAction;
 import org.elasticsearch.xpack.security.rest.action.RestDelegatePkiAuthenticationAction;
 import org.elasticsearch.xpack.security.rest.action.apikey.RestCreateApiKeyAction;
 import org.elasticsearch.xpack.security.rest.action.apikey.RestGetApiKeyAction;
@@ -248,6 +251,7 @@ import org.elasticsearch.xpack.security.rest.action.user.RestHasPrivilegesAction
 import org.elasticsearch.xpack.security.rest.action.user.RestPutUserAction;
 import org.elasticsearch.xpack.security.rest.action.user.RestSetEnabledAction;
 import org.elasticsearch.xpack.security.support.ExtensionComponents;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
 import org.elasticsearch.xpack.security.support.SecurityStatusChangeListener;
 import org.elasticsearch.xpack.security.transport.SecurityHttpSettings;
@@ -436,6 +440,10 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
         components.add(privilegeStore);
         securityIndex.get().addIndexStateListener(privilegeStore::onSecurityIndexStateChange);
 
+        final CacheInvalidatorRegistry cacheInvalidatorRegistry = new CacheInvalidatorRegistry();
+        components.add(cacheInvalidatorRegistry);
+        securityIndex.get().addIndexStateListener(cacheInvalidatorRegistry::onSecurityIndexStageChange);
+
         dlsBitsetCache.set(new DocumentSubsetBitsetCache(settings, threadPool));
         final FieldPermissionsCache fieldPermissionsCache = new FieldPermissionsCache(settings);
         final FileRolesStore fileRolesStore = new FileRolesStore(settings, environment, resourceWatcherService, getLicenseState(),
@@ -448,7 +456,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
         }
 
         final ApiKeyService apiKeyService = new ApiKeyService(settings, Clock.systemUTC(), client, getLicenseState(), securityIndex.get(),
-            clusterService, threadPool);
+            clusterService, cacheInvalidatorRegistry, threadPool);
         components.add(apiKeyService);
         final CompositeRolesStore allRolesStore = new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore, reservedRolesStore,
             privilegeStore, rolesProviders, threadPool.getThreadContext(), getLicenseState(), fieldPermissionsCache, apiKeyService,
@@ -658,6 +666,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
         settingsList.add(ApiKeyService.CACHE_HASH_ALGO_SETTING);
         settingsList.add(ApiKeyService.CACHE_MAX_KEYS_SETTING);
         settingsList.add(ApiKeyService.CACHE_TTL_SETTING);
+        settingsList.add(ApiKeyService.DOC_CACHE_TTL_SETTING);
         settingsList.add(NativePrivilegeStore.CACHE_MAX_APPLICATIONS_SETTING);
         settingsList.add(NativePrivilegeStore.CACHE_TTL_SETTING);
 
@@ -748,6 +757,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
                 new ActionHandler<>(ClearRealmCacheAction.INSTANCE, TransportClearRealmCacheAction.class),
                 new ActionHandler<>(ClearRolesCacheAction.INSTANCE, TransportClearRolesCacheAction.class),
                 new ActionHandler<>(ClearPrivilegesCacheAction.INSTANCE, TransportClearPrivilegesCacheAction.class),
+                new ActionHandler<>(ClearSecurityCacheAction.INSTANCE, TransportClearSecurityCacheAction.class),
                 new ActionHandler<>(GetUsersAction.INSTANCE, TransportGetUsersAction.class),
                 new ActionHandler<>(PutUserAction.INSTANCE, TransportPutUserAction.class),
                 new ActionHandler<>(DeleteUserAction.INSTANCE, TransportDeleteUserAction.class),
@@ -809,6 +819,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
                 new RestClearRealmCacheAction(settings, getLicenseState()),
                 new RestClearRolesCacheAction(settings, getLicenseState()),
                 new RestClearPrivilegesCacheAction(settings, getLicenseState()),
+                new RestClearApiKeyCacheAction(settings, getLicenseState()),
                 new RestGetUsersAction(settings, getLicenseState()),
                 new RestPutUserAction(settings, getLicenseState()),
                 new RestDeleteUserAction(settings, getLicenseState()),

+ 80 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/TransportClearSecurityCacheAction.java

@@ -0,0 +1,80 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.security.action;
+
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.nodes.TransportNodesAction;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheAction;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheRequest;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheResponse;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Clears a security cache by name (with optional keys).
+ * @see CacheInvalidatorRegistry
+ */
+public class TransportClearSecurityCacheAction extends TransportNodesAction<ClearSecurityCacheRequest, ClearSecurityCacheResponse,
+    ClearSecurityCacheRequest.Node, ClearSecurityCacheResponse.Node> {
+
+    private final CacheInvalidatorRegistry cacheInvalidatorRegistry;
+
+    @Inject
+    public TransportClearSecurityCacheAction(
+        ThreadPool threadPool,
+        ClusterService clusterService,
+        TransportService transportService,
+        ActionFilters actionFilters,
+        CacheInvalidatorRegistry cacheInvalidatorRegistry) {
+        super(
+            ClearSecurityCacheAction.NAME,
+            threadPool,
+            clusterService,
+            transportService,
+            actionFilters,
+            ClearSecurityCacheRequest::new,
+            ClearSecurityCacheRequest.Node::new,
+            ThreadPool.Names.MANAGEMENT,
+            ClearSecurityCacheResponse.Node.class);
+        this.cacheInvalidatorRegistry = cacheInvalidatorRegistry;
+    }
+
+    @Override
+    protected ClearSecurityCacheResponse newResponse(
+        ClearSecurityCacheRequest request, List<ClearSecurityCacheResponse.Node> nodes, List<FailedNodeException> failures) {
+        return new ClearSecurityCacheResponse(clusterService.getClusterName(), nodes, failures);
+    }
+
+    @Override
+    protected ClearSecurityCacheRequest.Node newNodeRequest(ClearSecurityCacheRequest request) {
+        return new ClearSecurityCacheRequest.Node(request);
+    }
+
+    @Override
+    protected ClearSecurityCacheResponse.Node newNodeResponse(StreamInput in) throws IOException {
+        return new ClearSecurityCacheResponse.Node(in);
+    }
+
+    @Override
+    protected ClearSecurityCacheResponse.Node nodeOperation(ClearSecurityCacheRequest.Node request, Task task) {
+        if (request.getKeys() == null || request.getKeys().length == 0) {
+            cacheInvalidatorRegistry.invalidateCache(request.getCacheName());
+        } else {
+            cacheInvalidatorRegistry.invalidateByKey(request.getCacheName(), List.of(request.getKeys()));
+        }
+        return new ClearSecurityCacheResponse.Node(clusterService.localNode());
+    }
+}

+ 217 - 13
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java

@@ -42,6 +42,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.cache.Cache;
 import org.elasticsearch.common.cache.CacheBuilder;
 import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.hash.MessageDigests;
 import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Setting;
@@ -72,6 +73,9 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.security.ScrollHelper;
 import org.elasticsearch.xpack.core.security.action.ApiKey;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheAction;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheRequest;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheResponse;
 import org.elasticsearch.xpack.core.security.action.CreateApiKeyRequest;
 import org.elasticsearch.xpack.core.security.action.CreateApiKeyResponse;
 import org.elasticsearch.xpack.core.security.action.GetApiKeyResponse;
@@ -82,6 +86,8 @@ import org.elasticsearch.xpack.core.security.authc.AuthenticationResult;
 import org.elasticsearch.xpack.core.security.authc.support.Hasher;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 import org.elasticsearch.xpack.core.security.user.User;
+import org.elasticsearch.xpack.security.support.InvalidationCountingCacheWrapper;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
 import org.elasticsearch.xpack.security.support.FeatureNotEnabledException;
 import org.elasticsearch.xpack.security.support.FeatureNotEnabledException.Feature;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
@@ -90,6 +96,7 @@ import javax.crypto.SecretKeyFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.time.Clock;
 import java.time.Instant;
@@ -107,6 +114,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -160,6 +168,8 @@ public class ApiKeyService {
         TimeValue.timeValueHours(24L), Property.NodeScope);
     public static final Setting<Integer> CACHE_MAX_KEYS_SETTING = Setting.intSetting("xpack.security.authc.api_key.cache.max_keys",
         10000, Property.NodeScope);
+    public static final Setting<TimeValue> DOC_CACHE_TTL_SETTING = Setting.timeSetting("xpack.security.authc.api_key.doc_cache.ttl",
+        TimeValue.timeValueMinutes(5), TimeValue.timeValueMinutes(0), TimeValue.timeValueMinutes(15), Property.NodeScope);
 
     private final Clock clock;
     private final Client client;
@@ -174,11 +184,12 @@ public class ApiKeyService {
     private final Cache<String, ListenableFuture<CachedApiKeyHashResult>> apiKeyAuthCache;
     private final Hasher cacheHasher;
     private final ThreadPool threadPool;
+    private final ApiKeyDocCache apiKeyDocCache;
 
     private volatile long lastExpirationRunMs;
 
     public ApiKeyService(Settings settings, Clock clock, Client client, XPackLicenseState licenseState, SecurityIndexManager securityIndex,
-                         ClusterService clusterService, ThreadPool threadPool) {
+                         ClusterService clusterService, CacheInvalidatorRegistry cacheInvalidatorRegistry, ThreadPool threadPool) {
         this.clock = clock;
         this.client = client;
         this.licenseState = licenseState;
@@ -192,13 +203,34 @@ public class ApiKeyService {
         this.threadPool = threadPool;
         this.cacheHasher = Hasher.resolve(CACHE_HASH_ALGO_SETTING.get(settings));
         final TimeValue ttl = CACHE_TTL_SETTING.get(settings);
+        final Integer maximumWeight = CACHE_MAX_KEYS_SETTING.get(settings);
         if (ttl.getNanos() > 0) {
             this.apiKeyAuthCache = CacheBuilder.<String, ListenableFuture<CachedApiKeyHashResult>>builder()
                 .setExpireAfterWrite(ttl)
-                .setMaximumWeight(CACHE_MAX_KEYS_SETTING.get(settings))
+                .setMaximumWeight(maximumWeight)
                 .build();
+            final TimeValue doc_ttl = DOC_CACHE_TTL_SETTING.get(settings);
+            this.apiKeyDocCache = doc_ttl.getNanos() == 0 ? null : new ApiKeyDocCache(doc_ttl, maximumWeight);
+            cacheInvalidatorRegistry.registerCacheInvalidator("api_key", new CacheInvalidatorRegistry.CacheInvalidator() {
+                @Override
+                public void invalidate(Collection<String> keys) {
+                    if (apiKeyDocCache != null) {
+                        apiKeyDocCache.invalidate(keys);
+                    }
+                    keys.forEach(apiKeyAuthCache::invalidate);
+                }
+
+                @Override
+                public void invalidateAll() {
+                    if (apiKeyDocCache != null) {
+                        apiKeyDocCache.invalidateAll();
+                    }
+                    apiKeyAuthCache.invalidateAll();
+                }
+            });
         } else {
             this.apiKeyAuthCache = null;
+            this.apiKeyDocCache = null;
         }
     }
 
@@ -276,7 +308,6 @@ public class ApiKeyService {
             Arrays.fill(keyHash, (char) 0);
         }
 
-
         // Save role_descriptors
         builder.startObject("role_descriptors");
         if (keyRoles != null && keyRoles.isEmpty() == false) {
@@ -353,9 +384,32 @@ public class ApiKeyService {
                 authResult.getMetadata());
     }
 
-    private void loadApiKeyAndValidateCredentials(ThreadContext ctx, ApiKeyCredentials credentials,
-                                                  ActionListener<AuthenticationResult> listener) {
+    void loadApiKeyAndValidateCredentials(ThreadContext ctx, ApiKeyCredentials credentials,
+                                          ActionListener<AuthenticationResult> listener) {
         final String docId = credentials.getId();
+
+        Consumer<ApiKeyDoc> validator = apiKeyDoc ->
+            validateApiKeyCredentials(docId, apiKeyDoc, credentials, clock, ActionListener.delegateResponse(listener, (l, e) -> {
+                if (ExceptionsHelper.unwrapCause(e) instanceof EsRejectedExecutionException) {
+                    listener.onResponse(AuthenticationResult.terminate("server is too busy to respond", e));
+                } else {
+                    listener.onFailure(e);
+                }
+            }));
+
+        final long invalidationCount;
+        if (apiKeyDocCache != null) {
+            ApiKeyDoc existing = apiKeyDocCache.get(docId);
+            if (existing != null) {
+                validator.accept(existing);
+                return;
+            }
+            // API key doc not found in cache, take a record of the current invalidation count to prepare for caching
+            invalidationCount = apiKeyDocCache.getInvalidationCount();
+        } else {
+            invalidationCount = -1;
+        }
+
         final GetRequest getRequest = client
             .prepareGet(SECURITY_MAIN_ALIAS, docId)
             .setFetchSource(true)
@@ -368,13 +422,10 @@ public class ApiKeyService {
                         response.getSourceAsBytesRef(), XContentType.JSON)) {
                         apiKeyDoc = ApiKeyDoc.fromXContent(parser);
                     }
-                    validateApiKeyCredentials(docId, apiKeyDoc, credentials, clock, ActionListener.delegateResponse(listener, (l, e) -> {
-                        if (ExceptionsHelper.unwrapCause(e) instanceof EsRejectedExecutionException) {
-                            listener.onResponse(AuthenticationResult.terminate("server is too busy to respond", e));
-                        } else {
-                            listener.onFailure(e);
-                        }
-                    }));
+                    if (invalidationCount != -1) {
+                        apiKeyDocCache.putIfNoInvalidationSince(docId, apiKeyDoc, invalidationCount);
+                    }
+                    validator.accept(apiKeyDoc);
                 } else {
                     listener.onResponse(
                         AuthenticationResult.unsuccessful("unable to find apikey with id " + credentials.getId(), null));
@@ -591,6 +642,16 @@ public class ApiKeyService {
         return apiKeyAuthCache == null ? null : FutureUtils.get(apiKeyAuthCache.get(id), 0L, TimeUnit.MILLISECONDS);
     }
 
+    // pkg private for testing
+    InvalidationCountingCacheWrapper<String, CachedApiKeyDoc> getDocCache() {
+        return apiKeyDocCache == null ? null : apiKeyDocCache.docCache;
+    }
+
+    // pkg private for testing
+    Cache<String, BytesReference> getRoleDescriptorsBytesCache() {
+        return apiKeyDocCache == null ? null : apiKeyDocCache.roleDescriptorsBytesCache;
+    }
+
     // package-private for testing
     void validateApiKeyExpiration(ApiKeyDoc apiKeyDoc, ApiKeyCredentials credentials, Clock clock,
                                   ActionListener<AuthenticationResult> listener) {
@@ -895,7 +956,7 @@ public class ApiKeyService {
                         }
                         InvalidateApiKeyResponse result = new InvalidateApiKeyResponse(invalidated, previouslyInvalidated,
                             failedRequestResponses);
-                        listener.onResponse(result);
+                        clearCache(result, listener);
                     }, e -> {
                         Throwable cause = ExceptionsHelper.unwrapCause(e);
                         traceLog("invalidate api keys", cause);
@@ -904,6 +965,25 @@ public class ApiKeyService {
         }
     }
 
+    private void clearCache(InvalidateApiKeyResponse result, ActionListener<InvalidateApiKeyResponse> listener) {
+        final ClearSecurityCacheRequest clearApiKeyCacheRequest =
+            new ClearSecurityCacheRequest().cacheName("api_key").keys(result.getInvalidatedApiKeys().toArray(String[]::new));
+        executeAsyncWithOrigin(client, SECURITY_ORIGIN, ClearSecurityCacheAction.INSTANCE, clearApiKeyCacheRequest,
+            new ActionListener<>() {
+                @Override
+                public void onResponse(ClearSecurityCacheResponse nodes) {
+                    listener.onResponse(result);
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    logger.error("unable to clear API key cache", e);
+                    listener.onFailure(new ElasticsearchException(
+                        "clearing the API key cache failed; please clear the caches manually", e));
+                }
+            });
+    }
+
     /**
      * Logs an exception concerning a specific api key at TRACE level (if enabled)
      */
@@ -1088,8 +1168,132 @@ public class ApiKeyService {
             this.creator = creator;
         }
 
+        public CachedApiKeyDoc toCachedApiKeyDoc() {
+            final MessageDigest digest = MessageDigests.sha256();
+            digest.update(BytesReference.toBytes(roleDescriptorsBytes));
+            final String roleDescriptorsHash = MessageDigests.toHexString(digest.digest());
+            digest.reset();
+            digest.update(BytesReference.toBytes(limitedByRoleDescriptorsBytes));
+            final String limitedByRoleDescriptorsHash = MessageDigests.toHexString(digest.digest());
+            return new CachedApiKeyDoc(
+                creationTime,
+                expirationTime,
+                invalidated,
+                hash,
+                name,
+                version,
+                creator,
+                roleDescriptorsHash,
+                limitedByRoleDescriptorsHash);
+        }
+
         static ApiKeyDoc fromXContent(XContentParser parser) {
             return PARSER.apply(parser, null);
         }
     }
+
+    /**
+     * A cached version of the {@link ApiKeyDoc}. The main difference is that the role descriptors
+     * are replaced by their hashes. The actual values are stored in a separate role descriptor cache,
+     * so that duplicate role descriptors are cached only once (and therefore consume less memory).
+     */
+    public static final class CachedApiKeyDoc {
+        final long creationTime;
+        final long expirationTime;
+        final Boolean invalidated;
+        final String hash;
+        final String name;
+        final int version;
+        final Map<String, Object> creator;
+        final String roleDescriptorsHash;
+        final String limitedByRoleDescriptorsHash;
+
+        public CachedApiKeyDoc(
+            long creationTime, long expirationTime,
+            Boolean invalidated,
+            String hash,
+            String name, int version, Map<String, Object> creator,
+            String roleDescriptorsHash,
+            String limitedByRoleDescriptorsHash) {
+            this.creationTime = creationTime;
+            this.expirationTime = expirationTime;
+            this.invalidated = invalidated;
+            this.hash = hash;
+            this.name = name;
+            this.version = version;
+            this.creator = creator;
+            this.roleDescriptorsHash = roleDescriptorsHash;
+            this.limitedByRoleDescriptorsHash = limitedByRoleDescriptorsHash;
+        }
+
+        public ApiKeyDoc toApiKeyDoc(BytesReference roleDescriptorsBytes, BytesReference limitedByRoleDescriptorsBytes) {
+            return new ApiKeyDoc(
+                "api_key",
+                creationTime,
+                expirationTime,
+                invalidated,
+                hash,
+                name,
+                version,
+                roleDescriptorsBytes,
+                limitedByRoleDescriptorsBytes,
+                creator);
+        }
+    }
+
+    private static final class ApiKeyDocCache {
+        private final InvalidationCountingCacheWrapper<String, ApiKeyService.CachedApiKeyDoc> docCache;
+        private final Cache<String, BytesReference> roleDescriptorsBytesCache;
+
+        ApiKeyDocCache(TimeValue ttl, int maximumWeight) {
+            this.docCache = new InvalidationCountingCacheWrapper<>(
+                CacheBuilder.<String, ApiKeyService.CachedApiKeyDoc>builder()
+                    .setMaximumWeight(maximumWeight)
+                    .setExpireAfterWrite(ttl)
+                    .build()
+            );
+            // We don't use the doc TTL because that TTL is very low to avoid the risk of
+            // caching an invalidated API key. But role descriptors are immutable and may be shared between
+            // multiple API keys, so we cache for longer and rely on the weight to manage the cache size.
+            this.roleDescriptorsBytesCache = CacheBuilder.<String, BytesReference>builder()
+                .setExpireAfterAccess(TimeValue.timeValueHours(1))
+                .setMaximumWeight(maximumWeight * 2)
+                .build();
+        }
+
+        public ApiKeyDoc get(String docId) {
+            ApiKeyService.CachedApiKeyDoc existing = docCache.get(docId);
+            if (existing != null) {
+                final BytesReference roleDescriptorsBytes = roleDescriptorsBytesCache.get(existing.roleDescriptorsHash);
+                final BytesReference limitedByRoleDescriptorsBytes = roleDescriptorsBytesCache.get(existing.limitedByRoleDescriptorsHash);
+                if (roleDescriptorsBytes != null && limitedByRoleDescriptorsBytes != null) {
+                    return existing.toApiKeyDoc(roleDescriptorsBytes, limitedByRoleDescriptorsBytes);
+                }
+            }
+            return null;
+        }
+
+        public long getInvalidationCount() {
+            return docCache.getInvalidationCount();
+        }
+
+        public void putIfNoInvalidationSince(String docId, ApiKeyDoc apiKeyDoc, long invalidationCount) throws ExecutionException {
+            final CachedApiKeyDoc cachedApiKeyDoc = apiKeyDoc.toCachedApiKeyDoc();
+            if (docCache.putIfNoInvalidationSince(docId, cachedApiKeyDoc, invalidationCount)) {
+                roleDescriptorsBytesCache.computeIfAbsent(
+                    cachedApiKeyDoc.roleDescriptorsHash, k -> apiKeyDoc.roleDescriptorsBytes);
+                roleDescriptorsBytesCache.computeIfAbsent(
+                    cachedApiKeyDoc.limitedByRoleDescriptorsHash, k -> apiKeyDoc.limitedByRoleDescriptorsBytes);
+            }
+        }
+
+        public void invalidate(Collection<String> docIds) {
+            docCache.invalidate(docIds);
+        }
+
+        public void invalidateAll() {
+            docCache.invalidateAll();
+            roleDescriptorsBytesCache.invalidateAll();
+        }
+    }
 }

+ 46 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/apikey/RestClearApiKeyCacheAction.java

@@ -0,0 +1,46 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.security.rest.action.apikey;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestActions.NodesResponseRestListener;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheAction;
+import org.elasticsearch.xpack.core.security.action.ClearSecurityCacheRequest;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.POST;
+
+public class RestClearApiKeyCacheAction extends ApiKeyBaseRestHandler {
+
+    public RestClearApiKeyCacheAction(Settings settings, XPackLicenseState licenseState) {
+        super(settings, licenseState);
+    }
+
+    @Override
+    public String getName() {
+        return "security_clear_api_key_cache_action";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return Collections.singletonList(
+            new Route(POST, "/_security/api_key/{ids}/_clear_cache"));
+    }
+
+    @Override
+    protected RestChannelConsumer innerPrepareRequest(RestRequest request, NodeClient client) throws IOException {
+        String[] ids = request.paramAsStringArrayOrEmptyIfAll("ids");
+        final ClearSecurityCacheRequest req = new ClearSecurityCacheRequest().cacheName("api_key").keys(ids);
+        return channel -> client.execute(ClearSecurityCacheAction.INSTANCE, req, new NodesResponseRestListener<>(channel));
+    }
+}

+ 64 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/CacheInvalidatorRegistry.java

@@ -0,0 +1,64 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.security.support;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.elasticsearch.xpack.security.support.SecurityIndexManager.isIndexDeleted;
+import static org.elasticsearch.xpack.security.support.SecurityIndexManager.isMoveFromRedToNonRed;
+
+/**
+ * A registry that provides common cache invalidation services for caches that relies on the security index.
+ */
+public class CacheInvalidatorRegistry {
+
+    private final Map<String, CacheInvalidator> cacheInvalidators = new ConcurrentHashMap<>();
+
+    public CacheInvalidatorRegistry() {
+    }
+
+    public void registerCacheInvalidator(String name, CacheInvalidator cacheInvalidator) {
+        if (cacheInvalidators.containsKey(name)) {
+            throw new IllegalArgumentException("Cache invalidator registry already has an entry with name: [" + name + "]");
+        }
+        cacheInvalidators.put(name, cacheInvalidator);
+    }
+
+    public void onSecurityIndexStageChange(SecurityIndexManager.State previousState, SecurityIndexManager.State currentState) {
+        if (isMoveFromRedToNonRed(previousState, currentState)
+            || isIndexDeleted(previousState, currentState)
+            || previousState.isIndexUpToDate != currentState.isIndexUpToDate) {
+            cacheInvalidators.values().forEach(CacheInvalidator::invalidateAll);
+        }
+    }
+
+    public void invalidateByKey(String cacheName, Collection<String> keys) {
+        final CacheInvalidator cacheInvalidator = cacheInvalidators.get(cacheName);
+        if (cacheInvalidator != null) {
+            cacheInvalidator.invalidate(keys);
+        } else {
+            throw new IllegalArgumentException("No cache named [" + cacheName + "] is found");
+        }
+    }
+
+    public void invalidateCache(String cacheName) {
+        final CacheInvalidator cacheInvalidator = cacheInvalidators.get(cacheName);
+        if (cacheInvalidator != null) {
+            cacheInvalidator.invalidateAll();
+        } else {
+            throw new IllegalArgumentException("No cache named [" + cacheName + "] is found");
+        }
+    }
+
+    public interface CacheInvalidator {
+        void invalidate(Collection<String> keys);
+
+        void invalidateAll();
+    }
+}

+ 76 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/InvalidationCountingCacheWrapper.java

@@ -0,0 +1,76 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.security.support;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.cache.Cache;
+import org.elasticsearch.common.util.concurrent.ReleasableLock;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A wrapper of {@link Cache} that keeps a counter for invalidation calls in order to
+ * minimizes the possibility of caching stale results.
+ */
+public class InvalidationCountingCacheWrapper<K, V> {
+
+    private static final Logger logger = LogManager.getLogger(InvalidationCountingCacheWrapper.class);
+
+    private final Cache<K, V> delegate;
+    private final AtomicLong numInvalidation = new AtomicLong();
+    private final ReadWriteLock invalidationLock = new ReentrantReadWriteLock();
+    private final ReleasableLock invalidationReadLock = new ReleasableLock(invalidationLock.readLock());
+    private final ReleasableLock invalidationWriteLock = new ReleasableLock(invalidationLock.writeLock());
+
+    public InvalidationCountingCacheWrapper(Cache<K, V> delegate) {
+        this.delegate = delegate;
+    }
+
+    public long getInvalidationCount() {
+        return numInvalidation.get();
+    }
+
+    public boolean putIfNoInvalidationSince(K key, V value, long invalidationCount) {
+        assert invalidationCount >= 0 : "Invalidation count must be non-negative";
+        try (ReleasableLock ignored = invalidationReadLock.acquire()) {
+            if (invalidationCount == numInvalidation.get()) {
+                logger.debug("Caching for key [{}], value [{}]", key, value);
+                delegate.put(key, value);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public V get(K key) {
+        return delegate.get(key);
+    }
+
+    public void invalidate(Collection<K> keys) {
+        try (ReleasableLock ignored = invalidationWriteLock.acquire()) {
+            numInvalidation.incrementAndGet();
+        }
+        logger.debug("Invalidating for keys [{}]", keys);
+        keys.forEach(delegate::invalidate);
+    }
+
+    public void invalidateAll() {
+        try (ReleasableLock ignored = invalidationWriteLock.acquire()) {
+            numInvalidation.incrementAndGet();
+        }
+        logger.debug("Invalidating all cache entries");
+        delegate.invalidateAll();
+    }
+
+    public int count() {
+        return delegate.count();
+    }
+}

+ 3 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailFilterTests.java

@@ -38,6 +38,7 @@ import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrailTests.Moc
 import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrailTests.RestContent;
 import org.elasticsearch.xpack.security.authc.ApiKeyService;
 import org.elasticsearch.xpack.security.rest.RemoteHostHeader;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
 import org.elasticsearch.xpack.security.transport.filter.SecurityIpFilterRule;
 import org.junit.Before;
@@ -91,7 +92,8 @@ public class LoggingAuditTrailFilterTests extends ESTestCase {
             return null;
         }).when(clusterService).addListener(Mockito.isA(LoggingAuditTrail.class));
         apiKeyService = new ApiKeyService(settings, Clock.systemUTC(), mock(Client.class), new XPackLicenseState(settings, () -> 0),
-                mock(SecurityIndexManager.class), clusterService, mock(ThreadPool.class));
+                                          mock(SecurityIndexManager.class), clusterService,
+                                          mock(CacheInvalidatorRegistry.class), mock(ThreadPool.class));
     }
 
     public void testPolicyDoesNotMatchNullValuesInEvent() throws Exception {

+ 3 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailTests.java

@@ -54,6 +54,7 @@ import org.elasticsearch.xpack.security.audit.AuditTrail;
 import org.elasticsearch.xpack.security.audit.AuditUtil;
 import org.elasticsearch.xpack.security.authc.ApiKeyService;
 import org.elasticsearch.xpack.security.rest.RemoteHostHeader;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
 import org.elasticsearch.xpack.security.transport.filter.IPFilter;
 import org.elasticsearch.xpack.security.transport.filter.SecurityIpFilterRule;
@@ -221,7 +222,8 @@ public class LoggingAuditTrailTests extends ESTestCase {
         logger = CapturingLogger.newCapturingLogger(randomFrom(Level.OFF, Level.FATAL, Level.ERROR, Level.WARN, Level.INFO), patternLayout);
         auditTrail = new LoggingAuditTrail(settings, clusterService, logger, threadContext);
         apiKeyService = new ApiKeyService(settings, Clock.systemUTC(), client, new XPackLicenseState(settings, () -> 0),
-                securityIndexManager, clusterService, mock(ThreadPool.class));
+                                          securityIndexManager, clusterService,
+                                          mock(CacheInvalidatorRegistry.class), mock(ThreadPool.class));
     }
 
     @After

+ 130 - 3
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java

@@ -11,6 +11,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.bulk.BulkAction;
 import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -57,6 +58,7 @@ import org.elasticsearch.xpack.security.authc.ApiKeyService.ApiKeyDoc;
 import org.elasticsearch.xpack.security.authc.ApiKeyService.ApiKeyRoleDescriptors;
 import org.elasticsearch.xpack.security.authc.ApiKeyService.CachedApiKeyHashResult;
 import org.elasticsearch.xpack.security.authz.store.NativePrivilegeStore;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
 import org.elasticsearch.xpack.security.support.FeatureNotEnabledException;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
 import org.elasticsearch.xpack.security.test.SecurityMocks;
@@ -105,6 +107,9 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ApiKeyServiceTests extends ESTestCase {
@@ -113,6 +118,7 @@ public class ApiKeyServiceTests extends ESTestCase {
     private XPackLicenseState licenseState;
     private Client client;
     private SecurityIndexManager securityIndex;
+    private CacheInvalidatorRegistry cacheInvalidatorRegistry;
 
     @Before
     public void createThreadPool() {
@@ -136,6 +142,7 @@ public class ApiKeyServiceTests extends ESTestCase {
 
         this.client = mock(Client.class);
         this.securityIndex = SecurityMocks.mockSecurityIndexManager();
+        this.cacheInvalidatorRegistry = mock(CacheInvalidatorRegistry.class);
     }
 
     public void testCreateApiKeyUsesBulkIndexAction() {
@@ -336,6 +343,11 @@ public class ApiKeyServiceTests extends ESTestCase {
 
     private void mockKeyDocument(ApiKeyService service, String id, String key, User user, boolean invalidated,
                                  Duration expiry) throws IOException {
+        mockKeyDocument(service, id, key, user, invalidated, expiry, null);
+    }
+
+    private void mockKeyDocument(ApiKeyService service, String id, String key, User user, boolean invalidated,
+                                 Duration expiry, List<RoleDescriptor> keyRoles) throws IOException {
         final Authentication authentication;
         if (user.isRunAs()) {
             authentication = new Authentication(user, new RealmRef("authRealm", "test", "foo"),
@@ -348,7 +360,7 @@ public class ApiKeyServiceTests extends ESTestCase {
                             AuthenticationType.ANONYMOUS), Collections.emptyMap());
         }
         XContentBuilder docSource = service.newDocument(new SecureString(key.toCharArray()), "test", authentication,
-            Collections.singleton(SUPERUSER_ROLE_DESCRIPTOR), Instant.now(), Instant.now().plus(expiry), null,
+            Collections.singleton(SUPERUSER_ROLE_DESCRIPTOR), Instant.now(), Instant.now().plus(expiry), keyRoles,
             Version.CURRENT);
         if (invalidated) {
             Map<String, Object> map = XContentHelper.convertToMap(BytesReference.bytes(docSource), true, XContentType.JSON).v2();
@@ -711,6 +723,113 @@ public class ApiKeyServiceTests extends ESTestCase {
         assertThat(result.isAuthenticated(), is(true));
         CachedApiKeyHashResult cachedApiKeyHashResult = service.getFromCache(creds.getId());
         assertNull(cachedApiKeyHashResult);
+        assertNull(service.getDocCache());
+        assertNull(service.getRoleDescriptorsBytesCache());
+    }
+
+    public void testApiKeyDocCacheCanBeDisabledSeparately() {
+        final String apiKey = randomAlphaOfLength(16);
+        Hasher hasher = randomFrom(Hasher.PBKDF2, Hasher.BCRYPT4, Hasher.BCRYPT);
+        final char[] hash = hasher.hash(new SecureString(apiKey.toCharArray()));
+        final Settings settings = Settings.builder()
+            .put(ApiKeyService.DOC_CACHE_TTL_SETTING.getKey(), "0s")
+            .build();
+
+        ApiKeyDoc apiKeyDoc = buildApiKeyDoc(hash, -1, false);
+
+        ApiKeyService service = createApiKeyService(settings);
+
+        ApiKeyCredentials creds = new ApiKeyCredentials(randomAlphaOfLength(12), new SecureString(apiKey.toCharArray()));
+        PlainActionFuture<AuthenticationResult> future = new PlainActionFuture<>();
+        service.validateApiKeyCredentials(creds.getId(), apiKeyDoc, creds, Clock.systemUTC(), future);
+        AuthenticationResult result = future.actionGet();
+        assertThat(result.isAuthenticated(), is(true));
+        CachedApiKeyHashResult cachedApiKeyHashResult = service.getFromCache(creds.getId());
+        assertNotNull(cachedApiKeyHashResult);
+        assertNull(service.getDocCache());
+        assertNull(service.getRoleDescriptorsBytesCache());
+    }
+
+    public void testApiKeyDocCache() throws IOException, ExecutionException, InterruptedException {
+        ApiKeyService service = createApiKeyService(Settings.EMPTY);
+        assertNotNull(service.getDocCache());
+        assertNotNull(service.getRoleDescriptorsBytesCache());
+        final ThreadContext threadContext = threadPool.getThreadContext();
+
+        // 1. A new API key document will be cached after its authentication
+        final String docId = randomAlphaOfLength(16);
+        final String apiKey = randomAlphaOfLength(16);
+        ApiKeyCredentials apiKeyCredentials = new ApiKeyCredentials(docId, new SecureString(apiKey.toCharArray()));
+        mockKeyDocument(service, docId, apiKey, new User("hulk", "superuser"), false, Duration.ofSeconds(3600));
+        PlainActionFuture<AuthenticationResult> future = new PlainActionFuture<>();
+        service.loadApiKeyAndValidateCredentials(threadContext, apiKeyCredentials, future);
+        final ApiKeyService.CachedApiKeyDoc cachedApiKeyDoc = service.getDocCache().get(docId);
+        assertNotNull(cachedApiKeyDoc);
+        assertEquals("hulk", cachedApiKeyDoc.creator.get("principal"));
+        final BytesReference roleDescriptorsBytes =
+            service.getRoleDescriptorsBytesCache().get(cachedApiKeyDoc.roleDescriptorsHash);
+        assertNotNull(roleDescriptorsBytes);
+        assertEquals("{}", roleDescriptorsBytes.utf8ToString());
+        final BytesReference limitedByRoleDescriptorsBytes =
+            service.getRoleDescriptorsBytesCache().get(cachedApiKeyDoc.limitedByRoleDescriptorsHash);
+        assertNotNull(limitedByRoleDescriptorsBytes);
+        final List<RoleDescriptor> limitedByRoleDescriptors = service.parseRoleDescriptors(docId, limitedByRoleDescriptorsBytes);
+        assertEquals(1, limitedByRoleDescriptors.size());
+        assertEquals(SUPERUSER_ROLE_DESCRIPTOR, limitedByRoleDescriptors.get(0));
+
+        // 2. A different API Key with the same role descriptors will share the entries in the role descriptor cache
+        final String docId2 = randomAlphaOfLength(16);
+        final String apiKey2 = randomAlphaOfLength(16);
+        ApiKeyCredentials apiKeyCredentials2 = new ApiKeyCredentials(docId2, new SecureString(apiKey2.toCharArray()));
+        mockKeyDocument(service, docId2, apiKey2, new User("thor", "superuser"), false, Duration.ofSeconds(3600));
+        PlainActionFuture<AuthenticationResult> future2 = new PlainActionFuture<>();
+        service.loadApiKeyAndValidateCredentials(threadContext, apiKeyCredentials2, future2);
+        final ApiKeyService.CachedApiKeyDoc cachedApiKeyDoc2 = service.getDocCache().get(docId2);
+        assertNotNull(cachedApiKeyDoc2);
+        assertEquals("thor", cachedApiKeyDoc2.creator.get("principal"));
+        final BytesReference roleDescriptorsBytes2 =
+            service.getRoleDescriptorsBytesCache().get(cachedApiKeyDoc2.roleDescriptorsHash);
+        assertSame(roleDescriptorsBytes, roleDescriptorsBytes2);
+        final BytesReference limitedByRoleDescriptorsBytes2 =
+            service.getRoleDescriptorsBytesCache().get(cachedApiKeyDoc2.limitedByRoleDescriptorsHash);
+        assertSame(limitedByRoleDescriptorsBytes, limitedByRoleDescriptorsBytes2);
+
+        // 3. Different role descriptors will be cached into a separate entry
+        final String docId3 = randomAlphaOfLength(16);
+        final String apiKey3 = randomAlphaOfLength(16);
+        ApiKeyCredentials apiKeyCredentials3 = new ApiKeyCredentials(docId3, new SecureString(apiKey3.toCharArray()));
+        final List<RoleDescriptor> keyRoles =
+            List.of(RoleDescriptor.parse("key-role", new BytesArray("{\"cluster\":[\"monitor\"]}"), true, XContentType.JSON));
+        mockKeyDocument(service, docId3, apiKey3, new User("banner", "superuser"),
+                        false, Duration.ofSeconds(3600), keyRoles);
+        PlainActionFuture<AuthenticationResult> future3 = new PlainActionFuture<>();
+        service.loadApiKeyAndValidateCredentials(threadContext, apiKeyCredentials3, future3);
+        final ApiKeyService.CachedApiKeyDoc cachedApiKeyDoc3 = service.getDocCache().get(docId3);
+        assertNotNull(cachedApiKeyDoc3);
+        assertEquals("banner", cachedApiKeyDoc3.creator.get("principal"));
+        // Shared bytes for limitedBy role since it is the same
+        assertSame(limitedByRoleDescriptorsBytes,
+                   service.getRoleDescriptorsBytesCache().get(cachedApiKeyDoc3.limitedByRoleDescriptorsHash));
+        // But role descriptors bytes are different
+        final BytesReference roleDescriptorsBytes3 = service.getRoleDescriptorsBytesCache().get(cachedApiKeyDoc3.roleDescriptorsHash);
+        assertNotSame(roleDescriptorsBytes, roleDescriptorsBytes3);
+        assertEquals(3, service.getRoleDescriptorsBytesCache().count());
+
+        // 4. Will fetch document from security index if role descriptors are not found even when
+        //    cachedApiKeyDoc is available
+        service.getRoleDescriptorsBytesCache().invalidateAll();
+        mockKeyDocument(service, docId, apiKey, new User("hulk", "superuser"), false, Duration.ofSeconds(3600));
+        PlainActionFuture<AuthenticationResult> future4 = new PlainActionFuture<>();
+        service.loadApiKeyAndValidateCredentials(threadContext, apiKeyCredentials, future4);
+        verify(client, times(4)).get(any(GetRequest.class), any(ActionListener.class));
+        assertEquals(2, service.getRoleDescriptorsBytesCache().count());
+        assertSame(AuthenticationResult.Status.SUCCESS, future4.get().getStatus());
+
+        // 5. Cached entries will be used for the same API key doc
+        SecurityMocks.mockGetRequestException(client, new EsRejectedExecutionException("rejected"));
+        PlainActionFuture<AuthenticationResult> future5 = new PlainActionFuture<>();
+        service.loadApiKeyAndValidateCredentials(threadContext, apiKeyCredentials, future5);
+        assertSame(AuthenticationResult.Status.SUCCESS, future5.get().getStatus());
     }
 
     public void testWillGetLookedUpByRealmNameIfExists() {
@@ -923,8 +1042,16 @@ public class ApiKeyServiceTests extends ESTestCase {
             .put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true)
             .put(baseSettings)
             .build();
-        return new ApiKeyService(settings, Clock.systemUTC(), client, licenseState, securityIndex,
-            ClusterServiceUtils.createClusterService(threadPool), threadPool);
+        final ApiKeyService service = new ApiKeyService(
+            settings, Clock.systemUTC(), client, licenseState, securityIndex,
+            ClusterServiceUtils.createClusterService(threadPool),
+            cacheInvalidatorRegistry, threadPool);
+        if ("0s".equals(settings.get(ApiKeyService.CACHE_TTL_SETTING.getKey()))) {
+            verify(cacheInvalidatorRegistry, never()).registerCacheInvalidator(eq("api_key"), any());
+        } else {
+            verify(cacheInvalidatorRegistry).registerCacheInvalidator(eq("api_key"), any());
+        }
+        return service;
     }
 
     private Map<String, Object> buildApiKeySourceDoc(char[] hash) {

+ 3 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java

@@ -86,6 +86,7 @@ import org.elasticsearch.xpack.security.audit.AuditTrailService;
 import org.elasticsearch.xpack.security.audit.AuditUtil;
 import org.elasticsearch.xpack.security.authc.AuthenticationService.Authenticator;
 import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
 import org.junit.After;
 import org.junit.Before;
@@ -253,7 +254,8 @@ public class AuthenticationServiceTests extends ESTestCase {
         }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
         ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
         final SecurityContext securityContext = new SecurityContext(settings, threadContext);
-        apiKeyService = new ApiKeyService(settings, Clock.systemUTC(), client, licenseState, securityIndex, clusterService, threadPool);
+        apiKeyService = new ApiKeyService(settings, Clock.systemUTC(), client, licenseState, securityIndex, clusterService,
+                                          mock(CacheInvalidatorRegistry.class), threadPool);
         tokenService = new TokenService(settings, Clock.systemUTC(), client, licenseState, securityContext, securityIndex, securityIndex,
             clusterService);
         service = new AuthenticationService(settings, realms, auditTrailService,

+ 3 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/SecondaryAuthenticatorTests.java

@@ -47,6 +47,7 @@ import org.elasticsearch.xpack.security.authc.ApiKeyService;
 import org.elasticsearch.xpack.security.authc.AuthenticationService;
 import org.elasticsearch.xpack.security.authc.Realms;
 import org.elasticsearch.xpack.security.authc.TokenService;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
 import org.elasticsearch.xpack.security.test.SecurityMocks;
 import org.hamcrest.Matchers;
@@ -121,7 +122,8 @@ public class SecondaryAuthenticatorTests extends ESTestCase {
 
         tokenService = new TokenService(settings, clock, client, licenseState, securityContext, securityIndex, tokensIndex, clusterService);
         final ApiKeyService apiKeyService = new ApiKeyService(settings, clock, client, licenseState,
-            securityIndex, clusterService, threadPool);
+                                                              securityIndex, clusterService,
+                                                              mock(CacheInvalidatorRegistry.class),threadPool);
         authenticationService = new AuthenticationService(settings, realms, auditTrail, failureHandler, threadPool, anonymous,
             tokenService, apiKeyService);
         authenticator = new SecondaryAuthenticator(securityContext, authenticationService);

+ 3 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java

@@ -68,6 +68,7 @@ import org.elasticsearch.xpack.core.security.user.User;
 import org.elasticsearch.xpack.core.security.user.XPackUser;
 import org.elasticsearch.xpack.security.audit.AuditUtil;
 import org.elasticsearch.xpack.security.authc.ApiKeyService;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;
 import org.elasticsearch.xpack.security.support.SecurityIndexManager;
 
 import java.io.IOException;
@@ -1036,7 +1037,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         ThreadContext threadContext = new ThreadContext(SECURITY_ENABLED_SETTINGS);
         ApiKeyService apiKeyService = spy(new ApiKeyService(SECURITY_ENABLED_SETTINGS, Clock.systemUTC(), mock(Client.class),
                 new XPackLicenseState(SECURITY_ENABLED_SETTINGS, () -> 0), mock(SecurityIndexManager.class), mock(ClusterService.class),
-                mock(ThreadPool.class)));
+                mock(CacheInvalidatorRegistry.class), mock(ThreadPool.class)));
         NativePrivilegeStore nativePrivStore = mock(NativePrivilegeStore.class);
         doAnswer(invocationOnMock -> {
             ActionListener<Collection<ApplicationPrivilegeDescriptor>> listener =
@@ -1089,7 +1090,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
 
         ApiKeyService apiKeyService = spy(new ApiKeyService(SECURITY_ENABLED_SETTINGS, Clock.systemUTC(), mock(Client.class),
                 new XPackLicenseState(SECURITY_ENABLED_SETTINGS, () -> 0), mock(SecurityIndexManager.class), mock(ClusterService.class),
-                mock(ThreadPool.class)));
+                mock(CacheInvalidatorRegistry.class), mock(ThreadPool.class)));
         NativePrivilegeStore nativePrivStore = mock(NativePrivilegeStore.class);
         doAnswer(invocationOnMock -> {
             ActionListener<Collection<ApplicationPrivilegeDescriptor>> listener =

+ 91 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/CacheInvalidatorRegistryTests.java

@@ -0,0 +1,91 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.security.support;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry.CacheInvalidator;
+import org.junit.Before;
+
+import java.time.Instant;
+import java.util.List;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class CacheInvalidatorRegistryTests extends ESTestCase {
+
+    private CacheInvalidatorRegistry cacheInvalidatorRegistry;
+
+    @Before
+    public void setup() {
+        cacheInvalidatorRegistry = new CacheInvalidatorRegistry();
+    }
+
+    public void testRegistryWillNotAllowInvalidatorsWithDuplicatedName() {
+        cacheInvalidatorRegistry.registerCacheInvalidator("service1", mock(CacheInvalidator.class));
+        final IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> cacheInvalidatorRegistry.registerCacheInvalidator("service1", mock(CacheInvalidator.class)));
+        assertThat(e.getMessage(), containsString("already has an entry with name: [service1]"));
+    }
+
+    public void testSecurityIndexStateChangeWillInvalidateAllRegisteredInvalidators() {
+        final CacheInvalidator invalidator1 = mock(CacheInvalidator.class);
+        cacheInvalidatorRegistry.registerCacheInvalidator("service1", invalidator1);
+        final CacheInvalidator invalidator2 = mock(CacheInvalidator.class);
+        cacheInvalidatorRegistry.registerCacheInvalidator("service2", invalidator2);
+
+        final SecurityIndexManager.State previousState = SecurityIndexManager.State.UNRECOVERED_STATE;
+        final SecurityIndexManager.State currentState = new SecurityIndexManager.State(
+            Instant.now(), true, true, true, Version.CURRENT,
+            ".security", ClusterHealthStatus.GREEN, IndexMetadata.State.OPEN);
+
+        cacheInvalidatorRegistry.onSecurityIndexStageChange(previousState, currentState);
+        verify(invalidator1).invalidateAll();
+        verify(invalidator2).invalidateAll();
+    }
+
+    public void testInvalidateByKeyCallsCorrectInvalidatorObject() {
+        final CacheInvalidator invalidator1 = mock(CacheInvalidator.class);
+        cacheInvalidatorRegistry.registerCacheInvalidator("service1", invalidator1);
+        final CacheInvalidator invalidator2 = mock(CacheInvalidator.class);
+        cacheInvalidatorRegistry.registerCacheInvalidator("service2", invalidator2);
+
+        cacheInvalidatorRegistry.invalidateByKey("service2", List.of("k1", "k2"));
+        verify(invalidator1, never()).invalidate(any());
+        verify(invalidator2).invalidate(List.of("k1", "k2"));
+
+        // Trying to invalidate entries from a non-existing cache will throw error
+        final IllegalArgumentException e =
+            expectThrows(IllegalArgumentException.class,
+                () -> cacheInvalidatorRegistry.invalidateByKey("non-exist", List.of("k1", "k2")));
+        assertThat(e.getMessage(), containsString("No cache named [non-exist] is found"));
+    }
+
+    public void testInvalidateCache() {
+        final CacheInvalidator invalidator1 = mock(CacheInvalidator.class);
+        cacheInvalidatorRegistry.registerCacheInvalidator("service1", invalidator1);
+        final CacheInvalidator invalidator2 = mock(CacheInvalidator.class);
+        cacheInvalidatorRegistry.registerCacheInvalidator("service2", invalidator2);
+
+        cacheInvalidatorRegistry.invalidateCache("service1");
+        verify(invalidator1).invalidateAll();
+        verify(invalidator2, never()).invalidateAll();
+
+        // Trying to invalidate entries from a non-existing cache will throw error
+        final IllegalArgumentException e =
+            expectThrows(IllegalArgumentException.class,
+                () -> cacheInvalidatorRegistry.invalidateCache("non-exist"));
+        assertThat(e.getMessage(), containsString("No cache named [non-exist] is found"));
+    }
+}

+ 70 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/InvalidationCountingCacheWrapperTests.java

@@ -0,0 +1,70 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.security.support;
+
+import org.elasticsearch.common.cache.CacheBuilder;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class InvalidationCountingCacheWrapperTests extends ESTestCase {
+
+    private InvalidationCountingCacheWrapper<String, String> invalidationCountingCacheWrapper;
+
+    @Before
+    public void setup() {
+        invalidationCountingCacheWrapper = new InvalidationCountingCacheWrapper<>(CacheBuilder.<String, String>builder().build());
+    }
+
+    public void testItemWillCached() {
+        final long invalidationCount = invalidationCountingCacheWrapper.getInvalidationCount();
+        assertTrue(invalidationCountingCacheWrapper.putIfNoInvalidationSince("foo", "bar", invalidationCount));
+        assertEquals("bar", invalidationCountingCacheWrapper.get("foo"));
+    }
+
+    public void testItemWillNotBeCachedIfInvalidationCounterHasChanged() throws InterruptedException {
+        final long invalidationCount = invalidationCountingCacheWrapper.getInvalidationCount();
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        new Thread(() -> {
+            invalidationCountingCacheWrapper.invalidate(List.of("fizz"));
+            countDownLatch.countDown();
+        }).start();
+        countDownLatch.await();
+        invalidationCountingCacheWrapper.putIfNoInvalidationSince("foo", "bar", invalidationCount);
+        assertNull(invalidationCountingCacheWrapper.get("foo"));
+    }
+
+    public void testInvalidate() {
+        final long invalidationCount = invalidationCountingCacheWrapper.getInvalidationCount();
+        invalidationCountingCacheWrapper.putIfNoInvalidationSince("foo", "bar", invalidationCount);
+        invalidationCountingCacheWrapper.putIfNoInvalidationSince("fizz", "buzz", invalidationCount);
+        invalidationCountingCacheWrapper.putIfNoInvalidationSince("hello", "world", invalidationCount);
+
+        assertEquals(3, invalidationCountingCacheWrapper.count());
+        assertEquals("bar", invalidationCountingCacheWrapper.get("foo"));
+        assertEquals("buzz", invalidationCountingCacheWrapper.get("fizz"));
+        assertEquals("world", invalidationCountingCacheWrapper.get("hello"));
+
+        invalidationCountingCacheWrapper.invalidate(List.of("foo", "hello"));
+        assertEquals(1, invalidationCountingCacheWrapper.count());
+        assertEquals("buzz", invalidationCountingCacheWrapper.get("fizz"));
+        assertNull(invalidationCountingCacheWrapper.get("foo"));
+        assertNull(invalidationCountingCacheWrapper.get("hello"));
+    }
+
+    public void testInvalidateAll() {
+        final long invalidationCount = invalidationCountingCacheWrapper.getInvalidationCount();
+        invalidationCountingCacheWrapper.putIfNoInvalidationSince("foo", "bar", invalidationCount);
+        invalidationCountingCacheWrapper.putIfNoInvalidationSince("fizz", "buzz", invalidationCount);
+        invalidationCountingCacheWrapper.putIfNoInvalidationSince("hello", "world", invalidationCount);
+
+        invalidationCountingCacheWrapper.invalidateAll();
+        assertEquals(0, invalidationCountingCacheWrapper.count());
+    }
+}

+ 26 - 0
x-pack/plugin/src/test/resources/rest-api-spec/api/security.clear_api_key_cache.json

@@ -0,0 +1,26 @@
+{
+  "security.clear_api_key_cache":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-clear-api-key-cache.html",
+      "description":"Clear a subset or all entries from the API key cache."
+    },
+    "stability":"stable",
+    "url":{
+      "paths":[
+        {
+          "path":"/_security/api_key/{ids}/_clear_cache",
+          "methods":[
+            "POST"
+          ],
+          "parts":{
+            "ids":{
+              "type":"list",
+              "description":"A comma-separated list of IDs of API keys to clear from the cache"
+            }
+          }
+        }
+      ]
+    },
+    "params":{}
+  }
+}

+ 15 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/api_key/10_basic.yml

@@ -111,6 +111,7 @@ teardown:
   - is_true: id
   - is_true: api_key
   - is_true: expiration
+  - set: { id: api_key_id }
   - transform_and_set: { login_creds: "#base64EncodeCredentials(id,api_key)" }
 
   - do:
@@ -123,6 +124,13 @@ teardown:
   - match: { authentication_realm.name: "_es_api_key" }
   - match: { authentication_realm.type: "_es_api_key" }
 
+  - do:
+      security.clear_api_key_cache:
+        ids: "${api_key_id}"
+
+  - match: { _nodes.failed: 0 }
+
+
 ---
 "Test get api key":
   - skip:
@@ -185,6 +193,13 @@ teardown:
   - match: { "api_keys.0.invalidated": false }
   - is_true: "api_keys.0.creation"
 
+  - do:
+      security.clear_api_key_cache:
+        ids: ""
+
+  - match: { _nodes.failed: 0 }
+
+
 ---
 "Test invalidate api key":
   - skip: