فهرست منبع

Fix Local Translog Recovery not Updating Safe Commit in Edge Case (#57350)

In case the local checkpoint in the latest commit is less
than the last processed local checkpoint we would recover
0 ops and hence not commit again.
This would lead to the logic in `IndexShard#recoverLocallyUpToGlobalCheckpoint`
not seeing the latest local checkpoint when it reload the safe commit from the store
and thus cause inefficient recoveries because the recoveries would work from a
lower than possible local checkpoint.

Closes #57010
Armin Braun 5 سال پیش
والد
کامیت
2ef82cd7f9

+ 10 - 18
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.index.engine;
 
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.index.DirectoryReader;
@@ -440,7 +441,6 @@ public class InternalEngine extends Engine {
 
     @Override
     public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
-        flushLock.lock();
         try (ReleasableLock lock = readLock.acquire()) {
             ensureOpen();
             if (pendingTranslogRecovery.get() == false) {
@@ -457,8 +457,6 @@ public class InternalEngine extends Engine {
                 }
                 throw e;
             }
-        } finally {
-            flushLock.unlock();
         }
         return this;
     }
@@ -485,13 +483,10 @@ public class InternalEngine extends Engine {
         // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
         assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
         pendingTranslogRecovery.set(false); // we are good - now we can commit
-        if (opsRecovered > 0) {
-            logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
-                opsRecovered, translog.currentFileGeneration());
-            commitIndexWriter(indexWriter, translog);
-            refreshLastCommittedSegmentInfos();
-            refresh("translog_recovery");
-        }
+        logger.trace(() -> new ParameterizedMessage(
+                "flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
+                opsRecovered, translog.currentFileGeneration()));
+        flush(false, true);
         translog.trimUnreferencedReaders();
     }
 
@@ -1633,12 +1628,6 @@ public class InternalEngine extends Engine {
             throw new IllegalArgumentException(
                 "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing);
         }
-        /*
-         * Unfortunately the lock order is important here. We have to acquire the readlock first otherwise
-         * if we are flushing at the end of the recovery while holding the write lock we can deadlock if:
-         *  Thread 1: flushes via API and gets the flush lock but blocks on the readlock since Thread 2 has the writeLock
-         *  Thread 2: flushes at the end of the recovery holding the writeLock and blocks on the flushLock owned by Thread 1
-         */
         try (ReleasableLock lock = readLock.acquire()) {
             ensureOpen();
             if (flushLock.tryLock() == false) {
@@ -1654,10 +1643,13 @@ public class InternalEngine extends Engine {
             }
             try {
                 // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
-                // newly created commit points to a different translog generation (can free translog)
+                // newly created commit points to a different translog generation (can free translog),
+                // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries.
                 boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges();
                 boolean shouldPeriodicallyFlush = shouldPeriodicallyFlush();
-                if (hasUncommittedChanges || force || shouldPeriodicallyFlush) {
+                if (hasUncommittedChanges || force || shouldPeriodicallyFlush
+                        || getProcessedLocalCheckpoint() > Long.parseLong(
+                                lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))) {
                     ensureCanFlush();
                     try {
                         translog.rollGeneration();

+ 4 - 2
server/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -1615,13 +1615,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     }
 
     /**
-     * Roll the current translog generation into a new generation. This does not commit the
-     * translog.
+     * Roll the current translog generation into a new generation if it's not empty. This does not commit the translog.
      *
      * @throws IOException if an I/O exception occurred during any file operations
      */
     public void rollGeneration() throws IOException {
         syncBeforeRollGeneration();
+        if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
+            return;
+        }
         try (Releasable ignored = writeLock.acquire()) {
             ensureOpen();
             try {

+ 8 - 7
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -568,6 +568,7 @@ public class InternalEngineTests extends EngineTestCase {
         }
         try (Engine recoveringEngine = new InternalEngine(engine.config())) {
             recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+            recoveringEngine.refresh("test");
             try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
                 final TotalHitCountCollector collector = new TotalHitCountCollector();
                 searcher.search(new MatchAllDocsQuery(), collector);
@@ -636,6 +637,7 @@ public class InternalEngineTests extends EngineTestCase {
             initialEngine.close();
             recoveringEngine = new InternalEngine(initialEngine.config());
             recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+            recoveringEngine.refresh("test");
             try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
                 TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), docs);
                 assertEquals(docs, topDocs.totalHits.value);
@@ -961,13 +963,13 @@ public class InternalEngineTests extends EngineTestCase {
         assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
 
         engine.flush(true, true);
-        assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L));
-        assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 4L : 2L));
+        assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
+        assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
 
         globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
         engine.flush(true, true);
-        assertThat(engine.getTranslog().currentFileGeneration(), equalTo(5L));
-        assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L));
+        assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
+        assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(3L));
     }
 
     public void testSyncTranslogConcurrently() throws Exception {
@@ -2790,11 +2792,10 @@ public class InternalEngineTests extends EngineTestCase {
         assertEquals(flush ? 1 : 2, translogHandler.appliedOperations());
         engine.delete(new Engine.Delete(Integer.toString(randomId), newUid(doc), primaryTerm.get()));
         if (randomBoolean()) {
-            engine.refresh("test");
-        } else {
             engine.close();
             engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
         }
+        engine.refresh("test");
         try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
             TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), numDocs);
             assertThat(topDocs.totalHits.value, equalTo((long) numDocs));
@@ -2856,7 +2857,7 @@ public class InternalEngineTests extends EngineTestCase {
         expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
 
         engine = createEngine(store, primaryTranslogDir); // and recover again!
-        assertVisibleCount(engine, numDocs, false);
+        assertVisibleCount(engine, numDocs, true);
     }
 
     public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {

+ 1 - 0
server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -3190,6 +3190,7 @@ public class TranslogTests extends ESTestCase {
 
         try (Translog brokenTranslog = create(filterFileSystemProvider.getPath(path.toUri()))) {
             failOnCopy.set(true);
+            primaryTerm.incrementAndGet(); // increment primary term to force rolling generation
             assertThat(expectThrows(IOException.class, brokenTranslog::rollGeneration).getMessage(), equalTo(expectedExceptionMessage));
             assertFalse(brokenTranslog.isOpen());