Przeglądaj źródła

Fix memory leak in DLS bitset cache (#50635)

The Document Level Security BitSet cache stores a secondary "lookup
map" so that it can determine which cache entries to invalidate when
a Lucene index is closed (merged, etc).

There was a memory leak because this secondary map was not cleared
when entries were naturally evicted from the cache (due to size/ttl
limits).

This has been solved by adding a cache removal listener and processing
those removal events asyncronously.
Tim Vernum 5 lat temu
rodzic
commit
496604942f

+ 111 - 24
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCache.java

@@ -23,13 +23,16 @@ import org.apache.lucene.util.FixedBitSet;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.cache.Cache;
 import org.elasticsearch.common.cache.CacheBuilder;
+import org.elasticsearch.common.cache.RemovalNotification;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.Closeable;
 import java.util.List;
@@ -38,6 +41,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * This is a cache for {@link BitSet} instances that are used with the {@link DocumentSubsetReader}.
@@ -64,17 +69,48 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
     private static final BitSet NULL_MARKER = new FixedBitSet(0);
 
     private final Logger logger;
+
+    /**
+     * When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}.
+     * We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the
+     * {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}.
+     * The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately
+     * re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache}
+     * but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}.
+     */
+    private final ReleasableLock cacheEvictionLock;
+    private final ReleasableLock cacheModificationLock;
+    private final ExecutorService cleanupExecutor;
+
     private final Cache<BitsetCacheKey, BitSet> bitsetCache;
     private final Map<IndexReader.CacheKey, Set<BitsetCacheKey>> keysByIndex;
 
-    public DocumentSubsetBitsetCache(Settings settings) {
+    public DocumentSubsetBitsetCache(Settings settings, ThreadPool threadPool) {
+        this(settings, threadPool.executor(ThreadPool.Names.GENERIC));
+    }
+
+    /**
+     * @param settings The global settings object for this node
+     * @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally,
+     *                        it is sometimes necessary to run an asynchronous task to synchronize the internal state.
+     */
+    protected DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor) {
         this.logger = LogManager.getLogger(getClass());
+
+        final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+        this.cacheEvictionLock = new ReleasableLock(readWriteLock.writeLock());
+        this.cacheModificationLock = new ReleasableLock(readWriteLock.readLock());
+        this.cleanupExecutor = cleanupExecutor;
+
         final TimeValue ttl = CACHE_TTL_SETTING.get(settings);
         final ByteSizeValue size = CACHE_SIZE_SETTING.get(settings);
         this.bitsetCache = CacheBuilder.<BitsetCacheKey, BitSet>builder()
             .setExpireAfterAccess(ttl)
             .setMaximumWeight(size.getBytes())
-            .weigher((key, bitSet) -> bitSet == NULL_MARKER ? 0 : bitSet.ramBytesUsed()).build();
+            .weigher((key, bitSet) -> bitSet == NULL_MARKER ? 0 : bitSet.ramBytesUsed())
+            .removalListener(this::onCacheEviction)
+            .build();
+
         this.keysByIndex = new ConcurrentHashMap<>();
     }
 
@@ -88,6 +124,31 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
         }
     }
 
+    /**
+     * Cleanup (synchronize) the internal state when an object is removed from the primary cache
+     */
+    private void onCacheEviction(RemovalNotification<BitsetCacheKey, BitSet> notification) {
+        final BitsetCacheKey bitsetKey = notification.getKey();
+        final IndexReader.CacheKey indexKey = bitsetKey.index;
+        if (keysByIndex.getOrDefault(indexKey, Set.of()).contains(bitsetKey) == false) {
+            // If the bitsetKey isn't in the lookup map, then there's nothing to synchronize
+            return;
+        }
+        // We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is
+        // simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore
+        // holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write ("eviction") lock, but we
+        // need to acquire that lock here.
+        cleanupExecutor.submit(() -> {
+            try (ReleasableLock ignored = cacheEvictionLock.acquire()) {
+                // it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check
+                if (bitsetCache.get(bitsetKey) == null) {
+                    // key is no longer in the cache, make sure it is no longer in the lookup map either.
+                    keysByIndex.getOrDefault(indexKey, Set.of()).remove(bitsetKey);
+                }
+            }
+        });
+    }
+
     @Override
     public void close() {
         clear("close");
@@ -96,7 +157,8 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
     public void clear(String reason) {
         logger.debug("clearing all DLS bitsets because [{}]", reason);
         // Due to the order here, it is possible than a new entry could be added _after_ the keysByIndex map is cleared
-        // but _before_ the cache is cleared. This would mean it sits orphaned in keysByIndex, but this is not a issue.
+        // but _before_ the cache is cleared. This should get fixed up in the "onCacheEviction" callback, but if anything slips through
+        // and sits orphaned in keysByIndex, it will not be a significant issue.
         // When the index is closed, the key will be removed from the map, and there will not be a corresponding item
         // in the cache, which will make the cache-invalidate a no-op.
         // Since the entry is not in the cache, if #getBitSet is called, it will be loaded, and the new key will be added
@@ -130,31 +192,33 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
         final IndexReader.CacheKey indexKey = coreCacheHelper.getKey();
         final BitsetCacheKey cacheKey = new BitsetCacheKey(indexKey, query);
 
-        final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> {
-            // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
-            keysByIndex.compute(indexKey, (ignore2, set) -> {
-                if (set == null) {
-                    set = Sets.newConcurrentHashSet();
+        try (ReleasableLock ignored = cacheModificationLock.acquire()) {
+            final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> {
+                // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
+                keysByIndex.compute(indexKey, (ignore2, set) -> {
+                    if (set == null) {
+                        set = Sets.newConcurrentHashSet();
+                    }
+                    set.add(cacheKey);
+                    return set;
+                });
+                final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
+                final IndexSearcher searcher = new IndexSearcher(topLevelContext);
+                searcher.setQueryCache(null);
+                final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f);
+                Scorer s = weight.scorer(context);
+                if (s == null) {
+                    // A cache loader is not allowed to return null, return a marker object instead.
+                    return NULL_MARKER;
+                } else {
+                    return BitSet.of(s.iterator(), context.reader().maxDoc());
                 }
-                set.add(cacheKey);
-                return set;
             });
-            final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
-            final IndexSearcher searcher = new IndexSearcher(topLevelContext);
-            searcher.setQueryCache(null);
-            final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f);
-            Scorer s = weight.scorer(context);
-            if (s == null) {
-                // A cache loader is not allowed to return null, return a marker object instead.
-                return NULL_MARKER;
+            if (bitSet == NULL_MARKER) {
+                return null;
             } else {
-                return BitSet.of(s.iterator(), context.reader().maxDoc());
+                return bitSet;
             }
-        });
-        if (bitSet == NULL_MARKER) {
-            return null;
-        } else {
-            return bitSet;
         }
     }
 
@@ -203,4 +267,27 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
             return getClass().getSimpleName() + "(" + index + "," + query + ")";
         }
     }
+
+    /**
+     * This method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent with one
+     * another. This method is only called by tests.
+     */
+    void verifyInternalConsistency() {
+        this.bitsetCache.keys().forEach(bck -> {
+            final Set<BitsetCacheKey> set = this.keysByIndex.get(bck.index);
+            if (set == null) {
+                throw new IllegalStateException("Key [" + bck + "] is in the cache, but there is no entry for [" + bck.index +
+                    "] in the lookup map");
+            }
+            if (set.contains(bck) == false) {
+                throw new IllegalStateException("Key [" + bck + "] is in the cache, but the lookup entry for [" + bck.index +
+                    "] does not contain that key");
+            }
+        });
+        this.keysByIndex.values().stream().flatMap(Set::stream).forEach(bck -> {
+            if (this.bitsetCache.get(bck) == null) {
+                throw new IllegalStateException("Key [" + bck + "] is in the lookup map, but is not in the cache");
+            }
+        });
+    }
 }

+ 246 - 22
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCacheTests.java

@@ -21,6 +21,7 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BitSet;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.CheckedBiConsumer;
+import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.IndexSettings;
@@ -32,22 +33,51 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.IndexSettingsModule;
 import org.hamcrest.Matchers;
-
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.Mockito;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class DocumentSubsetBitsetCacheTests extends ESTestCase {
 
+    private static final int FIELD_COUNT = 10;
+    private ExecutorService singleThreadExecutor;
+
+    @Before
+    public void setUpExecutor() throws Exception {
+        singleThreadExecutor = Executors.newSingleThreadExecutor();
+    }
+
+    @After
+    public void cleanUpExecutor() throws Exception {
+        singleThreadExecutor.shutdown();
+    }
+
     public void testSameBitSetIsReturnedForIdenticalQuery() throws Exception {
-        final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
         runTestOnIndex((shardContext, leafContext) -> {
             final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(shardContext);
             final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
@@ -62,7 +92,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
     }
 
     public void testNullBitSetIsReturnedForNonMatchingQuery() throws Exception {
-        final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
         runTestOnIndex((shardContext, leafContext) -> {
             final Query query = QueryBuilders.termQuery("does-not-exist", "any-value").toQuery(shardContext);
             final BitSet bitSet = cache.getBitSet(query, leafContext);
@@ -71,7 +101,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
     }
 
     public void testNullEntriesAreNotCountedInMemoryUsage() throws Exception {
-        final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
         assertThat(cache.ramBytesUsed(), equalTo(0L));
 
         runTestOnIndex((shardContext, leafContext) -> {
@@ -95,7 +125,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
         final Settings settings = Settings.builder()
             .put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
             .build();
-        final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings);
+        final DocumentSubsetBitsetCache cache = newCache(settings);
         assertThat(cache.entryCount(), equalTo(0));
         assertThat(cache.ramBytesUsed(), equalTo(0L));
 
@@ -142,7 +172,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
         final Settings settings = Settings.builder()
             .put(DocumentSubsetBitsetCache.CACHE_TTL_SETTING.getKey(), "10ms")
             .build();
-        final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings);
+        final DocumentSubsetBitsetCache cache = newCache(settings);
         assertThat(cache.entryCount(), equalTo(0));
         assertThat(cache.ramBytesUsed(), equalTo(0L));
 
@@ -167,8 +197,131 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
         });
     }
 
+    public void testIndexLookupIsClearedWhenBitSetIsEvicted() throws Exception {
+        // This value is based on the internal implementation details of lucene's FixedBitSet
+        // If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
+        final long expectedBytesPerBitSet = 56;
+
+        // Enough to hold slightly more than 1 bit-set in the cache
+        final long maxCacheBytes = expectedBytesPerBitSet + expectedBytesPerBitSet/2;
+        final Settings settings = Settings.builder()
+            .put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
+            .build();
+
+        final ExecutorService executor = mock(ExecutorService.class);
+        final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
+        when(executor.submit(any(Runnable.class))).thenAnswer(inv -> {
+            final Runnable r = (Runnable) inv.getArguments()[0];
+            runnableRef.set(r);
+            return null;
+        });
+
+        final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, executor);
+        assertThat(cache.entryCount(), equalTo(0));
+        assertThat(cache.ramBytesUsed(), equalTo(0L));
+
+        runTestOnIndex((shardContext, leafContext) -> {
+            final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(shardContext);
+            final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
+            assertThat(bitSet1, notNullValue());
+
+            final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(shardContext);
+            final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
+            assertThat(bitSet2, notNullValue());
+
+            // BitSet1 has been evicted now, run the cleanup...
+            final Runnable runnable1 = runnableRef.get();
+            assertThat(runnable1, notNullValue());
+            runnable1.run();
+            cache.verifyInternalConsistency();
+
+            // Check that the original bitset is no longer in the cache (a new instance is returned)
+            assertThat(cache.getBitSet(query1, leafContext), not(sameInstance(bitSet1)));
+
+            // BitSet2 has been evicted now, run the cleanup...
+            final Runnable runnable2 = runnableRef.get();
+            assertThat(runnable2, not(sameInstance(runnable1)));
+            runnable2.run();
+            cache.verifyInternalConsistency();
+        });
+    }
+    public void testCacheUnderConcurrentAccess() throws Exception {
+        // This value is based on the internal implementation details of lucene's FixedBitSet
+        // If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
+        final long expectedBytesPerBitSet = 56;
+
+        final int concurrentThreads = randomIntBetween(5, 15);
+        final int numberOfIndices = randomIntBetween(3, 8);
+
+        // Force cache evictions by setting the size to be less than the number of distinct queries we search on.
+        final int maxCacheCount = randomIntBetween(FIELD_COUNT / 2, FIELD_COUNT * 3 / 4);
+        final long maxCacheBytes = expectedBytesPerBitSet * maxCacheCount;
+        final Settings settings = Settings.builder()
+            .put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
+            .build();
+
+        final ExecutorService threads = Executors.newFixedThreadPool(concurrentThreads + 1);
+        final ExecutorService cleanupExecutor = Mockito.mock(ExecutorService.class);
+        when(cleanupExecutor.submit(any(Runnable.class))).thenAnswer(inv -> {
+            final Runnable runnable = (Runnable) inv.getArguments()[0];
+            return threads.submit(() -> {
+                // Sleep for a small (random) length of time.
+                // This increases the likelihood that cache could have been modified between the eviction & the cleanup
+                Thread.sleep(randomIntBetween(1, 10));
+                runnable.run();
+                return null;
+            });
+        });
+        try {
+            final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, cleanupExecutor);
+            assertThat(cache.entryCount(), equalTo(0));
+            assertThat(cache.ramBytesUsed(), equalTo(0L));
+
+            runTestOnIndices(numberOfIndices, contexts -> {
+                final CountDownLatch start = new CountDownLatch(concurrentThreads);
+                final CountDownLatch end = new CountDownLatch(concurrentThreads);
+                final Set<BitSet> uniqueBitSets = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
+                for (int thread = 0; thread < concurrentThreads; thread++) {
+                    threads.submit(() -> {
+                        start.countDown();
+                        start.await(100, TimeUnit.MILLISECONDS);
+                        for (int loop = 0; loop < 15; loop++) {
+                            for (int field = 1; field <= FIELD_COUNT; field++) {
+                                final TermQueryBuilder queryBuilder = QueryBuilders.termQuery("field-" + field, "value-" + field);
+                                final TestIndexContext randomContext = randomFrom(contexts);
+                                final Query query = queryBuilder.toQuery(randomContext.queryShardContext);
+                                final BitSet bitSet = cache.getBitSet(query, randomContext.leafReaderContext);
+                                assertThat(bitSet, notNullValue());
+                                assertThat(bitSet.ramBytesUsed(), equalTo(expectedBytesPerBitSet));
+                                uniqueBitSets.add(bitSet);
+                            }
+                        }
+                        end.countDown();
+                        return null;
+                    });
+                }
+
+                assertTrue("Query threads did not complete in expected time", end.await(1, TimeUnit.SECONDS));
+
+                threads.shutdown();
+                assertTrue("Cleanup thread did not complete in expected time", threads.awaitTermination(3, TimeUnit.SECONDS));
+                cache.verifyInternalConsistency();
+
+                // Due to cache evictions, we must get more bitsets than fields
+                assertThat(uniqueBitSets.size(), Matchers.greaterThan(FIELD_COUNT));
+                // Due to cache evictions, we must have seen more bitsets than the cache currently holds
+                assertThat(uniqueBitSets.size(), Matchers.greaterThan(cache.entryCount()));
+                // Even under concurrent pressure, the cache should hit the expected size
+                assertThat(cache.entryCount(), is(maxCacheCount));
+                assertThat(cache.ramBytesUsed(), is(maxCacheBytes));
+            });
+        } finally {
+            threads.shutdown();
+        }
+    }
+
     public void testCacheIsPerIndex() throws Exception {
-        final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
         assertThat(cache.entryCount(), equalTo(0));
         assertThat(cache.ramBytesUsed(), equalTo(0L));
 
@@ -195,7 +348,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
     }
 
     public void testCacheClearEntriesWhenIndexIsClosed() throws Exception {
-        final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
         assertThat(cache.entryCount(), equalTo(0));
         assertThat(cache.ramBytesUsed(), equalTo(0L));
 
@@ -215,35 +368,106 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
     }
 
     private void runTestOnIndex(CheckedBiConsumer<QueryShardContext, LeafReaderContext, Exception> body) throws Exception {
+        runTestOnIndices(1, ctx -> {
+            final TestIndexContext indexContext = ctx.get(0);
+            body.accept(indexContext.queryShardContext, indexContext.leafReaderContext);
+        });
+    }
+
+    private static final class TestIndexContext implements Closeable {
+        private final Directory directory;
+        private final IndexWriter indexWriter;
+        private final DirectoryReader directoryReader;
+        private final QueryShardContext queryShardContext;
+        private final LeafReaderContext leafReaderContext;
+
+        private TestIndexContext(Directory directory, IndexWriter indexWriter, DirectoryReader directoryReader,
+                                 QueryShardContext queryShardContext, LeafReaderContext leafReaderContext) {
+            this.directory = directory;
+            this.indexWriter = indexWriter;
+            this.directoryReader = directoryReader;
+            this.queryShardContext = queryShardContext;
+            this.leafReaderContext = leafReaderContext;
+        }
+
+        @Override
+        public void close() throws IOException {
+            directoryReader.close();
+            indexWriter.close();
+            directory.close();
+        }
+    }
+
+    private TestIndexContext testIndex(MapperService mapperService, Client client) throws IOException {
+        TestIndexContext context = null;
+
+        final long nowInMillis = randomNonNegativeLong();
         final ShardId shardId = new ShardId("idx_" + randomAlphaOfLengthBetween(2, 8), randomAlphaOfLength(12), 0);
         final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), Settings.EMPTY);
-        final MapperService mapperService = mock(MapperService.class);
-        final long nowInMillis = randomNonNegativeLong();
+        final IndexWriterConfig writerConfig = new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE);
 
-        final Client client = mock(Client.class);
-        when(client.settings()).thenReturn(Settings.EMPTY);
+        Directory directory = null;
+        IndexWriter iw = null;
+        DirectoryReader directoryReader = null;
+        try {
+            directory = newDirectory();
 
-        final IndexWriterConfig writerConfig = new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE);
-        try (Directory directory = newDirectory();
-             IndexWriter iw = new IndexWriter(directory, writerConfig)) {
+            iw = new IndexWriter(directory, writerConfig);
             for (int i = 1; i <= 100; i++) {
                 Document document = new Document();
-                for (int j = 1; j <= 10; j++) {
+                for (int j = 1; j <= FIELD_COUNT; j++) {
                     document.add(new StringField("field-" + j, "value-" + i, Field.Store.NO));
                 }
                 iw.addDocument(document);
             }
             iw.commit();
 
-            try (DirectoryReader directoryReader = DirectoryReader.open(directory)) {
-                final LeafReaderContext leaf = directoryReader.leaves().get(0);
+            directoryReader = DirectoryReader.open(directory);
+            final LeafReaderContext leaf = directoryReader.leaves().get(0);
+
+            final QueryShardContext shardContext = new QueryShardContext(shardId.id(), indexSettings, BigArrays.NON_RECYCLING_INSTANCE,
+                null, null, mapperService, null, null, xContentRegistry(), writableRegistry(),
+                client, new IndexSearcher(directoryReader), () -> nowInMillis, null, null);
+
+            context = new TestIndexContext(directory, iw, directoryReader, shardContext, leaf);
+            return context;
+        } finally {
+            if (context == null) {
+                if (directoryReader != null) {
+                    directoryReader.close();
+                }
+                if (iw != null) {
+                    iw.close();
+                }
+                if (directory != null) {
+                    directory.close();
+                }
+            }
+        }
+    }
+
+    private void runTestOnIndices(int numberIndices, CheckedConsumer<List<TestIndexContext>, Exception> body) throws Exception {
+        final MapperService mapperService = mock(MapperService.class);
 
-                final QueryShardContext context = new QueryShardContext(shardId.id(), indexSettings, BigArrays.NON_RECYCLING_INSTANCE,
-                    null, null, mapperService, null, null, xContentRegistry(), writableRegistry(),
-                        client, new IndexSearcher(directoryReader), () -> nowInMillis, null, null);
-                body.accept(context, leaf);
+        final Client client = mock(Client.class);
+        when(client.settings()).thenReturn(Settings.EMPTY);
+
+        final List<TestIndexContext> context = new ArrayList<>(numberIndices);
+        try {
+            for (int i = 0; i < numberIndices; i++) {
+                context.add(testIndex(mapperService, client));
+            }
+
+            body.accept(context);
+        } finally {
+            for (TestIndexContext indexContext : context) {
+                indexContext.close();
             }
         }
     }
 
+    private DocumentSubsetBitsetCache newCache(Settings settings) {
+        return new DocumentSubsetBitsetCache(settings, singleThreadExecutor);
+    }
+
 }

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReaderTests.java

@@ -32,6 +32,7 @@ import org.junit.After;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.util.concurrent.Executors;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -50,7 +51,7 @@ public class DocumentSubsetReaderTests extends ESTestCase {
         assertTrue(DocumentSubsetReader.NUM_DOCS_CACHE.toString(),
                 DocumentSubsetReader.NUM_DOCS_CACHE.isEmpty());
         directory = newDirectory();
-        bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY, Executors.newSingleThreadExecutor());
     }
 
     @After

+ 3 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/SecurityIndexReaderWrapperIntegrationTests.java

@@ -48,6 +48,7 @@ import org.elasticsearch.xpack.core.security.user.User;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executors;
 
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonMap;
@@ -80,7 +81,7 @@ public class SecurityIndexReaderWrapperIntegrationTests extends AbstractBuilderT
                 null, null, mapperService, null, null, xContentRegistry(), writableRegistry(),
                 client, null, () -> nowInMillis, null, null);
         QueryShardContext queryShardContext = spy(realQueryShardContext);
-        DocumentSubsetBitsetCache bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        DocumentSubsetBitsetCache bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY, Executors.newSingleThreadExecutor());
         XPackLicenseState licenseState = mock(XPackLicenseState.class);
         when(licenseState.isDocumentAndFieldLevelSecurityAllowed()).thenReturn(true);
 
@@ -202,7 +203,7 @@ public class SecurityIndexReaderWrapperIntegrationTests extends AbstractBuilderT
                 null, null, mapperService, null, null, xContentRegistry(), writableRegistry(),
                 client, null, () -> nowInMillis, null, null);
         QueryShardContext queryShardContext = spy(realQueryShardContext);
-        DocumentSubsetBitsetCache bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        DocumentSubsetBitsetCache bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY, Executors.newSingleThreadExecutor());
 
         XPackLicenseState licenseState = mock(XPackLicenseState.class);
         when(licenseState.isDocumentAndFieldLevelSecurityAllowed()).thenReturn(true);

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

@@ -412,7 +412,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
         final NativePrivilegeStore privilegeStore = new NativePrivilegeStore(settings, client, securityIndex.get());
         components.add(privilegeStore);
 
-        dlsBitsetCache.set(new DocumentSubsetBitsetCache(settings));
+        dlsBitsetCache.set(new DocumentSubsetBitsetCache(settings, threadPool));
         final FieldPermissionsCache fieldPermissionsCache = new FieldPermissionsCache(settings);
         final FileRolesStore fileRolesStore = new FileRolesStore(settings, env, resourceWatcherService, getLicenseState(),
             xContentRegistry);

+ 22 - 17
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java

@@ -28,6 +28,7 @@ import org.elasticsearch.license.License.OperationMode;
 import org.elasticsearch.license.TestUtils.UpdatableLicenseState;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequest.Empty;
 import org.elasticsearch.xpack.core.XPackSettings;
@@ -152,7 +153,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         when(fileRolesStore.roleDescriptors(Collections.singleton("fls_dls"))).thenReturn(Collections.singleton(flsDlsRole));
         when(fileRolesStore.roleDescriptors(Collections.singleton("no_fls_dls"))).thenReturn(Collections.singleton(noFlsDlsRole));
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         CompositeRolesStore compositeRolesStore = new CompositeRolesStore(Settings.EMPTY, fileRolesStore, nativeRolesStore,
                 reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(),
                 new ThreadContext(Settings.EMPTY), licenseState, cache, mock(ApiKeyService.class), documentSubsetBitsetCache,
@@ -228,7 +229,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         when(fileRolesStore.roleDescriptors(Collections.singleton("fls_dls"))).thenReturn(Collections.singleton(flsDlsRole));
         when(fileRolesStore.roleDescriptors(Collections.singleton("no_fls_dls"))).thenReturn(Collections.singleton(noFlsDlsRole));
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         CompositeRolesStore compositeRolesStore = new CompositeRolesStore(Settings.EMPTY, fileRolesStore, nativeRolesStore,
                 reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(),
                 new ThreadContext(Settings.EMPTY), licenseState, cache, mock(ApiKeyService.class), documentSubsetBitsetCache,
@@ -280,7 +281,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         }).when(nativePrivilegeStore).getPrivileges(isA(Set.class), isA(Set.class), any(ActionListener.class));
 
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final CompositeRolesStore compositeRolesStore =
                 new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,
                         nativePrivilegeStore, Collections.emptyList(), new ThreadContext(SECURITY_ENABLED_SETTINGS),
@@ -343,7 +344,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
             .put("xpack.security.authz.store.roles.negative_lookup_cache.max_size", 0)
             .build();
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore,
             reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(settings),
             new XPackLicenseState(settings), cache, mock(ApiKeyService.class), documentSubsetBitsetCache,
@@ -381,7 +382,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore());
 
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final CompositeRolesStore compositeRolesStore =
             new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,
                 mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(SECURITY_ENABLED_SETTINGS),
@@ -421,6 +422,10 @@ public class CompositeRolesStoreTests extends ESTestCase {
         verifyNoMoreInteractions(fileRolesStore, reservedRolesStore, nativeRolesStore);
     }
 
+    private DocumentSubsetBitsetCache buildBitsetCache() {
+        return new DocumentSubsetBitsetCache(Settings.EMPTY, mock(ThreadPool.class));
+    }
+
     public void testCustomRolesProviders() {
         final FileRolesStore fileRolesStore = mock(FileRolesStore.class);
         doCallRealMethod().when(fileRolesStore).accept(any(Set.class), any(ActionListener.class));
@@ -467,7 +472,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         }));
 
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final CompositeRolesStore compositeRolesStore =
                 new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,
                                 mock(NativePrivilegeStore.class), Arrays.asList(inMemoryProvider1, inMemoryProvider2),
@@ -696,7 +701,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
             (roles, listener) -> listener.onFailure(new Exception("fake failure"));
 
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final CompositeRolesStore compositeRolesStore =
             new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,
                 mock(NativePrivilegeStore.class), Arrays.asList(inMemoryProvider1, failingProvider),
@@ -744,7 +749,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         // these licenses don't allow custom role providers
         xPackLicenseState.update(randomFrom(OperationMode.BASIC, OperationMode.GOLD, OperationMode.STANDARD), true, null);
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         CompositeRolesStore compositeRolesStore = new CompositeRolesStore(
             Settings.EMPTY, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class),
             Arrays.asList(inMemoryProvider), new ThreadContext(Settings.EMPTY), xPackLicenseState, cache,
@@ -808,7 +813,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         doCallRealMethod().when(reservedRolesStore).accept(any(Set.class), any(ActionListener.class));
         NativeRolesStore nativeRolesStore = mock(NativeRolesStore.class);
         doCallRealMethod().when(nativeRolesStore).accept(any(Set.class), any(ActionListener.class));
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         CompositeRolesStore compositeRolesStore = new CompositeRolesStore(
                 Settings.EMPTY, fileRolesStore, nativeRolesStore, reservedRolesStore,
                 mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(Settings.EMPTY),
@@ -862,7 +867,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         doCallRealMethod().when(reservedRolesStore).accept(any(Set.class), any(ActionListener.class));
         NativeRolesStore nativeRolesStore = mock(NativeRolesStore.class);
         doCallRealMethod().when(nativeRolesStore).accept(any(Set.class), any(ActionListener.class));
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS,
                 fileRolesStore, nativeRolesStore, reservedRolesStore,
                 mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(SECURITY_ENABLED_SETTINGS),
@@ -894,7 +899,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         }).when(nativeRolesStore).getRoleDescriptors(isA(Set.class), any(ActionListener.class));
         final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore());
 
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final CompositeRolesStore compositeRolesStore =
             new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,
                 mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(SECURITY_ENABLED_SETTINGS),
@@ -935,7 +940,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         }).when(nativeRolesStore).getRoleDescriptors(isA(Set.class), any(ActionListener.class));
         final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore());
 
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final CompositeRolesStore compositeRolesStore =
             new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore, reservedRolesStore,
                 mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(settings),
@@ -963,7 +968,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         }).when(nativeRolesStore).getRoleDescriptors(isA(Set.class), any(ActionListener.class));
         final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore());
 
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
         final CompositeRolesStore compositeRolesStore =
             new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,
@@ -994,7 +999,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         }).when(nativeRolesStore).getRoleDescriptors(isA(Set.class), any(ActionListener.class));
         final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore());
 
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
         final CompositeRolesStore compositeRolesStore =
             new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,
@@ -1030,7 +1035,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
             return Void.TYPE;
         }).when(nativePrivStore).getPrivileges(any(Collection.class), any(Collection.class), any(ActionListener.class));
 
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
         final CompositeRolesStore compositeRolesStore =
             new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,
@@ -1076,7 +1081,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
             return Void.TYPE;
         }).when(nativePrivStore).getPrivileges(any(Collection.class), any(Collection.class), any(ActionListener.class));
 
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
         final AtomicReference<Collection<RoleDescriptor>> effectiveRoleDescriptors = new AtomicReference<Collection<RoleDescriptor>>();
         final CompositeRolesStore compositeRolesStore =
             new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,
@@ -1117,7 +1122,7 @@ public class CompositeRolesStoreTests extends ESTestCase {
         }).when(nativeRolesStore).usageStats(any(ActionListener.class));
         final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore());
 
-        final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY);
+        final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache();
 
         final CompositeRolesStore compositeRolesStore =
             new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,