Explorar o código

Support shard request cache for queries with DLS and FLS (#70191)

Shard level request cache is now generally supported for queries with DLS
and/or FLS. The request cache is now enabled following the same rule as a
regular search w/o DLS/FLS except following few scenarios where the request
cache will still be disabled: 
1. DLS query uses a stored script 
2. The search targets any remote indices 
3. The cluster has any nodes older than v7.11.2

It is worth noting that the caching behaviour is overall safety over
efficiency. This means two functional equivalent set of DLS or FLS permissions
can possibly result into different cache entries. We consider this a better
tradeoff due to higher priorities for correctness and security.
Yang Wang %!s(int64=4) %!d(string=hai) anos
pai
achega
4a8ff0f26b
Modificáronse 37 ficheiros con 1696 adicións e 137 borrados
  1. 6 2
      server/src/main/java/org/elasticsearch/indices/IndicesService.java
  2. 1 1
      server/src/main/java/org/elasticsearch/node/Node.java
  3. 14 0
      server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java
  4. 28 0
      server/src/main/java/org/elasticsearch/search/SearchModule.java
  5. 5 1
      server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java
  6. 39 0
      server/src/test/java/org/elasticsearch/search/SearchModuleTests.java
  7. 17 1
      server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java
  8. 2 1
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  9. 1 0
      x-pack/plugin/build.gradle
  10. 4 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/user/GetUserPrivilegesResponse.java
  11. 38 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/IndicesAccessControl.java
  12. 96 21
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/DocumentPermissions.java
  13. 60 25
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissions.java
  14. 14 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsCache.java
  15. 37 9
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsDefinition.java
  16. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/LimitedRole.java
  17. 32 15
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/support/DLSRoleQueryValidator.java
  18. 10 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/support/SecurityQueryTemplateEvaluator.java
  19. 20 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/support/CacheKey.java
  20. 23 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java
  21. 56 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/permission/DocumentPermissionsTests.java
  22. 77 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsTests.java
  23. 13 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/support/DLSRoleQueryValidatorTests.java
  24. 118 0
      x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/dlsfls/DlsRequestCacheIT.java
  25. 5 0
      x-pack/plugin/security/qa/security-trial/src/javaRestTest/resources/roles.yml
  26. 420 0
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DlsFlsRequestCacheTests.java
  27. 20 2
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
  28. 65 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/DlsFlsRequestCacheDifferentiator.java
  29. 9 2
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java
  30. 24 24
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java
  31. 31 21
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestInterceptor.java
  32. 81 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ShardSearchRequestInterceptor.java
  33. 6 2
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/UpdateRequestInterceptor.java
  34. 106 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/DlsFlsRequestCacheDifferentiatorTests.java
  35. 119 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestInterceptorTests.java
  36. 96 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ShardSearchRequestInterceptorTests.java
  37. 2 2
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/user/RestGetUserPrivilegesActionTests.java

+ 6 - 2
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -33,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.common.CheckedSupplier;
@@ -227,6 +228,7 @@ public class IndicesService extends AbstractLifecycleComponent
     private final boolean nodeWriteDanglingIndicesInfo;
     private final ValuesSourceRegistry valuesSourceRegistry;
     private final TimestampFieldMapperService timestampFieldMapperService;
+    private final CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator;
 
     @Override
     protected void doStart() {
@@ -246,7 +248,8 @@ public class IndicesService extends AbstractLifecycleComponent
                           Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry,
                           Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
                           List<IndexStorePlugin.IndexFoldersDeletionListener> indexFoldersDeletionListeners,
-                          Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers) {
+                          Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers,
+                          CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator) {
         this.settings = settings;
         this.threadPool = threadPool;
         this.pluginsService = pluginsService;
@@ -295,6 +298,7 @@ public class IndicesService extends AbstractLifecycleComponent
         this.recoveryStateFactories = recoveryStateFactories;
         this.indexFoldersDeletionListeners = new CompositeIndexFoldersDeletionListener(indexFoldersDeletionListeners);
         this.snapshotCommitSuppliers = snapshotCommitSuppliers;
+        this.requestCacheKeyDifferentiator = requestCacheKeyDifferentiator;
         // doClose() is called when shutting down a node, yet there might still be ongoing requests
         // that we need to wait for before closing some resources such as the caches. In order to
         // avoid closing these resources while ongoing requests are still being processed, we use a
@@ -1399,7 +1403,7 @@ public class IndicesService extends AbstractLifecycleComponent
         final DirectoryReader directoryReader = context.searcher().getDirectoryReader();
 
         boolean[] loadedFromCache = new boolean[] { true };
-        BytesReference cacheKey = request.cacheKey();
+        BytesReference cacheKey = request.cacheKey(requestCacheKeyDifferentiator);
         BytesReference bytesReference = cacheShardLevelResult(
             context.indexShard(),
             context.getSearchExecutionContext().mappingCacheKey(),

+ 1 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -518,7 +518,7 @@ public class Node implements Closeable {
                     threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService,
                     clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories,
                     searchModule.getValuesSourceRegistry(), recoveryStateFactories, indexFoldersDeletionListeners,
-                    snapshotCommitSuppliers);
+                    snapshotCommitSuppliers, searchModule.getRequestCacheKeyDifferentiator());
 
             final AliasValidator aliasValidator = new AliasValidator();
 

+ 14 - 0
server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java

@@ -9,15 +9,18 @@
 package org.elasticsearch.plugins;
 
 import org.apache.lucene.search.Query;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.common.xcontent.ParseField;
 import org.elasticsearch.common.io.stream.NamedWriteable;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.lucene.search.function.ScoreFunction;
 import org.elasticsearch.common.xcontent.ContextParser;
 import org.elasticsearch.common.xcontent.XContent;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryParser;
 import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
@@ -34,6 +37,7 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
 import org.elasticsearch.search.fetch.FetchSubPhase;
 import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
+import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.search.rescore.Rescorer;
 import org.elasticsearch.search.rescore.RescorerBuilder;
 import org.elasticsearch.search.suggest.Suggest;
@@ -124,6 +128,16 @@ public interface SearchPlugin {
         return emptyList();
     }
 
+    /**
+     * Allows plugins to register a cache differentiator which contributes to the cacheKey
+     * computation for the request cache. This helps differentiate between queries that
+     * are otherwise identical.
+     */
+    @Nullable
+    default CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> getRequestCacheKeyDifferentiator() {
+        return null;
+    }
+
     /**
      * Specification of custom {@link ScoreFunction}.
      */

+ 28 - 0
server/src/main/java/org/elasticsearch/search/SearchModule.java

@@ -9,17 +9,20 @@
 package org.elasticsearch.search;
 
 import org.apache.lucene.search.BooleanQuery;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.NamedRegistry;
 import org.elasticsearch.common.xcontent.ParseField;
 import org.elasticsearch.common.geo.GeoShapeType;
 import org.elasticsearch.common.geo.ShapesAvailability;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.BoostingQueryBuilder;
 import org.elasticsearch.index.query.CombinedFieldsQueryBuilder;
@@ -223,6 +226,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightPhase;
 import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
 import org.elasticsearch.search.fetch.subphase.highlight.PlainHighlighter;
 import org.elasticsearch.search.fetch.subphase.highlight.UnifiedHighlighter;
+import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.search.rescore.QueryRescorerBuilder;
 import org.elasticsearch.search.rescore.RescorerBuilder;
 import org.elasticsearch.search.sort.FieldSortBuilder;
@@ -244,6 +248,7 @@ import org.elasticsearch.search.suggest.phrase.StupidBackoff;
 import org.elasticsearch.search.suggest.term.TermSuggestion;
 import org.elasticsearch.search.suggest.term.TermSuggestionBuilder;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -271,6 +276,7 @@ public class SearchModule {
     private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
     private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
     private final ValuesSourceRegistry valuesSourceRegistry;
+    private final CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator;
 
     /**
      * Constructs a new SearchModule object
@@ -296,6 +302,7 @@ public class SearchModule {
         registerSearchExts(plugins);
         registerShapes();
         registerIntervalsSourceProviders();
+        requestCacheKeyDifferentiator = registerRequestCacheKeyDifferentiator(plugins);
         namedWriteables.addAll(SortValue.namedWriteables());
     }
 
@@ -311,6 +318,11 @@ public class SearchModule {
         return valuesSourceRegistry;
     }
 
+    @Nullable
+    public CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> getRequestCacheKeyDifferentiator() {
+        return requestCacheKeyDifferentiator;
+    }
+
     /**
      * Returns the {@link Highlighter} registry
      */
@@ -833,6 +845,22 @@ public class SearchModule {
         namedWriteables.addAll(getIntervalsSourceProviderNamedWritables());
     }
 
+    private CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> registerRequestCacheKeyDifferentiator(
+        List<SearchPlugin> plugins) {
+        CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> differentiator = null;
+        for (SearchPlugin plugin : plugins) {
+            final CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> d = plugin.getRequestCacheKeyDifferentiator();
+            if (d != null) {
+                if (differentiator == null) {
+                    differentiator = d;
+                } else {
+                    throw new IllegalArgumentException("Cannot have more than one plugin providing a request cache key differentiator");
+                }
+            }
+        }
+        return differentiator;
+    }
+
     public static List<NamedWriteableRegistry.Entry> getIntervalsSourceProviderNamedWritables() {
         return List.of(
             new NamedWriteableRegistry.Entry(IntervalsSourceProvider.class,

+ 5 - 1
server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java

@@ -17,6 +17,7 @@ import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
@@ -396,10 +397,13 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
     /**
      * Returns the cache key for this shard search request, based on its content
      */
-    public BytesReference cacheKey() throws IOException {
+    public BytesReference cacheKey(CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> differentiator) throws IOException {
         BytesStreamOutput out = scratch.get();
         try {
             this.innerWriteTo(out, true);
+            if (differentiator != null) {
+                differentiator.accept(this, out);
+            }
             // copy it over since we don't want to share the thread-local bytes in #scratch
             return out.copyBytes();
         } finally {

+ 39 - 0
server/src/test/java/org/elasticsearch/search/SearchModuleTests.java

@@ -9,6 +9,7 @@ package org.elasticsearch.search;
 
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.util.CharsRefBuilder;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
@@ -48,6 +49,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
 import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
 import org.elasticsearch.search.fetch.subphase.highlight.PlainHighlighter;
 import org.elasticsearch.search.fetch.subphase.highlight.UnifiedHighlighter;
+import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.search.rescore.QueryRescorerBuilder;
 import org.elasticsearch.search.rescore.RescoreContext;
 import org.elasticsearch.search.rescore.RescorerBuilder;
@@ -76,7 +78,10 @@ import static java.util.Collections.singletonMap;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.nullValue;
 
 public class SearchModuleTests extends ESTestCase {
 
@@ -296,6 +301,40 @@ public class SearchModuleTests extends ESTestCase {
                 hasSize(1));
     }
 
+    public void testRegisterNullRequestCacheKeyDifferentiator() {
+        final SearchModule module = new SearchModule(Settings.EMPTY, List.of());
+        assertThat(module.getRequestCacheKeyDifferentiator(), nullValue());
+    }
+
+    public void testRegisterRequestCacheKeyDifferentiator() {
+        final CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator = (r, o) -> { };
+        final SearchModule module = new SearchModule(Settings.EMPTY, List.of(new SearchPlugin() {
+            @Override
+            public CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> getRequestCacheKeyDifferentiator() {
+                return requestCacheKeyDifferentiator;
+            }
+        }));
+        assertThat(module.getRequestCacheKeyDifferentiator(), equalTo(requestCacheKeyDifferentiator));
+    }
+
+    public void testCannotRegisterMultipleRequestCacheKeyDifferentiators() {
+        final CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> differentiator1 = (r, o) -> {};
+        final CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> differentiator2 = (r, o) -> {};
+        final IllegalArgumentException e =
+            expectThrows(IllegalArgumentException.class, () -> new SearchModule(Settings.EMPTY, List.of(new SearchPlugin() {
+                @Override
+                public CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> getRequestCacheKeyDifferentiator() {
+                    return differentiator1;
+                }
+            }, new SearchPlugin() {
+                @Override
+                public CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> getRequestCacheKeyDifferentiator() {
+                    return differentiator2;
+                }
+            })));
+        assertThat(e.getMessage(), containsString("Cannot have more than one plugin providing a request cache key differentiator"));
+    }
+
     private static final String[] NON_DEPRECATED_QUERIES = new String[] {
             "bool",
             "boosting",

+ 17 - 1
server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java

@@ -13,10 +13,12 @@ import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
@@ -35,12 +37,15 @@ import org.elasticsearch.test.VersionUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
 
 public class ShardSearchRequestTests extends AbstractSearchTestCase {
     private static final IndexMetadata BASE_METADATA = IndexMetadata.builder("test").settings(Settings.builder()
@@ -147,7 +152,7 @@ public class ShardSearchRequestTests extends AbstractSearchTestCase {
         assertEquals(orig.searchType(), copy.searchType());
         assertEquals(orig.shardId(), copy.shardId());
         assertEquals(orig.numberOfShards(), copy.numberOfShards());
-        assertEquals(orig.cacheKey(), copy.cacheKey());
+        assertEquals(orig.cacheKey(null), copy.cacheKey(null));
         assertNotSame(orig, copy);
         assertEquals(orig.getAliasFilter(), copy.getAliasFilter());
         assertEquals(orig.indexBoost(), copy.indexBoost(), 0.0f);
@@ -198,4 +203,15 @@ public class ShardSearchRequestTests extends AbstractSearchTestCase {
             }
         }
     }
+
+    public void testWillCallRequestCacheKeyDifferentiators() throws IOException {
+        final ShardSearchRequest shardSearchRequest = createShardSearchRequest();
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        final CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> differentiator = (r, o) -> {
+            assertThat(r, sameInstance(shardSearchRequest));
+            invoked.set(true);
+        };
+        shardSearchRequest.cacheKey(differentiator);
+        assertThat(invoked.get(), is(true));
+    }
 }

+ 2 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1772,7 +1772,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     null,
                     emptyMap(),
                     List.of(),
-                    emptyMap()
+                    emptyMap(),
+                    null
                 );
                 final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
                 snapshotShardsService = new SnapshotShardsService(

+ 1 - 0
x-pack/plugin/build.gradle

@@ -172,6 +172,7 @@ tasks.named("yamlRestCompatTest").configure {
     'vectors/30_sparse_vector_basic/Dot Product',
     'vectors/35_sparse_vector_l1l2/L1 norm',
     'vectors/35_sparse_vector_l1l2/L2 norm',
+    'privileges/40_get_user_privs/Test get_user_privileges for merged roles',  // temporary disabled till #70191 gets backported
     'vectors/40_sparse_vector_special_cases/Dimensions can be sorted differently',
     'vectors/40_sparse_vector_special_cases/Documents missing a vector field',
     'vectors/40_sparse_vector_special_cases/Query vector has different dimensions from documents\' vectors',

+ 4 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/user/GetUserPrivilegesResponse.java

@@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.security.authz.privilege.ConfigurableCluster
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
@@ -209,7 +210,9 @@ public final class GetUserPrivilegesResponse extends ActionResponse {
             builder.field(RoleDescriptor.Fields.PRIVILEGES.getPreferredName(), privileges);
             if (fieldSecurity.stream().anyMatch(g -> nonEmpty(g.getGrantedFields()) || nonEmpty(g.getExcludedFields()))) {
                 builder.startArray(RoleDescriptor.Fields.FIELD_PERMISSIONS.getPreferredName());
-                for (FieldPermissionsDefinition.FieldGrantExcludeGroup group : this.fieldSecurity) {
+                final List<FieldPermissionsDefinition.FieldGrantExcludeGroup> sortedFieldSecurity =
+                    this.fieldSecurity.stream().sorted().collect(Collectors.toUnmodifiableList());
+                for (FieldPermissionsDefinition.FieldGrantExcludeGroup group : sortedFieldSecurity) {
                     builder.startObject();
                     if (nonEmpty(group.getGrantedFields())) {
                         builder.array(RoleDescriptor.Fields.GRANT_FIELDS.getPreferredName(), group.getGrantedFields());

+ 38 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/IndicesAccessControl.java

@@ -6,16 +6,21 @@
  */
 package org.elasticsearch.xpack.core.security.authz.accesscontrol;
 
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField;
 import org.elasticsearch.xpack.core.security.authz.permission.DocumentPermissions;
 import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissions;
+import org.elasticsearch.xpack.core.security.authz.support.SecurityQueryTemplateEvaluator.DlsQueryEvaluationContext;
+import org.elasticsearch.xpack.core.security.support.CacheKey;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -64,7 +69,7 @@ public class IndicesAccessControl {
     /**
      * Encapsulates the field and document permissions for an index.
      */
-    public static class IndexAccessControl {
+    public static class IndexAccessControl implements CacheKey {
 
         private final boolean granted;
         private final FieldPermissions fieldPermissions;
@@ -132,6 +137,38 @@ public class IndicesAccessControl {
                     ", documentPermissions=" + documentPermissions +
                     '}';
         }
+
+        @Override
+        public void buildCacheKey(StreamOutput out, DlsQueryEvaluationContext context) throws IOException {
+            if (documentPermissions.hasDocumentLevelPermissions()) {
+                out.writeBoolean(true);
+                documentPermissions.buildCacheKey(out, context);
+            } else {
+                out.writeBoolean(false);
+            }
+            if (fieldPermissions.hasFieldLevelSecurity()) {
+                out.writeBoolean(true);
+                fieldPermissions.buildCacheKey(out, context);
+            } else {
+                out.writeBoolean(false);
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            IndexAccessControl that = (IndexAccessControl) o;
+            return granted == that.granted && Objects.equals(fieldPermissions, that.fieldPermissions) && Objects.equals(documentPermissions,
+                that.documentPermissions);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(granted, fieldPermissions, documentPermissions);
+        }
     }
 
     /**

+ 96 - 21
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/DocumentPermissions.java

@@ -12,7 +12,9 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.search.join.BitSetProducer;
 import org.apache.lucene.search.join.ToChildBlockJoinQuery;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.lucene.search.Queries;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryRewriteContext;
 import org.elasticsearch.index.query.SearchExecutionContext;
@@ -21,12 +23,19 @@ import org.elasticsearch.index.search.NestedHelper;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.xpack.core.security.authz.support.DLSRoleQueryValidator;
+import org.elasticsearch.xpack.core.security.authz.support.SecurityQueryTemplateEvaluator;
+import org.elasticsearch.xpack.core.security.authz.support.SecurityQueryTemplateEvaluator.DlsQueryEvaluationContext;
+import org.elasticsearch.xpack.core.security.support.CacheKey;
 import org.elasticsearch.xpack.core.security.user.User;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.lucene.search.BooleanClause.Occur.FILTER;
 import static org.apache.lucene.search.BooleanClause.Occur.SHOULD;
@@ -36,9 +45,13 @@ import static org.apache.lucene.search.BooleanClause.Occur.SHOULD;
  * The document level permissions may be limited by another set of queries in that case the limited
  * queries are used as an additional filter.
  */
-public final class DocumentPermissions {
-    private final Set<BytesReference> queries;
-    private final Set<BytesReference> limitedByQueries;
+public final class DocumentPermissions implements CacheKey {
+    // SortedSet because orders are important when they get serialised for request cache key
+    private final SortedSet<BytesReference> queries;
+    private final SortedSet<BytesReference> limitedByQueries;
+    private List<String> evaluatedQueries;
+    private List<String> evaluatedLimitedByQueries;
+
 
     private static DocumentPermissions ALLOW_ALL = new DocumentPermissions();
 
@@ -55,16 +68,16 @@ public final class DocumentPermissions {
         if (queries == null && scopedByQueries == null) {
             throw new IllegalArgumentException("one of the queries or scoped queries must be provided");
         }
-        this.queries = (queries != null) ? Collections.unmodifiableSet(queries) : queries;
-        this.limitedByQueries = (scopedByQueries != null) ? Collections.unmodifiableSet(scopedByQueries) : scopedByQueries;
+        this.queries = (queries != null) ? new TreeSet<>(queries) : null;
+        this.limitedByQueries = (scopedByQueries != null) ? new TreeSet<>(scopedByQueries) : null;
     }
 
     public Set<BytesReference> getQueries() {
-        return queries;
+        return queries == null ? null : Set.copyOf(queries);
     }
 
     public Set<BytesReference> getLimitedByQueries() {
-        return limitedByQueries;
+        return limitedByQueries == null ? null : Set.copyOf(limitedByQueries);
     }
 
     /**
@@ -74,6 +87,24 @@ public final class DocumentPermissions {
         return queries != null || limitedByQueries != null;
     }
 
+    public boolean hasStoredScript() throws IOException {
+        if (queries != null) {
+            for (BytesReference q : queries) {
+                if (DLSRoleQueryValidator.hasStoredScript(q, NamedXContentRegistry.EMPTY)) {
+                    return true;
+                }
+            }
+        }
+        if (limitedByQueries != null) {
+            for (BytesReference q : limitedByQueries) {
+                if (DLSRoleQueryValidator.hasStoredScript(q, NamedXContentRegistry.EMPTY)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     /**
      * Creates a {@link BooleanQuery} to be used as filter to restrict access to documents.<br>
      * Document permission queries are used to create an boolean query.<br>
@@ -88,23 +119,25 @@ public final class DocumentPermissions {
      * @throws IOException thrown if there is an exception during parsing
      */
     public BooleanQuery filter(User user, ScriptService scriptService, ShardId shardId,
-                                      Function<ShardId, SearchExecutionContext> searchExecutionContextProvider) throws IOException {
+                               Function<ShardId, SearchExecutionContext> searchExecutionContextProvider) throws IOException {
         if (hasDocumentLevelPermissions()) {
+            evaluateQueries(SecurityQueryTemplateEvaluator.wrap(user, scriptService));
             BooleanQuery.Builder filter;
-            if (queries != null && limitedByQueries != null) {
+            if (evaluatedQueries != null && evaluatedLimitedByQueries != null) {
                 filter = new BooleanQuery.Builder();
                 BooleanQuery.Builder scopedFilter = new BooleanQuery.Builder();
-                buildRoleQuery(user, scriptService, shardId, searchExecutionContextProvider, limitedByQueries, scopedFilter);
+                buildRoleQuery(shardId, searchExecutionContextProvider, evaluatedLimitedByQueries, scopedFilter);
                 filter.add(scopedFilter.build(), FILTER);
 
-                buildRoleQuery(user, scriptService, shardId, searchExecutionContextProvider, queries, filter);
-            } else if (queries != null) {
+                buildRoleQuery(shardId, searchExecutionContextProvider, evaluatedQueries, filter);
+            } else if (evaluatedQueries != null) {
                 filter = new BooleanQuery.Builder();
-                buildRoleQuery(user, scriptService, shardId, searchExecutionContextProvider, queries, filter);
-            } else if (limitedByQueries != null) {
+                buildRoleQuery(shardId, searchExecutionContextProvider, evaluatedQueries, filter);
+            } else if (evaluatedLimitedByQueries != null) {
                 filter = new BooleanQuery.Builder();
-                buildRoleQuery(user, scriptService, shardId, searchExecutionContextProvider, limitedByQueries, filter);
+                buildRoleQuery(shardId, searchExecutionContextProvider, evaluatedLimitedByQueries, filter);
             } else {
+                assert false : "one of queries and limited-by queries must be non-null";
                 return null;
             }
             return filter.build();
@@ -112,14 +145,22 @@ public final class DocumentPermissions {
         return null;
     }
 
-    private static void buildRoleQuery(User user, ScriptService scriptService, ShardId shardId,
+    private void evaluateQueries(DlsQueryEvaluationContext context) {
+        if (queries != null && evaluatedQueries == null) {
+            evaluatedQueries = queries.stream().map(context::evaluate).collect(Collectors.toUnmodifiableList());
+        }
+        if (limitedByQueries != null && evaluatedLimitedByQueries == null) {
+            evaluatedLimitedByQueries = limitedByQueries.stream().map(context::evaluate).collect(Collectors.toUnmodifiableList());
+        }
+    }
+
+    private static void buildRoleQuery(ShardId shardId,
                                        Function<ShardId, SearchExecutionContext> searchExecutionContextProvider,
-                                       Set<BytesReference> queries,
+                                       List<String> queries,
                                        BooleanQuery.Builder filter) throws IOException {
-        for (BytesReference bytesReference : queries) {
+        for (String query : queries) {
             SearchExecutionContext context = searchExecutionContextProvider.apply(shardId);
-            QueryBuilder queryBuilder = DLSRoleQueryValidator.evaluateAndVerifyRoleQuery(bytesReference, scriptService,
-                context.getXContentRegistry(), user);
+            QueryBuilder queryBuilder = DLSRoleQueryValidator.evaluateAndVerifyRoleQuery(query, context.getXContentRegistry());
             if (queryBuilder != null) {
                 failIfQueryUsesClient(queryBuilder, context);
                 Query roleQuery = context.toQuery(queryBuilder).query();
@@ -196,6 +237,8 @@ public final class DocumentPermissions {
         if (queries == null && limitedByDocumentPermissions.queries == null) {
             return DocumentPermissions.allowAll();
         }
+        // TODO: should we apply the same logic here as FieldPermissions#limitFieldPermissions,
+        //       i.e. treat limited-by as queries if original queries is null?
         return new DocumentPermissions(queries, limitedByDocumentPermissions.queries);
     }
 
@@ -204,4 +247,36 @@ public final class DocumentPermissions {
         return "DocumentPermissions [queries=" + queries + ", scopedByQueries=" + limitedByQueries + "]";
     }
 
+    @Override
+    public void buildCacheKey(StreamOutput out, DlsQueryEvaluationContext context) throws IOException {
+        assert false == (queries == null && limitedByQueries == null) : "one of queries and limited-by queries must be non-null";
+        evaluateQueries(context);
+        if (evaluatedQueries != null) {
+            out.writeBoolean(true);
+            out.writeCollection(evaluatedQueries, StreamOutput::writeString);
+        } else {
+            out.writeBoolean(false);
+        }
+        if (evaluatedLimitedByQueries != null) {
+            out.writeBoolean(true);
+            out.writeCollection(evaluatedLimitedByQueries, StreamOutput::writeString);
+        } else {
+            out.writeBoolean(false);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        DocumentPermissions that = (DocumentPermissions) o;
+        return Objects.equals(queries, that.queries) && Objects.equals(limitedByQueries, that.limitedByQueries);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(queries, limitedByQueries);
+    }
 }

+ 60 - 25
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissions.java

@@ -15,16 +15,21 @@ import org.apache.lucene.util.automaton.CharacterRunAutomaton;
 import org.apache.lucene.util.automaton.MinimizationOperations;
 import org.apache.lucene.util.automaton.Operations;
 import org.elasticsearch.ElasticsearchSecurityException;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.xpack.core.security.authz.accesscontrol.FieldSubsetReader;
 import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissionsDefinition.FieldGrantExcludeGroup;
+import org.elasticsearch.xpack.core.security.authz.support.SecurityQueryTemplateEvaluator.DlsQueryEvaluationContext;
 import org.elasticsearch.xpack.core.security.support.Automatons;
+import org.elasticsearch.xpack.core.security.support.CacheKey;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -38,7 +43,7 @@ import static org.apache.lucene.util.automaton.Operations.subsetOf;
  * 1. It has to match the patterns in grantedFieldsArray
  * 2. it must not match the patterns in deniedFieldsArray
  */
-public final class FieldPermissions implements Accountable {
+public final class FieldPermissions implements Accountable, CacheKey {
 
     public static final FieldPermissions DEFAULT = new FieldPermissions();
 
@@ -57,6 +62,8 @@ public final class FieldPermissions implements Accountable {
     }
 
     private final FieldPermissionsDefinition fieldPermissionsDefinition;
+    @Nullable
+    private final FieldPermissionsDefinition limitedByFieldPermissionsDefinition;
     // an automaton that represents a union of one more sets of permitted and denied fields
     private final CharacterRunAutomaton permittedFieldsAutomaton;
     private final boolean permittedFieldsAutomatonIsTotal;
@@ -78,35 +85,50 @@ public final class FieldPermissions implements Accountable {
     /** Constructor that enables field-level security based on include/exclude rules. Exclude rules
      *  have precedence over include rules. */
     FieldPermissions(FieldPermissionsDefinition fieldPermissionsDefinition, Automaton permittedFieldsAutomaton) {
+        this(fieldPermissionsDefinition, null, permittedFieldsAutomaton);
+    }
+
+    /** Constructor that enables field-level security based on include/exclude rules. Exclude rules
+     *  have precedence over include rules. */
+    private FieldPermissions(FieldPermissionsDefinition fieldPermissionsDefinition,
+                             @Nullable FieldPermissionsDefinition limitedByFieldPermissionsDefinition,
+                             Automaton permittedFieldsAutomaton) {
         if (permittedFieldsAutomaton.isDeterministic() == false && permittedFieldsAutomaton.getNumStates() > 1) {
             // we only accept deterministic automata so that the CharacterRunAutomaton constructor
             // directly wraps the provided automaton
             throw new IllegalArgumentException("Only accepts deterministic automata");
         }
-        this.fieldPermissionsDefinition = fieldPermissionsDefinition;
+        this.fieldPermissionsDefinition = Objects.requireNonNull(fieldPermissionsDefinition, "field permission definition cannot be null");
+        this.limitedByFieldPermissionsDefinition = limitedByFieldPermissionsDefinition;
         this.originalAutomaton = permittedFieldsAutomaton;
         this.permittedFieldsAutomaton = new CharacterRunAutomaton(permittedFieldsAutomaton);
         // we cache the result of isTotal since this might be a costly operation
         this.permittedFieldsAutomatonIsTotal = Operations.isTotal(permittedFieldsAutomaton);
 
         long ramBytesUsed = BASE_FIELD_PERM_DEF_BYTES;
-
-        if (fieldPermissionsDefinition != null) {
-            for (FieldGrantExcludeGroup group : fieldPermissionsDefinition.getFieldGrantExcludeGroups()) {
-                ramBytesUsed += BASE_FIELD_GROUP_BYTES + BASE_HASHSET_ENTRY_SIZE;
-                if (group.getGrantedFields() != null) {
-                    ramBytesUsed += RamUsageEstimator.shallowSizeOf(group.getGrantedFields());
-                }
-                if (group.getExcludedFields() != null) {
-                    ramBytesUsed += RamUsageEstimator.shallowSizeOf(group.getExcludedFields());
-                }
-            }
+        ramBytesUsed += ramBytesUsedForFieldPermissionsDefinition(this.fieldPermissionsDefinition);
+        if (this.limitedByFieldPermissionsDefinition != null) {
+            ramBytesUsed += ramBytesUsedForFieldPermissionsDefinition(this.limitedByFieldPermissionsDefinition);
         }
         ramBytesUsed += permittedFieldsAutomaton.ramBytesUsed();
         ramBytesUsed += runAutomatonRamBytesUsed(permittedFieldsAutomaton);
         this.ramBytesUsed = ramBytesUsed;
     }
 
+    private static long ramBytesUsedForFieldPermissionsDefinition(FieldPermissionsDefinition fpd) {
+        long ramBytesUsed = 0L;
+        for (FieldGrantExcludeGroup group : fpd.getFieldGrantExcludeGroups()) {
+            ramBytesUsed += BASE_FIELD_GROUP_BYTES + BASE_HASHSET_ENTRY_SIZE;
+            if (group.getGrantedFields() != null) {
+                ramBytesUsed += RamUsageEstimator.shallowSizeOf(group.getGrantedFields());
+            }
+            if (group.getExcludedFields() != null) {
+                ramBytesUsed += RamUsageEstimator.shallowSizeOf(group.getExcludedFields());
+            }
+        }
+        return ramBytesUsed;
+    }
+
     /**
      * Return an estimation of the ram bytes used by a {@link CharacterRunAutomaton}
      * that wraps the given automaton.
@@ -173,11 +195,11 @@ public final class FieldPermissions implements Accountable {
     public FieldPermissions limitFieldPermissions(FieldPermissions limitedBy) {
         if (hasFieldLevelSecurity() && limitedBy != null && limitedBy.hasFieldLevelSecurity()) {
             Automaton permittedFieldsAutomaton = Automatons.intersectAndMinimize(getIncludeAutomaton(), limitedBy.getIncludeAutomaton());
-            return new FieldPermissions(null, permittedFieldsAutomaton);
+            return new FieldPermissions(fieldPermissionsDefinition, limitedBy.fieldPermissionsDefinition, permittedFieldsAutomaton);
         } else if (limitedBy != null && limitedBy.hasFieldLevelSecurity()) {
             return new FieldPermissions(limitedBy.getFieldPermissionsDefinition(), limitedBy.getIncludeAutomaton());
         } else if (hasFieldLevelSecurity()) {
-            return new FieldPermissions(getFieldPermissionsDefinition(), getIncludeAutomaton());
+            return new FieldPermissions(this.getFieldPermissionsDefinition(), getIncludeAutomaton());
         }
         return FieldPermissions.DEFAULT;
     }
@@ -194,6 +216,21 @@ public final class FieldPermissions implements Accountable {
         return fieldPermissionsDefinition;
     }
 
+    public FieldPermissionsDefinition getLimitedByFieldPermissionsDefinition() {
+        return limitedByFieldPermissionsDefinition;
+    }
+
+    @Override
+    public void buildCacheKey(StreamOutput out, DlsQueryEvaluationContext context) throws IOException {
+        fieldPermissionsDefinition.buildCacheKey(out, context);
+        if (limitedByFieldPermissionsDefinition != null) {
+            out.writeBoolean(true);
+            limitedByFieldPermissionsDefinition.buildCacheKey(out, context);
+        } else {
+            out.writeBoolean(false);
+        }
+    }
+
     /** Return whether field-level security is enabled, ie. whether any field might be filtered out. */
     public boolean hasFieldLevelSecurity() {
         return permittedFieldsAutomatonIsTotal == false;
@@ -213,21 +250,19 @@ public final class FieldPermissions implements Accountable {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
         FieldPermissions that = (FieldPermissions) o;
-
-        if (permittedFieldsAutomatonIsTotal != that.permittedFieldsAutomatonIsTotal) return false;
-        return fieldPermissionsDefinition != null ?
-                fieldPermissionsDefinition.equals(that.fieldPermissionsDefinition) : that.fieldPermissionsDefinition == null;
+        return permittedFieldsAutomatonIsTotal == that.permittedFieldsAutomatonIsTotal
+            && fieldPermissionsDefinition.equals(that.fieldPermissionsDefinition)
+            && Objects.equals(limitedByFieldPermissionsDefinition, that.limitedByFieldPermissionsDefinition);
     }
 
     @Override
     public int hashCode() {
-        int result = fieldPermissionsDefinition != null ? fieldPermissionsDefinition.hashCode() : 0;
-        result = 31 * result + (permittedFieldsAutomatonIsTotal ? 1 : 0);
-        return result;
+        return Objects.hash(fieldPermissionsDefinition, limitedByFieldPermissionsDefinition, permittedFieldsAutomatonIsTotal);
     }
 
     @Override

+ 14 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsCache.java

@@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissionsDe
 import org.elasticsearch.xpack.core.security.support.Automatons;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -73,9 +74,19 @@ public final class FieldPermissionsCache {
                 .filter(((Predicate<FieldPermissions>) (FieldPermissions::hasFieldLevelSecurity)).negate())
                 .findFirst();
         return allowAllFieldPermissions.orElseGet(() -> {
-            final Set<FieldGrantExcludeGroup> fieldGrantExcludeGroups = fieldPermissionsCollection.stream()
-                    .flatMap(fieldPermission -> fieldPermission.getFieldPermissionsDefinition().getFieldGrantExcludeGroups().stream())
-                    .collect(Collectors.toSet());
+            final Set<FieldGrantExcludeGroup> fieldGrantExcludeGroups = new HashSet<>();
+            for (FieldPermissions fieldPermissions : fieldPermissionsCollection) {
+                final FieldPermissionsDefinition definition = fieldPermissions.getFieldPermissionsDefinition();
+                final FieldPermissionsDefinition limitedByDefinition =
+                    fieldPermissions.getLimitedByFieldPermissionsDefinition();
+                if (definition == null) {
+                    throw new IllegalArgumentException("Expected field permission definition, but found null");
+                } else if (limitedByDefinition != null) {
+                    throw new IllegalArgumentException("Expected no limited-by field permission definition, but found ["
+                        + limitedByDefinition + "]");
+                }
+                fieldGrantExcludeGroups.addAll(definition.getFieldGrantExcludeGroups());
+            }
             final FieldPermissionsDefinition combined = new FieldPermissionsDefinition(fieldGrantExcludeGroups);
             try {
                 return cache.computeIfAbsent(combined, (key) -> {

+ 37 - 9
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsDefinition.java

@@ -7,30 +7,38 @@
 package org.elasticsearch.xpack.core.security.authz.permission;
 
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xpack.core.security.authz.support.SecurityQueryTemplateEvaluator.DlsQueryEvaluationContext;
+import org.elasticsearch.xpack.core.security.support.CacheKey;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Objects;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 /**
  * Represents the definition of a {@link FieldPermissions}. Field permissions are defined as a
  * collections of grant and exclude definitions where the exclude definition must be a subset of
  * the grant definition.
  */
-public final class FieldPermissionsDefinition {
+public final class FieldPermissionsDefinition implements CacheKey {
 
-    private final Set<FieldGrantExcludeGroup> fieldGrantExcludeGroups;
+    // SortedSet because orders are important when building the request cacheKey
+    private final SortedSet<FieldGrantExcludeGroup> fieldGrantExcludeGroups;
 
     public FieldPermissionsDefinition(String[] grant, String[] exclude) {
         this(Collections.singleton(new FieldGrantExcludeGroup(grant, exclude)));
     }
 
     public FieldPermissionsDefinition(Set<FieldGrantExcludeGroup> fieldGrantExcludeGroups) {
-        this.fieldGrantExcludeGroups = Collections.unmodifiableSet(fieldGrantExcludeGroups);
+        this.fieldGrantExcludeGroups = new TreeSet<>(fieldGrantExcludeGroups);
     }
 
     public Set<FieldGrantExcludeGroup> getFieldGrantExcludeGroups() {
-        return fieldGrantExcludeGroups;
+        return Set.copyOf(fieldGrantExcludeGroups);
     }
 
     @Override
@@ -40,17 +48,25 @@ public final class FieldPermissionsDefinition {
 
         FieldPermissionsDefinition that = (FieldPermissionsDefinition) o;
 
-        return fieldGrantExcludeGroups != null ?
-                fieldGrantExcludeGroups.equals(that.fieldGrantExcludeGroups) :
-                that.fieldGrantExcludeGroups == null;
+        return Objects.equals(fieldGrantExcludeGroups, that.fieldGrantExcludeGroups);
     }
 
     @Override
     public int hashCode() {
-        return fieldGrantExcludeGroups != null ? fieldGrantExcludeGroups.hashCode() : 0;
+        return fieldGrantExcludeGroups.hashCode();
     }
 
-    public static final class FieldGrantExcludeGroup {
+    @Override
+    public String toString() {
+        return "FieldPermissionsDefinition{" + "fieldGrantExcludeGroups=" + fieldGrantExcludeGroups + '}';
+    }
+
+    @Override
+    public void buildCacheKey(StreamOutput out, DlsQueryEvaluationContext context) throws IOException {
+        out.writeCollection(fieldGrantExcludeGroups, (o, g) -> g.buildCacheKey(o, context));
+    }
+
+    public static final class FieldGrantExcludeGroup implements CacheKey, Comparable<FieldGrantExcludeGroup> {
         private final String[] grantedFields;
         private final String[] excludedFields;
 
@@ -92,5 +108,17 @@ public final class FieldPermissionsDefinition {
                 + "; exclude=" + Strings.arrayToCommaDelimitedString(excludedFields)
                 + "]";
         }
+
+        @Override
+        public void buildCacheKey(StreamOutput out, DlsQueryEvaluationContext context) throws IOException {
+            out.writeOptionalStringArray(grantedFields);
+            out.writeOptionalStringArray(excludedFields);
+        }
+
+        @Override
+        public int compareTo(FieldGrantExcludeGroup o) {
+            final int compare = Arrays.compare(grantedFields, o.grantedFields);
+            return compare == 0 ? Arrays.compare(excludedFields, o.excludedFields) : compare;
+        }
     }
 }

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/permission/LimitedRole.java

@@ -53,7 +53,7 @@ public final class LimitedRole extends Role {
 
     @Override
     public RunAsPermission runAs() {
-        throw new UnsupportedOperationException("cannot retrieve cluster permission on limited role");
+        throw new UnsupportedOperationException("cannot retrieve run_as permission on limited role");
     }
 
     @Override

+ 32 - 15
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/support/DLSRoleQueryValidator.java

@@ -25,7 +25,9 @@ import org.elasticsearch.index.query.GeoShapeQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.TermsQueryBuilder;
 import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
+import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
 import org.elasticsearch.xpack.core.security.user.User;
 
@@ -75,23 +77,38 @@ public final class DLSRoleQueryValidator {
     private static boolean isTemplateQuery(BytesReference query, NamedXContentRegistry xContentRegistry) throws IOException {
         try (XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry,
             LoggingDeprecationHandler.INSTANCE, query.utf8ToString())) {
-            XContentParser.Token token = parser.nextToken();
-            if (token != XContentParser.Token.START_OBJECT) {
-                throw new XContentParseException(parser.getTokenLocation(), "expected [" + XContentParser.Token.START_OBJECT + "] but " +
-                    "found [" + token + "] instead");
-            }
-            token = parser.nextToken();
-            if (token != XContentParser.Token.FIELD_NAME) {
-                throw new XContentParseException(parser.getTokenLocation(), "expected [" + XContentParser.Token.FIELD_NAME + "] with " +
-                    "value a query name or 'template' but found [" + token + "] instead");
+            return isTemplateQuery(parser);
+        }
+    }
+
+    private static boolean isTemplateQuery(XContentParser parser) throws IOException {
+        XContentParser.Token token = parser.nextToken();
+        if (token != XContentParser.Token.START_OBJECT) {
+            throw new XContentParseException(parser.getTokenLocation(), "expected [" + XContentParser.Token.START_OBJECT + "] but " +
+                "found [" + token + "] instead");
+        }
+        token = parser.nextToken();
+        if (token != XContentParser.Token.FIELD_NAME) {
+            throw new XContentParseException(parser.getTokenLocation(), "expected [" + XContentParser.Token.FIELD_NAME + "] with " +
+                "value a query name or 'template' but found [" + token + "] instead");
+        }
+        String fieldName = parser.currentName();
+        return "template".equals(fieldName);
+    }
+
+    public static boolean hasStoredScript(BytesReference query, NamedXContentRegistry xContentRegistry) throws IOException {
+        try (XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry,
+            LoggingDeprecationHandler.INSTANCE, query.utf8ToString())) {
+            if (false == isTemplateQuery(parser)) {
+               return false;
             }
-            String fieldName = parser.currentName();
-            if ("template".equals(fieldName)) {
-                return true;
+            if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
+                throw new XContentParseException(
+                    parser.getTokenLocation(),
+                    "expected [" + XContentParser.Token.START_OBJECT + "] but found [" + parser.currentToken() + "] instead");
             }
+            return ScriptType.STORED == Script.parse(parser).getType();
         }
-
-        return false;
     }
 
     /**
@@ -122,7 +139,7 @@ public final class DLSRoleQueryValidator {
     }
 
     @Nullable
-    private static QueryBuilder evaluateAndVerifyRoleQuery(String query, NamedXContentRegistry xContentRegistry) throws IOException {
+    public static QueryBuilder evaluateAndVerifyRoleQuery(String query, NamedXContentRegistry xContentRegistry) throws IOException {
         if (query != null) {
             try (XContentParser parser = XContentFactory.xContent(query).createParser(xContentRegistry,
                 LoggingDeprecationHandler.INSTANCE, query)) {

+ 10 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/support/SecurityQueryTemplateEvaluator.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.core.security.authz.support;
 
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentFactory;
@@ -80,4 +81,13 @@ public final class SecurityQueryTemplateEvaluator {
         }
     }
 
+    public static DlsQueryEvaluationContext wrap(User user, ScriptService scriptService) {
+        return q -> SecurityQueryTemplateEvaluator.evaluateTemplate(q.utf8ToString(), scriptService, user);
+    }
+
+    @FunctionalInterface
+    public interface DlsQueryEvaluationContext {
+        String evaluate(BytesReference query);
+    }
+
 }

+ 20 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/support/CacheKey.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.security.support;
+
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xpack.core.security.authz.support.SecurityQueryTemplateEvaluator.DlsQueryEvaluationContext;
+
+import java.io.IOException;
+
+/**
+ * Interface in ES Security for objects that can contribute to a cache-key
+ */
+public interface CacheKey {
+    void buildCacheKey(StreamOutput out, DlsQueryEvaluationContext context) throws IOException;
+}

+ 23 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java

@@ -23,7 +23,9 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -66,6 +68,7 @@ import org.elasticsearch.plugins.PersistentTaskPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.RepositoryPlugin;
 import org.elasticsearch.plugins.ScriptPlugin;
+import org.elasticsearch.plugins.SearchPlugin;
 import org.elasticsearch.plugins.SystemIndexPlugin;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
@@ -74,6 +77,7 @@ import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.rest.RestHeaderDefinition;
 import org.elasticsearch.script.ScriptContext;
 import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.threadpool.ExecutorBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
@@ -105,7 +109,7 @@ import static java.util.stream.Collectors.toList;
 
 public class LocalStateCompositeXPackPlugin extends XPackPlugin implements ScriptPlugin, ActionPlugin, IngestPlugin, NetworkPlugin,
         ClusterPlugin, DiscoveryPlugin, MapperPlugin, AnalysisPlugin, PersistentTaskPlugin, EnginePlugin, IndexStorePlugin,
-        SystemIndexPlugin {
+        SystemIndexPlugin, SearchPlugin {
 
     private XPackLicenseState licenseState;
     private SSLService sslService;
@@ -566,4 +570,22 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
     public String getFeatureDescription() {
         return this.getClass().getCanonicalName();
     }
+
+    @Override
+    public CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> getRequestCacheKeyDifferentiator() {
+        final List<CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException>> differentiators =
+            filterPlugins(SearchPlugin.class).stream()
+                .map(SearchPlugin::getRequestCacheKeyDifferentiator)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toUnmodifiableList());
+
+        if (differentiators.size() > 1) {
+            throw new UnsupportedOperationException("Only the security SearchPlugin should provide the request cache key differentiator");
+        } else if (differentiators.size() == 1) {
+            return differentiators.get(0);
+        } else {
+            return null;
+        }
+
+    }
 }

+ 56 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/permission/DocumentPermissionsTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.core.security.authz.permission;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryRewriteContext;
@@ -18,7 +19,9 @@ import org.elasticsearch.indices.TermsLookup;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 
 import static org.hamcrest.Matchers.containsString;
@@ -74,4 +77,57 @@ public class DocumentPermissionsTests extends ESTestCase {
                 () -> DocumentPermissions.failIfQueryUsesClient(queryBuilder2, context));
         assertThat(e.getMessage(), equalTo("role queries are not allowed to execute additional requests"));
     }
+
+    public void testWriteCacheKeyWillDistinguishBetweenQueriesAndLimitedByQueries() throws IOException {
+        final BytesStreamOutput out0 = new BytesStreamOutput();
+        final DocumentPermissions documentPermissions0 =
+            new DocumentPermissions(
+                Set.of(new BytesArray("{\"term\":{\"q1\":\"v1\"}}"),
+                    new BytesArray("{\"term\":{\"q2\":\"v2\"}}"), new BytesArray("{\"term\":{\"q3\":\"v3\"}}")),
+                null);
+        documentPermissions0.buildCacheKey(out0, BytesReference::utf8ToString);
+
+        final BytesStreamOutput out1 = new BytesStreamOutput();
+        final DocumentPermissions documentPermissions1 =
+            new DocumentPermissions(
+                Set.of(new BytesArray("{\"term\":{\"q1\":\"v1\"}}"), new BytesArray("{\"term\":{\"q2\":\"v2\"}}")),
+                Set.of(new BytesArray("{\"term\":{\"q3\":\"v3\"}}")));
+        documentPermissions1.buildCacheKey(out1, BytesReference::utf8ToString);
+
+        final BytesStreamOutput out2 = new BytesStreamOutput();
+        final DocumentPermissions documentPermissions2 =
+            new DocumentPermissions(
+                Set.of(new BytesArray("{\"term\":{\"q1\":\"v1\"}}")),
+                Set.of(new BytesArray("{\"term\":{\"q2\":\"v2\"}}"), new BytesArray("{\"term\":{\"q3\":\"v3\"}}")));
+        documentPermissions2.buildCacheKey(out2, BytesReference::utf8ToString);
+
+        final BytesStreamOutput out3 = new BytesStreamOutput();
+        final DocumentPermissions documentPermissions3 =
+            new DocumentPermissions(
+                null,
+                Set.of(new BytesArray("{\"term\":{\"q1\":\"v1\"}}"),
+                    new BytesArray("{\"term\":{\"q2\":\"v2\"}}"), new BytesArray("{\"term\":{\"q3\":\"v3\"}}")));
+        documentPermissions3.buildCacheKey(out3, BytesReference::utf8ToString);
+
+        assertThat(Arrays.equals(BytesReference.toBytes(out0.bytes()), BytesReference.toBytes(out1.bytes())), is(false));
+        assertThat(Arrays.equals(BytesReference.toBytes(out0.bytes()), BytesReference.toBytes(out2.bytes())), is(false));
+        assertThat(Arrays.equals(BytesReference.toBytes(out0.bytes()), BytesReference.toBytes(out3.bytes())), is(false));
+        assertThat(Arrays.equals(BytesReference.toBytes(out1.bytes()), BytesReference.toBytes(out2.bytes())), is(false));
+        assertThat(Arrays.equals(BytesReference.toBytes(out1.bytes()), BytesReference.toBytes(out3.bytes())), is(false));
+        assertThat(Arrays.equals(BytesReference.toBytes(out2.bytes()), BytesReference.toBytes(out3.bytes())), is(false));
+    }
+
+    public void testHasStoredScript() throws IOException {
+        final Set<BytesReference> queries = new HashSet<>();
+        if (randomBoolean()) {
+            queries.add(new BytesArray("{\"term\":{\"username\":\"foo\"}}"));
+        }
+        final boolean hasStoredScript = randomBoolean();
+        if (hasStoredScript) {
+            queries.add(new BytesArray("{\"template\":{\"id\":\"my-script\"}}"));
+        }
+        final DocumentPermissions documentPermissions0 =
+            randomBoolean() ? new DocumentPermissions(queries, null) : new DocumentPermissions(null, queries);
+        assertThat(documentPermissions0.hasStoredScript(), is(hasStoredScript));
+    }
 }

+ 77 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/permission/FieldPermissionsTests.java

@@ -8,10 +8,15 @@
 package org.elasticsearch.xpack.core.security.authz.permission;
 
 import org.apache.lucene.util.automaton.CharacterRunAutomaton;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.security.support.Automatons;
 import org.hamcrest.core.IsSame;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
 
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
@@ -20,7 +25,7 @@ import static org.mockito.Matchers.same;
 
 public class FieldPermissionsTests extends ESTestCase {
 
-    public void testFieldPermissionsIntersection() throws IOException {
+    public void testFieldPermissionsIntersection() {
 
         final FieldPermissions fieldPermissions = FieldPermissions.DEFAULT;
         final FieldPermissions fieldPermissions1 = new FieldPermissions(
@@ -75,6 +80,77 @@ public class FieldPermissionsTests extends ESTestCase {
         }
     }
 
+    public void testMustHaveNonNullFieldPermissionsDefinition() {
+        final FieldPermissions fieldPermissions0 = new FieldPermissions();
+        assertThat(fieldPermissions0.getFieldPermissionsDefinition(), notNullValue());
+        expectThrows(NullPointerException.class, () -> new FieldPermissions(null));
+        expectThrows(NullPointerException.class, () -> new FieldPermissions(null, Automatons.MATCH_ALL));
+
+        final FieldPermissions fieldPermissions03 = randomFrom(
+            FieldPermissions.DEFAULT,
+            new FieldPermissions(fieldPermissionDef(new String[] { "f1", "f2", "f3*" }, new String[] { "f3" })));
+        assertThat(fieldPermissions03.limitFieldPermissions(null).getFieldPermissionsDefinition(), notNullValue());
+        assertThat(fieldPermissions03.limitFieldPermissions(FieldPermissions.DEFAULT).getFieldPermissionsDefinition(), notNullValue());
+        assertThat(fieldPermissions03.limitFieldPermissions(
+            new FieldPermissions(fieldPermissionDef(new String[] { "f1", "f3*", "f4" }, new String[] { "f3" }))
+        ).getFieldPermissionsDefinition(), notNullValue());
+    }
+
+    public void testWriteCacheKeyWillDistinguishBetweenDefinitionAndLimitedByDefinition() throws IOException {
+        // The overall same grant/except sets but are come from either:
+        //   1. Just the definition
+        //   2. Just the limited-by definition
+        //   3. both
+        // The cache key should differentiate between them
+
+        // Just definition
+        final BytesStreamOutput out0 = new BytesStreamOutput();
+        final FieldPermissions fieldPermissions0 = new FieldPermissions(
+            new FieldPermissionsDefinition(Set.of(
+                new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "x*" }, new String[] { "x2" }),
+                new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "y*" }, new String[] { "y2" }),
+                new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "z*" }, new String[] { "z2" }))));
+        fieldPermissions0.buildCacheKey(out0, BytesReference::utf8ToString);
+
+        // Mixed definition
+        final BytesStreamOutput out1 = new BytesStreamOutput();
+        final FieldPermissions fieldPermissions1 = new FieldPermissions(
+            new FieldPermissionsDefinition(Set.of(
+                new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "x*" }, new String[] { "x2" }),
+                new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "y*" }, new String[] { "y2" }))))
+            .limitFieldPermissions(new FieldPermissions(fieldPermissionDef(new String[] { "z*" }, new String[] { "z2" })));
+        fieldPermissions1.buildCacheKey(out1, BytesReference::utf8ToString);
+
+        // Another mixed definition
+        final BytesStreamOutput out2 = new BytesStreamOutput();
+        final FieldPermissions fieldPermissions2 = new FieldPermissions(
+            new FieldPermissionsDefinition(Set.of(
+                new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "x*" }, new String[] { "x2" }))))
+            .limitFieldPermissions(new FieldPermissions(new FieldPermissionsDefinition(Set.of(
+                new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "y*" }, new String[] { "y2" }),
+                new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "z*" }, new String[] { "z2" }))
+            )));
+        fieldPermissions2.buildCacheKey(out2, BytesReference::utf8ToString);
+
+        // Just limited by
+        final BytesStreamOutput out3 = new BytesStreamOutput();
+        final FieldPermissions fieldPermissions3 = new FieldPermissions().limitFieldPermissions(
+            new FieldPermissions(
+                new FieldPermissionsDefinition(Set.of(
+                    new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "x*" }, new String[] { "x2" }),
+                    new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "y*" }, new String[] { "y2" }),
+                    new FieldPermissionsDefinition.FieldGrantExcludeGroup(new String[] { "z*" }, new String[] { "z2" })))));
+        fieldPermissions3.buildCacheKey(out3, BytesReference::utf8ToString);
+
+        assertThat(Arrays.equals(BytesReference.toBytes(out0.bytes()), BytesReference.toBytes(out1.bytes())), is(false));
+        assertThat(Arrays.equals(BytesReference.toBytes(out0.bytes()), BytesReference.toBytes(out2.bytes())), is(false));
+        assertThat(Arrays.equals(BytesReference.toBytes(out1.bytes()), BytesReference.toBytes(out2.bytes())), is(false));
+
+        // Just limited by is the same as definition because limitFieldPermissions uses limited-by definition if the original
+        // permission is match all
+        assertThat(Arrays.equals(BytesReference.toBytes(out0.bytes()), BytesReference.toBytes(out3.bytes())), is(true));
+    }
+
     private static FieldPermissionsDefinition fieldPermissionDef(String[] granted, String[] denied) {
         return new FieldPermissionsDefinition(granted, denied);
     }

+ 13 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/support/DLSRoleQueryValidatorTests.java

@@ -7,6 +7,8 @@
 package org.elasticsearch.xpack.core.security.authz.support;
 
 import org.apache.lucene.search.join.ScoreMode;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.BoostingQueryBuilder;
 import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
@@ -20,7 +22,10 @@ import org.elasticsearch.join.query.HasChildQueryBuilder;
 import org.elasticsearch.join.query.HasParentQueryBuilder;
 import org.elasticsearch.test.ESTestCase;
 
+import java.io.IOException;
+
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 
 public class DLSRoleQueryValidatorTests extends ESTestCase {
 
@@ -61,4 +66,12 @@ public class DLSRoleQueryValidatorTests extends ESTestCase {
         e = expectThrows(IllegalArgumentException.class, () -> DLSRoleQueryValidator.verifyRoleQuery(queryBuilder9));
         assertThat(e.getMessage(), equalTo("geoshape query referring to indexed shapes isn't supported as part of a role query"));
     }
+
+    public void testHasStoredScript() throws IOException {
+        assertThat(DLSRoleQueryValidator.hasStoredScript(
+            new BytesArray("{\"template\":{\"id\":\"my-script\"}}"), NamedXContentRegistry.EMPTY), is(true));
+        assertThat(DLSRoleQueryValidator.hasStoredScript(
+            new BytesArray("{\"template\":{\"source\":\"{}\"}}"), NamedXContentRegistry.EMPTY), is(false));
+    }
+
 }

+ 118 - 0
x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/dlsfls/DlsRequestCacheIT.java

@@ -0,0 +1,118 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.dlsfls;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.settings.SecureString;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.test.SecuritySettingsSourceField;
+import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken;
+import org.elasticsearch.xpack.security.SecurityOnTrialLicenseRestTestCase;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class DlsRequestCacheIT extends SecurityOnTrialLicenseRestTestCase {
+
+    private static final String DLS_USER = "system_user";
+    private static final SecureString DLS_USER_PASSWORD = new SecureString("dls-user-password".toCharArray());
+    private static final String DLS_TEMPLATE_PAINLESS_INDEX = "dls-template-painless-index";
+
+    @Before
+    public void createUsers() throws IOException {
+        createUser(DLS_USER, DLS_USER_PASSWORD, List.of("dls_painless_role"));
+    }
+
+    @After
+    public void cleanUp() throws IOException {
+        deleteUser(DLS_USER);
+    }
+
+    @Override
+    protected Settings restAdminSettings() {
+        String token = basicAuthHeaderValue("x_pack_rest_user", SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
+        return Settings.builder()
+            .put(ThreadContext.PREFIX + ".Authorization", token)
+            .build();
+    }
+
+    public void testRequestCacheDisabledForDlsTemplateRoleWithPainless() throws IOException {
+        final RestClient adminClient = adminClient();
+        final RestClient client = client();
+
+        final Request putScriptRequest = new Request("PUT", "_scripts/range-now");
+        putScriptRequest.setJsonEntity("{\"script\":{\"lang\":\"painless\"," +
+            "\"source\":\"'{\\\"range\\\":{\\\"date\\\": {\\\"lte\\\": \\\"' + new Date().getTime() + '\\\"}}}' \"}}");
+        assertOK(adminClient.performRequest(putScriptRequest));
+
+        // Create the index with a date field and 1 primary shard with no replica
+        final Request putIndexRequest = new Request("PUT", DLS_TEMPLATE_PAINLESS_INDEX);
+        putIndexRequest.setJsonEntity("{\"mappings\":{\"properties\":{\"date\":{\"type\":\"date\",\"format\":\"epoch_millis\"}}}," +
+            "\"settings\":{\"number_of_shards\":1,\"number_of_replicas\":0}}");
+        assertOK(adminClient.performRequest(putIndexRequest));
+
+        // A doc in the past 1 min
+        final Request putDocRequest1 = new Request("PUT", DLS_TEMPLATE_PAINLESS_INDEX + "/_doc/1");
+        putDocRequest1.setJsonEntity("{\"date\":" + Instant.now().minus(Duration.ofSeconds(60)).toEpochMilli() + "}");
+        assertOK(adminClient.performRequest(putDocRequest1));
+
+        // A doc in the future 1 min
+        final Request putDocRequest2 = new Request("PUT", DLS_TEMPLATE_PAINLESS_INDEX + "/_doc/2");
+        putDocRequest2.setJsonEntity("{\"date\":" + Instant.now().plus(Duration.ofSeconds(60)).toEpochMilli() + "}");
+        assertOK(adminClient.performRequest(putDocRequest2));
+
+        final Request refreshRequest = new Request("POST", DLS_TEMPLATE_PAINLESS_INDEX + "/_refresh");
+        assertOK(adminClient.performRequest(refreshRequest));
+
+        // First search should only get 1 doc in the past
+        final Request searchRequest = new Request("GET", DLS_TEMPLATE_PAINLESS_INDEX + "/_search");
+        searchRequest.addParameter("request_cache", "true");
+        searchRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization",
+            UsernamePasswordToken.basicAuthHeaderValue(DLS_USER, DLS_USER_PASSWORD)));
+        assertSearchResponse(client.performRequest(searchRequest), Set.of("1"));
+        // Cache should not be used since DLS query uses stored script
+        assertCacheState(0, 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertSearchResponse(Response response, Set<String> docIds) throws IOException {
+        final Map<String, Object> m = responseAsMap(response);
+
+        final Map<String, Object> hits = (Map<String, Object>) m.get("hits");
+        final List<Map<String, Object>> docs = (List<Map<String, Object>>) hits.get("hits");
+
+        assertThat(docs.stream().map(d -> (String) d.get("_id")).collect(Collectors.toSet()), equalTo(docIds));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void assertCacheState(int expectedHits, int expectedMisses) throws IOException {
+        final Request request = new Request("GET", DLS_TEMPLATE_PAINLESS_INDEX + "/_stats");
+        request.addParameter("filter_path", "indices." + DLS_TEMPLATE_PAINLESS_INDEX + ".total.request_cache");
+        final Response response = adminClient().performRequest(request);
+        final Map<String, Object> m = responseAsMap(response);
+        final Map<String, Object> indices = (Map<String, Object>) m.get("indices");
+        final Map<String, Object> index = (Map<String, Object>) indices.get(DLS_TEMPLATE_PAINLESS_INDEX);
+        final Map<String, Object> total = (Map<String, Object>) index.get("total");
+        final Map<String, Object> requestCache = (Map<String, Object>) total.get("request_cache");
+        assertThat((int) requestCache.get("hit_count"), equalTo(expectedHits));
+        assertThat((int) requestCache.get("miss_count"), equalTo(expectedMisses));
+    }
+}

+ 5 - 0
x-pack/plugin/security/qa/security-trial/src/javaRestTest/resources/roles.yml

@@ -13,3 +13,8 @@ cat_test_role:
     - names: [ "index_allowed" ]
       privileges: [ "read", "write", "monitor" ]
 
+dls_painless_role:
+  indices:
+    - names: [ "dls-template-painless-index" ]
+      privileges: ["read"]
+      query: {"template":{"id":"range-now"}}

+ 420 - 0
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DlsFlsRequestCacheTests.java

@@ -0,0 +1,420 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.integration;
+
+import org.elasticsearch.ElasticsearchSecurityException;
+import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
+import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.settings.SecureString;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.cache.request.RequestCacheStats;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.script.mustache.MustachePlugin;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.test.SecuritySingleNodeTestCase;
+import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.security.action.CreateApiKeyAction;
+import org.elasticsearch.xpack.core.security.action.CreateApiKeyRequest;
+import org.elasticsearch.xpack.core.security.action.CreateApiKeyResponse;
+import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+import org.junit.Before;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.test.SecuritySettingsSource.TEST_PASSWORD_HASHED;
+import static org.elasticsearch.test.SecuritySettingsSourceField.TEST_PASSWORD;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
+import static org.hamcrest.Matchers.equalTo;
+
+public class DlsFlsRequestCacheTests extends SecuritySingleNodeTestCase {
+
+    private static final String DLS_FLS_USER = "dls_fls_user";
+    private static final String DLS_INDEX = "dls-index";
+    private static final String DLS_ALIAS = "dls-alias";
+    private static final String FLS_INDEX = "fls-index";
+    private static final String FLS_ALIAS = "fls-alias";
+    private static final String INDEX = "index";
+    private static final String ALIAS1 = "alias1";
+    private static final String ALIAS2 = "alias2";
+    private static final String ALL_ALIAS = "all-alias";
+    private static final String DLS_TEMPLATE_ROLE_QUERY_USER_1 = "dls_template_role_query_user_1";
+    private static final String DLS_TEMPLATE_ROLE_QUERY_USER_2 = "dls_template_role_query_user_2";
+    private static final String DLS_TEMPLATE_ROLE_QUERY_ROLE = "dls_template_role_query_role";
+    private static final String DLS_TEMPLATE_ROLE_QUERY_INDEX = "dls-template-role-query-index";
+    private static final String DLS_TEMPLATE_ROLE_QUERY_ALIAS = "dls-template-role-query-alias";
+
+    @Override
+    protected Settings nodeSettings() {
+        return Settings.builder()
+            .put(super.nodeSettings())
+            .put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true)
+            .build();
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> getPlugins() {
+        final ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
+        plugins.add(MustachePlugin.class);
+        return List.copyOf(plugins);
+    }
+
+    @Override
+    protected String configUsers() {
+        return super.configUsers()
+            + DLS_FLS_USER + ":" + TEST_PASSWORD_HASHED + "\n"
+            + DLS_TEMPLATE_ROLE_QUERY_USER_2 + ":" + TEST_PASSWORD_HASHED + "\n"
+            + DLS_TEMPLATE_ROLE_QUERY_USER_1 + ":" + TEST_PASSWORD_HASHED + "\n";
+    }
+
+    @Override
+    protected String configRoles() {
+        return super.configRoles()
+            + DLS_FLS_USER + ":\n"
+            + "  cluster: [ \"manage_own_api_key\" ]\n"
+            + "  indices:\n"
+            + "  - names:\n"
+            + "    - \"dls-index\"\n"
+            + "    privileges:\n"
+            + "    - \"read\"\n"
+            + "    query: \"{\\\"match\\\": {\\\"number\\\": 101}}\"\n"
+            + "  - names:\n"
+            + "    - \"dls-alias\"\n"
+            + "    privileges:\n"
+            + "    - \"read\"\n"
+            + "    query: \"{\\\"match\\\": {\\\"number\\\": 102}}\"\n"
+            + "  - names:\n"
+            + "    - \"fls-index\"\n"
+            + "    privileges:\n"
+            + "    - \"read\"\n"
+            + "    field_security:\n"
+            + "      grant:\n"
+            + "      - \"public\"\n"
+            + "  - names:\n"
+            + "    - \"fls-alias\"\n"
+            + "    privileges:\n"
+            + "    - \"read\"\n"
+            + "    field_security:\n"
+            + "      grant:\n"
+            + "      - \"private\"\n"
+            + "  - names:\n"
+            + "    - \"alias1\"\n"
+            + "    privileges:\n"
+            + "    - \"read\"\n"
+            + "    query: \"{\\\"match\\\": {\\\"number\\\": 1}}\"\n"
+            + "    field_security:\n"
+            + "      grant:\n"
+            + "      - \"*\"\n"
+            + "      except:\n"
+            + "      - \"private\"\n"
+            + "  - names:\n"
+            + "    - \"alias2\"\n"
+            + "    privileges:\n"
+            + "    - \"read\"\n"
+            + "    query: \"{\\\"match\\\": {\\\"number\\\": 2}}\"\n"
+            + "    field_security:\n"
+            + "      grant:\n"
+            + "      - \"*\"\n"
+            + "      except:\n"
+            + "      - \"public\"\n"
+            + "  - names:\n"
+            + "    - \"all-alias\"\n"
+            + "    privileges:\n"
+            + "    - \"read\"\n"
+            + DLS_TEMPLATE_ROLE_QUERY_ROLE + ":\n"
+            + "  indices:\n"
+            + "  - names:\n"
+            + "    - \"dls-template-role-query-index\"\n"
+            + "    privileges:\n"
+            + "    - \"read\"\n"
+            + "    query: {\"template\":{\"source\":{\"match\":{\"username\":\"{{_user.username}}\"}}}}\n"
+            + "  - names:\n"
+            + "    - \"dls-template-role-query-alias\"\n"
+            + "    privileges:\n"
+            + "    - \"read\"\n"
+            + "    query: {\"template\":{\"id\":\"my-script\"}}\n";
+    }
+
+    @Override
+    protected String configUsersRoles() {
+        return super.configUsersRoles()
+            + DLS_FLS_USER + ":" + DLS_FLS_USER + "\n"
+            + DLS_TEMPLATE_ROLE_QUERY_ROLE + ":" + DLS_TEMPLATE_ROLE_QUERY_USER_1 + "," + DLS_TEMPLATE_ROLE_QUERY_USER_2 + "\n";
+    }
+
+    @Before
+    public void init() {
+        prepareIndices();
+    }
+
+    public void testRequestCacheForDLS() {
+        final Client powerClient = client();
+        final Client limitedClient = limitedClient();
+
+        // Search first with power client, it should see all docs
+        assertSearchResponse(powerClient.prepareSearch(DLS_INDEX).setRequestCache(true).get(), Set.of("101", "102"));
+        assertCacheState(DLS_INDEX, 0, 1);
+
+        // Search with the limited client and it should see only one doc (i.e. it won't use cache entry for power client)
+        assertSearchResponse(limitedClient.prepareSearch(DLS_INDEX).setRequestCache(true).get(), Set.of("101"));
+        assertCacheState(DLS_INDEX, 0, 2);
+
+        // Execute the above search again and it should use the cache entry for limited client
+        assertSearchResponse(limitedClient.prepareSearch(DLS_INDEX).setRequestCache(true).get(), Set.of("101"));
+        assertCacheState(DLS_INDEX, 1, 2);
+
+        // Execute the search with power client again and it should still see all docs
+        assertSearchResponse(powerClient.prepareSearch(DLS_INDEX).setRequestCache(true).get(), Set.of("101", "102"));
+        assertCacheState(DLS_INDEX, 2, 2);
+
+        // The limited client has a different DLS query for dls-alias compared to the underlying dls-index
+        assertSearchResponse(limitedClient.prepareSearch(DLS_ALIAS).setRequestCache(true).get(), Set.of("102"));
+        assertCacheState(DLS_INDEX, 2, 3);
+        assertSearchResponse(limitedClient.prepareSearch(DLS_ALIAS).setRequestCache(true).get(), Set.of("102"));
+        assertCacheState(DLS_INDEX, 3, 3);
+
+        // Search with limited client for dls-alias and dls-index returns all docs. The cache entry is however different
+        // from the power client, i.e. still no sharing even if the end results are the same. This is because the
+        // search with limited client still have DLS queries attached to it.
+        assertSearchResponse(limitedClient.prepareSearch(DLS_ALIAS, DLS_INDEX).setRequestCache(true).get(), Set.of("101", "102"));
+        assertCacheState(DLS_INDEX, 3, 4);
+    }
+
+    public void testRequestCacheForFLS() {
+        final Client powerClient = client();
+        final Client limitedClient = limitedClient();
+
+        // Search first with power client, it should see all fields
+        assertSearchResponse(powerClient.prepareSearch(FLS_INDEX).setRequestCache(true).get(),
+            Set.of("201", "202"), Set.of("public", "private"));
+        assertCacheState(FLS_INDEX, 0, 1);
+
+        // Search with limited client and it should see only public field
+        assertSearchResponse(limitedClient.prepareSearch(FLS_INDEX).setRequestCache(true).get(),
+            Set.of("201", "202"), Set.of("public"));
+        assertCacheState(FLS_INDEX, 0, 2);
+
+        // Search with limited client again and it should use the cache
+        assertSearchResponse(limitedClient.prepareSearch(FLS_INDEX).setRequestCache(true).get(),
+            Set.of("201", "202"), Set.of("public"));
+        assertCacheState(FLS_INDEX, 1, 2);
+
+        // Search again with power client, it should use its own cache entry
+        assertSearchResponse(powerClient.prepareSearch(FLS_INDEX).setRequestCache(true).get(),
+            Set.of("201", "202"), Set.of("public", "private"));
+        assertCacheState(FLS_INDEX, 2, 2);
+
+        // The fls-alias has a different FLS definition compared to its underlying fls-index.
+        assertSearchResponse(limitedClient.prepareSearch(FLS_ALIAS).setRequestCache(true).get(),
+            Set.of("201", "202"), Set.of("private"));
+        assertCacheState(FLS_INDEX, 2, 3);
+
+        // Search with the limited client for both fls-alias and fls-index and all docs and fields are also returned.
+        // But request cache is not shared with the power client because it still has a different indexAccessControl
+        assertSearchResponse(limitedClient.prepareSearch(FLS_ALIAS, FLS_INDEX).setRequestCache(true).get(),
+            Set.of("201", "202"), Set.of("public", "private"));
+        assertCacheState(FLS_INDEX, 2, 4);
+    }
+
+    public void testRequestCacheForBothDLSandFLS() throws ExecutionException, InterruptedException {
+        final Client powerClient = client();
+        final Client limitedClient = limitedClient();
+
+        // Search first with power client, it should see all fields
+        assertSearchResponse(powerClient.prepareSearch(INDEX).setRequestCache(true).get(),
+            Set.of("1", "2"), Set.of("number", "letter", "public", "private"));
+        assertCacheState(INDEX, 0, 1);
+
+        // The limited client does not have access to the underlying index
+        // Search with the limited client results in error
+        expectThrows(ElasticsearchSecurityException.class, () -> limitedClient.prepareSearch(INDEX).setRequestCache(true).get());
+
+        // Search for alias1 that points to index and has DLS/FLS
+        assertSearchResponse(limitedClient.prepareSearch(ALIAS1).setRequestCache(true).get(),
+            Set.of("1"), Set.of("number", "letter", "public"));
+        assertCacheState(INDEX, 0, 2);
+
+        // Search for alias2 that also points to index but has a different set of DLS/FLS
+        assertSearchResponse(limitedClient.prepareSearch(ALIAS2).setRequestCache(true).get(),
+            Set.of("2"), Set.of("number", "letter", "private"));
+        assertCacheState(INDEX, 0, 3);
+
+        // Search for all-alias that has full read access to the underlying index
+        // This makes it share the cache entry of the power client
+        assertSearchResponse(limitedClient.prepareSearch(ALL_ALIAS).setRequestCache(true).get(),
+            Set.of("1", "2"), Set.of("number", "letter", "public", "private"));
+        assertCacheState(INDEX, 1, 3);
+
+        // Similarly, search for alias1 and all-alias results in full read access to the index
+        // and again reuse the cache entry of the power client
+        assertSearchResponse(limitedClient.prepareSearch(ALIAS1, ALL_ALIAS).setRequestCache(true).get(),
+            Set.of("1", "2"), Set.of("number", "letter", "public", "private"));
+        assertCacheState(INDEX, 2, 3);
+
+        // Though search for both alias1 and alias2 is effectively full read access to index,
+        // it does not share the cache entry of the power client because role queries still exist.
+        assertSearchResponse(limitedClient.prepareSearch(ALIAS1, ALIAS2).setRequestCache(true).get(),
+            Set.of("1", "2"), Set.of("number", "letter", "public", "private"));
+        assertCacheState(INDEX, 2, 4);
+
+        // Test with an API Key that has different DLS/FLS on all-alias
+        final Client limitedClientApiKey = limitedClientApiKey();
+
+        // It should not reuse any entries from the cache
+        assertSearchResponse(limitedClientApiKey.prepareSearch(ALL_ALIAS).setRequestCache(true).get(),
+            Set.of("1"), Set.of("letter", "public", "private"));
+        assertCacheState(INDEX, 2, 5);
+    }
+
+    public void testRequestCacheWithTemplateRoleQuery() {
+        final Client client1 = client().filterWithHeader(Map.of(
+            "Authorization", basicAuthHeaderValue(DLS_TEMPLATE_ROLE_QUERY_USER_1, new SecureString(TEST_PASSWORD.toCharArray()))));
+        final Client client2 = client().filterWithHeader(Map.of(
+            "Authorization", basicAuthHeaderValue(DLS_TEMPLATE_ROLE_QUERY_USER_2, new SecureString(TEST_PASSWORD.toCharArray()))));
+
+        // Search first with user1 and only one document will be return with the corresponding username
+        assertSearchResponse(client1.prepareSearch(DLS_TEMPLATE_ROLE_QUERY_INDEX).setRequestCache(true).get(),
+            Set.of("1"), Set.of("username"));
+        assertCacheState(DLS_TEMPLATE_ROLE_QUERY_INDEX, 0, 1);
+
+        // Search with user2 will not use user1's cache because template query is resolved differently for them
+        assertSearchResponse(client2.prepareSearch(DLS_TEMPLATE_ROLE_QUERY_INDEX).setRequestCache(true).get(),
+            Set.of("2"), Set.of("username"));
+        assertCacheState(DLS_TEMPLATE_ROLE_QUERY_INDEX, 0, 2);
+
+        // Search with user1 again will use user1's cache
+        assertSearchResponse(client1.prepareSearch(DLS_TEMPLATE_ROLE_QUERY_INDEX).setRequestCache(true).get(),
+            Set.of("1"), Set.of("username"));
+        assertCacheState(DLS_TEMPLATE_ROLE_QUERY_INDEX, 1, 2);
+
+        // Search with user2 again will use user2's cache
+        assertSearchResponse(client2.prepareSearch(DLS_TEMPLATE_ROLE_QUERY_INDEX).setRequestCache(true).get(),
+            Set.of("2"), Set.of("username"));
+        assertCacheState(DLS_TEMPLATE_ROLE_QUERY_INDEX, 2, 2);
+
+        // Since the DLS for the alias uses a stored script, this should cause the request cached to be disabled
+        assertSearchResponse(client1.prepareSearch(DLS_TEMPLATE_ROLE_QUERY_ALIAS).setRequestCache(true).get(),
+            Set.of("1"), Set.of("username"));
+        // No cache should be used
+        assertCacheState(DLS_TEMPLATE_ROLE_QUERY_INDEX, 2, 2);
+    }
+
+    private void prepareIndices() {
+        final Client client = client();
+
+        assertAcked(client.admin().cluster().preparePutStoredScript().setId("my-script")
+            .setContent(new BytesArray("{\"script\":{\"source\":" +
+                "\"{\\\"match\\\":{\\\"username\\\":\\\"{{_user.username}}\\\"}}\",\"lang\":\"mustache\"}}"), XContentType.JSON)
+            .get());
+
+        assertAcked(client.admin().indices().prepareCreate(DLS_INDEX).addAlias(new Alias("dls-alias")).get());
+        client.prepareIndex(DLS_INDEX).setId("101").setSource("number", 101, "letter", "A").get();
+        client.prepareIndex(DLS_INDEX).setId("102").setSource("number", 102, "letter", "B").get();
+
+        assertAcked(client.admin().indices().prepareCreate(FLS_INDEX).addAlias(new Alias("fls-alias")).get());
+        client.prepareIndex(FLS_INDEX).setId("201").setSource("public", "X", "private", "x").get();
+        client.prepareIndex(FLS_INDEX).setId("202").setSource("public", "Y", "private", "y").get();
+
+        assertAcked(client.admin().indices().prepareCreate(INDEX)
+            .addAlias(new Alias(ALIAS1))
+            .addAlias(new Alias(ALIAS2))
+            .addAlias(new Alias(ALL_ALIAS))
+            .get());
+        client.prepareIndex(INDEX).setId("1").setSource("number", 1, "letter", "a", "private", "sesame_1", "public", "door_1").get();
+        client.prepareIndex(INDEX).setId("2").setSource("number", 2, "letter", "b", "private", "sesame_2", "public", "door_2").get();
+
+        assertAcked(client.admin().indices().prepareCreate(DLS_TEMPLATE_ROLE_QUERY_INDEX)
+            .addAlias(new Alias(DLS_TEMPLATE_ROLE_QUERY_ALIAS)).get());
+        client.prepareIndex(DLS_TEMPLATE_ROLE_QUERY_INDEX).setId("1").setSource("username", DLS_TEMPLATE_ROLE_QUERY_USER_1).get();
+        client.prepareIndex(DLS_TEMPLATE_ROLE_QUERY_INDEX).setId("2").setSource("username", DLS_TEMPLATE_ROLE_QUERY_USER_2).get();
+
+        ensureGreen(DLS_INDEX, FLS_INDEX, INDEX, DLS_TEMPLATE_ROLE_QUERY_INDEX);
+        assertCacheState(DLS_INDEX, 0, 0);
+        assertCacheState(FLS_INDEX, 0, 0);
+        assertCacheState(INDEX, 0, 0);
+        assertCacheState(DLS_TEMPLATE_ROLE_QUERY_INDEX, 0, 0);
+
+        // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
+        final ForceMergeResponse forceMergeResponse = client.admin().indices()
+            .prepareForceMerge(DLS_INDEX, FLS_INDEX, INDEX, DLS_TEMPLATE_ROLE_QUERY_INDEX).setFlush(true).get();
+        ElasticsearchAssertions.assertAllSuccessful(forceMergeResponse);
+        final RefreshResponse refreshResponse = client.admin().indices()
+            .prepareRefresh(DLS_INDEX, FLS_INDEX, INDEX, DLS_TEMPLATE_ROLE_QUERY_INDEX).get();
+        assertThat(refreshResponse.getFailedShards(), equalTo(0));
+        ensureGreen(DLS_INDEX, FLS_INDEX, INDEX, DLS_TEMPLATE_ROLE_QUERY_INDEX);
+    }
+
+    private Client limitedClient() {
+        return client().filterWithHeader(Map.of(
+            "Authorization", basicAuthHeaderValue(DLS_FLS_USER, new SecureString(TEST_PASSWORD.toCharArray()))));
+    }
+
+    private Client limitedClientApiKey() throws ExecutionException, InterruptedException {
+        final CreateApiKeyRequest createApiKeyRequest = new CreateApiKeyRequest(randomAlphaOfLengthBetween(3, 8),
+            List.of(new RoleDescriptor(randomAlphaOfLengthBetween(3, 8),
+                null,
+                new RoleDescriptor.IndicesPrivileges[]{
+                    RoleDescriptor.IndicesPrivileges.builder().indices(ALL_ALIAS)
+                    .privileges("read").query("{\"term\":{\"letter\":\"a\"}}").grantedFields("*").deniedFields("number").build()
+                },
+                null)),
+            null);
+        final CreateApiKeyResponse createApiKeyResponse = limitedClient().execute(CreateApiKeyAction.INSTANCE, createApiKeyRequest).get();
+
+        final String base64ApiKey = Base64.getEncoder().encodeToString(
+            (createApiKeyResponse.getId() + ":" + createApiKeyResponse.getKey()).getBytes(StandardCharsets.UTF_8));
+        return client().filterWithHeader(Map.of("Authorization", "ApiKey " + base64ApiKey));
+    }
+
+    private void assertSearchResponse(SearchResponse searchResponse, Set<String> docIds) {
+        assertSearchResponse(searchResponse, docIds, null);
+    }
+
+    private void assertSearchResponse(SearchResponse searchResponse, Set<String> docIds, Set<String> fieldNames) {
+        assertThat(searchResponse.getFailedShards(), equalTo(0));
+        assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) docIds.size()));
+        final SearchHit[] hits = searchResponse.getHits().getHits();
+        assertThat(Arrays.stream(hits).map(SearchHit::getId).collect(Collectors.toUnmodifiableSet()), equalTo(docIds));
+        if (fieldNames != null) {
+            for (SearchHit hit : hits) {
+                assertThat(hit.getSourceAsMap().keySet(), equalTo(fieldNames));
+            }
+        }
+    }
+
+    private void assertCacheState(String index, long expectedHits, long expectedMisses) {
+        RequestCacheStats requestCacheStats = client().admin().indices().prepareStats(index)
+            .setRequestCache(true)
+            .get().getTotal().getRequestCache();
+        // Check the hit count and miss count together so if they are not
+        // correct we can see both values
+        assertEquals(
+            Arrays.asList(expectedHits, expectedMisses, 0L),
+            Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()));
+    }
+
+    private void clearCache() {
+        assertNoFailures(client().admin().indices().prepareClearCache(DLS_INDEX, FLS_INDEX, INDEX).setRequestCache(true).get());
+    }
+}

+ 20 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

@@ -24,8 +24,10 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -60,12 +62,14 @@ import org.elasticsearch.plugins.IngestPlugin;
 import org.elasticsearch.plugins.MapperPlugin;
 import org.elasticsearch.plugins.NetworkPlugin;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.SearchPlugin;
 import org.elasticsearch.plugins.SystemIndexPlugin;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.rest.RestHeaderDefinition;
 import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.threadpool.ExecutorBuilder;
 import org.elasticsearch.threadpool.FixedExecutorBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -222,6 +226,7 @@ import org.elasticsearch.xpack.security.authc.support.SecondaryAuthenticator;
 import org.elasticsearch.xpack.security.authc.support.HttpTlsRuntimeCheck;
 import org.elasticsearch.xpack.security.authc.support.mapper.NativeRoleMappingStore;
 import org.elasticsearch.xpack.security.authz.AuthorizationService;
+import org.elasticsearch.xpack.security.authz.DlsFlsRequestCacheDifferentiator;
 import org.elasticsearch.xpack.security.authz.SecuritySearchOperationListener;
 import org.elasticsearch.xpack.security.authz.accesscontrol.OptOutQueryCache;
 import org.elasticsearch.xpack.security.authz.interceptor.BulkShardRequestInterceptor;
@@ -229,6 +234,7 @@ import org.elasticsearch.xpack.security.authz.interceptor.IndicesAliasesRequestI
 import org.elasticsearch.xpack.security.authz.interceptor.RequestInterceptor;
 import org.elasticsearch.xpack.security.authz.interceptor.ResizeRequestInterceptor;
 import org.elasticsearch.xpack.security.authz.interceptor.SearchRequestInterceptor;
+import org.elasticsearch.xpack.security.authz.interceptor.ShardSearchRequestInterceptor;
 import org.elasticsearch.xpack.security.authz.interceptor.UpdateRequestInterceptor;
 import org.elasticsearch.xpack.security.authz.store.CompositeRolesStore;
 import org.elasticsearch.xpack.security.authz.store.DeprecationRoleDescriptorConsumer;
@@ -334,7 +340,7 @@ import static org.elasticsearch.xpack.security.support.SecurityIndexManager.INTE
 import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_VERSION_STRING;
 
 public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin, NetworkPlugin, ClusterPlugin,
-        DiscoveryPlugin, MapperPlugin, ExtensiblePlugin {
+        DiscoveryPlugin, MapperPlugin, ExtensiblePlugin, SearchPlugin {
 
     public static final String SECURITY_CRYPTO_THREAD_POOL_NAME = XPackField.SECURITY + "-crypto";
 
@@ -364,6 +370,7 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
     private final SetOnce<List<BootstrapCheck>> bootstrapChecks = new SetOnce<>();
     private final List<SecurityExtension> securityExtensions = new ArrayList<>();
     private final SetOnce<Transport> transportReference = new SetOnce<>();
+    private final SetOnce<ScriptService> scriptServiceReference = new SetOnce<>();
 
     public Security(Settings settings, final Path configPath) {
         this(settings, configPath, Collections.emptyList());
@@ -425,6 +432,8 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
             return Collections.singletonList(new SecurityUsageServices(null, null, null, null));
         }
 
+        scriptServiceReference.set(scriptService);
+
         // We need to construct the checks here while the secure settings are still available.
         // If we wait until #getBoostrapChecks the secure settings will have been cleared/closed.
         final List<BootstrapCheck> checks = new ArrayList<>();
@@ -561,7 +570,8 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
             new IndicesAliasesRequestInterceptor(threadPool.getThreadContext(), getLicenseState(), auditTrailService));
         if (XPackSettings.DLS_FLS_ENABLED.get(settings)) {
             requestInterceptors.addAll(Arrays.asList(
-                new SearchRequestInterceptor(threadPool, getLicenseState()),
+                new SearchRequestInterceptor(threadPool, getLicenseState(), clusterService),
+                new ShardSearchRequestInterceptor(threadPool, getLicenseState(), clusterService),
                 new UpdateRequestInterceptor(threadPool, getLicenseState()),
                 new BulkShardRequestInterceptor(threadPool, getLicenseState())
             ));
@@ -1856,4 +1866,12 @@ public class Security extends Plugin implements SystemIndexPlugin, IngestPlugin,
     public String getFeatureDescription() {
         return "Manages configuration for Security features, such as users and roles";
     }
+
+    @Override
+    public CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> getRequestCacheKeyDifferentiator() {
+        if (enabled == false) {
+            return null;
+        }
+        return new DlsFlsRequestCacheDifferentiator(getLicenseState(), securityContext, scriptServiceReference);
+    }
 }

+ 65 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/DlsFlsRequestCacheDifferentiator.java

@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.authz;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.common.CheckedBiConsumer;
+import org.elasticsearch.core.MemoizedSupplier;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.internal.ShardSearchRequest;
+import org.elasticsearch.xpack.core.security.SecurityContext;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
+import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
+import org.elasticsearch.xpack.core.security.authz.support.SecurityQueryTemplateEvaluator;
+
+import java.io.IOException;
+
+public class DlsFlsRequestCacheDifferentiator implements CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> {
+
+    private static final Logger logger = LogManager.getLogger(DlsFlsRequestCacheDifferentiator.class);
+
+    private final XPackLicenseState licenseState;
+    private final SetOnce<SecurityContext> securityContextHolder;
+    private final SetOnce<ScriptService> scriptServiceReference;
+
+    public DlsFlsRequestCacheDifferentiator(XPackLicenseState licenseState,
+                                            SetOnce<SecurityContext> securityContextReference,
+                                            SetOnce<ScriptService> scriptServiceReference) {
+        this.licenseState = licenseState;
+        this.securityContextHolder = securityContextReference;
+        this.scriptServiceReference = scriptServiceReference;
+    }
+
+    @Override
+    public void accept(ShardSearchRequest request, StreamOutput out) throws IOException {
+        if (false == licenseState.isSecurityEnabled()) {
+            return;
+        }
+        var licenseChecker = new MemoizedSupplier<>(() -> licenseState.checkFeature(XPackLicenseState.Feature.SECURITY_DLS_FLS));
+        final SecurityContext securityContext = securityContextHolder.get();
+        final IndicesAccessControl indicesAccessControl =
+            securityContext.getThreadContext().getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY);
+        final String indexName = request.shardId().getIndexName();
+        IndicesAccessControl.IndexAccessControl indexAccessControl = indicesAccessControl.getIndexPermissions(indexName);
+        if (indexAccessControl != null) {
+            final boolean flsEnabled = indexAccessControl.getFieldPermissions().hasFieldLevelSecurity();
+            final boolean dlsEnabled = indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions();
+            if ((flsEnabled || dlsEnabled) && licenseChecker.get()) {
+                logger.debug("index [{}] with field level access controls [{}] " +
+                        "document level access controls [{}]. Differentiating request cache key",
+                    indexName, flsEnabled, dlsEnabled);
+                indexAccessControl.buildCacheKey(
+                    out, SecurityQueryTemplateEvaluator.wrap(securityContext.getUser(), scriptServiceReference.get()));
+            }
+        }
+    }
+}

+ 9 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java

@@ -483,8 +483,15 @@ public class RBACEngine implements AuthorizationEngine {
         final Set<GetUserPrivilegesResponse.Indices> indices = new LinkedHashSet<>();
         for (IndicesPermission.Group group : userRole.indices().groups()) {
             final Set<BytesReference> queries = group.getQuery() == null ? Collections.emptySet() : group.getQuery();
-            final Set<FieldPermissionsDefinition.FieldGrantExcludeGroup> fieldSecurity = group.getFieldPermissions().hasFieldLevelSecurity()
-                ? group.getFieldPermissions().getFieldPermissionsDefinition().getFieldGrantExcludeGroups() : Collections.emptySet();
+            final Set<FieldPermissionsDefinition.FieldGrantExcludeGroup> fieldSecurity;
+            if (group.getFieldPermissions().hasFieldLevelSecurity()) {
+                final FieldPermissionsDefinition definition = group.getFieldPermissions().getFieldPermissionsDefinition();
+                assert group.getFieldPermissions().getLimitedByFieldPermissionsDefinition() == null
+                    : "limited-by field must not exist since we do not support reporting user privileges for limited roles";
+                fieldSecurity = definition.getFieldGrantExcludeGroups();
+            } else {
+                fieldSecurity = Collections.emptySet();
+            }
             indices.add(new GetUserPrivilegesResponse.Indices(
                 Arrays.asList(group.indices()),
                 group.privilege().name(),

+ 24 - 24
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/FieldAndDocumentLevelSecurityRequestInterceptor.java

@@ -11,7 +11,6 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.core.MemoizedSupplier;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.license.XPackLicenseState.Feature;
@@ -22,6 +21,9 @@ import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.RequestIn
 import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
 import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Base class for interceptors that disables features when field level security is configured for indices a request
  * is going to execute on.
@@ -43,46 +45,44 @@ abstract class FieldAndDocumentLevelSecurityRequestInterceptor implements Reques
                           ActionListener<Void> listener) {
         if (requestInfo.getRequest() instanceof IndicesRequest && false == TransportActionProxy.isProxyAction(requestInfo.getAction())) {
             IndicesRequest indicesRequest = (IndicesRequest) requestInfo.getRequest();
+            // TODO: should we check is DLS/FLS feature allowed here as part of shouldIntercept
             boolean shouldIntercept = licenseState.isSecurityEnabled();
-            var licenseChecker = new MemoizedSupplier<>(() -> licenseState.checkFeature(Feature.SECURITY_DLS_FLS));
             if (supports(indicesRequest) && shouldIntercept) {
-                final IndicesAccessControl indicesAccessControl =
-                    threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY);
-                boolean fieldLevelSecurityEnabled = false;
-                boolean documentLevelSecurityEnabled = false;
-                final String[] requestIndices = requestIndices(indicesRequest);
-                for (String index : requestIndices) {
+                var licenseChecker = new MemoizedSupplier<>(() -> licenseState.checkFeature(Feature.SECURITY_DLS_FLS));
+                final IndicesAccessControl indicesAccessControl
+                    = threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY);
+                final Map<String, IndicesAccessControl.IndexAccessControl> accessControlByIndex = new HashMap<>();
+                for (String index : requestIndices(indicesRequest)) {
                     IndicesAccessControl.IndexAccessControl indexAccessControl = indicesAccessControl.getIndexPermissions(index);
                     if (indexAccessControl != null) {
-                        fieldLevelSecurityEnabled =
-                            fieldLevelSecurityEnabled || indexAccessControl.getFieldPermissions().hasFieldLevelSecurity();
-                        documentLevelSecurityEnabled =
-                            documentLevelSecurityEnabled || indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions();
-                        if (fieldLevelSecurityEnabled && documentLevelSecurityEnabled) {
-                            break;
+                        final boolean flsEnabled = indexAccessControl.getFieldPermissions().hasFieldLevelSecurity();
+                        final boolean dlsEnabled = indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions();
+                        if ((flsEnabled || dlsEnabled) && licenseChecker.get()) {
+                            logger.trace("intercepted request for index [{}] with field level access controls [{}] " +
+                                "document level access controls [{}]. disabling conflicting features",
+                                index, flsEnabled, dlsEnabled);
+                            accessControlByIndex.put(index, indexAccessControl);
                         }
+                    } else {
+                        logger.trace("intercepted request for index [{}] without field or document level access controls", index);
                     }
                 }
-                if ((fieldLevelSecurityEnabled || documentLevelSecurityEnabled) && licenseChecker.get()) {
-                    logger.trace("intercepted request for indices [{}] with field level access controls [{}] " +
-                            "document level access controls [{}]. disabling conflicting features",
-                        Strings.arrayToDelimitedString(requestIndices, ","), fieldLevelSecurityEnabled, documentLevelSecurityEnabled);
-                    disableFeatures(indicesRequest, fieldLevelSecurityEnabled, documentLevelSecurityEnabled, listener);
+                if (false == accessControlByIndex.isEmpty()) {
+                    disableFeatures(indicesRequest, accessControlByIndex, listener);
                     return;
                 }
-                logger.trace("intercepted request for indices [{}] without field or document level access controls",
-                    Strings.arrayToDelimitedString(requestIndices, ","));
             }
         }
         listener.onResponse(null);
     }
 
+    abstract void disableFeatures(IndicesRequest indicesRequest,
+                                  Map<String, IndicesAccessControl.IndexAccessControl> indicesAccessControlByIndex,
+                                  ActionListener<Void> listener);
+
     String[] requestIndices(IndicesRequest indicesRequest) {
         return indicesRequest.indices();
     }
 
-    abstract void disableFeatures(IndicesRequest request, boolean fieldLevelSecurityEnabled, boolean documentLevelSecurityEnabled,
-                                  ActionListener<Void> listener);
-
     abstract boolean supports(IndicesRequest request);
 }

+ 31 - 21
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestInterceptor.java

@@ -7,45 +7,50 @@
 package org.elasticsearch.xpack.security.authz.interceptor;
 
 import org.elasticsearch.ElasticsearchSecurityException;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.internal.ShardSearchRequest;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.elasticsearch.transport.RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR;
 
-/**
- * If field level security is enabled this interceptor disables the request cache for search and shardSearch requests.
- */
 public class SearchRequestInterceptor extends FieldAndDocumentLevelSecurityRequestInterceptor {
 
-    public SearchRequestInterceptor(ThreadPool threadPool, XPackLicenseState licenseState) {
+    public static final Version VERSION_SHARD_SEARCH_INTERCEPTOR = Version.V_7_11_2;
+    private final ClusterService clusterService;
+
+    public SearchRequestInterceptor(ThreadPool threadPool, XPackLicenseState licenseState, ClusterService clusterService) {
         super(threadPool.getThreadContext(), licenseState);
+        this.clusterService = clusterService;
     }
 
     @Override
-    public void disableFeatures(IndicesRequest indicesRequest, boolean fieldLevelSecurityEnabled, boolean documentLevelSecurityEnabled,
-                                ActionListener<Void> listener) {
-        assert indicesRequest instanceof SearchRequest || indicesRequest instanceof ShardSearchRequest
-            : "request must be either SearchRequest or ShardSearchRequest";
-
-        final SearchSourceBuilder source;
-        if (indicesRequest instanceof SearchRequest) {
-            final SearchRequest request = (SearchRequest) indicesRequest;
+    void disableFeatures(IndicesRequest indicesRequest,
+                         Map<String, IndicesAccessControl.IndexAccessControl> indexAccessControlByIndex,
+                         ActionListener<Void> listener) {
+        final SearchRequest request = (SearchRequest) indicesRequest;
+        // The 7.11.2 version check is needed because request caching has a bug related to DLS/FLS
+        // versions before 7.11.2. It is fixed by #69505. See also ESA-2021-08.
+        // TODO: The version check can be removed in 8.0 because 7.last will have support for request caching with DLS/FLS
+        if (clusterService.state().nodes().getMinNodeVersion().before(VERSION_SHARD_SEARCH_INTERCEPTOR) || hasRemoteIndices(request)) {
             request.requestCache(false);
-            source = request.source();
-        } else {
-            final ShardSearchRequest request = (ShardSearchRequest) indicesRequest;
-            request.requestCache(false);
-            source = request.source();
         }
 
-        if (documentLevelSecurityEnabled) {
+        final SearchSourceBuilder source = request.source();
+
+        if (indexAccessControlByIndex.values().stream().anyMatch(iac -> iac.getDocumentPermissions().hasDocumentLevelPermissions())) {
             if (source != null && source.suggest() != null) {
                 listener.onFailure(new ElasticsearchSecurityException("Suggest isn't supported if document level security is enabled",
-                        RestStatus.BAD_REQUEST));
+                    RestStatus.BAD_REQUEST));
             } else if (source != null && source.profile()) {
                 listener.onFailure(new ElasticsearchSecurityException("A search request cannot be profiled if document level security " +
                     "is enabled", RestStatus.BAD_REQUEST));
@@ -59,6 +64,11 @@ public class SearchRequestInterceptor extends FieldAndDocumentLevelSecurityReque
 
     @Override
     public boolean supports(IndicesRequest request) {
-        return request instanceof SearchRequest || request instanceof ShardSearchRequest;
+        return request instanceof SearchRequest;
+    }
+
+    // package private for test
+    boolean hasRemoteIndices(SearchRequest request) {
+        return Arrays.stream(request.indices()).anyMatch(name -> name.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR) >= 0);
     }
 }

+ 81 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/ShardSearchRequestInterceptor.java

@@ -0,0 +1,81 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+package org.elasticsearch.xpack.security.authz.interceptor;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.search.internal.ShardSearchRequest;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
+import org.elasticsearch.xpack.core.security.authz.permission.DocumentPermissions;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.security.authz.interceptor.SearchRequestInterceptor.VERSION_SHARD_SEARCH_INTERCEPTOR;
+
+public class ShardSearchRequestInterceptor extends FieldAndDocumentLevelSecurityRequestInterceptor {
+
+    private static final Logger logger = LogManager.getLogger(ShardSearchRequestInterceptor.class);
+
+    private final ClusterService clusterService;
+
+    public ShardSearchRequestInterceptor(ThreadPool threadPool, XPackLicenseState licenseState, ClusterService clusterService) {
+        super(threadPool.getThreadContext(), licenseState);
+        this.clusterService = clusterService;
+    }
+
+    @Override
+    void disableFeatures(IndicesRequest indicesRequest,
+                         Map<String, IndicesAccessControl.IndexAccessControl> indexAccessControlByIndex,
+                         ActionListener<Void> listener) {
+        final ShardSearchRequest request = (ShardSearchRequest) indicesRequest;
+        // The 7.11.2 version check is needed because request caching has a bug related to DLS/FLS
+        // versions before 7.11.2. It is fixed by #69505. See also ESA-2021-08.
+        // TODO: The version check can be removed in 8.0 because 7.last will have support for request caching with DLS/FLS
+        if (clusterService.state().nodes().getMinNodeVersion().before(VERSION_SHARD_SEARCH_INTERCEPTOR)) {
+            request.requestCache(false);
+        } else if (dlsUsesStoredScripts(request, indexAccessControlByIndex)) {
+            logger.debug("Disable shard search request cache because DLS queries use stored scripts");
+            request.requestCache(false);
+        }
+        listener.onResponse(null);
+    }
+
+    @Override
+    String[] requestIndices(IndicesRequest indicesRequest) {
+        final ShardSearchRequest request = (ShardSearchRequest) indicesRequest;
+        return new String[] { request.shardId().getIndexName() };
+    }
+
+    @Override
+    public boolean supports(IndicesRequest request) {
+        return request instanceof ShardSearchRequest;
+    }
+
+    boolean dlsUsesStoredScripts(ShardSearchRequest request,
+                                 Map<String, IndicesAccessControl.IndexAccessControl> indexAccessControlByIndex) {
+        final String indexName = request.shardId().getIndexName();
+        final IndicesAccessControl.IndexAccessControl indexAccessControl = indexAccessControlByIndex.get(indexName);
+        assert indexAccessControl != null : "index access control cannot be null";
+        final DocumentPermissions documentPermissions = indexAccessControl.getDocumentPermissions();
+        if (documentPermissions.hasDocumentLevelPermissions()) {
+            try {
+                return documentPermissions.hasStoredScript();
+            } catch (IOException e) {
+                throw new ElasticsearchException(e);
+            }
+        } else {
+            return false;
+        }
+    }
+}

+ 6 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/UpdateRequestInterceptor.java

@@ -13,6 +13,9 @@ import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
+
+import java.util.Map;
 
 /**
  * A request interceptor that fails update request if field or document level security is enabled.
@@ -28,8 +31,9 @@ public class UpdateRequestInterceptor extends FieldAndDocumentLevelSecurityReque
     }
 
     @Override
-    protected void disableFeatures(IndicesRequest updateRequest, boolean fieldLevelSecurityEnabled, boolean documentLevelSecurityEnabled,
-                                   ActionListener<Void> listener) {
+    void disableFeatures(IndicesRequest indicesRequest,
+                         Map<String, IndicesAccessControl.IndexAccessControl> indicesAccessControlByIndex,
+                         ActionListener<Void> listener) {
         listener.onFailure(new ElasticsearchSecurityException("Can't execute an update request if field or document level security " +
             "is enabled", RestStatus.BAD_REQUEST));
     }

+ 106 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/DlsFlsRequestCacheDifferentiatorTests.java

@@ -0,0 +1,106 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.authz;
+
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.internal.ShardSearchRequest;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.security.SecurityContext;
+import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
+import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
+import org.elasticsearch.xpack.core.security.authz.permission.DocumentPermissions;
+import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissions;
+import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissionsDefinition;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DlsFlsRequestCacheDifferentiatorTests extends ESTestCase {
+
+    private XPackLicenseState licenseState;
+    private ThreadContext threadContext;
+    private StreamOutput out;
+    private DlsFlsRequestCacheDifferentiator differentiator;
+    private ShardSearchRequest shardSearchRequest;
+    private String indexName;
+    private String dlsIndexName;
+    private String flsIndexName;
+    private String dlsFlsIndexName;
+
+    @Before
+    public void init() throws IOException {
+        licenseState = mock(XPackLicenseState.class);
+        when(licenseState.isSecurityEnabled()).thenReturn(true);
+        when(licenseState.checkFeature(XPackLicenseState.Feature.SECURITY_DLS_FLS)).thenReturn(true);
+        threadContext = new ThreadContext(Settings.EMPTY);
+        out = new BytesStreamOutput();
+        final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
+        differentiator = new DlsFlsRequestCacheDifferentiator(
+            licenseState, new SetOnce<>(securityContext), new SetOnce<>(mock(ScriptService.class)));
+        shardSearchRequest = mock(ShardSearchRequest.class);
+        indexName = randomAlphaOfLengthBetween(3, 8);
+        dlsIndexName = "dls-" + randomAlphaOfLengthBetween(3, 8);
+        flsIndexName = "fls-" + randomAlphaOfLengthBetween(3, 8);
+        dlsFlsIndexName = "dls-fls-" + randomAlphaOfLengthBetween(3, 8);
+
+        final DocumentPermissions documentPermissions1 = DocumentPermissions.filteredBy(
+            Set.of(new BytesArray("{\"term\":{\"number\":1}}")));
+
+        threadContext.putTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY,
+            new IndicesAccessControl(true,
+                Map.of(
+                    flsIndexName,
+                    new IndicesAccessControl.IndexAccessControl(true,
+                        new FieldPermissions(new FieldPermissionsDefinition(new String[]{"*"}, new String[]{"private"})),
+                        DocumentPermissions.allowAll()),
+                    dlsIndexName,
+                    new IndicesAccessControl.IndexAccessControl(true,
+                        FieldPermissions.DEFAULT, documentPermissions1),
+                    dlsFlsIndexName,
+                    new IndicesAccessControl.IndexAccessControl(true,
+                        new FieldPermissions(new FieldPermissionsDefinition(new String[]{"*"}, new String[]{"private"})),
+                        documentPermissions1)
+                )
+            ));
+    }
+
+    public void testWillWriteCacheKeyForAnyDlsOrFls() throws IOException {
+        when(shardSearchRequest.shardId()).thenReturn(
+            new ShardId(randomFrom(dlsIndexName, flsIndexName, dlsFlsIndexName), randomAlphaOfLength(10), randomIntBetween(0, 3)));
+        differentiator.accept(shardSearchRequest, out);
+        assertThat(out.position(), greaterThan(0L));
+    }
+
+    public void testWillDoNothingIfNoDlsFls() throws IOException {
+        when(shardSearchRequest.shardId()).thenReturn(new ShardId(indexName, randomAlphaOfLength(10), randomIntBetween(0, 3)));
+        differentiator.accept(shardSearchRequest, out);
+        assertThat(out.position(), equalTo(0L));
+    }
+
+    public void testWillDoNothingIfSecurityIsNotEnabled() throws IOException {
+        when(licenseState.isSecurityEnabled()).thenReturn(false);
+        when(shardSearchRequest.shardId()).thenReturn(new ShardId(dlsFlsIndexName, randomAlphaOfLength(10), randomIntBetween(0, 3)));
+        differentiator.accept(shardSearchRequest, out);
+        assertThat(out.position(), equalTo(0L));
+    }
+}

+ 119 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/SearchRequestInterceptorTests.java

@@ -0,0 +1,119 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.authz.interceptor;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.util.ArrayUtils;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.VersionUtils;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SearchRequestInterceptorTests extends ESTestCase {
+
+    private ClusterService clusterService;
+    private ThreadPool threadPool;
+    private XPackLicenseState licenseState;
+    private SearchRequestInterceptor interceptor;
+
+    @Before
+    public void init() {
+        threadPool = new TestThreadPool("search request interceptor tests");
+        licenseState = mock(XPackLicenseState.class);
+        when(licenseState.isSecurityEnabled()).thenReturn(true);
+        when(licenseState.checkFeature(XPackLicenseState.Feature.SECURITY_DLS_FLS)).thenReturn(true);
+        clusterService = mock(ClusterService.class);
+        interceptor = new SearchRequestInterceptor(threadPool, licenseState, clusterService);
+    }
+
+    @After
+    public void stopThreadPool() {
+        terminate(threadPool);
+    }
+
+    private void configureMinMondeVersion(Version version) {
+        final ClusterState clusterState = mock(ClusterState.class);
+        when(clusterService.state()).thenReturn(clusterState);
+        final DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
+        when(clusterState.nodes()).thenReturn(discoveryNodes);
+        when(discoveryNodes.getMinNodeVersion()).thenReturn(version);
+    }
+
+    public void testRequestCacheWillBeDisabledWhenMinNodeVersionIsBeforeShardSearchInterceptor() {
+        configureMinMondeVersion(VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_11_1));
+        final SearchRequest searchRequest = mock(SearchRequest.class);
+        when(searchRequest.indices()).thenReturn(randomArray(0, 3, String[]::new, () -> randomAlphaOfLengthBetween(3, 8)));
+        when(searchRequest.source()).thenReturn(SearchSourceBuilder.searchSource());
+        final PlainActionFuture<Void> future = new PlainActionFuture<>();
+        interceptor.disableFeatures(searchRequest, Map.of(), future);
+        future.actionGet();
+        verify(searchRequest).requestCache(false);
+    }
+
+    public void testRequestCacheWillBeDisabledWhenSearchRemoteIndices() {
+        configureMinMondeVersion(VersionUtils.randomVersionBetween(random(), Version.V_7_11_2, Version.CURRENT));
+        final SearchRequest searchRequest = mock(SearchRequest.class);
+        when(searchRequest.source()).thenReturn(SearchSourceBuilder.searchSource());
+        final String[] localIndices = randomArray(0, 3, String[]::new, () -> randomAlphaOfLengthBetween(3, 8));
+        final String[] remoteIndices = randomArray(0, 3, String[]::new,
+            () -> randomAlphaOfLengthBetween(0, 5) + ":" + randomAlphaOfLengthBetween(3, 8));
+        final ArrayList<String> allIndices =
+            Arrays.stream(ArrayUtils.concat(localIndices, remoteIndices)).collect(Collectors.toCollection(ArrayList::new));
+        Collections.shuffle(allIndices, random());
+        when(searchRequest.indices()).thenReturn(allIndices.toArray(String[]::new));
+
+        final PlainActionFuture<Void> future = new PlainActionFuture<>();
+        interceptor.disableFeatures(searchRequest, Map.of(), future);
+        future.actionGet();
+        if (remoteIndices.length > 0) {
+            verify(searchRequest).requestCache(false);
+        } else {
+            verify(searchRequest, never()).requestCache(anyBoolean());
+        }
+    }
+
+    public void testHasRemoteIndices() {
+        final SearchRequest searchRequest = mock(SearchRequest.class);
+        when(searchRequest.source()).thenReturn(SearchSourceBuilder.searchSource());
+        final String[] localIndices = randomArray(0, 3, String[]::new, () -> randomAlphaOfLengthBetween(3, 8));
+        final String[] remoteIndices = randomArray(0, 3, String[]::new,
+            () -> randomAlphaOfLengthBetween(0, 5) + ":" + randomAlphaOfLengthBetween(3, 8));
+        final ArrayList<String> allIndices =
+            Arrays.stream(ArrayUtils.concat(localIndices, remoteIndices)).collect(Collectors.toCollection(ArrayList::new));
+        Collections.shuffle(allIndices, random());
+        when(searchRequest.indices()).thenReturn(allIndices.toArray(String[]::new));
+
+        if (remoteIndices.length > 0) {
+            assertThat(interceptor.hasRemoteIndices(searchRequest), is(true));
+        } else {
+            assertThat(interceptor.hasRemoteIndices(searchRequest), is(false));
+        }
+    }
+}

+ 96 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/interceptor/ShardSearchRequestInterceptorTests.java

@@ -0,0 +1,96 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.authz.interceptor;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.search.internal.ShardSearchRequest;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
+import org.elasticsearch.xpack.core.security.authz.permission.DocumentPermissions;
+import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissions;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ShardSearchRequestInterceptorTests extends ESTestCase {
+
+    private ClusterService clusterService;
+    private ThreadPool threadPool;
+    private XPackLicenseState licenseState;
+    private ShardSearchRequestInterceptor interceptor;
+
+    @Before
+    public void init() {
+        threadPool = new TestThreadPool("shard search request interceptor tests");
+        licenseState = mock(XPackLicenseState.class);
+        when(licenseState.isSecurityEnabled()).thenReturn(true);
+        when(licenseState.checkFeature(XPackLicenseState.Feature.SECURITY_DLS_FLS)).thenReturn(true);
+        clusterService = mock(ClusterService.class);
+        interceptor = new ShardSearchRequestInterceptor(threadPool, licenseState, clusterService);
+    }
+
+    @After
+    public void stopThreadPool() {
+        terminate(threadPool);
+    }
+
+    private void configureMinMondeVersion(Version version) {
+        final ClusterState clusterState = mock(ClusterState.class);
+        when(clusterService.state()).thenReturn(clusterState);
+        final DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
+        when(clusterState.nodes()).thenReturn(discoveryNodes);
+        when(discoveryNodes.getMinNodeVersion()).thenReturn(version);
+    }
+
+    public void testRequestCacheWillBeDisabledWhenDlsUsesStoredScripts() {
+        configureMinMondeVersion(Version.CURRENT);
+        final DocumentPermissions documentPermissions = DocumentPermissions.filteredBy(
+            Set.of(new BytesArray("{\"template\":{\"id\":\"my-script\"}}")));
+        final ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
+        final String index = randomAlphaOfLengthBetween(3, 8);
+        when(shardSearchRequest.shardId()).thenReturn(new ShardId(index, randomAlphaOfLength(22), randomInt(3)));
+        final PlainActionFuture<Void> listener = new PlainActionFuture<>();
+        interceptor.disableFeatures(shardSearchRequest,
+            Map.of(index, new IndicesAccessControl.IndexAccessControl(true, FieldPermissions.DEFAULT, documentPermissions)),
+            listener);
+        listener.actionGet();
+        verify(shardSearchRequest).requestCache(false);
+    }
+
+    public void testRequestWillNotBeDisabledCacheWhenDlsUsesInlineScripts() {
+        configureMinMondeVersion(Version.CURRENT);
+        final DocumentPermissions documentPermissions = DocumentPermissions.filteredBy(
+            Set.of(new BytesArray("{\"term\":{\"username\":\"foo\"}}")));
+        final ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
+        final String index = randomAlphaOfLengthBetween(3, 8);
+        when(shardSearchRequest.shardId()).thenReturn(new ShardId(index, randomAlphaOfLength(22), randomInt(3)));
+        final PlainActionFuture<Void> listener = new PlainActionFuture<>();
+        interceptor.disableFeatures(shardSearchRequest,
+            Map.of(index, new IndicesAccessControl.IndexAccessControl(true, FieldPermissions.DEFAULT, documentPermissions)),
+            listener);
+        listener.actionGet();
+        verify(shardSearchRequest, never()).requestCache(false);
+    }
+
+}

+ 2 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/rest/action/user/RestGetUserPrivilegesActionTests.java

@@ -99,8 +99,8 @@ public class RestGetUserPrivilegesActionTests extends ESTestCase {
             "{\"names\":[\"index-1\",\"index-2\",\"index-3-*\"]," +
             "\"privileges\":[\"read\",\"write\"]," +
             "\"field_security\":[" +
-            "{\"grant\":[\"public.*\"]}," +
-            "{\"grant\":[\"*\"],\"except\":[\"private.*\"]}" +
+            "{\"grant\":[\"*\"],\"except\":[\"private.*\"]}," +
+            "{\"grant\":[\"public.*\"]}" +
             "]," +
             "\"query\":[" +
             "\"{ \\\"term\\\": { \\\"access\\\": \\\"public\\\" } }\"," +