Browse Source

Update translog policy before the next safe commit (#54839)

IndexShardIT#testMaybeFlush relies on the assumption that the safe commit 
and translog deletion policy have advanced after IndexShard#sync returns . 
This assumption does not hold if there's a race with the global checkpoint sync.
Closes #52223
Nhat Nguyen 5 years ago
parent
commit
3bfcc60cce

+ 5 - 5
server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

@@ -84,17 +84,17 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
             this.safeCommitInfo = SafeCommitInfo.EMPTY;
             this.lastCommit = commits.get(commits.size() - 1);
             this.safeCommit = commits.get(keptPosition);
-            if (keptPosition == commits.size() - 1) {
-                this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
-            } else {
-                this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
-            }
             for (int i = 0; i < keptPosition; i++) {
                 if (snapshottedCommits.containsKey(commits.get(i)) == false) {
                     deleteCommit(commits.get(i));
                 }
             }
             updateRetentionPolicy();
+            if (keptPosition == commits.size() - 1) {
+                this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
+            } else {
+                this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
+            }
             safeCommit = this.safeCommit;
         }
 

+ 39 - 0
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -977,6 +977,45 @@ public class InternalEngineTests extends EngineTestCase {
         assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L));
     }
 
+    public void testSyncTranslogConcurrently() throws Exception {
+        IOUtils.close(engine, store);
+        final Path translogPath = createTempDir();
+        store = createStore();
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get));
+        List<Engine.Operation> ops = generateHistoryOnReplica(between(1, 50), false, randomBoolean(), randomBoolean());
+        applyOperations(engine, ops);
+        engine.flush(true, true);
+        final CheckedRunnable<IOException> checker = () -> {
+            assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0));
+            assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get()));
+            try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
+                SequenceNumbers.CommitInfo commitInfo =
+                    SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getIndexCommit().getUserData().entrySet());
+                assertThat(commitInfo.localCheckpoint, equalTo(engine.getProcessedLocalCheckpoint()));
+            }
+        };
+        final Thread[] threads = new Thread[randomIntBetween(2, 4)];
+        final Phaser phaser = new Phaser(threads.length);
+        globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(() -> {
+                phaser.arriveAndAwaitAdvance();
+                try {
+                    engine.syncTranslog();
+                    checker.run();
+                } catch (IOException e) {
+                    throw new AssertionError(e);
+                }
+            });
+            threads[i].start();
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+        checker.run();
+    }
+
     public void testSyncedFlushSurvivesEngineRestart() throws IOException {
         final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
         IOUtils.close(store, engine);

+ 0 - 4
server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

@@ -81,7 +81,6 @@ import org.elasticsearch.test.DummyShardLock;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.test.IndexSettingsModule;
 import org.elasticsearch.test.InternalSettingsPlugin;
-import org.elasticsearch.test.junit.annotations.TestIssueLogging;
 import org.junit.Assert;
 
 import java.io.IOException;
@@ -319,9 +318,6 @@ public class IndexShardIT extends ESSingleNodeTestCase {
         assertPathHasBeenCleared(newIndexDataPath.toAbsolutePath());
     }
 
-    @TestIssueLogging(
-        value = "org.elasticsearch.index.engine:DEBUG",
-        issueUrl = "https://github.com/elastic/elasticsearch/issues/52223")
     public void testMaybeFlush() throws Exception {
         createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
             .build());