|
@@ -43,10 +43,6 @@ import org.elasticsearch.index.shard.ShardId;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.test.IndexSettingsModule;
|
|
|
import org.elasticsearch.test.MockLog;
|
|
|
-import org.hamcrest.Matchers;
|
|
|
-import org.junit.After;
|
|
|
-import org.junit.Before;
|
|
|
-import org.mockito.Mockito;
|
|
|
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
@@ -70,11 +66,11 @@ import java.util.function.Supplier;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
+import static org.hamcrest.Matchers.lessThan;
|
|
|
import static org.hamcrest.Matchers.not;
|
|
|
import static org.hamcrest.Matchers.notNullValue;
|
|
|
import static org.hamcrest.Matchers.nullValue;
|
|
|
import static org.hamcrest.Matchers.sameInstance;
|
|
|
-import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
@@ -84,17 +80,6 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
// This value is based on the internal implementation details of lucene's FixedBitSet
|
|
|
// If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
|
|
|
private static final long EXPECTED_BYTES_PER_BIT_SET = 56;
|
|
|
- private ExecutorService singleThreadExecutor;
|
|
|
-
|
|
|
- @Before
|
|
|
- public void setUpExecutor() {
|
|
|
- singleThreadExecutor = Executors.newSingleThreadExecutor();
|
|
|
- }
|
|
|
-
|
|
|
- @After
|
|
|
- public void cleanUpExecutor() {
|
|
|
- singleThreadExecutor.shutdown();
|
|
|
- }
|
|
|
|
|
|
public void testSameBitSetIsReturnedForIdenticalQuery() throws Exception {
|
|
|
final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
|
|
@@ -107,7 +92,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
|
|
|
assertThat(bitSet2, notNullValue());
|
|
|
|
|
|
- assertThat(bitSet2, Matchers.sameInstance(bitSet1));
|
|
|
+ assertThat(bitSet2, sameInstance(bitSet1));
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -276,7 +261,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
assertThat(bitSet2, notNullValue());
|
|
|
|
|
|
// Loop until the cache has less than 2 items, which mean that something we evicted
|
|
|
- assertThat(cache.entryCount(), Matchers.lessThan(2));
|
|
|
+ assertThat(cache.entryCount(), lessThan(2));
|
|
|
|
|
|
}, 100, TimeUnit.MILLISECONDS);
|
|
|
|
|
@@ -292,42 +277,28 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
|
|
|
.build();
|
|
|
|
|
|
- final ExecutorService executor = mock(ExecutorService.class);
|
|
|
- final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
|
|
|
- when(executor.submit(any(Runnable.class))).thenAnswer(inv -> {
|
|
|
- final Runnable r = (Runnable) inv.getArguments()[0];
|
|
|
- runnableRef.set(r);
|
|
|
- return null;
|
|
|
- });
|
|
|
-
|
|
|
- final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, executor);
|
|
|
+ final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings);
|
|
|
assertThat(cache.entryCount(), equalTo(0));
|
|
|
assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
|
|
|
runTestOnIndex((searchExecutionContext, leafContext) -> {
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
+
|
|
|
final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(searchExecutionContext);
|
|
|
final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
|
|
|
assertThat(bitSet1, notNullValue());
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
|
|
|
final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(searchExecutionContext);
|
|
|
final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
|
|
|
assertThat(bitSet2, notNullValue());
|
|
|
-
|
|
|
- // BitSet1 has been evicted now, run the cleanup...
|
|
|
- final Runnable runnable1 = runnableRef.get();
|
|
|
- assertThat(runnable1, notNullValue());
|
|
|
- runnable1.run();
|
|
|
cache.verifyInternalConsistency();
|
|
|
|
|
|
- // Check that the original bitset is no longer in the cache (a new instance is returned)
|
|
|
assertThat(cache.getBitSet(query1, leafContext), not(sameInstance(bitSet1)));
|
|
|
-
|
|
|
- // BitSet2 has been evicted now, run the cleanup...
|
|
|
- final Runnable runnable2 = runnableRef.get();
|
|
|
- assertThat(runnable2, not(sameInstance(runnable1)));
|
|
|
- runnable2.run();
|
|
|
cache.verifyInternalConsistency();
|
|
|
});
|
|
|
+
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
}
|
|
|
|
|
|
public void testCacheUnderConcurrentAccess() throws Exception {
|
|
@@ -341,23 +312,12 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
|
|
|
.build();
|
|
|
|
|
|
+ final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings);
|
|
|
+ assertThat(cache.entryCount(), equalTo(0));
|
|
|
+ assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
+
|
|
|
final ExecutorService threads = Executors.newFixedThreadPool(concurrentThreads + 1);
|
|
|
- final ExecutorService cleanupExecutor = Mockito.mock(ExecutorService.class);
|
|
|
- when(cleanupExecutor.submit(any(Runnable.class))).thenAnswer(inv -> {
|
|
|
- final Runnable runnable = (Runnable) inv.getArguments()[0];
|
|
|
- return threads.submit(() -> {
|
|
|
- // Sleep for a small (random) length of time.
|
|
|
- // This increases the likelihood that cache could have been modified between the eviction & the cleanup
|
|
|
- Thread.sleep(randomIntBetween(1, 10));
|
|
|
- runnable.run();
|
|
|
- return null;
|
|
|
- });
|
|
|
- });
|
|
|
try {
|
|
|
- final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, cleanupExecutor);
|
|
|
- assertThat(cache.entryCount(), equalTo(0));
|
|
|
- assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
-
|
|
|
runTestOnIndices(numberOfIndices, contexts -> {
|
|
|
final CountDownLatch start = new CountDownLatch(concurrentThreads);
|
|
|
final CountDownLatch end = new CountDownLatch(concurrentThreads);
|
|
@@ -398,7 +358,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
|
|
|
threads.shutdown();
|
|
|
assertTrue("Cleanup thread did not complete in expected time", threads.awaitTermination(3, TimeUnit.SECONDS));
|
|
|
- cache.verifyInternalConsistency();
|
|
|
+ cache.verifyInternalConsistencyKeysToCache();
|
|
|
|
|
|
// Due to cache evictions, we must get more bitsets than fields
|
|
|
assertThat(uniqueBitSets.size(), greaterThan(FIELD_COUNT));
|
|
@@ -411,62 +371,41 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
} finally {
|
|
|
threads.shutdown();
|
|
|
}
|
|
|
+
|
|
|
+ cache.verifyInternalConsistencyKeysToCache();
|
|
|
}
|
|
|
|
|
|
- public void testCleanupWorksWhenIndexIsClosing() throws Exception {
|
|
|
+ public void testCleanupWorksWhenIndexIsClosed() throws Exception {
|
|
|
// Enough to hold slightly more than 1 bit-set in the cache
|
|
|
final long maxCacheBytes = EXPECTED_BYTES_PER_BIT_SET + EXPECTED_BYTES_PER_BIT_SET / 2;
|
|
|
final Settings settings = Settings.builder()
|
|
|
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
|
|
|
.build();
|
|
|
- final ExecutorService threads = Executors.newFixedThreadPool(1);
|
|
|
- final ExecutorService cleanupExecutor = Mockito.mock(ExecutorService.class);
|
|
|
- final CountDownLatch cleanupReadyLatch = new CountDownLatch(1);
|
|
|
- final CountDownLatch cleanupCompleteLatch = new CountDownLatch(1);
|
|
|
- final CountDownLatch indexCloseLatch = new CountDownLatch(1);
|
|
|
- final AtomicReference<Throwable> cleanupException = new AtomicReference<>();
|
|
|
- when(cleanupExecutor.submit(any(Runnable.class))).thenAnswer(inv -> {
|
|
|
- final Runnable runnable = (Runnable) inv.getArguments()[0];
|
|
|
- return threads.submit(() -> {
|
|
|
- try {
|
|
|
- cleanupReadyLatch.countDown();
|
|
|
- assertTrue("index close did not completed in expected time", indexCloseLatch.await(1, TimeUnit.SECONDS));
|
|
|
- runnable.run();
|
|
|
- } catch (Throwable e) {
|
|
|
- logger.warn("caught error in cleanup thread", e);
|
|
|
- cleanupException.compareAndSet(null, e);
|
|
|
- } finally {
|
|
|
- cleanupCompleteLatch.countDown();
|
|
|
- }
|
|
|
- return null;
|
|
|
- });
|
|
|
- });
|
|
|
|
|
|
- final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, cleanupExecutor);
|
|
|
+ final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings);
|
|
|
assertThat(cache.entryCount(), equalTo(0));
|
|
|
assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
|
|
|
- try {
|
|
|
- runTestOnIndex((searchExecutionContext, leafContext) -> {
|
|
|
- final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(searchExecutionContext);
|
|
|
- final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
|
|
|
- assertThat(bitSet1, notNullValue());
|
|
|
+ runTestOnIndex((searchExecutionContext, leafContext) -> {
|
|
|
+ final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(searchExecutionContext);
|
|
|
+ final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
|
|
|
+ assertThat(bitSet1, notNullValue());
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
|
|
|
- // Second query should trigger a cache eviction
|
|
|
- final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(searchExecutionContext);
|
|
|
- final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
|
|
|
- assertThat(bitSet2, notNullValue());
|
|
|
+ // Second query should trigger a cache eviction
|
|
|
+ final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(searchExecutionContext);
|
|
|
+ final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
|
|
|
+ assertThat(bitSet2, notNullValue());
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
|
|
|
- final IndexReader.CacheKey indexKey = leafContext.reader().getCoreCacheHelper().getKey();
|
|
|
- assertTrue("cleanup did not trigger in expected time", cleanupReadyLatch.await(1, TimeUnit.SECONDS));
|
|
|
- cache.onClose(indexKey);
|
|
|
- indexCloseLatch.countDown();
|
|
|
- assertTrue("cleanup did not complete in expected time", cleanupCompleteLatch.await(1, TimeUnit.SECONDS));
|
|
|
- assertThat("caught error in cleanup thread: " + cleanupException.get(), cleanupException.get(), nullValue());
|
|
|
- });
|
|
|
- } finally {
|
|
|
- threads.shutdown();
|
|
|
- }
|
|
|
+ final IndexReader.CacheKey indexKey = leafContext.reader().getCoreCacheHelper().getKey();
|
|
|
+ cache.onClose(indexKey);
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
+
|
|
|
+ // closing an index results in the associated entries being removed from the cache (at least when single threaded)
|
|
|
+ assertThat(cache.entryCount(), equalTo(0));
|
|
|
+ assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
public void testCacheIsPerIndex() throws Exception {
|
|
@@ -496,7 +435,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
runTestOnIndex(consumer);
|
|
|
}
|
|
|
|
|
|
- public void testCacheClearEntriesWhenIndexIsClosed() throws Exception {
|
|
|
+ public void testCacheClearsEntriesWhenIndexIsClosed() throws Exception {
|
|
|
final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
|
|
|
assertThat(cache.entryCount(), equalTo(0));
|
|
|
assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
@@ -508,9 +447,13 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
final BitSet bitSet = cache.getBitSet(query, leafContext);
|
|
|
assertThat(bitSet, notNullValue());
|
|
|
}
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
assertThat(cache.entryCount(), not(equalTo(0)));
|
|
|
assertThat(cache.ramBytesUsed(), not(equalTo(0L)));
|
|
|
});
|
|
|
+ cache.verifyInternalConsistency();
|
|
|
+
|
|
|
+ // closing an index results in the associated entries being removed from the cache (at least when single threaded)
|
|
|
assertThat(cache.entryCount(), equalTo(0));
|
|
|
assertThat(cache.ramBytesUsed(), equalTo(0L));
|
|
|
}
|
|
@@ -569,20 +512,17 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(searchExecutionContext);
|
|
|
final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
|
|
|
assertThat(bitSet2, notNullValue());
|
|
|
- // surprisingly, the eviction callback can call `get` on the cache (asynchronously) which causes another miss (or hit)
|
|
|
- // so this assertion is about the current state of the code, rather than the expected or desired state.
|
|
|
- // see https://github.com/elastic/elasticsearch/issues/132842
|
|
|
- expectedStats.put("misses", 3L);
|
|
|
+ expectedStats.put("misses", 2L);
|
|
|
expectedStats.put("evictions", 1L);
|
|
|
// underlying Cache class tracks hits/misses, but timing is in DLS cache, which is why we have `2L` here,
|
|
|
// because DLS cache is only hit once
|
|
|
expectedStats.put("misses_time_in_millis", 2L);
|
|
|
- assertBusy(() -> { assertThat(cache.usageStats(), equalTo(expectedStats)); }, 200, TimeUnit.MILLISECONDS);
|
|
|
+ assertThat(cache.usageStats(), equalTo(expectedStats));
|
|
|
});
|
|
|
|
|
|
final Map<String, Object> finalStats = emptyStatsSupplier.get();
|
|
|
finalStats.put("hits", 1L);
|
|
|
- finalStats.put("misses", 3L);
|
|
|
+ finalStats.put("misses", 2L);
|
|
|
finalStats.put("evictions", 2L);
|
|
|
finalStats.put("hits_time_in_millis", 1L);
|
|
|
finalStats.put("misses_time_in_millis", 2L);
|
|
@@ -710,6 +650,6 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase {
|
|
|
private DocumentSubsetBitsetCache newCache(Settings settings) {
|
|
|
final AtomicLong increasingMillisTime = new AtomicLong();
|
|
|
final LongSupplier relativeNanoTimeProvider = () -> TimeUnit.MILLISECONDS.toNanos(increasingMillisTime.getAndIncrement());
|
|
|
- return new DocumentSubsetBitsetCache(settings, singleThreadExecutor, relativeNanoTimeProvider);
|
|
|
+ return new DocumentSubsetBitsetCache(settings, relativeNanoTimeProvider);
|
|
|
}
|
|
|
}
|