|
@@ -43,10 +43,6 @@ import org.elasticsearch.index.shard.ShardId;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.test.IndexSettingsModule;
|
|
|
import org.elasticsearch.test.MockLog;
|
|
|
-import org.hamcrest.Matchers;
|
|
|
-import org.junit.After;
|
|
|
-import org.junit.Before;
|
|
|
-import org.mockito.Mockito;
|
|
|
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
@@ -65,12 +61,13 @@ import java.util.concurrent.atomic.AtomicReference;
|
|
|
import static java.util.Collections.emptyList;
|
|
|
import static java.util.Collections.emptyMap;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
+import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
+import static org.hamcrest.Matchers.lessThan;
|
|
|
import static org.hamcrest.Matchers.not;
|
|
|
import static org.hamcrest.Matchers.notNullValue;
|
|
|
import static org.hamcrest.Matchers.nullValue;
|
|
|
import static org.hamcrest.Matchers.sameInstance;
|
|
|
-import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
@@ -80,17 +77,6 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
// This value is based on the internal implementation details of lucene's FixedBitSet
|
|
|
// If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
|
|
|
private static final long EXPECTED_BYTES_PER_BIT_SET = 56;
|
|
|
- private ExecutorService singleThreadExecutor;
|
|
|
-
|
|
|
- @Before
|
|
|
- public void setUpExecutor() {
|
|
|
- singleThreadExecutor = Executors.newSingleThreadExecutor();
|
|
|
- }
|
|
|
-
|
|
|
- @After
|
|
|
- public void cleanUpExecutor() {
|
|
|
- singleThreadExecutor.shutdown();
|
|
|
- }
|
|
|
|
|
|
public void testSameBitSetIsReturnedForIdenticalQuery() throws Exception {
|
|
|
final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
|
|
@@ -103,7 +89,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
|
|
|
assertThat(bitSet2, notNullValue());
|
|
|
|
|
|
- assertThat(bitSet2, Matchers.sameInstance(bitSet1));
|
|
|
+ assertThat(bitSet2, sameInstance(bitSet1));
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -272,7 +258,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
assertThat(bitSet2, notNullValue());
|
|
|
|
|
|
// Loop until the cache has less than 2 items, which mean that something we evicted
|
|
|
- assertThat(cache.entryCount(), Matchers.lessThan(2));
|
|
|
+ assertThat(cache.entryCount(), lessThan(2));
|
|
|
|
|
|
}, 100, TimeUnit.MILLISECONDS);
|
|
|
|
|
@@ -288,42 +274,28 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
|
|
|
.build();
|
|
|
|
|
|
- final ExecutorService executor = mock(ExecutorService.class);
|
|
|
- final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
|
|
|
- when(executor.submit(any(Runnable.class))).thenAnswer(inv -> {
|
|
|
- final Runnable r = (Runnable) inv.getArguments()[0];
|
|
|
- runnableRef.set(r);
|
|
|
- return null;
|
|
|
- });
|
|
|
-
|
|
|
- final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, executor);
|
|
|
+ final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings);
|
|
|
assertThat(cache.entryCount(), equalTo(0));
|
|
|
assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
|
|
|
runTestOnIndex((searchExecutionContext, leafContext) -> {
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
+
|
|
|
final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(searchExecutionContext);
|
|
|
final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
|
|
|
assertThat(bitSet1, notNullValue());
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
|
|
|
final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(searchExecutionContext);
|
|
|
final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
|
|
|
assertThat(bitSet2, notNullValue());
|
|
|
-
|
|
|
- // BitSet1 has been evicted now, run the cleanup...
|
|
|
- final Runnable runnable1 = runnableRef.get();
|
|
|
- assertThat(runnable1, notNullValue());
|
|
|
- runnable1.run();
|
|
|
cache.verifyInternalConsistency();
|
|
|
|
|
|
- // Check that the original bitset is no longer in the cache (a new instance is returned)
|
|
|
assertThat(cache.getBitSet(query1, leafContext), not(sameInstance(bitSet1)));
|
|
|
-
|
|
|
- // BitSet2 has been evicted now, run the cleanup...
|
|
|
- final Runnable runnable2 = runnableRef.get();
|
|
|
- assertThat(runnable2, not(sameInstance(runnable1)));
|
|
|
- runnable2.run();
|
|
|
cache.verifyInternalConsistency();
|
|
|
});
|
|
|
+
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
}
|
|
|
|
|
|
public void testCacheUnderConcurrentAccess() throws Exception {
|
|
@@ -337,23 +309,12 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
|
|
|
.build();
|
|
|
|
|
|
+ final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings);
|
|
|
+ assertThat(cache.entryCount(), equalTo(0));
|
|
|
+ assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
+
|
|
|
final ExecutorService threads = Executors.newFixedThreadPool(concurrentThreads + 1);
|
|
|
- final ExecutorService cleanupExecutor = Mockito.mock(ExecutorService.class);
|
|
|
- when(cleanupExecutor.submit(any(Runnable.class))).thenAnswer(inv -> {
|
|
|
- final Runnable runnable = (Runnable) inv.getArguments()[0];
|
|
|
- return threads.submit(() -> {
|
|
|
- // Sleep for a small (random) length of time.
|
|
|
- // This increases the likelihood that cache could have been modified between the eviction & the cleanup
|
|
|
- Thread.sleep(randomIntBetween(1, 10));
|
|
|
- runnable.run();
|
|
|
- return null;
|
|
|
- });
|
|
|
- });
|
|
|
try {
|
|
|
- final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, cleanupExecutor);
|
|
|
- assertThat(cache.entryCount(), equalTo(0));
|
|
|
- assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
-
|
|
|
runTestOnIndices(numberOfIndices, contexts -> {
|
|
|
final CountDownLatch start = new CountDownLatch(concurrentThreads);
|
|
|
final CountDownLatch end = new CountDownLatch(concurrentThreads);
|
|
@@ -394,12 +355,12 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
|
|
|
threads.shutdown();
|
|
|
assertTrue("Cleanup thread did not complete in expected time", threads.awaitTermination(3, TimeUnit.SECONDS));
|
|
|
- cache.verifyInternalConsistency();
|
|
|
+ cache.verifyInternalConsistencyKeysToCache();
|
|
|
|
|
|
// Due to cache evictions, we must get more bitsets than fields
|
|
|
- assertThat(uniqueBitSets.size(), Matchers.greaterThan(FIELD_COUNT));
|
|
|
+ assertThat(uniqueBitSets.size(), greaterThan(FIELD_COUNT));
|
|
|
// Due to cache evictions, we must have seen more bitsets than the cache currently holds
|
|
|
- assertThat(uniqueBitSets.size(), Matchers.greaterThan(cache.entryCount()));
|
|
|
+ assertThat(uniqueBitSets.size(), greaterThan(cache.entryCount()));
|
|
|
// Even under concurrent pressure, the cache should hit the expected size
|
|
|
assertThat(cache.entryCount(), is(maxCacheCount));
|
|
|
assertThat(cache.ramBytesUsed(), is(maxCacheBytes));
|
|
@@ -407,62 +368,41 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
} finally {
|
|
|
threads.shutdown();
|
|
|
}
|
|
|
+
|
|
|
+ cache.verifyInternalConsistencyKeysToCache();
|
|
|
}
|
|
|
|
|
|
- public void testCleanupWorksWhenIndexIsClosing() throws Exception {
|
|
|
+ public void testCleanupWorksWhenIndexIsClosed() throws Exception {
|
|
|
// Enough to hold slightly more than 1 bit-set in the cache
|
|
|
final long maxCacheBytes = EXPECTED_BYTES_PER_BIT_SET + EXPECTED_BYTES_PER_BIT_SET / 2;
|
|
|
final Settings settings = Settings.builder()
|
|
|
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
|
|
|
.build();
|
|
|
- final ExecutorService threads = Executors.newFixedThreadPool(1);
|
|
|
- final ExecutorService cleanupExecutor = Mockito.mock(ExecutorService.class);
|
|
|
- final CountDownLatch cleanupReadyLatch = new CountDownLatch(1);
|
|
|
- final CountDownLatch cleanupCompleteLatch = new CountDownLatch(1);
|
|
|
- final CountDownLatch indexCloseLatch = new CountDownLatch(1);
|
|
|
- final AtomicReference<Throwable> cleanupException = new AtomicReference<>();
|
|
|
- when(cleanupExecutor.submit(any(Runnable.class))).thenAnswer(inv -> {
|
|
|
- final Runnable runnable = (Runnable) inv.getArguments()[0];
|
|
|
- return threads.submit(() -> {
|
|
|
- try {
|
|
|
- cleanupReadyLatch.countDown();
|
|
|
- assertTrue("index close did not completed in expected time", indexCloseLatch.await(1, TimeUnit.SECONDS));
|
|
|
- runnable.run();
|
|
|
- } catch (Throwable e) {
|
|
|
- logger.warn("caught error in cleanup thread", e);
|
|
|
- cleanupException.compareAndSet(null, e);
|
|
|
- } finally {
|
|
|
- cleanupCompleteLatch.countDown();
|
|
|
- }
|
|
|
- return null;
|
|
|
- });
|
|
|
- });
|
|
|
|
|
|
- final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, cleanupExecutor);
|
|
|
+ final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings);
|
|
|
assertThat(cache.entryCount(), equalTo(0));
|
|
|
assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
|
|
|
- try {
|
|
|
- runTestOnIndex((searchExecutionContext, leafContext) -> {
|
|
|
- final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(searchExecutionContext);
|
|
|
- final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
|
|
|
- assertThat(bitSet1, notNullValue());
|
|
|
+ runTestOnIndex((searchExecutionContext, leafContext) -> {
|
|
|
+ final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(searchExecutionContext);
|
|
|
+ final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
|
|
|
+ assertThat(bitSet1, notNullValue());
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
|
|
|
- // Second query should trigger a cache eviction
|
|
|
- final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(searchExecutionContext);
|
|
|
- final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
|
|
|
- assertThat(bitSet2, notNullValue());
|
|
|
+ // Second query should trigger a cache eviction
|
|
|
+ final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(searchExecutionContext);
|
|
|
+ final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
|
|
|
+ assertThat(bitSet2, notNullValue());
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
|
|
|
- final IndexReader.CacheKey indexKey = leafContext.reader().getCoreCacheHelper().getKey();
|
|
|
- assertTrue("cleanup did not trigger in expected time", cleanupReadyLatch.await(1, TimeUnit.SECONDS));
|
|
|
- cache.onClose(indexKey);
|
|
|
- indexCloseLatch.countDown();
|
|
|
- assertTrue("cleanup did not complete in expected time", cleanupCompleteLatch.await(1, TimeUnit.SECONDS));
|
|
|
- assertThat("caught error in cleanup thread: " + cleanupException.get(), cleanupException.get(), nullValue());
|
|
|
- });
|
|
|
- } finally {
|
|
|
- threads.shutdown();
|
|
|
- }
|
|
|
+ final IndexReader.CacheKey indexKey = leafContext.reader().getCoreCacheHelper().getKey();
|
|
|
+ cache.onClose(indexKey);
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
+
|
|
|
+ // closing an index results in the associated entries being removed from the cache (at least when single threaded)
|
|
|
+ assertThat(cache.entryCount(), equalTo(0));
|
|
|
+ assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
public void testCacheIsPerIndex() throws Exception {
|
|
@@ -492,7 +432,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
runTestOnIndex(consumer);
|
|
|
}
|
|
|
|
|
|
- public void testCacheClearEntriesWhenIndexIsClosed() throws Exception {
|
|
|
+ public void testCacheClearsEntriesWhenIndexIsClosed() throws Exception {
|
|
|
final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
|
|
|
assertThat(cache.entryCount(), equalTo(0));
|
|
|
assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
@@ -504,9 +444,13 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
final BitSet bitSet = cache.getBitSet(query, leafContext);
|
|
|
assertThat(bitSet, notNullValue());
|
|
|
}
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
assertThat(cache.entryCount(), not(equalTo(0)));
|
|
|
assertThat(cache.ramBytesUsed(), not(equalTo(0L)));
|
|
|
});
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
+
|
|
|
+ // closing an index results in the associated entries being removed from the cache (at least when single threaded)
|
|
|
assertThat(cache.entryCount(), equalTo(0));
|
|
|
assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
}
|
|
@@ -650,7 +594,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
private DocumentSubsetBitsetCache newCache(Settings settings) {
|
|
|
- return new DocumentSubsetBitsetCache(settings, singleThreadExecutor);
|
|
|
+ return new DocumentSubsetBitsetCache(settings);
|
|
|
}
|
|
|
|
|
|
}
|