소스 검색

Add additional test for sequence-number recovery

This commit adds a test for a scenario where a replica receives an extra
document that the promoted replica does not receive, misses the
primary/replica re-sync, and the recovers from the newly-promoted
primary.

Relates #25493
Jason Tedor 8 년 전
부모
커밋
dd93ef3f24

+ 1 - 0
core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -356,6 +356,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
             final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
             final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint);
             if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) {
+                assert seqNoStats.getLocalCheckpoint() <= seqNoStats.getGlobalCheckpoint();
                 /*
                  * Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global
                  * checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation

+ 63 - 0
core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

@@ -29,11 +29,15 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineConfig;
 import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.engine.InternalEngineTests;
+import org.elasticsearch.index.mapper.SourceToParse;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
 import org.elasticsearch.index.store.Store;
@@ -147,6 +151,65 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
         }
     }
 
+    /*
+     * Simulate a scenario with two replicas where one of the replicas receives an extra document, the other replica is promoted on primary
+     * failure, the receiving replica misses the primary/replica re-sync and then recovers from the primary. We expect that a
+     * sequence-number based recovery is performed and the extra document does not remain after recovery.
+     */
+    public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception {
+        try (ReplicationGroup shards = createGroup(2)) {
+            shards.startAll();
+            final int docs = randomIntBetween(0, 16);
+            for (int i = 0; i < docs; i++) {
+                shards.index(
+                        new IndexRequest("index", "type", Integer.toString(i)).source("{}", XContentType.JSON));
+            }
+
+            shards.flush();
+            shards.syncGlobalCheckpoint();
+
+            final IndexShard oldPrimary = shards.getPrimary();
+            final IndexShard promotedReplica = shards.getReplicas().get(0);
+            final IndexShard remainingReplica = shards.getReplicas().get(1);
+            // slip the extra document into the replica
+            remainingReplica.applyIndexOperationOnReplica(
+                    remainingReplica.getLocalCheckpoint() + 1,
+                    remainingReplica.getPrimaryTerm(),
+                    1,
+                    VersionType.EXTERNAL,
+                    randomNonNegativeLong(),
+                    false,
+                    SourceToParse.source("index", "type", "replica", new BytesArray("{}"), XContentType.JSON),
+                    mapping -> {});
+            shards.promoteReplicaToPrimary(promotedReplica);
+            oldPrimary.close("demoted", randomBoolean());
+            oldPrimary.store().close();
+            shards.removeReplica(remainingReplica);
+            remainingReplica.close("disconnected", false);
+            remainingReplica.store().close();
+            // randomly introduce a conflicting document
+            final boolean extra = randomBoolean();
+            if (extra) {
+                promotedReplica.applyIndexOperationOnPrimary(
+                        Versions.MATCH_ANY,
+                        VersionType.INTERNAL,
+                        SourceToParse.source("index", "type", "primary", new BytesArray("{}"), XContentType.JSON),
+                        randomNonNegativeLong(),
+                        false,
+                        mapping -> {
+                        });
+            }
+            final IndexShard recoveredReplica =
+                    shards.addReplicaWithExistingPath(remainingReplica.shardPath(), remainingReplica.routingEntry().currentNodeId());
+            shards.recoverReplica(recoveredReplica);
+
+            assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty());
+            assertThat(recoveredReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(extra ? 1 : 0));
+
+            shards.assertAllEqual(docs + (extra ? 1 : 0));
+        }
+    }
+
     @TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.indices.recovery:TRACE")
     public void testRecoveryAfterPrimaryPromotion() throws Exception {
         try (ReplicationGroup shards = createGroup(2)) {