فهرست منبع

Add a periodic cleanup thread for IndexFieldCache caches

Fixes #7010
Lee Hinman 11 سال پیش
والد
کامیت
89e03910f4

+ 4 - 1
src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java

@@ -42,6 +42,7 @@ import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -143,7 +144,9 @@ public class IndexFieldDataService extends AbstractIndexComponent {
 
     // public for testing
     public IndexFieldDataService(Index index, CircuitBreakerService circuitBreakerService) {
-        this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCacheListener(circuitBreakerService)), circuitBreakerService, new IndicesFieldDataCacheListener(circuitBreakerService));
+        this(index, ImmutableSettings.Builder.EMPTY_SETTINGS,
+                new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCacheListener(circuitBreakerService), new ThreadPool("testing-only")),
+                circuitBreakerService, new IndicesFieldDataCacheListener(circuitBreakerService));
     }
 
     // public for testing

+ 63 - 5
src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java

@@ -32,15 +32,13 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.Index;
-import org.elasticsearch.index.fielddata.AtomicFieldData;
-import org.elasticsearch.index.fielddata.FieldDataType;
-import org.elasticsearch.index.fielddata.IndexFieldData;
-import org.elasticsearch.index.fielddata.IndexFieldDataCache;
+import org.elasticsearch.index.fielddata.*;
 import org.elasticsearch.index.mapper.FieldMapper;
 import org.elasticsearch.index.service.IndexService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardUtils;
 import org.elasticsearch.index.shard.service.IndexShard;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -51,12 +49,18 @@ import java.util.concurrent.TimeUnit;
  */
 public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, Accountable> {
 
+    public static final String FIELDDATA_CLEAN_INTERVAL_SETTING = "indices.fielddata.cache.cleanup_interval";
+
     private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
     private final Cache<Key, Accountable> cache;
+    private final TimeValue cleanInterval;
+    private final ThreadPool threadPool;
+    private volatile boolean closed = false;
 
     @Inject
-    public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
+    public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener, ThreadPool threadPool) {
         super(settings);
+        this.threadPool = threadPool;
         this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
         String size = componentSettings.get("size", "-1");
         long sizeInBytes = componentSettings.getAsMemory("size", "-1").bytes();
@@ -79,10 +83,16 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
         }
         logger.debug("using size [{}] [{}], expire [{}]", size, new ByteSizeValue(sizeInBytes), expire);
         cache = cacheBuilder.build();
+
+        this.cleanInterval = settings.getAsTime(FIELDDATA_CLEAN_INTERVAL_SETTING, TimeValue.timeValueMinutes(1));
+        // Start thread that will manage cleaning the field data cache periodically
+        threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME,
+                new FieldDataCacheCleaner(this.cache, this.logger, this.threadPool, this.cleanInterval));
     }
 
     public void close() {
         cache.invalidateAll();
+        this.closed = true;
     }
 
     public IndexFieldDataCache buildIndexFieldDataCache(IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
@@ -146,6 +156,13 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
             assert indexService != null;
         }
 
+        /**
+         * Clean up the internal Guava cache
+         */
+        public void cleanUp() {
+            cache.cleanUp();
+        }
+
         @Override
         public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(final AtomicReaderContext context, final IFD indexFieldData) throws Exception {
             final Key key = new Key(this, context.reader().getCoreCacheKey());
@@ -288,4 +305,45 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
             return result;
         }
     }
+
+    /**
+     * FieldDataCacheCleaner is a scheduled Runnable used to clean a Guava cache
+     * periodically. In this case it is the field data cache, because a cache that
+     * has an entry invalidated may not clean up the entry if it is not read from
+     * or written to after invalidation.
+     */
+    public class FieldDataCacheCleaner implements Runnable {
+
+        private final Cache<Key, Accountable> cache;
+        private final ESLogger logger;
+        private final ThreadPool threadPool;
+        private final TimeValue interval;
+
+        public FieldDataCacheCleaner(Cache cache, ESLogger logger, ThreadPool threadPool, TimeValue interval) {
+            this.cache = cache;
+            this.logger = logger;
+            this.threadPool = threadPool;
+            this.interval = interval;
+        }
+
+        @Override
+        public void run() {
+            long startTime = System.currentTimeMillis();
+            if (logger.isTraceEnabled()) {
+                logger.trace("running periodic field data cache cleanup");
+            }
+            try {
+                this.cache.cleanUp();
+            } catch (Exception e) {
+                logger.warn("Exception during periodic field data cache cleanup:", e);
+            }
+            if (logger.isTraceEnabled()) {
+                logger.trace("periodic field data cache cleanup finished in {} milliseconds", System.currentTimeMillis() - startTime);
+            }
+            // Reschedule itself to run again if not closed
+            if (closed == false) {
+                threadPool.schedule(interval, ThreadPool.Names.SAME, this);
+            }
+        }
+    }
 }