瀏覽代碼

Lock down Engine.Searcher (#34363)

`Engine.Searcher` is non-final today which makes it error prone
in the case of wrapping the underlying reader or lucene `IndexSearcher`
like we do in `IndexSearcherWrapper`. Yet, there is no subclass of it yet
that would be dramatic to just drop on the floor. With the start of development
of frozen indices this changed since in #34357 functionality was added to
a subclass which would be dropped if a `IndexSearcherWrapper` is installed on an index.
This change locks down the `Engine.Searcher` to prevent such a functionality trap.
Simon Willnauer 7 年之前
父節點
當前提交
d43a1fac33

+ 27 - 36
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -43,7 +43,6 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.Accountables;
-import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.index.IndexRequest;
@@ -84,6 +83,7 @@ import org.elasticsearch.search.suggest.completion.CompletionStats;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.file.NoSuchFileException;
 import java.util.Arrays;
 import java.util.Base64;
@@ -665,14 +665,23 @@ public abstract class Engine implements Closeable {
         Releasable releasable = store::decRef;
         try {
             ReferenceManager<IndexSearcher> referenceManager = getReferenceManager(scope);
-            Searcher engineSearcher = new Searcher(source, referenceManager.acquire(),
-                s -> {
-                  try {
-                      referenceManager.release(s);
-                  } finally {
-                      store.decRef();
-                  }
-              }, logger);
+            IndexSearcher acquire = referenceManager.acquire();
+            AtomicBoolean released = new AtomicBoolean(false);
+            Searcher engineSearcher = new Searcher(source, acquire,
+                () -> {
+                if (released.compareAndSet(false, true)) {
+                    try {
+                        referenceManager.release(acquire);
+                    } finally {
+                        store.decRef();
+                    }
+                } else {
+                    /* In general, searchers should never be released twice or this would break reference counting. There is one rare case
+                     * when it might happen though: when the request and the Reaper thread would both try to release it in a very short
+                     * amount of time, this is why we only log a warning instead of throwing an exception. */
+                    logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
+                }
+              });
             releasable = null; // success - hand over the reference to the engine searcher
             return engineSearcher;
         } catch (AlreadyClosedException ex) {
@@ -1175,69 +1184,51 @@ public abstract class Engine implements Closeable {
         }
     }
 
-    public static class Searcher implements Releasable {
+    public static final class Searcher implements Releasable {
         private final String source;
         private final IndexSearcher searcher;
-        private final AtomicBoolean released = new AtomicBoolean(false);
-        private final Logger logger;
-        private final IOUtils.IOConsumer<IndexSearcher> onClose;
+        private final Closeable onClose;
 
-        public Searcher(String source, IndexSearcher searcher, Logger logger) {
-            this(source, searcher, s -> s.getIndexReader().close(), logger);
-        }
-
-        public Searcher(String source, IndexSearcher searcher, IOUtils.IOConsumer<IndexSearcher> onClose, Logger logger) {
+        public Searcher(String source, IndexSearcher searcher, Closeable onClose) {
             this.source = source;
             this.searcher = searcher;
             this.onClose = onClose;
-            this.logger = logger;
         }
 
         /**
          * The source that caused this searcher to be acquired.
          */
-        public final String source() {
+        public String source() {
             return source;
         }
 
-        public final IndexReader reader() {
+        public IndexReader reader() {
             return searcher.getIndexReader();
         }
 
-        public final DirectoryReader getDirectoryReader() {
+        public DirectoryReader getDirectoryReader() {
             if (reader() instanceof DirectoryReader) {
                 return (DirectoryReader) reader();
             }
             throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader");
         }
 
-        public final IndexSearcher searcher() {
+        public IndexSearcher searcher() {
             return searcher;
         }
 
         @Override
         public void close() {
-            if (released.compareAndSet(false, true) == false) {
-                /* In general, searchers should never be released twice or this would break reference counting. There is one rare case
-                 * when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount
-                 * of time, this is why we only log a warning instead of throwing an exception.
-                 */
-                logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
-                return;
-            }
             try {
-                onClose.accept(searcher());
+                onClose.close();
             } catch (IOException e) {
-                throw new IllegalStateException("Cannot close", e);
+                throw new UncheckedIOException("failed to close", e);
             } catch (AlreadyClosedException e) {
                 // This means there's a bug somewhere: don't suppress it
                 throw new AssertionError(e);
             }
         }
 
-        public final Logger getLogger() {
-            return logger;
-        }
     }
 
     public abstract static class Operation {

+ 2 - 2
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -606,7 +606,7 @@ public class InternalEngine extends Engine {
                                     // in the case of a already pruned translog generation we might get null here - yet very unlikely
                                     TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
                                         .getIndexSettings().getIndexVersionCreated());
-                                    return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger),
+                                    return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader::close),
                                         new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
                                 }
                             } catch (IOException e) {
@@ -2086,7 +2086,7 @@ public class InternalEngine extends Engine {
             if (warmer != null) {
                 try {
                     assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
-                    warmer.warm(new Searcher("top_reader_warming", searcher, s -> {}, logger));
+                    warmer.warm(new Searcher("top_reader_warming", searcher, () -> {}));
                 } catch (Exception e) {
                     if (isEngineClosed.get() == false) {
                         logger.warn("failed to prepare/warm", e);

+ 3 - 2
server/src/main/java/org/elasticsearch/index/shard/IndexSearcherWrapper.java

@@ -99,8 +99,9 @@ public class IndexSearcherWrapper {
         } else {
             // we close the reader to make sure wrappers can release resources if needed....
             // our NonClosingReaderWrapper makes sure that our reader is not closed
-            return new Engine.Searcher(engineSearcher.source(), indexSearcher, s -> IOUtils.close(s.getIndexReader(), engineSearcher),
-                engineSearcher.getLogger());
+            return new Engine.Searcher(engineSearcher.source(), indexSearcher, () ->
+                IOUtils.close(indexSearcher.getIndexReader(), // this will close the wrappers excluding the NonClosingReaderWrapper
+                engineSearcher)); // this will run the closeable on the wrapped engine searcher
         }
     }
 

+ 25 - 24
server/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java

@@ -42,6 +42,7 @@ import org.elasticsearch.test.ESTestCase;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IndexSearcherWrapperTests extends ESTestCase {
@@ -73,20 +74,20 @@ public class IndexSearcherWrapperTests extends ESTestCase {
         final int sourceRefCount = open.getRefCount();
         final AtomicInteger count = new AtomicInteger();
         final AtomicInteger outerCount = new AtomicInteger();
-        try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) {
-            final Engine.Searcher wrap =  wrapper.wrap(engineSearcher);
-            assertEquals(1, wrap.reader().getRefCount());
-            ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
-                if (key == open.getReaderCacheHelper().getKey()) {
-                    count.incrementAndGet();
-                }
-                outerCount.incrementAndGet();
-            });
-            assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
-            wrap.close();
-            assertFalse("wrapped reader is closed", wrap.reader().tryIncRef());
-            assertEquals(sourceRefCount, open.getRefCount());
-        }
+        final AtomicBoolean closeCalled = new AtomicBoolean(false);
+        final Engine.Searcher wrap =  wrapper.wrap(new Engine.Searcher("foo", searcher, () -> closeCalled.set(true)));
+        assertEquals(1, wrap.reader().getRefCount());
+        ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
+            if (key == open.getReaderCacheHelper().getKey()) {
+                count.incrementAndGet();
+            }
+            outerCount.incrementAndGet();
+        });
+        assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
+        wrap.close();
+        assertFalse("wrapped reader is closed", wrap.reader().tryIncRef());
+        assertEquals(sourceRefCount, open.getRefCount());
+        assertTrue(closeCalled.get());
         assertEquals(1, closeCalls.get());
 
         IOUtils.close(open, writer, dir);
@@ -121,15 +122,15 @@ public class IndexSearcherWrapperTests extends ESTestCase {
             }
         };
         final ConcurrentHashMap<Object, TopDocs> cache = new ConcurrentHashMap<>();
-        try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) {
-            try (Engine.Searcher wrap = wrapper.wrap(engineSearcher)) {
-                ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
-                    cache.remove(key);
-                });
-                TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1);
-                cache.put(wrap.reader().getReaderCacheHelper().getKey(), search);
-            }
+        AtomicBoolean closeCalled = new AtomicBoolean(false);
+        try (Engine.Searcher wrap = wrapper.wrap(new Engine.Searcher("foo", searcher, () -> closeCalled.set(true)))) {
+            ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
+                cache.remove(key);
+            });
+            TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1);
+            cache.put(wrap.reader().getReaderCacheHelper().getKey(), search);
         }
+        assertTrue(closeCalled.get());
         assertEquals(1, closeCalls.get());
 
         assertEquals(1, cache.size());
@@ -151,11 +152,11 @@ public class IndexSearcherWrapperTests extends ESTestCase {
         assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
         searcher.setSimilarity(iwc.getSimilarity());
         IndexSearcherWrapper wrapper = new IndexSearcherWrapper();
-        try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, logger)) {
+        try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, open::close)) {
             final Engine.Searcher wrap = wrapper.wrap(engineSearcher);
             assertSame(wrap, engineSearcher);
         }
-        IOUtils.close(open, writer, dir);
+        IOUtils.close(writer, dir);
     }
 
     private static class FieldMaskingReader extends FilterDirectoryReader {

+ 1 - 1
server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

@@ -110,7 +110,7 @@ public class DefaultSearchContextTests extends ESTestCase {
         try (Directory dir = newDirectory();
              RandomIndexWriter w = new RandomIndexWriter(random(), dir);
              IndexReader reader = w.getReader();
-             Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), logger)) {
+             Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close)) {
 
             DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService,
                 indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);

+ 1 - 1
server/src/test/java/org/elasticsearch/search/profile/query/QueryProfilerTests.java

@@ -363,7 +363,7 @@ public class QueryProfilerTests extends ESTestCase {
 
     public void testApproximations() throws IOException {
         QueryProfiler profiler = new QueryProfiler();
-        Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), logger);
+        Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close);
         // disable query caching since we want to test approximations, which won't
         // be exposed on a cached entry
         ContextIndexSearcher searcher = new ContextIndexSearcher(engineSearcher, null, MAYBE_CACHE_POLICY);

+ 2 - 2
test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

@@ -240,7 +240,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
     }
 
     protected SearchContext createSearchContext(IndexSearcher indexSearcher, IndexSettings indexSettings) {
-        Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, logger);
+        Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, () -> indexSearcher.getIndexReader().close());
         QueryCache queryCache = new DisabledQueryCache(indexSettings);
         QueryCachingPolicy queryCachingPolicy = new QueryCachingPolicy() {
             @Override
@@ -248,7 +248,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
             }
 
             @Override
-            public boolean shouldCache(Query query) throws IOException {
+            public boolean shouldCache(Query query) {
                 // never cache a query
                 return false;
             }

+ 0 - 87
test/framework/src/main/java/org/elasticsearch/test/engine/AssertingSearcher.java

@@ -1,87 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.test.engine;
-
-import org.apache.logging.log4j.Logger;
-import org.apache.lucene.search.IndexSearcher;
-import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.shard.ShardId;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A searcher that asserts the IndexReader's refcount on close
- */
-class AssertingSearcher extends Engine.Searcher {
-    private final Engine.Searcher wrappedSearcher;
-    private final ShardId shardId;
-    private RuntimeException firstReleaseStack;
-    private final Object lock = new Object();
-    private final int initialRefCount;
-    private final Logger logger;
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-
-    AssertingSearcher(IndexSearcher indexSearcher, final Engine.Searcher wrappedSearcher, ShardId shardId, Logger logger) {
-        super(wrappedSearcher.source(), indexSearcher, s -> {throw new AssertionError();}, logger);
-        // we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher
-        // with a wrapped reader.
-        this.wrappedSearcher = wrappedSearcher;
-        this.logger = logger;
-        this.shardId = shardId;
-        initialRefCount = wrappedSearcher.reader().getRefCount();
-        assert initialRefCount > 0 :
-                "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
-    }
-
-    @Override
-    public void close() {
-        synchronized (lock) {
-            if (closed.compareAndSet(false, true)) {
-                firstReleaseStack = new RuntimeException();
-                final int refCount = wrappedSearcher.reader().getRefCount();
-                /*
-                 * this assert seems to be paranoid but given LUCENE-5362 we
-                 * better add some assertions here to make sure we catch any
-                 * potential problems.
-                 */
-                assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already "
-                        + " closed. Initial refCount was: [" + initialRefCount + "]";
-                try {
-                    wrappedSearcher.close();
-                } catch (RuntimeException ex) {
-                    logger.debug("Failed to release searcher", ex);
-                    throw ex;
-                }
-            } else {
-                AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source() + "]");
-                error.initCause(firstReleaseStack);
-                throw error;
-            }
-        }
-    }
-
-    public ShardId shardId() {
-        return shardId;
-    }
-
-    public boolean isOpen() {
-        return closed.get() == false;
-    }
-}

+ 64 - 24
test/framework/src/main/java/org/elasticsearch/test/engine/MockEngineSupport.java

@@ -37,7 +37,6 @@ import org.elasticsearch.index.engine.EngineConfig;
 import org.elasticsearch.index.engine.EngineException;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.engine.MockInternalEngine;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -71,7 +70,7 @@ public final class MockEngineSupport {
     private final ShardId shardId;
     private final QueryCache filterCache;
     private final QueryCachingPolicy filterCachingPolicy;
-    private final SearcherCloseable searcherCloseable;
+    private final InFlightSearchers inFlightSearchers;
     private final MockContext mockContext;
     private final boolean disableFlushOnClose;
 
@@ -107,8 +106,8 @@ public final class MockEngineSupport {
             logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader);
         }
         mockContext = new MockContext(random, wrapReader, wrapper, settings);
-        this.searcherCloseable = new SearcherCloseable();
-        LuceneTestCase.closeAfterSuite(searcherCloseable); // only one suite closeable per Engine
+        this.inFlightSearchers = new InFlightSearchers();
+        LuceneTestCase.closeAfterSuite(inFlightSearchers); // only one suite closeable per Engine
         this.disableFlushOnClose = DISABLE_FLUSH_ON_CLOSE.get(settings);
     }
 
@@ -188,7 +187,7 @@ public final class MockEngineSupport {
 
     }
 
-    public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher) {
+    public Engine.Searcher wrapSearcher(Engine.Searcher engineSearcher) {
         final AssertingIndexSearcher assertingIndexSearcher = newSearcher(engineSearcher);
         assertingIndexSearcher.setSimilarity(engineSearcher.searcher().getSimilarity());
         /*
@@ -199,26 +198,16 @@ public final class MockEngineSupport {
          * early. - good news, stuff will fail all over the place if we don't
          * get this right here
          */
-        AssertingSearcher assertingSearcher = new AssertingSearcher(assertingIndexSearcher, engineSearcher, shardId, logger) {
-            @Override
-            public void close() {
-                try {
-                    searcherCloseable.remove(this);
-                } finally {
-                    super.close();
-                }
-            }
-        };
-        searcherCloseable.add(assertingSearcher, engineSearcher.source());
-        return assertingSearcher;
+        SearcherCloseable closeable = new SearcherCloseable(engineSearcher, logger, inFlightSearchers);
+        return new Engine.Searcher(engineSearcher.source(), assertingIndexSearcher, closeable);
     }
 
-    private static final class SearcherCloseable implements Closeable {
+    private static final class InFlightSearchers implements Closeable {
 
-        private final IdentityHashMap<AssertingSearcher, RuntimeException> openSearchers = new IdentityHashMap<>();
+        private final IdentityHashMap<Object, RuntimeException> openSearchers = new IdentityHashMap<>();
 
         @Override
-        public synchronized void close() throws IOException {
+        public synchronized void close() {
             if (openSearchers.isEmpty() == false) {
                 AssertionError error = new AssertionError("Unreleased searchers found");
                 for (RuntimeException ex : openSearchers.values()) {
@@ -228,15 +217,66 @@ public final class MockEngineSupport {
             }
         }
 
-        void add(AssertingSearcher searcher, String source) {
+        void add(Object key, String source) {
             final RuntimeException ex = new RuntimeException("Unreleased Searcher, source [" + source+ "]");
             synchronized (this) {
-                openSearchers.put(searcher, ex);
+                openSearchers.put(key, ex);
             }
         }
 
-        synchronized void remove(AssertingSearcher searcher) {
-            openSearchers.remove(searcher);
+        synchronized void remove(Object key) {
+            openSearchers.remove(key);
+        }
+    }
+
+    private static final class SearcherCloseable implements Closeable {
+        private final Engine.Searcher wrappedSearcher;
+        private final InFlightSearchers inFlightSearchers;
+        private RuntimeException firstReleaseStack;
+        private final Object lock = new Object();
+        private final int initialRefCount;
+        private final Logger logger;
+        private final AtomicBoolean closed = new AtomicBoolean(false);
+
+        SearcherCloseable(final Engine.Searcher wrappedSearcher, Logger logger, InFlightSearchers inFlightSearchers) {
+            // we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher
+            // with a wrapped reader.
+            this.wrappedSearcher = wrappedSearcher;
+            this.logger = logger;
+            initialRefCount = wrappedSearcher.reader().getRefCount();
+            this.inFlightSearchers = inFlightSearchers;
+            assert initialRefCount > 0 :
+                "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
+            inFlightSearchers.add(this, wrappedSearcher.source());
+        }
+
+        @Override
+        public void close() {
+            synchronized (lock) {
+                if (closed.compareAndSet(false, true)) {
+                    inFlightSearchers.remove(this);
+                    firstReleaseStack = new RuntimeException();
+                    final int refCount = wrappedSearcher.reader().getRefCount();
+                    /*
+                     * this assert seems to be paranoid but given LUCENE-5362 we
+                     * better add some assertions here to make sure we catch any
+                     * potential problems.
+                     */
+                    assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already "
+                        + " closed. Initial refCount was: [" + initialRefCount + "]";
+                    try {
+                        wrappedSearcher.close();
+                    } catch (RuntimeException ex) {
+                        logger.debug("Failed to release searcher", ex);
+                        throw ex;
+                    }
+                } else {
+                    AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source()
+                        + "]");
+                    error.initCause(firstReleaseStack);
+                    throw error;
+                }
+            }
         }
     }
 }

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java

@@ -78,6 +78,6 @@ final class MockInternalEngine extends InternalEngine {
     @Override
     public Searcher acquireSearcher(String source, SearcherScope scope) {
         final Searcher engineSearcher = super.acquireSearcher(source, scope);
-        return support().wrapSearcher(source, engineSearcher);
+        return support().wrapSearcher(engineSearcher);
     }
 }