|
|
@@ -19,453 +19,302 @@
|
|
|
|
|
|
package org.elasticsearch.indices.cache.query;
|
|
|
|
|
|
-import com.carrotsearch.hppc.ObjectHashSet;
|
|
|
-import com.carrotsearch.hppc.ObjectSet;
|
|
|
-import com.google.common.cache.Cache;
|
|
|
-import com.google.common.cache.CacheBuilder;
|
|
|
-import com.google.common.cache.RemovalListener;
|
|
|
-import com.google.common.cache.RemovalNotification;
|
|
|
-import com.google.common.cache.Weigher;
|
|
|
-
|
|
|
-import org.apache.lucene.index.DirectoryReader;
|
|
|
-import org.apache.lucene.index.IndexReader;
|
|
|
-import org.apache.lucene.util.Accountable;
|
|
|
-import org.apache.lucene.util.RamUsageEstimator;
|
|
|
-import org.elasticsearch.action.search.SearchType;
|
|
|
-import org.elasticsearch.cluster.ClusterService;
|
|
|
-import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
-import org.elasticsearch.common.bytes.BytesReference;
|
|
|
+import org.apache.lucene.index.LeafReaderContext;
|
|
|
+import org.apache.lucene.index.Term;
|
|
|
+import org.apache.lucene.search.Explanation;
|
|
|
+import org.apache.lucene.search.LRUQueryCache;
|
|
|
+import org.apache.lucene.search.Query;
|
|
|
+import org.apache.lucene.search.QueryCache;
|
|
|
+import org.apache.lucene.search.QueryCachingPolicy;
|
|
|
+import org.apache.lucene.search.Scorer;
|
|
|
+import org.apache.lucene.search.Weight;
|
|
|
+import org.apache.lucene.util.Bits;
|
|
|
import org.elasticsearch.common.component.AbstractComponent;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
|
+import org.elasticsearch.common.lucene.ShardCoreKeyMap;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.unit.MemorySizeValue;
|
|
|
-import org.elasticsearch.common.unit.TimeValue;
|
|
|
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
-import org.elasticsearch.index.shard.IndexShard;
|
|
|
-import org.elasticsearch.index.shard.IndexShardState;
|
|
|
-import org.elasticsearch.search.internal.SearchContext;
|
|
|
-import org.elasticsearch.search.internal.ShardSearchRequest;
|
|
|
-import org.elasticsearch.search.query.QueryPhase;
|
|
|
-import org.elasticsearch.search.query.QuerySearchResult;
|
|
|
-import org.elasticsearch.threadpool.ThreadPool;
|
|
|
-
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.EnumSet;
|
|
|
-import java.util.Iterator;
|
|
|
+import org.elasticsearch.index.cache.query.QueryCacheStats;
|
|
|
+import org.elasticsearch.index.shard.ShardId;
|
|
|
+
|
|
|
+import java.io.Closeable;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.IdentityHashMap;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
-import java.util.concurrent.Callable;
|
|
|
-import java.util.concurrent.ConcurrentMap;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
-import static org.elasticsearch.common.Strings.hasLength;
|
|
|
-
|
|
|
-/**
|
|
|
- * The indices query cache allows to cache a shard level query stage responses, helping with improving
|
|
|
- * similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent
|
|
|
- * with the semantics of NRT (the index reader version is part of the cache key), and relies on size based
|
|
|
- * eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that
|
|
|
- * are no longer used or closed shards.
|
|
|
- * <p/>
|
|
|
- * Currently, the cache is only enabled for {@link SearchType#COUNT}, and can only be opted in on an index
|
|
|
- * level setting that can be dynamically changed and defaults to false.
|
|
|
- * <p/>
|
|
|
- * There are still several TODOs left in this class, some easily addressable, some more complex, but the support
|
|
|
- * is functional.
|
|
|
- */
|
|
|
-public class IndicesQueryCache extends AbstractComponent implements RemovalListener<IndicesQueryCache.Key, IndicesQueryCache.Value> {
|
|
|
-
|
|
|
- /**
|
|
|
- * A setting to enable or disable query caching on an index level. Its dynamic by default
|
|
|
- * since we are checking on the cluster state IndexMetaData always.
|
|
|
- */
|
|
|
- public static final String INDEX_CACHE_QUERY_ENABLED = "index.cache.query.enable";
|
|
|
- public static final String INDICES_CACHE_QUERY_CLEAN_INTERVAL = "indices.cache.query.clean_interval";
|
|
|
-
|
|
|
- public static final String INDICES_CACHE_QUERY_SIZE = "indices.cache.query.size";
|
|
|
- public static final String INDICES_CACHE_QUERY_EXPIRE = "indices.cache.query.expire";
|
|
|
- public static final String INDICES_CACHE_QUERY_CONCURRENCY_LEVEL = "indices.cache.query.concurrency_level";
|
|
|
-
|
|
|
- private static final Set<SearchType> CACHEABLE_SEARCH_TYPES = EnumSet.of(SearchType.QUERY_THEN_FETCH, SearchType.QUERY_AND_FETCH);
|
|
|
-
|
|
|
- private final ThreadPool threadPool;
|
|
|
- private final ClusterService clusterService;
|
|
|
-
|
|
|
- private final TimeValue cleanInterval;
|
|
|
- private final Reaper reaper;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
- final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
|
|
|
- final Set<CleanupKey> keysToClean = ConcurrentCollections.newConcurrentSet();
|
|
|
+public class IndicesQueryCache extends AbstractComponent implements QueryCache, Closeable {
|
|
|
|
|
|
+ public static final String INDICES_CACHE_QUERY_SIZE = "indices.queries.cache.size";
|
|
|
+ @Deprecated
|
|
|
+ public static final String DEPRECATED_INDICES_CACHE_QUERY_SIZE = "indices.cache.filter.size";
|
|
|
+ public static final String INDICES_CACHE_QUERY_COUNT = "indices.queries.cache.count";
|
|
|
|
|
|
- //TODO make these changes configurable on the cluster level
|
|
|
- private final String size;
|
|
|
- private final TimeValue expire;
|
|
|
- private final int concurrencyLevel;
|
|
|
+ private final LRUQueryCache cache;
|
|
|
+ private final ShardCoreKeyMap shardKeyMap = new ShardCoreKeyMap();
|
|
|
+ private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
|
|
|
+ private volatile long sharedRamBytesUsed;
|
|
|
|
|
|
- private volatile Cache<Key, Value> cache;
|
|
|
+ // This is a hack for the fact that the close listener for the
|
|
|
+ // ShardCoreKeyMap will be called before onDocIdSetEviction
|
|
|
+ // See onDocIdSetEviction for more info
|
|
|
+ private final Map<Object, StatsAndCount> stats2 = new IdentityHashMap<>();
|
|
|
|
|
|
@Inject
|
|
|
- public IndicesQueryCache(Settings settings, ClusterService clusterService, ThreadPool threadPool) {
|
|
|
+ public IndicesQueryCache(Settings settings) {
|
|
|
super(settings);
|
|
|
- this.clusterService = clusterService;
|
|
|
- this.threadPool = threadPool;
|
|
|
- this.cleanInterval = settings.getAsTime(INDICES_CACHE_QUERY_CLEAN_INTERVAL, TimeValue.timeValueSeconds(60));
|
|
|
- // this cache can be very small yet still be very effective
|
|
|
- this.size = settings.get(INDICES_CACHE_QUERY_SIZE, "1%");
|
|
|
- this.expire = settings.getAsTime(INDICES_CACHE_QUERY_EXPIRE, null);
|
|
|
- // defaults to 4, but this is a busy map for all indices, increase it a bit by default
|
|
|
- this.concurrencyLevel = settings.getAsInt(INDICES_CACHE_QUERY_CONCURRENCY_LEVEL, 16);
|
|
|
- if (concurrencyLevel <= 0) {
|
|
|
- throw new IllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel);
|
|
|
- }
|
|
|
- buildCache();
|
|
|
-
|
|
|
- this.reaper = new Reaper();
|
|
|
- threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, reaper);
|
|
|
- }
|
|
|
-
|
|
|
- private void buildCache() {
|
|
|
- long sizeInBytes = MemorySizeValue.parseBytesSizeValueOrHeapRatio(size, INDICES_CACHE_QUERY_SIZE).bytes();
|
|
|
-
|
|
|
- CacheBuilder<Key, Value> cacheBuilder = CacheBuilder.newBuilder()
|
|
|
- .maximumWeight(sizeInBytes).weigher(new QueryCacheWeigher()).removalListener(this);
|
|
|
- cacheBuilder.concurrencyLevel(concurrencyLevel);
|
|
|
-
|
|
|
- if (expire != null) {
|
|
|
- cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS);
|
|
|
- }
|
|
|
-
|
|
|
- cache = cacheBuilder.build();
|
|
|
- }
|
|
|
-
|
|
|
- private static class QueryCacheWeigher implements Weigher<Key, Value> {
|
|
|
-
|
|
|
- @Override
|
|
|
- public int weigh(Key key, Value value) {
|
|
|
- return (int) (key.ramBytesUsed() + value.ramBytesUsed());
|
|
|
+ String sizeString = settings.get(INDICES_CACHE_QUERY_SIZE);
|
|
|
+ if (sizeString == null) {
|
|
|
+ sizeString = settings.get(DEPRECATED_INDICES_CACHE_QUERY_SIZE);
|
|
|
+ if (sizeString != null) {
|
|
|
+ deprecationLogger.deprecated("The [" + DEPRECATED_INDICES_CACHE_QUERY_SIZE
|
|
|
+ + "] settings is now deprecated, use [" + INDICES_CACHE_QUERY_SIZE + "] instead");
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- public void close() {
|
|
|
- reaper.close();
|
|
|
- cache.invalidateAll();
|
|
|
- }
|
|
|
-
|
|
|
- public void clear(IndexShard shard) {
|
|
|
- if (shard == null) {
|
|
|
- return;
|
|
|
+ if (sizeString == null) {
|
|
|
+ sizeString = "10%";
|
|
|
}
|
|
|
- keysToClean.add(new CleanupKey(shard, -1));
|
|
|
- logger.trace("{} explicit cache clear", shard.shardId());
|
|
|
- reaper.reap();
|
|
|
- }
|
|
|
+ final ByteSizeValue size = MemorySizeValue.parseBytesSizeValueOrHeapRatio(sizeString, INDICES_CACHE_QUERY_SIZE);
|
|
|
+ final int count = settings.getAsInt(INDICES_CACHE_QUERY_COUNT, 1000);
|
|
|
+ logger.debug("using [node] query cache with size [{}], actual_size [{}], max filter count [{}]",
|
|
|
+ sizeString, size, count);
|
|
|
+ cache = new LRUQueryCache(count, size.bytes()) {
|
|
|
|
|
|
- @Override
|
|
|
- public void onRemoval(RemovalNotification<Key, Value> notification) {
|
|
|
- if (notification.getKey() == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- notification.getKey().shard.queryCache().onRemoval(notification);
|
|
|
- }
|
|
|
+ private Stats getStats(Object coreKey) {
|
|
|
+ final ShardId shardId = shardKeyMap.getShardId(coreKey);
|
|
|
+ if (shardId == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return shardStats.get(shardId);
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
- * Can the shard request be cached at all?
|
|
|
- */
|
|
|
- public boolean canCache(ShardSearchRequest request, SearchContext context) {
|
|
|
- // TODO: for now, template is not supported, though we could use the generated bytes as the key
|
|
|
- if (hasLength(request.templateSource())) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ private Stats getOrCreateStats(Object coreKey) {
|
|
|
+ final ShardId shardId = shardKeyMap.getShardId(coreKey);
|
|
|
+ Stats stats = shardStats.get(shardId);
|
|
|
+ if (stats == null) {
|
|
|
+ stats = new Stats();
|
|
|
+ shardStats.put(shardId, stats);
|
|
|
+ }
|
|
|
+ return stats;
|
|
|
+ }
|
|
|
|
|
|
- // for now, only enable it for requests with no hits
|
|
|
- if (context.size() != 0) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ // It's ok to not protect these callbacks by a lock since it is
|
|
|
+ // done in LRUQueryCache
|
|
|
+ @Override
|
|
|
+ protected void onClear() {
|
|
|
+ assert Thread.holdsLock(this);
|
|
|
+ super.onClear();
|
|
|
+ for (Stats stats : shardStats.values()) {
|
|
|
+ // don't throw away hit/miss
|
|
|
+ stats.cacheSize = 0;
|
|
|
+ stats.ramBytesUsed = 0;
|
|
|
+ }
|
|
|
+ sharedRamBytesUsed = 0;
|
|
|
+ }
|
|
|
|
|
|
- // We cannot cache with DFS because results depend not only on the content of the index but also
|
|
|
- // on the overridden statistics. So if you ran two queries on the same index with different stats
|
|
|
- // (because an other shard was updated) you would get wrong results because of the scores
|
|
|
- // (think about top_hits aggs or scripts using the score)
|
|
|
- if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ protected void onQueryCache(Query filter, long ramBytesUsed) {
|
|
|
+ assert Thread.holdsLock(this);
|
|
|
+ super.onQueryCache(filter, ramBytesUsed);
|
|
|
+ sharedRamBytesUsed += ramBytesUsed;
|
|
|
+ }
|
|
|
|
|
|
- IndexMetaData index = clusterService.state().getMetaData().index(request.index());
|
|
|
- if (index == null) { // in case we didn't yet have the cluster state, or it just got deleted
|
|
|
- return false;
|
|
|
- }
|
|
|
- // if not explicitly set in the request, use the index setting, if not, use the request
|
|
|
- if (request.queryCache() == null) {
|
|
|
- if (!index.settings().getAsBoolean(INDEX_CACHE_QUERY_ENABLED, Boolean.FALSE)) {
|
|
|
- return false;
|
|
|
+ @Override
|
|
|
+ protected void onQueryEviction(Query filter, long ramBytesUsed) {
|
|
|
+ assert Thread.holdsLock(this);
|
|
|
+ super.onQueryEviction(filter, ramBytesUsed);
|
|
|
+ sharedRamBytesUsed -= ramBytesUsed;
|
|
|
}
|
|
|
- } else if (!request.queryCache()) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- // if the reader is not a directory reader, we can't get the version from it
|
|
|
- if (!(context.searcher().getIndexReader() instanceof DirectoryReader)) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- // if now in millis is used (or in the future, a more generic "isDeterministic" flag
|
|
|
- // then we can't cache based on "now" key within the search request, as it is not deterministic
|
|
|
- if (context.nowInMillisUsed()) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
|
|
|
- /**
|
|
|
- * Loads the cache result, computing it if needed by executing the query phase and otherwise deserializing the cached
|
|
|
- * value into the {@link SearchContext#queryResult() context's query result}. The combination of load + compute allows
|
|
|
- * to have a single load operation that will cause other requests with the same key to wait till its loaded an reuse
|
|
|
- * the same cache.
|
|
|
- */
|
|
|
- public void loadIntoContext(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
|
|
|
- assert canCache(request, context);
|
|
|
- Key key = buildKey(request, context);
|
|
|
- Loader loader = new Loader(queryPhase, context, key);
|
|
|
- Value value = cache.get(key, loader);
|
|
|
- if (loader.isLoaded()) {
|
|
|
- key.shard.queryCache().onMiss();
|
|
|
- // see if its the first time we see this reader, and make sure to register a cleanup key
|
|
|
- CleanupKey cleanupKey = new CleanupKey(context.indexShard(), ((DirectoryReader) context.searcher().getIndexReader()).getVersion());
|
|
|
- if (!registeredClosedListeners.containsKey(cleanupKey)) {
|
|
|
- Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE);
|
|
|
- if (previous == null) {
|
|
|
- context.searcher().getIndexReader().addReaderClosedListener(cleanupKey);
|
|
|
+ @Override
|
|
|
+ protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
|
|
|
+ assert Thread.holdsLock(this);
|
|
|
+ super.onDocIdSetCache(readerCoreKey, ramBytesUsed);
|
|
|
+ final Stats shardStats = getOrCreateStats(readerCoreKey);
|
|
|
+ shardStats.cacheSize += 1;
|
|
|
+ shardStats.cacheCount += 1;
|
|
|
+ shardStats.ramBytesUsed += ramBytesUsed;
|
|
|
+
|
|
|
+ StatsAndCount statsAndCount = stats2.get(readerCoreKey);
|
|
|
+ if (statsAndCount == null) {
|
|
|
+ statsAndCount = new StatsAndCount(shardStats);
|
|
|
+ stats2.put(readerCoreKey, statsAndCount);
|
|
|
}
|
|
|
+ statsAndCount.count += 1;
|
|
|
}
|
|
|
- } else {
|
|
|
- key.shard.queryCache().onHit();
|
|
|
- // restore the cached query result into the context
|
|
|
- final QuerySearchResult result = context.queryResult();
|
|
|
- result.readFromWithId(context.id(), value.reference.streamInput());
|
|
|
- result.shardTarget(context.shardTarget());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private static class Loader implements Callable<Value> {
|
|
|
|
|
|
- private final QueryPhase queryPhase;
|
|
|
- private final SearchContext context;
|
|
|
- private final IndicesQueryCache.Key key;
|
|
|
- private boolean loaded;
|
|
|
-
|
|
|
- Loader(QueryPhase queryPhase, SearchContext context, IndicesQueryCache.Key key) {
|
|
|
- this.queryPhase = queryPhase;
|
|
|
- this.context = context;
|
|
|
- this.key = key;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ protected void onDocIdSetEviction(Object readerCoreKey, int numEntries, long sumRamBytesUsed) {
|
|
|
+ assert Thread.holdsLock(this);
|
|
|
+ super.onDocIdSetEviction(readerCoreKey, numEntries, sumRamBytesUsed);
|
|
|
+ // We can't use ShardCoreKeyMap here because its core closed
|
|
|
+ // listener is called before the listener of the cache which
|
|
|
+ // triggers this eviction. So instead we use use stats2 that
|
|
|
+ // we only evict when nothing is cached anymore on the segment
|
|
|
+ // instead of relying on close listeners
|
|
|
+ final StatsAndCount statsAndCount = stats2.get(readerCoreKey);
|
|
|
+ final Stats shardStats = statsAndCount.stats;
|
|
|
+ shardStats.cacheSize -= numEntries;
|
|
|
+ shardStats.ramBytesUsed -= sumRamBytesUsed;
|
|
|
+ statsAndCount.count -= numEntries;
|
|
|
+ if (statsAndCount.count == 0) {
|
|
|
+ stats2.remove(readerCoreKey);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- public boolean isLoaded() {
|
|
|
- return this.loaded;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ protected void onHit(Object readerCoreKey, Query filter) {
|
|
|
+ assert Thread.holdsLock(this);
|
|
|
+ super.onHit(readerCoreKey, filter);
|
|
|
+ final Stats shardStats = getStats(readerCoreKey);
|
|
|
+ shardStats.hitCount += 1;
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public Value call() throws Exception {
|
|
|
- queryPhase.execute(context);
|
|
|
-
|
|
|
- /* BytesStreamOutput allows to pass the expected size but by default uses
|
|
|
- * BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
|
|
|
- * a date histogram with 3 buckets is ~100byte so 16k might be very wasteful
|
|
|
- * since we don't shrink to the actual size once we are done serializing.
|
|
|
- * By passing 512 as the expected size we will resize the byte array in the stream
|
|
|
- * slowly until we hit the page size and don't waste too much memory for small query
|
|
|
- * results.*/
|
|
|
- final int expectedSizeInBytes = 512;
|
|
|
- try (BytesStreamOutput out = new BytesStreamOutput(expectedSizeInBytes)) {
|
|
|
- context.queryResult().writeToNoId(out);
|
|
|
- // for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
|
|
|
- // the memory properly paged instead of having varied sized bytes
|
|
|
- final BytesReference reference = out.bytes();
|
|
|
- loaded = true;
|
|
|
- Value value = new Value(reference, out.ramBytesUsed());
|
|
|
- key.shard.queryCache().onCached(key, value);
|
|
|
- return value;
|
|
|
+ @Override
|
|
|
+ protected void onMiss(Object readerCoreKey, Query filter) {
|
|
|
+ assert Thread.holdsLock(this);
|
|
|
+ super.onMiss(readerCoreKey, filter);
|
|
|
+ final Stats shardStats = getOrCreateStats(readerCoreKey);
|
|
|
+ shardStats.missCount += 1;
|
|
|
}
|
|
|
- }
|
|
|
+ };
|
|
|
+ sharedRamBytesUsed = 0;
|
|
|
}
|
|
|
|
|
|
- public static class Value implements Accountable {
|
|
|
- final BytesReference reference;
|
|
|
- final long ramBytesUsed;
|
|
|
-
|
|
|
- public Value(BytesReference reference, long ramBytesUsed) {
|
|
|
- this.reference = reference;
|
|
|
- this.ramBytesUsed = ramBytesUsed;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public long ramBytesUsed() {
|
|
|
- return ramBytesUsed;
|
|
|
- }
|
|
|
+ /** Get usage statistics for the given shard. */
|
|
|
+ public QueryCacheStats getStats(ShardId shard) {
|
|
|
+ final Map<ShardId, QueryCacheStats> stats = new HashMap<>();
|
|
|
+ for (Map.Entry<ShardId, Stats> entry : shardStats.entrySet()) {
|
|
|
+ stats.put(entry.getKey(), entry.getValue().toQueryCacheStats());
|
|
|
+ }
|
|
|
+ QueryCacheStats shardStats = new QueryCacheStats();
|
|
|
+ QueryCacheStats info = stats.get(shard);
|
|
|
+ if (info == null) {
|
|
|
+ info = new QueryCacheStats();
|
|
|
+ }
|
|
|
+ shardStats.add(info);
|
|
|
+
|
|
|
+ // We also have some shared ram usage that we try to distribute to
|
|
|
+ // proportionally to their number of cache entries of each shard
|
|
|
+ long totalSize = 0;
|
|
|
+ for (QueryCacheStats s : stats.values()) {
|
|
|
+ totalSize += s.getCacheSize();
|
|
|
+ }
|
|
|
+ final double weight = totalSize == 0
|
|
|
+ ? 1d / stats.size()
|
|
|
+ : shardStats.getCacheSize() / totalSize;
|
|
|
+ final long additionalRamBytesUsed = Math.round(weight * sharedRamBytesUsed);
|
|
|
+ shardStats.add(new QueryCacheStats(additionalRamBytesUsed, 0, 0, 0, 0));
|
|
|
+ return shardStats;
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public Collection<Accountable> getChildResources() {
|
|
|
- return Collections.emptyList();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public Weight doCache(Weight weight, QueryCachingPolicy policy) {
|
|
|
+ while (weight instanceof CachingWeightWrapper) {
|
|
|
+ weight = ((CachingWeightWrapper) weight).in;
|
|
|
+ }
|
|
|
+ final Weight in = cache.doCache(weight, policy);
|
|
|
+ // We wrap the weight to track the readers it sees and map them with
|
|
|
+ // the shards they belong to
|
|
|
+ return new CachingWeightWrapper(in);
|
|
|
}
|
|
|
|
|
|
- public static class Key implements Accountable {
|
|
|
- public final IndexShard shard; // use as identity equality
|
|
|
- public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
|
|
|
- public final BytesReference value;
|
|
|
+ private class CachingWeightWrapper extends Weight {
|
|
|
|
|
|
- Key(IndexShard shard, long readerVersion, BytesReference value) {
|
|
|
- this.shard = shard;
|
|
|
- this.readerVersion = readerVersion;
|
|
|
- this.value = value;
|
|
|
- }
|
|
|
+ private final Weight in;
|
|
|
|
|
|
- @Override
|
|
|
- public long ramBytesUsed() {
|
|
|
- return RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_LONG + value.length();
|
|
|
+ protected CachingWeightWrapper(Weight in) {
|
|
|
+ super(in.getQuery());
|
|
|
+ this.in = in;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Collection<Accountable> getChildResources() {
|
|
|
- // TODO: more detailed ram usage?
|
|
|
- return Collections.emptyList();
|
|
|
+ public void extractTerms(Set<Term> terms) {
|
|
|
+ in.extractTerms(terms);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public boolean equals(Object o) {
|
|
|
- if (this == o) return true;
|
|
|
- Key key = (Key) o;
|
|
|
- if (readerVersion != key.readerVersion) return false;
|
|
|
- if (!shard.equals(key.shard)) return false;
|
|
|
- if (!value.equals(key.value)) return false;
|
|
|
- return true;
|
|
|
+ public Explanation explain(LeafReaderContext context, int doc) throws IOException {
|
|
|
+ shardKeyMap.add(context.reader());
|
|
|
+ return in.explain(context, doc);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public int hashCode() {
|
|
|
- int result = shard.hashCode();
|
|
|
- result = 31 * result + (int) (readerVersion ^ (readerVersion >>> 32));
|
|
|
- result = 31 * result + value.hashCode();
|
|
|
- return result;
|
|
|
+ public float getValueForNormalization() throws IOException {
|
|
|
+ return in.getValueForNormalization();
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- private class CleanupKey implements IndexReader.ReaderClosedListener {
|
|
|
- IndexShard indexShard;
|
|
|
- long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped
|
|
|
-
|
|
|
- private CleanupKey(IndexShard indexShard, long readerVersion) {
|
|
|
- this.indexShard = indexShard;
|
|
|
- this.readerVersion = readerVersion;
|
|
|
+ @Override
|
|
|
+ public void normalize(float norm, float topLevelBoost) {
|
|
|
+ in.normalize(norm, topLevelBoost);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void onClose(IndexReader reader) {
|
|
|
- Boolean remove = registeredClosedListeners.remove(this);
|
|
|
- if (remove != null) {
|
|
|
- keysToClean.add(this);
|
|
|
- }
|
|
|
+ public Scorer scorer(LeafReaderContext context, Bits acceptDocs) throws IOException {
|
|
|
+ shardKeyMap.add(context.reader());
|
|
|
+ return in.scorer(context, acceptDocs);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public boolean equals(Object o) {
|
|
|
- if (this == o) return true;
|
|
|
- CleanupKey that = (CleanupKey) o;
|
|
|
- if (readerVersion != that.readerVersion) return false;
|
|
|
- if (!indexShard.equals(that.indexShard)) return false;
|
|
|
- return true;
|
|
|
+ /** Clear all entries that belong to the given index. */
|
|
|
+ public void clearIndex(String index) {
|
|
|
+ final Set<Object> coreCacheKeys = shardKeyMap.getCoreKeysForIndex(index);
|
|
|
+ for (Object coreKey : coreCacheKeys) {
|
|
|
+ cache.clearCoreCacheKey(coreKey);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public int hashCode() {
|
|
|
- int result = indexShard.hashCode();
|
|
|
- result = 31 * result + (int) (readerVersion ^ (readerVersion >>> 32));
|
|
|
- return result;
|
|
|
+ // This cache stores two things: filters, and doc id sets. Calling
|
|
|
+ // clear only removes the doc id sets, but if we reach the situation
|
|
|
+ // that the cache does not contain any DocIdSet anymore, then it
|
|
|
+ // probably means that the user wanted to remove everything.
|
|
|
+ if (cache.getCacheSize() == 0) {
|
|
|
+ cache.clear();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private class Reaper implements Runnable {
|
|
|
+ @Override
|
|
|
+ public void close() {
|
|
|
+ assert shardKeyMap.size() == 0 : shardKeyMap.size();
|
|
|
+ assert shardStats.isEmpty() : shardStats.keySet();
|
|
|
+ assert stats2.isEmpty() : stats2;
|
|
|
+ cache.clear();
|
|
|
+ }
|
|
|
|
|
|
- private final ObjectSet<CleanupKey> currentKeysToClean = new ObjectHashSet<>();
|
|
|
- private final ObjectSet<IndexShard> currentFullClean = new ObjectHashSet<>();
|
|
|
+ private static class Stats implements Cloneable {
|
|
|
|
|
|
- private volatile boolean closed;
|
|
|
+ volatile long ramBytesUsed;
|
|
|
+ volatile long hitCount;
|
|
|
+ volatile long missCount;
|
|
|
+ volatile long cacheCount;
|
|
|
+ volatile long cacheSize;
|
|
|
|
|
|
- void close() {
|
|
|
- closed = true;
|
|
|
+ QueryCacheStats toQueryCacheStats() {
|
|
|
+ return new QueryCacheStats(ramBytesUsed, hitCount, missCount, cacheCount, cacheSize);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- if (closed) {
|
|
|
- return;
|
|
|
- }
|
|
|
- if (keysToClean.isEmpty()) {
|
|
|
- schedule();
|
|
|
- return;
|
|
|
- }
|
|
|
- try {
|
|
|
- threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- reap();
|
|
|
- schedule();
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (EsRejectedExecutionException ex) {
|
|
|
- logger.debug("Can not run ReaderCleaner - execution rejected", ex);
|
|
|
- }
|
|
|
- }
|
|
|
+ private static class StatsAndCount {
|
|
|
+ int count;
|
|
|
+ final Stats stats;
|
|
|
|
|
|
- private void schedule() {
|
|
|
- try {
|
|
|
- threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this);
|
|
|
- } catch (EsRejectedExecutionException ex) {
|
|
|
- logger.debug("Can not schedule ReaderCleaner - execution rejected", ex);
|
|
|
- }
|
|
|
+ StatsAndCount(Stats stats) {
|
|
|
+ this.stats = stats;
|
|
|
+ this.count = 0;
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- synchronized void reap() {
|
|
|
- currentKeysToClean.clear();
|
|
|
- currentFullClean.clear();
|
|
|
- for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext(); ) {
|
|
|
- CleanupKey cleanupKey = iterator.next();
|
|
|
- iterator.remove();
|
|
|
- if (cleanupKey.readerVersion == -1 || cleanupKey.indexShard.state() == IndexShardState.CLOSED) {
|
|
|
- // -1 indicates full cleanup, as does a closed shard
|
|
|
- currentFullClean.add(cleanupKey.indexShard);
|
|
|
- } else {
|
|
|
- currentKeysToClean.add(cleanupKey);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) {
|
|
|
- CleanupKey lookupKey = new CleanupKey(null, -1);
|
|
|
- for (Iterator<Key> iterator = cache.asMap().keySet().iterator(); iterator.hasNext(); ) {
|
|
|
- Key key = iterator.next();
|
|
|
- if (currentFullClean.contains(key.shard)) {
|
|
|
- iterator.remove();
|
|
|
- } else {
|
|
|
- lookupKey.indexShard = key.shard;
|
|
|
- lookupKey.readerVersion = key.readerVersion;
|
|
|
- if (currentKeysToClean.contains(lookupKey)) {
|
|
|
- iterator.remove();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- cache.cleanUp();
|
|
|
- currentKeysToClean.clear();
|
|
|
- currentFullClean.clear();
|
|
|
+ private boolean empty(Stats stats) {
|
|
|
+ if (stats == null) {
|
|
|
+ return true;
|
|
|
}
|
|
|
+ return stats.cacheSize == 0 && stats.ramBytesUsed == 0;
|
|
|
}
|
|
|
|
|
|
- private static Key buildKey(ShardSearchRequest request, SearchContext context) throws Exception {
|
|
|
- // TODO: for now, this will create different keys for different JSON order
|
|
|
- // TODO: tricky to get around this, need to parse and order all, which can be expensive
|
|
|
- return new Key(context.indexShard(),
|
|
|
- ((DirectoryReader) context.searcher().getIndexReader()).getVersion(),
|
|
|
- request.cacheKey());
|
|
|
+ public void onClose(ShardId shardId) {
|
|
|
+ assert empty(shardStats.get(shardId));
|
|
|
+ shardStats.remove(shardId);
|
|
|
}
|
|
|
}
|