Browse Source

Ensure external refreshes will also refresh internal searcher to minimize segment creation (#27253)

We cut over to internal and external IndexReader/IndexSearcher in #26972 which uses
two independent searcher managers. This has the downside that refreshes of the external
reader will never clear the internal version map which in-turn will trigger additional
and potentially unnecessary segment flushes since memory must be freed. Under heavy
indexing load with low refresh intervals this can cause excessive segment creation which
causes high GC activity and significantly increases the required segment merges.

This change adds a dedicated external reference manager that delegates refreshes to the
internal reference manager that then `steals` the refreshed reader from the internal
reference manager for external usage. This ensures that external and internal readers
are consistent on an external refresh. As a sideeffect this also releases old segments
referenced by the internal reference manager which can potentially hold on to already merged
away segments until it is refreshed due to a flush or indexing activity.
Simon Willnauer 8 years ago
parent
commit
a34c2f0b8d

+ 14 - 8
core/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -36,7 +36,7 @@ import org.apache.lucene.index.SegmentReader;
 import org.apache.lucene.index.SnapshotDeletionPolicy;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.ReferenceManager;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
@@ -170,7 +170,7 @@ public abstract class Engine implements Closeable {
         return IndexWriter.SOURCE_MERGE.equals(source);
     }
 
-    protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
+    protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager) {
         return new EngineSearcher(source, searcher, manager, store, logger);
     }
 
@@ -531,7 +531,7 @@ public abstract class Engine implements Closeable {
           * the searcher is acquired. */
         store.incRef();
         try {
-            final SearcherManager manager = getSearcherManager(source, scope); // can never be null
+            final ReferenceManager<IndexSearcher> manager = getSearcherManager(source, scope); // can never be null
             /* This might throw NPE but that's fine we will run ensureOpen()
             *  in the catch block and throw the right exception */
             final IndexSearcher searcher = manager.acquire();
@@ -585,7 +585,7 @@ public abstract class Engine implements Closeable {
     /**
      * Read the last segments info from the commit pointed to by the searcher manager
      */
-    protected static SegmentInfos readLastCommittedSegmentInfos(final SearcherManager sm, final Store store) throws IOException {
+    protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager<IndexSearcher> sm, final Store store) throws IOException {
         IndexSearcher searcher = sm.acquire();
         try {
             IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit();
@@ -787,13 +787,19 @@ public abstract class Engine implements Closeable {
     public final boolean refreshNeeded() {
         if (store.tryIncRef()) {
             /*
-              we need to inc the store here since searcherManager.isSearcherCurrent()
-              acquires a searcher internally and that might keep a file open on the
+              we need to inc the store here since we acquire a searcher and that might keep a file open on the
               store. this violates the assumption that all files are closed when
               the store is closed so we need to make sure we increment it here
              */
             try {
-                return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false;
+                ReferenceManager<IndexSearcher> manager = getSearcherManager("refresh_needed", SearcherScope.EXTERNAL);
+                final IndexSearcher searcher =  manager.acquire();
+                try {
+                    final IndexReader r = searcher.getIndexReader();
+                    return ((DirectoryReader) r).isCurrent() == false;
+                } finally {
+                    manager.release(searcher);
+                }
             } catch (IOException e) {
                 logger.error("failed to access searcher manager", e);
                 failEngine("failed to access searcher manager", e);
@@ -1331,7 +1337,7 @@ public abstract class Engine implements Closeable {
         }
     }
 
-    protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);
+    protected abstract ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope);
 
     /**
      * Method to close the engine while the write lock is held.

+ 3 - 2
core/src/main/java/org/elasticsearch/index/engine/EngineSearcher.java

@@ -21,6 +21,7 @@ package org.elasticsearch.index.engine;
 
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ReferenceManager;
 import org.apache.lucene.search.SearcherManager;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.index.store.Store;
@@ -32,12 +33,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * Searcher for an Engine
  */
 public class EngineSearcher extends Engine.Searcher {
-    private final SearcherManager manager;
+    private final ReferenceManager<IndexSearcher> manager;
     private final AtomicBoolean released = new AtomicBoolean(false);
     private final Store store;
     private final Logger logger;
 
-    public EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager, Store store, Logger logger) {
+    public EngineSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager, Store store, Logger logger) {
         super(source, searcher);
         this.manager = manager;
         this.store = store;

+ 90 - 21
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -48,6 +48,7 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lucene.LoggerInfoStream;
@@ -57,7 +58,6 @@ import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
 import org.elasticsearch.common.metrics.CounterMetric;
-import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.KeyedLock;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
@@ -108,7 +108,7 @@ public class InternalEngine extends Engine {
 
     private final IndexWriter indexWriter;
 
-    private final SearcherManager externalSearcherManager;
+    private final ExternalSearcherManager externalSearcherManager;
     private final SearcherManager internalSearcherManager;
 
     private final Lock flushLock = new ReentrantLock();
@@ -172,7 +172,7 @@ public class InternalEngine extends Engine {
         store.incRef();
         IndexWriter writer = null;
         Translog translog = null;
-        SearcherManager externalSearcherManager = null;
+        ExternalSearcherManager externalSearcherManager = null;
         SearcherManager internalSearcherManager = null;
         EngineMergeScheduler scheduler = null;
         boolean success = false;
@@ -224,8 +224,8 @@ public class InternalEngine extends Engine {
                     throw e;
                 }
             }
-            internalSearcherManager = createSearcherManager(new SearcherFactory(), false);
-            externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true);
+            externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig));
+            internalSearcherManager = externalSearcherManager.internalSearcherManager;
             this.internalSearcherManager = internalSearcherManager;
             this.externalSearcherManager = externalSearcherManager;
             internalSearcherManager.addListener(versionMap);
@@ -238,7 +238,7 @@ public class InternalEngine extends Engine {
             success = true;
         } finally {
             if (success == false) {
-                IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler);
+                IOUtils.closeWhileHandlingException(writer, translog, internalSearcherManager, externalSearcherManager, scheduler);
                 if (isClosed.get() == false) {
                     // failure we need to dec the store reference
                     store.decRef();
@@ -248,6 +248,75 @@ public class InternalEngine extends Engine {
         logger.trace("created new InternalEngine");
     }
 
+    /**
+     * This reference manager delegates all it's refresh calls to another (internal) SearcherManager
+     * The main purpose for this is that if we have external refreshes happening we don't issue extra
+     * refreshes to clear version map memory etc. this can cause excessive segment creation if heavy indexing
+     * is happening and the refresh interval is low (ie. 1 sec)
+     *
+     * This also prevents segment starvation where an internal reader holds on to old segments literally forever
+     * since no indexing is happening and refreshes are only happening to the external reader manager, while with
+     * this specialized implementation an external refresh will immediately be reflected on the internal reader
+     * and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
+     */
+    @SuppressForbidden(reason = "reference counting is required here")
+    private static final class ExternalSearcherManager extends ReferenceManager<IndexSearcher> {
+        private final SearcherFactory searcherFactory;
+        private final SearcherManager internalSearcherManager;
+
+        ExternalSearcherManager(SearcherManager internalSearcherManager, SearcherFactory searcherFactory) throws IOException {
+            IndexSearcher acquire = internalSearcherManager.acquire();
+            try {
+                IndexReader indexReader = acquire.getIndexReader();
+                assert indexReader instanceof ElasticsearchDirectoryReader:
+                    "searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + indexReader;
+                indexReader.incRef(); // steal the reader - getSearcher will decrement if it fails
+                current = SearcherManager.getSearcher(searcherFactory, indexReader, null);
+            } finally {
+                internalSearcherManager.release(acquire);
+            }
+            this.searcherFactory = searcherFactory;
+            this.internalSearcherManager = internalSearcherManager;
+        }
+
+        @Override
+        protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
+            // we simply run a blocking refresh on the internal reference manager and then steal it's reader
+            // it's a save operation since we acquire the reader which incs it's reference but then down the road
+            // steal it by calling incRef on the "stolen" reader
+            internalSearcherManager.maybeRefreshBlocking();
+            IndexSearcher acquire = internalSearcherManager.acquire();
+            final IndexReader previousReader = referenceToRefresh.getIndexReader();
+            assert previousReader instanceof ElasticsearchDirectoryReader:
+                "searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + previousReader;
+            try {
+                final IndexReader newReader = acquire.getIndexReader();
+                if (newReader == previousReader) {
+                    // nothing has changed - both ref managers share the same instance so we can use reference equality
+                    return null;
+                } else {
+                    newReader.incRef(); // steal the reader - getSearcher will decrement if it fails
+                    return SearcherManager.getSearcher(searcherFactory, newReader, previousReader);
+                }
+            } finally {
+                internalSearcherManager.release(acquire);
+            }
+        }
+
+        @Override
+        protected boolean tryIncRef(IndexSearcher reference) {
+            return reference.getIndexReader().tryIncRef();
+        }
+
+        @Override
+        protected int getRefCount(IndexSearcher reference) {
+            return reference.getIndexReader().getRefCount();
+        }
+
+        @Override
+        protected void decRef(IndexSearcher reference) throws IOException { reference.getIndexReader().decRef(); }
+    }
+
     @Override
     public void restoreLocalCheckpointFromTranslog() throws IOException {
         try (ReleasableLock ignored = writeLock.acquire()) {
@@ -456,18 +525,18 @@ public class InternalEngine extends Engine {
         return uuid;
     }
 
-    private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException {
+    private ExternalSearcherManager createSearcherManager(SearchFactory externalSearcherFactory) throws EngineException {
         boolean success = false;
-        SearcherManager searcherManager = null;
+        SearcherManager internalSearcherManager = null;
         try {
             try {
                 final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
-                searcherManager = new SearcherManager(directoryReader, searcherFactory);
-                if (readSegmentsInfo) {
-                    lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
-                }
+                internalSearcherManager = new SearcherManager(directoryReader, new SearcherFactory());
+                lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
+                ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
+                    externalSearcherFactory);
                 success = true;
-                return searcherManager;
+                return externalSearcherManager;
             } catch (IOException e) {
                 maybeFailEngine("start", e);
                 try {
@@ -479,7 +548,7 @@ public class InternalEngine extends Engine {
             }
         } finally {
             if (success == false) { // release everything we created on a failure
-                IOUtils.closeWhileHandlingException(searcherManager, indexWriter);
+                IOUtils.closeWhileHandlingException(internalSearcherManager, indexWriter);
             }
         }
     }
@@ -1229,24 +1298,24 @@ public class InternalEngine extends Engine {
     }
 
     final void refresh(String source, SearcherScope scope) throws EngineException {
-        long bytes = 0;
         // we obtain a read lock here, since we don't want a flush to happen while we are refreshing
         // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
+        // both refresh types will result in an internal refresh but only the external will also
+        // pass the new reader reference to the external reader manager.
+
+        // this will also cause version map ram to be freed hence we always account for it.
+        final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
+        writingBytes.addAndGet(bytes);
         try (ReleasableLock lock = readLock.acquire()) {
             ensureOpen();
-            bytes = indexWriter.ramBytesUsed();
             switch (scope) {
                 case EXTERNAL:
                     // even though we maintain 2 managers we really do the heavy-lifting only once.
                     // the second refresh will only do the extra work we have to do for warming caches etc.
-                    writingBytes.addAndGet(bytes);
                     externalSearcherManager.maybeRefreshBlocking();
                     // the break here is intentional we never refresh both internal / external together
                     break;
                 case INTERNAL:
-                    final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
-                    bytes += versionMapBytes;
-                    writingBytes.addAndGet(bytes);
                     internalSearcherManager.maybeRefreshBlocking();
                     break;
                 default:
@@ -1709,7 +1778,7 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
+    protected ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope) {
         switch (scope) {
             case INTERNAL:
                 return internalSearcherManager;

+ 32 - 3
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -3882,7 +3882,7 @@ public class InternalEngineTests extends EngineTestCase {
         List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
         assertEquals(rightLeaves.size(), leftLeaves.size());
         for (int i = 0; i < leftLeaves.size(); i++) {
-            assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
+            assertSame(leftLeaves.get(i).reader(), rightLeaves.get(i).reader());
         }
     }
 
@@ -3891,7 +3891,7 @@ public class InternalEngineTests extends EngineTestCase {
         List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
         if (rightLeaves.size() == leftLeaves.size()) {
             for (int i = 0; i < leftLeaves.size(); i++) {
-                if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
+                if (leftLeaves.get(i).reader() != rightLeaves.get(i).reader()) {
                     return; // all is well
                 }
             }
@@ -3919,7 +3919,6 @@ public class InternalEngineTests extends EngineTestCase {
             assertEquals(0, searchSearcher.reader().numDocs());
             assertNotSameReader(getSearcher, searchSearcher);
         }
-
         engine.refresh("test", Engine.SearcherScope.EXTERNAL);
 
         try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
@@ -3928,6 +3927,36 @@ public class InternalEngineTests extends EngineTestCase {
             assertEquals(10, searchSearcher.reader().numDocs());
             assertSameReader(getSearcher, searchSearcher);
         }
+
+        // now ensure external refreshes are reflected on the internal reader
+        final String docId = Integer.toString(10);
+        final ParsedDocument doc =
+            testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
+        Engine.Index primaryResponse = indexForDoc(doc);
+        engine.index(primaryResponse);
+
+        engine.refresh("test", Engine.SearcherScope.EXTERNAL);
+
+        try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
+             Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
+            assertEquals(11, getSearcher.reader().numDocs());
+            assertEquals(11, searchSearcher.reader().numDocs());
+            assertSameReader(getSearcher, searchSearcher);
+        }
+
+        try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
+            engine.refresh("test", Engine.SearcherScope.INTERNAL);
+            try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
+                assertSame(searcher.searcher(), nextSearcher.searcher());
+            }
+        }
+
+        try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
+            engine.refresh("test", Engine.SearcherScope.EXTERNAL);
+            try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
+                assertSame(searcher.searcher(), nextSearcher.searcher());
+            }
+        }
     }
 
     public void testSeqNoGenerator() throws IOException {

+ 5 - 2
test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java

@@ -26,6 +26,7 @@ import org.apache.lucene.search.AssertingIndexSearcher;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.QueryCache;
 import org.apache.lucene.search.QueryCachingPolicy;
+import org.apache.lucene.search.ReferenceManager;
 import org.apache.lucene.search.SearcherManager;
 import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.ElasticsearchException;
@@ -133,7 +134,8 @@ public final class MockEngineSupport {
         }
     }
 
-    public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
+    public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher,
+                                              ReferenceManager<IndexSearcher> manager) throws EngineException {
         IndexReader reader = searcher.getIndexReader();
         IndexReader wrappedReader = reader;
         assert reader != null;
@@ -182,7 +184,8 @@ public final class MockEngineSupport {
 
     }
 
-    public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, SearcherManager manager) {
+    public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher,
+                                        ReferenceManager<IndexSearcher> manager) {
         final AssertingIndexSearcher assertingIndexSearcher = newSearcher(source, searcher, manager);
         assertingIndexSearcher.setSimilarity(searcher.getSimilarity(true));
         // pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java

@@ -20,6 +20,7 @@ package org.elasticsearch.test.engine;
 
 import org.apache.lucene.index.FilterDirectoryReader;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ReferenceManager;
 import org.apache.lucene.search.SearcherManager;
 import org.elasticsearch.index.engine.EngineConfig;
 import org.elasticsearch.index.engine.EngineException;
@@ -78,7 +79,7 @@ final class MockInternalEngine extends InternalEngine {
     }
 
     @Override
-    protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
+    protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager) throws EngineException {
         final Searcher engineSearcher = super.newSearcher(source, searcher, manager);
         return support().wrapSearcher(source, engineSearcher, searcher, manager);
     }