Browse Source

Relax maxSeqNoOfUpdates assertion in FollowingEngine (#47188)

We disable MSU optimization if the local checkpoint is smaller than
max_seq_no_of_updates. Hence, we need to relax the MSU assertion in
FollowingEngine for that scenario. Suppose the leader has three
operations: index-0, delete-1, and index-2 for the same doc Id. MSU on
the leader is 1 as index-2 is an append. If the follower applies index-0
then index-2, then the assertion is violated.

Closes #47137
Nhat Nguyen 6 years ago
parent
commit
fbeb5a3829

+ 8 - 4
test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java

@@ -23,6 +23,7 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.index.analysis.AnalyzerScope;
 import org.elasticsearch.index.analysis.IndexAnalyzers;
@@ -117,20 +118,23 @@ public class TranslogHandler implements Engine.TranslogRecoveryRunner {
         return opsRecovered;
     }
 
-    private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
+    public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
+        // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
+        final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
         switch (operation.opType()) {
             case INDEX:
                 final Translog.Index index = (Translog.Index) operation;
                 final String indexName = mapperService.index().getName();
                 final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
                     new SourceToParse(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source()),
-                            index.routing()), index.seqNo(), index.primaryTerm(),
-                        index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
+                        index.routing()), index.seqNo(), index.primaryTerm(), index.version(), versionType, origin,
+                    index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
                 return engineIndex;
             case DELETE:
                 final Translog.Delete delete = (Translog.Delete) operation;
                 final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
-                        delete.primaryTerm(), delete.version(), null, origin, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
+                    delete.primaryTerm(), delete.version(), versionType, origin, System.nanoTime(),
+                    SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
                 return engineDelete;
             case NO_OP:
                 final Translog.NoOp noOp = (Translog.NoOp) operation;

+ 7 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java

@@ -16,6 +16,7 @@ import org.apache.lucene.search.DocValuesFieldExistsQuery;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.TopDocs;
+import org.elasticsearch.Assertions;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.index.VersionType;
@@ -117,7 +118,12 @@ public final class FollowingEngine extends InternalEngine {
 
     @Override
     protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) {
-        assert getMaxSeqNoOfUpdatesOrDeletes() >= seqNo : seqNo + " < " + getMaxSeqNoOfUpdatesOrDeletes();
+        if (Assertions.ENABLED) {
+            final long localCheckpoint = getProcessedLocalCheckpoint();
+            final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
+            assert localCheckpoint < maxSeqNoOfUpdates || maxSeqNoOfUpdates >= seqNo :
+                "maxSeqNoOfUpdates is not advanced local_checkpoint=" + localCheckpoint + " msu=" + maxSeqNoOfUpdates + " seq_no=" + seqNo;
+        }
         super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code
     }
 

+ 7 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

@@ -484,7 +484,7 @@ public class FollowingEngineTests extends ESTestCase {
                 for (Thread thread : threads) {
                     thread.join();
                 }
-                assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
+                assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
                 assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
                 EngineTestCase.assertConsistentHistoryBetweenTranslogAndLuceneIndex(follower, createMapperService("test"));
                 EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(follower);
@@ -535,7 +535,12 @@ public class FollowingEngineTests extends ESTestCase {
                     try (Translog.Snapshot snapshot =
                              shuffleSnapshot(leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true))) {
                         follower.advanceMaxSeqNoOfUpdatesOrDeletes(leader.getMaxSeqNoOfUpdatesOrDeletes());
-                        translogHandler.run(follower, snapshot);
+                        Translog.Operation op;
+                        while ((op = snapshot.next()) != null) {
+                            EngineTestCase.applyOperation(follower,
+                                translogHandler.convertToEngineOp(op, randomFrom(Engine.Operation.Origin.values())));
+                        }
+                        follower.syncTranslog();
                     }
                 }
             }