Browse Source

Ignore Lucene index in peer recovery if translog corrupted (#49114)

If the translog on a replica is corrupt, we should not perform an 
operation-based recovery or utilize sync_id as we won't be able to open
an engine in the next step. This change adds an extra validation that
ensures translog is okay when preparing a peer recovery request.
Nhat Nguyen 5 years ago
parent
commit
5aa5d7b54b

+ 13 - 0
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -50,6 +50,8 @@ import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardNotFoundException;
 import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.index.translog.TranslogCorruptedException;
 import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -327,6 +329,17 @@ public class PeerRecoveryTargetService implements IndexEventListener {
         Store.MetadataSnapshot metadataSnapshot;
         try {
             metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
+            // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.
+            try {
+                final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
+                final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
+                assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
+            } catch (IOException | TranslogCorruptedException e) {
+                logger.warn(new ParameterizedMessage("error while reading global checkpoint from translog, " +
+                    "resetting the starting sequence number from {} to unassigned and recovering as if there are none", startingSeqNo), e);
+                metadataSnapshot = Store.MetadataSnapshot.EMPTY;
+                startingSeqNo = UNASSIGNED_SEQ_NO;
+            }
         } catch (final org.apache.lucene.index.IndexNotFoundException e) {
             // happens on an empty folder. no need to log
             assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo;

+ 30 - 0
server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

@@ -35,6 +35,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.NoOpEngine;
 import org.elasticsearch.index.mapper.SourceToParse;
@@ -60,6 +61,7 @@ import java.util.stream.LongStream;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.sameInstance;
 
 public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
 
@@ -286,4 +288,32 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         recoveryTarget.decRef();
         closeShards(shard);
     }
+
+    public void testResetStartRequestIfTranslogIsCorrupted() throws Exception {
+        DiscoveryNode pNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
+            Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
+        DiscoveryNode rNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
+            Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
+        IndexShard shard = newStartedShard(false);
+        final SeqNoStats seqNoStats = populateRandomData(shard);
+        shard.close("test", false);
+        if (randomBoolean()) {
+            shard.store().associateIndexWithNewTranslog(UUIDs.randomBase64UUID());
+        } else if (randomBoolean()) {
+            Translog.createEmptyTranslog(
+                shard.shardPath().resolveTranslog(), seqNoStats.getGlobalCheckpoint(), shard.shardId(), shard.getOperationPrimaryTerm());
+        } else {
+            IOUtils.rm(shard.shardPath().resolveTranslog());
+        }
+        shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE));
+        shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode));
+        shard.prepareForIndexRecovery();
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null);
+        StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
+            logger, rNode, recoveryTarget, randomNonNegativeLong());
+        assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
+        assertThat(request.metadataSnapshot(), sameInstance(Store.MetadataSnapshot.EMPTY));
+        recoveryTarget.decRef();
+        closeShards(shard);
+    }
 }