Przeglądaj źródła

[ENGINE] Close Engine immediately if a tragic event strikes.

Until lately we couldn't close the engine in a tragic event due to
some the lock order and all it's complications. Now that the engine
is much more simplified in terms of having a single IndexWriter etc.
we don't necessarily need the write-lock on close anymore and can
easily just close and continue.
Simon Willnauer 10 lat temu
rodzic
commit
de7461efd0

+ 98 - 64
src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -104,7 +104,6 @@ public class InternalEngine extends Engine {
     private final SearcherManager searcherManager;
 
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
-    private volatile boolean closedOrFailed = false;
     private final AtomicBoolean optimizeMutex = new AtomicBoolean();
     // we use flushNeeded here, since if there are no changes, then the commit won't write
     // will not really happen, and then the commitUserData and the new translog will not be reflected
@@ -119,7 +118,7 @@ public class InternalEngine extends Engine {
 
     private final Object[] dirtyLocks;
     private volatile Throwable failedEngine = null;
-    private final Lock failEngineLock = new ReentrantLock();
+    private final ReentrantLock failEngineLock = new ReentrantLock();
     private final FailedEngineListener failedEngineListener;
 
     private final AtomicLong translogIdGenerator = new AtomicLong();
@@ -181,8 +180,10 @@ public class InternalEngine extends Engine {
             if (success == false) {
                 IOUtils.closeWhileHandlingException(writer, manager);
                 versionMap.clear();
-                // failure we need to dec the store reference
-                store.decRef();
+                if (isClosed.get() == false) {
+                    // failure we need to dec the store reference
+                    store.decRef();
+                }
             }
         }
     }
@@ -227,7 +228,7 @@ public class InternalEngine extends Engine {
                 }
                 if (mustCommitTranslogId) { // translog id is not in the metadata - fix this inconsistency some code relies on this and old indices might not have it.
                     indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
-                    indexWriter.commit();
+                    commitIndexWriter(indexWriter);
                 }
                 final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter, true), shardId);
                 searcherManager = new SearcherManager(directoryReader, searcherFactory);
@@ -238,7 +239,7 @@ public class InternalEngine extends Engine {
                 success = true;
                 return searcherManager;
             } catch (IOException e) {
-                maybeFailEngine(e, "start");
+                maybeFailEngine("start", e);
                 try {
                     indexWriter.rollback();
                 } catch (IOException e1) { // iw is closed below
@@ -254,7 +255,7 @@ public class InternalEngine extends Engine {
     }
 
     private void updateSettings() {
-        if (closedOrFailed == false) {
+        if (isClosed.get() == false) {
             final LiveIndexWriterConfig iwc = indexWriter.getConfig();
             iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush());
         }
@@ -332,7 +333,7 @@ public class InternalEngine extends Engine {
             }
             flushNeeded = true;
         } catch (OutOfMemoryError | IllegalStateException | IOException t) {
-            maybeFailEngine(t, "create");
+            maybeFailEngine("create", t);
             throw new CreateFailedEngineException(shardId, create, t);
         }
         checkVersionMapRefresh();
@@ -438,7 +439,7 @@ public class InternalEngine extends Engine {
             }
             flushNeeded = true;
         } catch (OutOfMemoryError | IllegalStateException | IOException t) {
-            maybeFailEngine(t, "index");
+            maybeFailEngine("index", t);
             throw new IndexFailedEngineException(shardId, index, t);
         }
         checkVersionMapRefresh();
@@ -451,7 +452,7 @@ public class InternalEngine extends Engine {
         // TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
         if (versionMap.ramBytesUsedForRefresh() > 0.25 * engineConfig.getIndexingBufferSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
             try {
-                if (closedOrFailed) {
+                if (isClosed.get()) {
                     // no point...
                     return;
                 }
@@ -530,7 +531,7 @@ public class InternalEngine extends Engine {
             innerDelete(delete);
             flushNeeded = true;
         } catch (OutOfMemoryError | IllegalStateException | IOException t) {
-            maybeFailEngine(t, "delete");
+            maybeFailEngine("delete", t);
             throw new DeleteFailedEngineException(shardId, delete, t);
         }
 
@@ -609,7 +610,7 @@ public class InternalEngine extends Engine {
             translog.add(new Translog.DeleteByQuery(delete));
             flushNeeded = true;
         } catch (Throwable t) {
-            maybeFailEngine(t, "delete_by_query");
+            maybeFailEngine("delete_by_query", t);
             throw new DeleteByQueryFailedEngineException(shardId, delete, t);
         }
 
@@ -684,6 +685,7 @@ public class InternalEngine extends Engine {
             searcherManager.maybeRefreshBlocking();
         } catch (AlreadyClosedException e) {
             ensureOpen();
+            maybeFailEngine("refresh", e);
         } catch (EngineClosedException e) {
             throw e;
         } catch (Throwable t) {
@@ -737,7 +739,7 @@ public class InternalEngine extends Engine {
                             long translogId = translogIdGenerator.incrementAndGet();
                             translog.newTransientTranslog(translogId);
                             indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
-                            indexWriter.commit();
+                            commitIndexWriter(indexWriter);
                             // we need to refresh in order to clear older version values
                             refresh("version_table_flush");
                             // we need to move transient to current only after we refresh
@@ -774,7 +776,7 @@ public class InternalEngine extends Engine {
                     try {
                         long translogId = translog.currentId();
                         indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
-                        indexWriter.commit();
+                        commitIndexWriter(indexWriter);
                     } catch (Throwable e) {
                         throw new FlushFailedEngineException(shardId, e);
                     }
@@ -788,19 +790,22 @@ public class InternalEngine extends Engine {
             }
 
             // reread the last committed segment infos
+            store.incRef();
             try (ReleasableLock _ = readLock.acquire()) {
                 ensureOpen();
                 lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
             } catch (Throwable e) {
-                if (closedOrFailed == false) {
+                if (isClosed.get() == false) {
                     logger.warn("failed to read latest segment infos on flush", e);
                     if (Lucene.isCorruptionException(e)) {
                         throw new FlushFailedEngineException(shardId, e);
                     }
                 }
+            } finally {
+                store.decRef();
             }
         } catch (FlushFailedEngineException ex) {
-            maybeFailEngine(ex, "flush");
+            maybeFailEngine("flush", ex);
             throw ex;
         } finally {
             flushLock.unlock();
@@ -809,7 +814,7 @@ public class InternalEngine extends Engine {
     }
 
     private void ensureOpen() {
-        if (closedOrFailed) {
+        if (isClosed.get()) {
             throw new EngineClosedException(shardId, failedEngine);
         }
     }
@@ -886,7 +891,7 @@ public class InternalEngine extends Engine {
                     indexWriter.forceMerge(maxNumSegments, false);
                 }
             } catch (Throwable t) {
-                maybeFailEngine(t, "optimize");
+                maybeFailEngine("optimize", t);
                 throw new OptimizeFailedEngineException(shardId, t);
             } finally {
                 optimizeMutex.set(false);
@@ -939,7 +944,7 @@ public class InternalEngine extends Engine {
         try {
             phase1Snapshot = deletionPolicy.snapshot();
         } catch (Throwable e) {
-            maybeFailEngine(e, "recovery");
+            maybeFailEngine("recovery", e);
             Releasables.closeWhileHandlingException(onGoingRecoveries);
             throw new RecoveryEngineException(shardId, 1, "Snapshot failed", e);
         }
@@ -947,7 +952,7 @@ public class InternalEngine extends Engine {
         try {
             recoveryHandler.phase1(phase1Snapshot);
         } catch (Throwable e) {
-            maybeFailEngine(e, "recovery phase 1");
+            maybeFailEngine("recovery phase 1", e);
             Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot);
             throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e));
         }
@@ -956,14 +961,14 @@ public class InternalEngine extends Engine {
         try {
             phase2Snapshot = translog.snapshot();
         } catch (Throwable e) {
-            maybeFailEngine(e, "snapshot recovery");
+            maybeFailEngine("snapshot recovery", e);
             Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot);
             throw new RecoveryEngineException(shardId, 2, "Snapshot failed", wrapIfClosed(e));
         }
         try {
             recoveryHandler.phase2(phase2Snapshot);
         } catch (Throwable e) {
-            maybeFailEngine(e, "recovery phase 2");
+            maybeFailEngine("recovery phase 2", e);
             Releasables.closeWhileHandlingException(onGoingRecoveries, phase1Snapshot, phase2Snapshot);
             throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e));
         }
@@ -977,7 +982,7 @@ public class InternalEngine extends Engine {
             recoveryHandler.phase3(phase3Snapshot);
             success = true;
         } catch (Throwable e) {
-            maybeFailEngine(e, "recovery phase 3");
+            maybeFailEngine("recovery phase 3", e);
             throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e));
         } finally {
             Releasables.close(success, onGoingRecoveries, writeLock, phase1Snapshot,
@@ -985,7 +990,7 @@ public class InternalEngine extends Engine {
         }
     }
 
-    private boolean maybeFailEngine(Throwable t, String source) {
+    private boolean maybeFailEngine(String source, Throwable t) {
         if (Lucene.isCorruptionException(t)) {
             if (engineConfig.isFailEngineOnCorruption()) {
                 failEngine("corrupt file detected source: [" + source + "]", t);
@@ -996,12 +1001,25 @@ public class InternalEngine extends Engine {
         } else if (ExceptionsHelper.isOOM(t)) {
             failEngine("out of memory", t);
             return true;
+        } else if (t instanceof AlreadyClosedException) {
+            // if we are already closed due to some tragic exception
+            // we need to fail the engine. it might have already been failed before
+            // but we are double-checking it's failed and closed
+            if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
+                failEngine("already closed by tragic event", indexWriter.getTragicException());
+            }
+            return true;
+        } else if (t != null && indexWriter.isOpen() == false && indexWriter.getTragicException() == t) {
+            // this spot on - we are handling the tragic event exception here so we have to fail the engine
+            // right away
+            failEngine(source, t);
+            return true;
         }
         return false;
     }
 
     private Throwable wrapIfClosed(Throwable t) {
-        if (closedOrFailed) {
+        if (isClosed.get()) {
             if (t != failedEngine && failedEngine != null) {
                 t.addSuppressed(failedEngine);
             }
@@ -1118,37 +1136,47 @@ public class InternalEngine extends Engine {
 
     @Override
     public void close() throws ElasticsearchException {
-        logger.debug("close now acquire writeLock");
-        try (ReleasableLock _ = writeLock.acquire()) {
-            logger.debug("close acquired writeLock");
-            if (isClosed.compareAndSet(false, true)) {
+        if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
+            logger.trace("close now acquire writeLock");
+            try (ReleasableLock _ = writeLock.acquire()) {
+                logger.trace("close now acquired writeLock");
+                closeNoLock("api");
+            }
+        }
+    }
+
+    /**
+     * Closes the engine without acquiring the write lock. This should only be
+     * called while the write lock is hold or in a disaster condition ie. if the engine
+     * is failed.
+     */
+    private void closeNoLock(String reason) throws ElasticsearchException {
+        if (isClosed.compareAndSet(false, true)) {
+            assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
+            try {
+                this.versionMap.clear();
+                logger.trace("close searcherManager");
                 try {
-                    closedOrFailed = true;
-                    this.versionMap.clear();
-                    logger.debug("close searcherManager");
-                    try {
-                        IOUtils.close(searcherManager);
-                    } catch (Throwable t) {
-                        logger.warn("Failed to close SearcherManager", t);
-                    }
-                    // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
-                    if (indexWriter != null) {
-                        logger.debug("rollback indexWriter");
-                        try {
-                            indexWriter.rollback();
-                        } catch (AlreadyClosedException e) {
-                            // ignore
-                        }
-                        logger.debug("rollback indexWriter done");
-                    }
-                } catch (Throwable e) {
-                    logger.warn("failed to rollback writer on close", e);
-                } finally {
-                    store.decRef();
-                    this.mergeScheduler.removeListener(mergeSchedulerListener);
-                    this.mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
-                    engineConfig.getIndexSettingsService().removeListener(listener);
+                    IOUtils.close(searcherManager);
+                } catch (Throwable t) {
+                    logger.warn("Failed to close SearcherManager", t);
                 }
+                // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
+                logger.trace("rollback indexWriter");
+                try {
+                    indexWriter.rollback();
+                } catch (AlreadyClosedException e) {
+                    // ignore
+                }
+                logger.trace("rollback indexWriter done");
+            } catch (Throwable e) {
+                logger.warn("failed to rollback writer on close", e);
+            } finally {
+                store.decRef();
+                this.mergeScheduler.removeListener(mergeSchedulerListener);
+                this.mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
+                engineConfig.getIndexSettingsService().removeListener(listener);
+                logger.debug("engine closed [{}]", reason);
             }
         }
     }
@@ -1157,8 +1185,11 @@ public class InternalEngine extends Engine {
     public void failEngine(String reason, Throwable failure) {
         assert failure != null;
         if (failEngineLock.tryLock()) {
+            store.incRef();
             try {
                 try {
+                    // we just go and close this engine - no way to recover
+                    closeNoLock("engine failed on: [" + reason + "]");
                     // we first mark the store as corrupted before we notify any listeners
                     // this must happen first otherwise we might try to reallocate so quickly
                     // on the same node that we don't see the corrupted marker file when
@@ -1184,14 +1215,7 @@ public class InternalEngine extends Engine {
                 // don't bubble up these exceptions up
                 logger.warn("failEngine threw exception", t);
             } finally {
-                closedOrFailed = true;
-                try (ReleasableLock _ = readLock.acquire()) {
-                    // we take the readlock here to ensure nobody replaces this IW concurrently.
-                    indexWriter.rollback();
-                } catch (Throwable t) {
-                    logger.warn("Rolling back indexwriter on engine failure failed", t);
-                    // to be on the safe side we just rollback the IW
-                }
+                store.decRef();
             }
         } else {
             logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
@@ -1252,7 +1276,7 @@ public class InternalEngine extends Engine {
                         }
                     } catch (Throwable t) {
                         // Don't fail a merge if the warm-up failed
-                        if (closedOrFailed == false) {
+                        if (isClosed.get() == false) {
                             logger.warn("Warm-up failed", t);
                         }
                         if (t instanceof Error) {
@@ -1324,7 +1348,7 @@ public class InternalEngine extends Engine {
                     }
                     warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new Searcher("warmer", searcher)));
                 } catch (Throwable e) {
-                    if (closedOrFailed == false) {
+                    if (isClosed.get() == false) {
                         logger.warn("failed to prepare/warm", e);
                     }
                 } finally {
@@ -1402,4 +1426,14 @@ public class InternalEngine extends Engine {
     EngineConfig config() {
         return engineConfig;
     }
+
+
+    private void commitIndexWriter(IndexWriter writer) throws IOException {
+        try {
+            writer.commit();
+        } catch (Throwable ex) {
+            failEngine("lucene commit failed", ex);
+            throw ex;
+        }
+    }
 }

+ 5 - 10
src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1084,17 +1084,12 @@ public class IndexShard extends AbstractIndexShardComponent {
         // called by the current engine
         @Override
         public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
-            try {
-                for (Engine.FailedEngineListener listener : delegates) {
-                    try {
-                        listener.onFailedEngine(shardId, reason, failure);
-                    } catch (Exception e) {
-                        logger.warn("exception while notifying engine failure", e);
-                    }
+            for (Engine.FailedEngineListener listener : delegates) {
+                try {
+                    listener.onFailedEngine(shardId, reason, failure);
+                } catch (Exception e) {
+                    logger.warn("exception while notifying engine failure", e);
                 }
-            } finally {
-                // close the engine all bets are off... don't use engine() here it can throw an exception
-                IOUtils.closeWhileHandlingException(currentEngineReference.get());
             }
         }
     }

+ 0 - 2
src/test/java/org/elasticsearch/search/basic/SearchWithRandomExceptionsTests.java

@@ -22,7 +22,6 @@ package org.elasticsearch.search.basic;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.util.English;
-import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@@ -52,7 +51,6 @@ import java.util.concurrent.ExecutionException;
 import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
 
-@LuceneTestCase.AwaitsFix(bugUrl = "Boaz Leskes: disabling this until further discussion. Recent failures probably relate to #9211 & #8720 (+ friends)")
 public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTest {
 
     @Test