Browse Source

Restore local history from translog on promotion (#33616)

If a shard was serving as a replica when another shard was promoted to
primary, then its Lucene index was reset to the global checkpoint.
However, if the new primary fails before the primary/replica resync
completes and we are now being promoted, we have to restore the reverted
operations by replaying the translog to avoid losing acknowledged writes.

Relates #33473
Relates #32867
Nhat Nguyen 7 years ago
parent
commit
002f763c48

+ 0 - 2
qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

@@ -111,7 +111,6 @@ public class RecoveryIT extends AbstractRollingTestCase {
         return future;
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616")
     public void testRecoveryWithConcurrentIndexing() throws Exception {
         final String index = "recovery_with_concurrent_indexing";
         Response response = client().performRequest(new Request("GET", "_nodes"));
@@ -184,7 +183,6 @@ public class RecoveryIT extends AbstractRollingTestCase {
     }
 
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/33616")
     public void testRelocationWithConcurrentIndexing() throws Exception {
         final String index = "relocation_with_concurrent_indexing";
         switch (CLUSTER_TYPE) {

+ 4 - 4
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -1720,12 +1720,12 @@ public abstract class Engine implements Closeable {
     public abstract void deactivateThrottling();
 
     /**
-     * Marks operations in the translog as completed. This is used to restore the state of the local checkpoint tracker on primary
-     * promotion.
+     * This method replays translog to restore the Lucene index which might be reverted previously.
+     * This ensures that all acknowledged writes are restored correctly when this engine is promoted.
      *
-     * @throws IOException if an I/O exception occurred reading the translog
+     * @return the number of translog operations have been recovered
      */
-    public abstract void restoreLocalCheckpointFromTranslog() throws IOException;
+    public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;
 
     /**
      * Fills up the local checkpoints history with no-ops until the local checkpoint

+ 3 - 8
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -332,17 +332,12 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public void restoreLocalCheckpointFromTranslog() throws IOException {
-        try (ReleasableLock ignored = writeLock.acquire()) {
+    public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
+        try (ReleasableLock ignored = readLock.acquire()) {
             ensureOpen();
             final long localCheckpoint = localCheckpointTracker.getCheckpoint();
             try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
-                Translog.Operation operation;
-                while ((operation = snapshot.next()) != null) {
-                    if (operation.seqNo() > localCheckpoint) {
-                        localCheckpointTracker.markSeqNoAsCompleted(operation.seqNo());
-                    }
-                }
+                return translogRecoveryRunner.run(this, snapshot);
             }
         }
     }

+ 2 - 1
server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

@@ -344,7 +344,8 @@ public final class ReadOnlyEngine extends Engine {
     }
 
     @Override
-    public void restoreLocalCheckpointFromTranslog() {
+    public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) {
+        return 0;
     }
 
     @Override

+ 12 - 11
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -494,17 +494,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                             try {
                                 replicationTracker.activatePrimaryMode(getLocalCheckpoint());
                                 /*
-                                 * If this shard was serving as a replica shard when another shard was promoted to primary then the state of
-                                 * its local checkpoint tracker was reset during the primary term transition. In particular, the local
-                                 * checkpoint on this shard was thrown back to the global checkpoint and the state of the local checkpoint
-                                 * tracker above the local checkpoint was destroyed. If the other shard that was promoted to primary
-                                 * subsequently fails before the primary/replica re-sync completes successfully and we are now being
-                                 * promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
-                                 * numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
-                                 * replaying the translog and marking any operations there are completed.
+                                 * If this shard was serving as a replica shard when another shard was promoted to primary then
+                                 * its Lucene index was reset during the primary term transition. In particular, the Lucene index
+                                 * on this shard was reset to the global checkpoint and the operations above the local checkpoint
+                                 * were reverted. If the other shard that was promoted to primary subsequently fails before the
+                                 * primary/replica re-sync completes successfully and we are now being promoted, we have to restore
+                                 * the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
                                  */
                                 final Engine engine = getEngine();
-                                engine.restoreLocalCheckpointFromTranslog();
+                                engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
+                                    runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
                                 /* Rolling the translog generation is not strictly needed here (as we will never have collisions between
                                  * sequence numbers in a translog generation in a new primary as it takes the last known sequence number
                                  * as a starting point), but it simplifies reasoning about the relationship between primary terms and
@@ -1452,9 +1451,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         } else {
             if (origin == Engine.Operation.Origin.PRIMARY) {
                 assert assertPrimaryMode();
-            } else {
-                assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET;
+            } else if (origin == Engine.Operation.Origin.REPLICA) {
                 assert assertReplicationTarget();
+            } else {
+                assert origin == Engine.Operation.Origin.LOCAL_RESET;
+                assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
             }
             if (writeAllowedStates.contains(state) == false) {
                 throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]");

+ 42 - 45
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -148,6 +148,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.seqno.ReplicationTracker;
+import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
 import org.elasticsearch.index.shard.ShardId;
@@ -4047,56 +4048,52 @@ public class InternalEngineTests extends EngineTestCase {
         }
     }
 
-    public void testRestoreLocalCheckpointFromTranslog() throws IOException {
-        engine.close();
-        InternalEngine actualEngine = null;
-        try {
-            final Set<Long> completedSeqNos = new HashSet<>();
-            final BiFunction<Long, Long, LocalCheckpointTracker> supplier = (maxSeqNo, localCheckpoint) -> new LocalCheckpointTracker(
-                    maxSeqNo,
-                    localCheckpoint) {
-                @Override
-                public void markSeqNoAsCompleted(long seqNo) {
-                    super.markSeqNoAsCompleted(seqNo);
-                    completedSeqNos.add(seqNo);
-                }
-            };
-            trimUnsafeCommits(engine.config());
-            actualEngine = new InternalEngine(engine.config(), supplier);
-            final int operations = randomIntBetween(0, 1024);
-            final Set<Long> expectedCompletedSeqNos = new HashSet<>();
-            for (int i = 0; i < operations; i++) {
-                if (rarely() && i < operations - 1) {
+    public void testRestoreLocalHistoryFromTranslog() throws IOException {
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        try (Store store = createStore()) {
+            final ArrayList<Long> seqNos = new ArrayList<>();
+            final int numOps = randomIntBetween(0, 1024);
+            for (int i = 0; i < numOps; i++) {
+                if (rarely()) {
                     continue;
                 }
-                expectedCompletedSeqNos.add((long) i);
+                seqNos.add((long) i);
             }
-
-            final ArrayList<Long> seqNos = new ArrayList<>(expectedCompletedSeqNos);
             Randomness.shuffle(seqNos);
-            for (final long seqNo : seqNos) {
-                final String id = Long.toString(seqNo);
-                final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
-                final Term uid = newUid(doc);
-                final long time = System.nanoTime();
-                actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, null, REPLICA, time, time, false));
-                if (rarely()) {
-                    actualEngine.rollTranslogGeneration();
+            final EngineConfig engineConfig;
+            final SeqNoStats prevSeqNoStats;
+            final List<DocIdSeqNoAndTerm> prevDocs;
+            final int totalTranslogOps;
+            try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
+                engineConfig = engine.config();
+                for (final long seqNo : seqNos) {
+                    final String id = Long.toString(seqNo);
+                    final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
+                    engine.index(replicaIndexForDoc(doc, 1, seqNo, false));
+                    if (rarely()) {
+                        engine.rollTranslogGeneration();
+                    }
+                    if (rarely()) {
+                        engine.flush();
+                    }
                 }
-            }
-            final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint();
-            final long resetLocalCheckpoint =
-                    randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
-            actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint);
-            completedSeqNos.clear();
-            actualEngine.restoreLocalCheckpointFromTranslog();
-            final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
-            intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet()));
-            assertThat(completedSeqNos, equalTo(intersection));
-            assertThat(actualEngine.getLocalCheckpoint(), equalTo(currentLocalCheckpoint));
-            assertThat(generateNewSeqNo(actualEngine), equalTo((long) operations));
-        } finally {
-            IOUtils.close(actualEngine);
+                globalCheckpoint.set(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, engine.getLocalCheckpoint()));
+                engine.syncTranslog();
+                prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
+                prevDocs = getDocIds(engine, true);
+                totalTranslogOps = engine.getTranslog().totalOperations();
+            }
+            trimUnsafeCommits(engineConfig);
+            try (InternalEngine engine = new InternalEngine(engineConfig)) {
+                engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
+                engine.restoreLocalHistoryFromTranslog(translogHandler);
+                assertThat(getDocIds(engine, true), equalTo(prevDocs));
+                SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
+                assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint()));
+                assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo()));
+                assertThat(engine.getTranslog().totalOperations(), equalTo(totalTranslogOps));
+            }
+            assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
         }
     }
 

+ 5 - 10
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -896,23 +896,17 @@ public class IndexShardTests extends IndexShardTestCase {
         closeShards(replicaShard, primaryShard);
     }
 
-    public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException {
+    public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
         final IndexShard indexShard = newStartedShard(false);
         final int operations = 1024 - scaledRandomIntBetween(0, 1024);
         indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
 
         final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
-        final long globalCheckpointOnReplica = SequenceNumbers.UNASSIGNED_SEQ_NO;
-        randomIntBetween(
-                Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
-                Math.toIntExact(indexShard.getLocalCheckpoint()));
+        final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
         indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
 
-        final int globalCheckpoint =
-                randomIntBetween(
-                        Math.toIntExact(SequenceNumbers.UNASSIGNED_SEQ_NO),
-                        Math.toIntExact(indexShard.getLocalCheckpoint()));
-
+        final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
+        final Set<String> docsBeforeRollback = getShardDocUIDs(indexShard);
         final CountDownLatch latch = new CountDownLatch(1);
         indexShard.acquireReplicaOperationPermit(
                 indexShard.getPendingPrimaryTerm() + 1,
@@ -946,6 +940,7 @@ public class IndexShardTests extends IndexShardTestCase {
         resyncLatch.await();
         assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
         assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
+        assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback));
         closeShard(indexShard, false);
     }