Browse Source

Add a frozen engine implementation (#34357)

This change adds a `frozen` engine that allows lazily open a directory reader
on a read-only shard. The engine wraps general purpose searchers in a LazyDirectoryReader
that also allows to release and reset the underlying index readers after any and before
secondary search phases.

Relates to #34352
Simon Willnauer 7 years ago
parent
commit
0cc0fd2d15

+ 0 - 1
libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java

@@ -277,5 +277,4 @@ public final class IOUtils {
             throw ioe;
         }
     }
-
 }

+ 101 - 1
server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

@@ -29,8 +29,12 @@ import org.apache.lucene.codecs.PostingsFormat;
 import org.apache.lucene.document.LatLonDocValuesField;
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.index.BinaryDocValues;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.Fields;
 import org.apache.lucene.index.FilterCodecReader;
 import org.apache.lucene.index.FilterDirectoryReader;
 import org.apache.lucene.index.FilterLeafReader;
@@ -40,13 +44,20 @@ import org.apache.lucene.index.IndexFormatTooNewException;
 import org.apache.lucene.index.IndexFormatTooOldException;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.LeafMetaData;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.NumericDocValues;
+import org.apache.lucene.index.PointValues;
 import org.apache.lucene.index.SegmentCommitInfo;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedNumericDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.index.StoredFieldVisitor;
+import org.apache.lucene.index.Terms;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.Explanation;
 import org.apache.lucene.search.FieldDoc;
@@ -209,7 +220,7 @@ public class Lucene {
                 throw new IllegalStateException("no commit found in the directory");
             }
         }
-        final CommitPoint cp = new CommitPoint(si, directory);
+        final IndexCommit cp = getIndexCommit(si, directory);
         try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
                 .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
                 .setIndexCommit(cp)
@@ -221,6 +232,13 @@ public class Lucene {
         return si;
     }
 
+    /**
+     * Returns an index commit for the given {@link SegmentInfos} in the given directory.
+     */
+    public static IndexCommit getIndexCommit(SegmentInfos si, Directory directory) throws IOException {
+        return new CommitPoint(si, directory);
+    }
+
     /**
      * This method removes all lucene files from the given directory. It will first try to delete all commit points / segments
      * files to ensure broken commits or corrupted indices will not be opened in the future. If any of the segment files can't be deleted
@@ -973,6 +991,88 @@ public class Lucene {
         return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
     }
 
+    /**
+     * Returns an empty leaf reader with the given max docs. The reader will be fully deleted.
+     */
+    public static LeafReader emptyReader(final int maxDoc) {
+        return new LeafReader() {
+            final Bits liveDocs = new Bits.MatchNoBits(maxDoc);
+
+            public Terms terms(String field) {
+                return null;
+            }
+
+            public NumericDocValues getNumericDocValues(String field) {
+                return null;
+            }
+
+            public BinaryDocValues getBinaryDocValues(String field) {
+                return null;
+            }
+
+            public SortedDocValues getSortedDocValues(String field) {
+                return null;
+            }
+
+            public SortedNumericDocValues getSortedNumericDocValues(String field) {
+                return null;
+            }
+
+            public SortedSetDocValues getSortedSetDocValues(String field) {
+                return null;
+            }
+
+            public NumericDocValues getNormValues(String field) {
+                return null;
+            }
+
+            public FieldInfos getFieldInfos() {
+                return new FieldInfos(new FieldInfo[0]);
+            }
+
+            public Bits getLiveDocs() {
+                return this.liveDocs;
+            }
+
+            public PointValues getPointValues(String fieldName) {
+                return null;
+            }
+
+            public void checkIntegrity() {
+            }
+
+            public Fields getTermVectors(int docID) {
+                return null;
+            }
+
+            public int numDocs() {
+                return 0;
+            }
+
+            public int maxDoc() {
+                return maxDoc;
+            }
+
+            public void document(int docID, StoredFieldVisitor visitor) {
+            }
+
+            protected void doClose() {
+            }
+
+            public LeafMetaData getMetaData() {
+                return new LeafMetaData(Version.LATEST.major, Version.LATEST, (Sort)null);
+            }
+
+            public CacheHelper getCoreCacheHelper() {
+                return null;
+            }
+
+            public CacheHelper getReaderCacheHelper() {
+                return null;
+            }
+        };
+    }
+
     /**
      * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
      * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.

+ 1 - 1
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -793,7 +793,7 @@ public abstract class Engine implements Closeable {
     /**
      * Global stats on segments.
      */
-    public final SegmentsStats segmentsStats(boolean includeSegmentFileSizes) {
+    public SegmentsStats segmentsStats(boolean includeSegmentFileSizes) {
         ensureOpen();
         Set<String> segmentName = new HashSet<>();
         SegmentsStats stats = new SegmentsStats();

+ 5 - 1
server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java

@@ -48,6 +48,11 @@ final class RamAccountingSearcherFactory extends SearcherFactory {
 
     @Override
     public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
+        processReaders(reader, previousReader);
+        return super.newSearcher(reader, previousReader);
+    }
+
+    public void processReaders(IndexReader reader, IndexReader previousReader) {
         final CircuitBreaker breaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
 
         // Construct a list of the previous segment readers, we only want to track memory used
@@ -79,6 +84,5 @@ final class RamAccountingSearcherFactory extends SearcherFactory {
                 segmentReader.getCoreCacheHelper().addClosedListener(k -> breaker.addWithoutBreaking(-ramBytesUsed));
             }
         }
-        return super.newSearcher(reader, previousReader);
     }
 }

+ 18 - 11
server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

@@ -57,7 +57,7 @@ import java.util.stream.Stream;
  *
  * @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function)
  */
-public final class ReadOnlyEngine extends Engine {
+public class ReadOnlyEngine extends Engine {
 
     private final SegmentInfos lastCommittedSegmentInfos;
     private final SeqNoStats seqNoStats;
@@ -66,6 +66,7 @@ public final class ReadOnlyEngine extends Engine {
     private final IndexCommit indexCommit;
     private final Lock indexWriterLock;
     private final DocsStats docsStats;
+    protected final RamAccountingSearcherFactory searcherFactory;
 
     /**
      * Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
@@ -82,6 +83,7 @@ public final class ReadOnlyEngine extends Engine {
     public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock,
                    Function<DirectoryReader, DirectoryReader> readerWrapperFunction) {
         super(config);
+        this.searcherFactory = new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService());
         try {
             Store store = config.getStore();
             store.incRef();
@@ -96,14 +98,10 @@ public final class ReadOnlyEngine extends Engine {
                 this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
                 this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
                 this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
-                reader = ElasticsearchDirectoryReader.wrap(open(directory), config.getShardId());
-                if (config.getIndexSettings().isSoftDeleteEnabled()) {
-                    reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
-                }
-                reader = readerWrapperFunction.apply(reader);
-                this.indexCommit = reader.getIndexCommit();
-                this.searcherManager = new SearcherManager(reader,
-                    new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
+                this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
+                reader = open(indexCommit);
+                reader = wrapReader(reader, readerWrapperFunction);
+                searcherManager = new SearcherManager(reader, searcherFactory);
                 this.docsStats = docsStats(lastCommittedSegmentInfos);
                 this.indexWriterLock = indexWriterLock;
                 success = true;
@@ -117,8 +115,17 @@ public final class ReadOnlyEngine extends Engine {
         }
     }
 
-    protected DirectoryReader open(final Directory directory) throws IOException {
-        return DirectoryReader.open(directory);
+    protected final DirectoryReader wrapReader(DirectoryReader reader,
+                                                    Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
+        reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
+        if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
+            reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
+        }
+        return readerWrapperFunction.apply(reader);
+    }
+
+    protected DirectoryReader open(IndexCommit commit) throws IOException {
+        return DirectoryReader.open(commit);
     }
 
     private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {

+ 2 - 2
server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

@@ -154,7 +154,7 @@ final class DefaultSearchContext extends SearchContext {
     private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
     private final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
     private final QueryShardContext queryShardContext;
-    private FetchPhase fetchPhase;
+    private final FetchPhase fetchPhase;
 
     DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
                          Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
@@ -186,7 +186,7 @@ final class DefaultSearchContext extends SearchContext {
 
     @Override
     public void doClose() {
-        // clear and scope phase we  have
+        // clear and scope phase we have
         Releasables.close(searcher, engineSearcher);
     }
 

+ 4 - 0
server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java

@@ -217,4 +217,8 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
     public DirectoryReader getDirectoryReader() {
         return engineSearcher.getDirectoryReader();
     }
+
+    public Engine.Searcher getEngineSearcher() {
+        return engineSearcher;
+    }
 }

+ 3 - 3
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -5057,15 +5057,15 @@ public class InternalEngineTests extends EngineTestCase {
                  null,
                  new ReferenceManager.RefreshListener() {
                      @Override
-                     public void beforeRefresh() throws IOException {
+                     public void beforeRefresh() {
                          refreshCounter.incrementAndGet();
                      }
 
                      @Override
-                     public void afterRefresh(boolean didRefresh) throws IOException {
+                     public void afterRefresh(boolean didRefresh) {
 
                      }
-                 }, null, () -> SequenceNumbers.NO_OPS_PERFORMED))) {
+                 }, null, () -> SequenceNumbers.NO_OPS_PERFORMED, new NoneCircuitBreakerService()))) {
             for (long seqNo = 0; seqNo <= maxSeqNo; seqNo++) {
                 final ParsedDocument doc = testParsedDocument("id_" + seqNo, null, testDocumentWithTextField("test"),
                     new BytesArray("{}".getBytes(Charset.defaultCharset())), null);

+ 5 - 3
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -88,6 +88,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogConfig;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.test.DummyShardLock;
 import org.elasticsearch.test.ESTestCase;
@@ -573,13 +574,14 @@ public abstract class EngineTestCase extends ESTestCase {
 
     public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
                                ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) {
-        return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier);
+        return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier,
+            new NoneCircuitBreakerService());
     }
 
     public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
                                ReferenceManager.RefreshListener externalRefreshListener,
                                ReferenceManager.RefreshListener internalRefreshListener,
-                               Sort indexSort, LongSupplier globalCheckpointSupplier) {
+                               Sort indexSort, LongSupplier globalCheckpointSupplier, CircuitBreakerService breakerService) {
             IndexWriterConfig iwc = newIndexWriterConfig();
         TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
         Engine.EventListener listener = new Engine.EventListener() {
@@ -596,7 +598,7 @@ public abstract class EngineTestCase extends ESTestCase {
                 mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
                 IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
                 TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort,
-                new NoneCircuitBreakerService(),
+                breakerService,
                 globalCheckpointSupplier == null ?
                     new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :
                     globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier());

+ 518 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java

@@ -0,0 +1,518 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.index.engine;
+
+import org.apache.lucene.index.BinaryDocValues;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.FilterDirectoryReader;
+import org.apache.lucene.index.FilterLeafReader;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.LeafMetaData;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NumericDocValues;
+import org.apache.lucene.index.PointValues;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedNumericDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.index.StoredFieldVisitor;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.util.Bits;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.core.internal.io.IOUtils;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * This is a stand-alone read-only engine that maintains a lazy loaded index reader that is opened on calls to
+ * {@link Engine#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore and then
+ * releases itself from the engine. The readers returned from this engine are lazy which allows release after and reset before a search
+ * phase starts. This allows releasing references as soon as possible on the search layer.
+ *
+ * Internally this class uses a set of wrapper abstractions to allow a reader that is used inside the {@link Engine.Searcher} returned from
+ * {@link #acquireSearcher(String, SearcherScope)} to release and reset it's internal resources. This is necessary to for instance release
+ * all SegmentReaders after a search phase finishes and reopen them before the next search phase starts. This together with a throttled
+ * threadpool (search_throttled) guarantees that at most N frozen shards have a low level index reader open at the same time.
+ *
+ * In particular we have LazyDirectoryReader that wraps its LeafReaders (the actual segment readers) inside LazyLeafReaders. Each of the
+ * LazyLeafReader delegates to segment LeafReader that can be reset (it's reference decremented and nulled out) on a search phase is
+ * finished. Before the next search phase starts we can reopen the corresponding reader and reset the reference to execute the search phase.
+ * This allows the SearchContext to hold on to the same LazyDirectoryReader across its lifecycle but under the hood resources (memory) is
+ * released while the SearchContext phases are not executing.
+ *
+ * The internal reopen of readers is treated like a refresh and refresh listeners are called up-on reopen. This allows to consume refresh
+ * stats in order to obtain the number of reopens.
+ */
+public final class FrozenEngine extends ReadOnlyEngine {
+    private volatile DirectoryReader lastOpenedReader;
+
+    public FrozenEngine(EngineConfig config) {
+        super(config, null, null, true, Function.identity());
+    }
+
+    @Override
+    protected DirectoryReader open(IndexCommit indexCommit) throws IOException {
+        // we fake an empty DirectoryReader for the ReadOnlyEngine. this reader is only used
+        // to initialize the reference manager and to make the refresh call happy which is essentially
+        // a no-op now
+        return new DirectoryReader(indexCommit.getDirectory(), new LeafReader[0]) {
+            @Override
+            protected DirectoryReader doOpenIfChanged() {
+                return null;
+            }
+
+            @Override
+            protected DirectoryReader doOpenIfChanged(IndexCommit commit) {
+                return null;
+            }
+
+            @Override
+            protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) {
+                return null;
+            }
+
+            @Override
+            public long getVersion() {
+                return 0;
+            }
+
+            @Override
+            public boolean isCurrent() {
+                return true; // always current
+            }
+
+            @Override
+            public IndexCommit getIndexCommit() {
+                return indexCommit; // TODO maybe we can return an empty commit?
+            }
+
+            @Override
+            protected void doClose() {
+            }
+
+            @Override
+            public CacheHelper getReaderCacheHelper() {
+                return null;
+            }
+        };
+    }
+
+    @SuppressForbidden(reason = "we manage references explicitly here")
+    private synchronized void onReaderClosed(IndexReader.CacheKey key) {
+        // it might look awkward that we have to check here if the keys match but if we concurrently
+        // access the lastOpenedReader there might be 2 threads competing for the cached reference in
+        // a way that thread 1 counts down the lastOpenedReader reference and before thread 1 can execute
+        // the close listener we already open and assign a new reader to lastOpenedReader. In this case
+        // the cache key doesn't match and we just ignore it since we use this method only to null out the
+        // lastOpenedReader member to ensure resources can be GCed
+        if (lastOpenedReader != null && key == lastOpenedReader.getReaderCacheHelper().getKey()) {
+            assert lastOpenedReader.getRefCount() == 0;
+            lastOpenedReader = null;
+        }
+    }
+
+    private synchronized DirectoryReader getOrOpenReader() throws IOException {
+        DirectoryReader reader = null;
+        boolean success = false;
+        try {
+            reader = getReader();
+            if (reader == null) {
+                for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
+                    listeners.beforeRefresh();
+                }
+                reader = DirectoryReader.open(engineConfig.getStore().directory());
+                searcherFactory.processReaders(reader, null);
+                reader = lastOpenedReader = wrapReader(reader, Function.identity());
+                reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
+                for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
+                    listeners.afterRefresh(true);
+                }
+            }
+            success = true;
+            return reader;
+        } finally {
+            if (success == false) {
+                IOUtils.close(reader);
+            }
+        }
+    }
+
+    @SuppressForbidden(reason = "we manage references explicitly here")
+    private synchronized DirectoryReader getReader() throws IOException {
+        DirectoryReader reader = null;
+        boolean success = false;
+        try {
+            if (lastOpenedReader != null && lastOpenedReader.tryIncRef()) {
+                reader = lastOpenedReader;
+            }
+            success = true;
+            return reader;
+        } finally {
+            if (success == false) {
+                IOUtils.close(reader);
+            }
+        }
+    }
+
+    @Override
+    @SuppressWarnings("fallthrough")
+    @SuppressForbidden( reason = "we manage references explicitly here")
+    public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
+        store.incRef();
+        boolean releaseRefeference = true;
+        try  {
+            final boolean maybeOpenReader;
+            switch (source) {
+                case "load_seq_no":
+                case "load_version":
+                    assert false : "this is a read-only engine";
+                case "doc_stats":
+                    assert false : "doc_stats are overwritten";
+                case "segments":
+                case "segments_stats":
+                case "completion_stats":
+                case "refresh_needed":
+                    maybeOpenReader = false;
+                    break;
+                default:
+                    maybeOpenReader = true;
+            }
+            // special case we only want to report segment stats if we have a reader open. in that case we only get a reader if we still
+            // have one open at the time and can inc it's reference.
+            DirectoryReader reader = maybeOpenReader ? getOrOpenReader() : getReader();
+            if (reader == null) {
+                // we just hand out a searcher on top of an empty reader that we opened for the ReadOnlyEngine in the #open(IndexCommit)
+                // method. this is the case when we don't have a reader open right now and we get a stats call any other that falls in
+                // the category that doesn't trigger a reopen
+                return super.acquireSearcher(source, scope);
+            } else {
+                try {
+                    LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader, this);
+                    Searcher newSearcher = new Searcher(source, new IndexSearcher(lazyDirectoryReader),
+                        () -> IOUtils.close(lazyDirectoryReader, store::decRef));
+                    releaseRefeference = false;
+                    return newSearcher;
+                } finally {
+                    if (releaseRefeference) {
+                        reader.decRef(); // don't call close here we manage reference ourselves
+                    }
+                }
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        } finally {
+            if (releaseRefeference) {
+                store.decRef();
+            }
+        }
+    }
+
+    static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) {
+        while (reader instanceof FilterDirectoryReader) {
+            if (reader instanceof LazyDirectoryReader) {
+                return (LazyDirectoryReader) reader;
+            }
+            reader = ((FilterDirectoryReader) reader).getDelegate();
+        }
+        return null;
+    }
+
+    /**
+     * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings
+     * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases.
+     *
+     * This reader and its leaf reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still
+     * overrides all it's delegate methods. We have tests to ensure we never miss an override but we need to in order to make sure
+     * the wrapper leaf readers don't register themself as close listeners on the wrapped ones otherwise we fail plugging in new readers
+     * on the next search phase.
+     */
+    static final class LazyDirectoryReader extends FilterDirectoryReader {
+
+        private final FrozenEngine engine;
+        private volatile DirectoryReader delegate; // volatile since it might be closed concurrently
+
+        private LazyDirectoryReader(DirectoryReader reader, FrozenEngine engine) throws IOException {
+            super(reader, new SubReaderWrapper() {
+                @Override
+                public LeafReader wrap(LeafReader reader) {
+                    return new LazyLeafReader(reader);
+                };
+            });
+            this.delegate = reader;
+            this.engine = engine;
+        }
+
+        @SuppressForbidden(reason = "we manage references explicitly here")
+        synchronized void release() throws IOException {
+            if (delegate != null) { // we are lenient here it's ok to double close
+                delegate.decRef();
+                delegate = null;
+                if (tryIncRef()) { // only do this if we are not closed already
+                    // we end up in this case when we are not closed but in an intermediate
+                    // state were we want to release all or the real leaf readers ie. in between search phases
+                    // but still want to keep this Lazy reference open. In oder to let the heavy real leaf
+                    // readers to be GCed we need to null our the references.
+                    try {
+                        for (LeafReaderContext leaf : leaves()) {
+                            LazyLeafReader reader = (LazyLeafReader) leaf.reader();
+                            reader.in = null;
+                        }
+                    } finally {
+                        decRef();
+                    }
+                }
+            }
+        }
+
+        void reset() throws IOException {
+            boolean success = false;
+            DirectoryReader reader = engine.getOrOpenReader();
+            try {
+                reset(reader);
+                success = true;
+            } finally {
+                if (success == false) {
+                    IOUtils.close(reader);
+                }
+            }
+        }
+
+        private synchronized void reset(DirectoryReader delegate) {
+            if (this.delegate != null) {
+                throw new AssertionError("lazy reader is not released");
+            }
+            assert (delegate instanceof LazyDirectoryReader) == false : "must not be a LazyDirectoryReader";
+            List<LeafReaderContext> leaves = delegate.leaves();
+            int ord = 0;
+            for (LeafReaderContext leaf : leaves()) {
+                LazyLeafReader reader = (LazyLeafReader) leaf.reader();
+                LeafReader newReader = leaves.get(ord++).reader();
+                assert reader.in == null;
+                reader.in = newReader;
+                assert reader.info.info.equals(Lucene.segmentReader(newReader).getSegmentInfo().info);
+            }
+            this.delegate = delegate;
+        }
+
+        @Override
+        protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) {
+            throw new UnsupportedOperationException();
+        }
+
+        void ensureOpenOrReset() {
+            // ensure we fail early and with good exceptions
+            ensureOpen();
+            if (delegate == null) {
+                throw new AlreadyClosedException("delegate is released");
+            }
+        }
+
+        @Override
+        public long getVersion() {
+            ensureOpenOrReset();
+            return delegate.getVersion();
+        }
+
+        @Override
+        public boolean isCurrent() throws IOException {
+            ensureOpenOrReset();
+            return delegate.isCurrent();
+        }
+
+        @Override
+        public IndexCommit getIndexCommit() throws IOException {
+            ensureOpenOrReset();
+            return delegate.getIndexCommit();
+        }
+
+        @Override
+        protected void doClose() throws IOException {
+            release();
+        }
+
+        @Override
+        public CacheHelper getReaderCacheHelper() {
+            ensureOpenOrReset();
+            return delegate.getReaderCacheHelper();
+        }
+
+        @Override
+        public DirectoryReader getDelegate() {
+            ensureOpenOrReset();
+            return delegate;
+        }
+    }
+
+    /**
+     * We basically duplicate a FilterLeafReader here since we don't want the
+     * incoming reader to register with this reader as a parent reader. This would mean we barf if the incoming
+     * reader is closed and that is what we actually doing on purpose.
+     */
+    static final class LazyLeafReader extends FilterLeafReader {
+
+        private volatile LeafReader in;
+        private final SegmentCommitInfo info;
+        private final int numDocs;
+        private final int maxDocs;
+
+        private LazyLeafReader(LeafReader in) {
+            super(Lucene.emptyReader(in.maxDoc())); // empty reader here to make FilterLeafReader happy
+            this.info = Lucene.segmentReader(in).getSegmentInfo();
+            this.in = in;
+            numDocs = in.numDocs();
+            maxDocs = in.maxDoc();
+            // don't register in reader as a subreader here.
+        }
+
+        private void ensureOpenOrReleased() {
+            ensureOpen();
+            if (in == null) {
+                throw new AlreadyClosedException("leaf is already released");
+            }
+        }
+
+        @Override
+        public Bits getLiveDocs() {
+            ensureOpenOrReleased();
+            return in.getLiveDocs();
+        }
+
+        @Override
+        public FieldInfos getFieldInfos() {
+            ensureOpenOrReleased();
+            return in.getFieldInfos();
+        }
+
+        @Override
+        public PointValues getPointValues(String field) throws IOException {
+            ensureOpenOrReleased();
+            return in.getPointValues(field);
+        }
+
+        @Override
+        public Fields getTermVectors(int docID)
+            throws IOException {
+            ensureOpenOrReleased();
+            return in.getTermVectors(docID);
+        }
+
+        @Override
+        public int numDocs() {
+            return numDocs;
+        }
+
+        @Override
+        public int maxDoc() {
+            return maxDocs;
+        }
+
+        @Override
+        public void document(int docID, StoredFieldVisitor visitor) throws IOException {
+            ensureOpenOrReleased();
+            in.document(docID, visitor);
+        }
+
+        @Override
+        protected void doClose() throws IOException {
+            in.close();
+        }
+
+        @Override
+        public CacheHelper getReaderCacheHelper() {
+            ensureOpenOrReleased();
+            return in.getReaderCacheHelper();
+        }
+
+        @Override
+        public CacheHelper getCoreCacheHelper() {
+            ensureOpenOrReleased();
+            return in.getCoreCacheHelper();
+        }
+
+        @Override
+        public Terms terms(String field) throws IOException {
+            ensureOpenOrReleased();
+            return in.terms(field);
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder buffer = new StringBuilder("LazyLeafReader(");
+            buffer.append(in);
+            buffer.append(')');
+            return buffer.toString();
+        }
+
+        @Override
+        public NumericDocValues getNumericDocValues(String field) throws IOException {
+            ensureOpenOrReleased();
+            return in.getNumericDocValues(field);
+        }
+
+        @Override
+        public BinaryDocValues getBinaryDocValues(String field) throws IOException {
+            ensureOpenOrReleased();
+            return in.getBinaryDocValues(field);
+        }
+
+        @Override
+        public SortedDocValues getSortedDocValues(String field) throws IOException {
+            ensureOpenOrReleased();
+            return in.getSortedDocValues(field);
+        }
+
+        @Override
+        public SortedNumericDocValues getSortedNumericDocValues(String field) throws IOException {
+            ensureOpenOrReleased();
+            return in.getSortedNumericDocValues(field);
+        }
+
+        @Override
+        public SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
+            ensureOpenOrReleased();
+            return in.getSortedSetDocValues(field);
+        }
+
+        @Override
+        public NumericDocValues getNormValues(String field) throws IOException {
+            ensureOpenOrReleased();
+            return in.getNormValues(field);
+        }
+
+        @Override
+        public LeafMetaData getMetaData() {
+            ensureOpenOrReleased();
+            return in.getMetaData();
+        }
+
+        @Override
+        public void checkIntegrity() throws IOException {
+            ensureOpenOrReleased();
+            in.checkIntegrity();
+        }
+
+        @Override
+        public LeafReader getDelegate() {
+            return in;
+        }
+    }
+
+    synchronized boolean isReaderOpen() {
+        return lastOpenedReader != null;
+    } // this is mainly for tests
+}

+ 282 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java

@@ -0,0 +1,282 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.index.engine;
+
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.index.mapper.ParsedDocument;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FrozenEngineTests extends EngineTestCase {
+
+    public void testAcquireReleaseReset() throws IOException {
+        IOUtils.close(engine, store);
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        try (Store store = createStore()) {
+            CountingRefreshListener listener = new CountingRefreshListener();
+            EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null,
+                globalCheckpoint::get, new NoneCircuitBreakerService());
+            try (InternalEngine engine = createEngine(config)) {
+                int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine));
+                engine.flushAndClose();
+                listener.reset();
+                try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
+                    assertFalse(frozenEngine.isReaderOpen());
+                    Engine.Searcher searcher = frozenEngine.acquireSearcher("test");
+                    assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher
+                        .getDirectoryReader()).shardId());
+                    assertTrue(frozenEngine.isReaderOpen());
+                    TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs);
+                    assertEquals(search.scoreDocs.length, numDocs);
+                    assertEquals(1, listener.afterRefresh.get());
+                    FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release();
+                    assertFalse(frozenEngine.isReaderOpen());
+                    assertEquals(1, listener.afterRefresh.get());
+                    expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), numDocs));
+                    FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset();
+                    assertEquals(2, listener.afterRefresh.get());
+                    search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs);
+                    assertEquals(search.scoreDocs.length, numDocs);
+                    searcher.close();
+                }
+            }
+        }
+    }
+
+    public void testAcquireReleaseResetTwoSearchers() throws IOException {
+        IOUtils.close(engine, store);
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        try (Store store = createStore()) {
+            CountingRefreshListener listener = new CountingRefreshListener();
+            EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null,
+                globalCheckpoint::get, new NoneCircuitBreakerService());
+            try (InternalEngine engine = createEngine(config)) {
+                int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine));
+                engine.flushAndClose();
+                listener.reset();
+                try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
+                    assertFalse(frozenEngine.isReaderOpen());
+                    Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test");
+                    assertTrue(frozenEngine.isReaderOpen());
+                    TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs);
+                    assertEquals(search.scoreDocs.length, numDocs);
+                    assertEquals(1, listener.afterRefresh.get());
+                    FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).release();
+                    Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test");
+                    search = searcher2.searcher().search(new MatchAllDocsQuery(), numDocs);
+                    assertEquals(search.scoreDocs.length, numDocs);
+                    assertTrue(frozenEngine.isReaderOpen());
+                    assertEquals(2, listener.afterRefresh.get());
+                    expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), numDocs));
+                    FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).reset();
+                    assertEquals(2, listener.afterRefresh.get());
+                    search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs);
+                    assertEquals(search.scoreDocs.length, numDocs);
+                    searcher1.close();
+                    searcher2.close();
+                }
+            }
+        }
+    }
+
+    public void testSegmentStats() throws IOException {
+        IOUtils.close(engine, store);
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        try (Store store = createStore()) {
+            CountingRefreshListener listener = new CountingRefreshListener();
+            EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null,
+                globalCheckpoint::get, new NoneCircuitBreakerService());
+            try (InternalEngine engine = createEngine(config)) {
+                addDocuments(globalCheckpoint, engine);
+                engine.flushAndClose();
+                listener.reset();
+                try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
+                    Engine.Searcher searcher = frozenEngine.acquireSearcher("test");
+                    SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean());
+                    assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount());
+                    FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release();
+                    assertEquals(1, listener.afterRefresh.get());
+                    segmentsStats = frozenEngine.segmentsStats(randomBoolean());
+                    assertEquals(0, segmentsStats.getCount());
+                    assertEquals(1, listener.afterRefresh.get());
+                    assertFalse(frozenEngine.isReaderOpen());
+                    FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset();
+                    segmentsStats = frozenEngine.segmentsStats(randomBoolean());
+                    assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount());
+                    searcher.close();
+                }
+            }
+        }
+    }
+
+    public void testCircuitBreakerAccounting() throws IOException {
+        IOUtils.close(engine, store);
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        try (Store store = createStore()) {
+            CountingRefreshListener listener = new CountingRefreshListener();
+            EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null,
+                globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(),
+                    new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
+            CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING);
+            long expectedUse;
+            try (InternalEngine engine = createEngine(config)) {
+                addDocuments(globalCheckpoint, engine);
+                engine.refresh("test"); // pull the reader
+                expectedUse = breaker.getUsed();
+                engine.flushAndClose();
+            }
+            assertTrue(expectedUse > 0);
+            assertEquals(0, breaker.getUsed());
+            listener.reset();
+            try (FrozenEngine frozenEngine = new FrozenEngine(config)) {
+                Engine.Searcher searcher = frozenEngine.acquireSearcher("test");
+                assertEquals(expectedUse, breaker.getUsed());
+                FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release();
+                assertEquals(1, listener.afterRefresh.get());
+                assertEquals(0, breaker.getUsed());
+                assertFalse(frozenEngine.isReaderOpen());
+                FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset();
+                assertEquals(expectedUse, breaker.getUsed());
+                searcher.close();
+                assertEquals(0, breaker.getUsed());
+            }
+        }
+    }
+
+    private int addDocuments(AtomicLong globalCheckpoint, InternalEngine engine) throws IOException {
+        int numDocs = scaledRandomIntBetween(10, 1000);
+        int numDocsAdded = 0;
+        for (int i = 0; i < numDocs; i++) {
+            numDocsAdded++;
+            ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
+            engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
+                System.nanoTime(), -1, false));
+            if (rarely()) {
+                engine.flush();
+            }
+            globalCheckpoint.set(engine.getLocalCheckpoint());
+        }
+        engine.syncTranslog();
+        return numDocsAdded;
+    }
+
+    public void testSearchConcurrently() throws IOException, InterruptedException {
+        // even though we don't want this to be searched concurrently we better make sure we release all resources etc.
+        IOUtils.close(engine, store);
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        try (Store store = createStore()) {
+            EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get,
+                new HierarchyCircuitBreakerService(defaultSettings.getSettings(),
+                    new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
+            CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING);
+            try (InternalEngine engine = createEngine(config)) {
+                int numDocsAdded = addDocuments(globalCheckpoint, engine);
+                engine.flushAndClose();
+                int numIters = randomIntBetween(100, 1000);
+                try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) {
+                    int numThreads = randomIntBetween(2, 4);
+                    Thread[] threads = new Thread[numThreads];
+                    CyclicBarrier barrier = new CyclicBarrier(numThreads);
+                    CountDownLatch latch = new CountDownLatch(numThreads);
+                    for (int i = 0; i < numThreads; i++) {
+                        threads[i] = new Thread(() -> {
+                            try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) {
+                                barrier.await();
+                                FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release();
+                                for (int j = 0; j < numIters; j++) {
+                                    FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset();
+                                    assertTrue(frozenEngine.isReaderOpen());
+                                    TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), Math.min(10, numDocsAdded));
+                                    assertEquals(search.scoreDocs.length, Math.min(10, numDocsAdded));
+                                    FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release();
+                                }
+                                if (randomBoolean()) {
+                                    FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset();
+                                }
+                            } catch (Exception e) {
+                                throw new AssertionError(e);
+                            } finally {
+                                latch.countDown();
+                            }
+                        });
+                        threads[i].start();
+                    }
+                    latch.await();
+                    for (Thread t : threads) {
+                        t.join();
+                    }
+                    assertFalse(frozenEngine.isReaderOpen());
+                    assertEquals(0, breaker.getUsed());
+                }
+            }
+        }
+    }
+
+    private static void checkOverrideMethods(Class<?> clazz) throws NoSuchMethodException, SecurityException {
+        final Class<?> superClazz = clazz.getSuperclass();
+        for (Method m : superClazz.getMethods()) {
+            final int mods = m.getModifiers();
+            if (Modifier.isStatic(mods) || Modifier.isAbstract(mods) || Modifier.isFinal(mods) || m.isSynthetic()
+                || m.getName().equals("attributes") || m.getName().equals("getStats")) {
+                continue;
+            }
+            // The point of these checks is to ensure that methods from the super class
+            // are overwritten to make sure we never miss a method from FilterLeafReader / FilterDirectoryReader
+            final Method subM = clazz.getMethod(m.getName(), m.getParameterTypes());
+            if (subM.getDeclaringClass() == superClazz
+                && m.getDeclaringClass() != Object.class
+                && m.getDeclaringClass() == subM.getDeclaringClass()) {
+                fail(clazz + " doesn't override" + m + " although it has been declared by it's superclass");
+            }
+        }
+    }
+
+    // here we make sure we catch any change to their super classes FilterLeafReader / FilterDirectoryReader
+    public void testOverrideMethods() throws Exception {
+        checkOverrideMethods(FrozenEngine.LazyDirectoryReader.class);
+        checkOverrideMethods(FrozenEngine.LazyLeafReader.class);
+    }
+
+    private class CountingRefreshListener implements ReferenceManager.RefreshListener {
+
+        final AtomicInteger afterRefresh = new AtomicInteger(0);
+        private final AtomicInteger beforeRefresh = new AtomicInteger(0);
+
+        @Override
+        public void beforeRefresh() {
+            beforeRefresh.incrementAndGet();
+        }
+
+        @Override
+        public void afterRefresh(boolean didRefresh) {
+            afterRefresh.incrementAndGet();
+            assertEquals(beforeRefresh.get(), afterRefresh.get());
+        }
+
+        void reset() {
+            afterRefresh.set(0);
+            beforeRefresh.set(0);
+        }
+    }
+}