Browse Source

Do not add noop from local translog to translog again (#29637)

Today we always add no-ops to translog regardless of its origin, thus a
noop may appear in the translog multiple times. This is not a big deal
as noops are small and rare to appear.

This commit ensures to add a noop to translog only if its origin is not
from local translog. This restriction has been applied for index and
delete.
Nhat Nguyen 7 years ago
parent
commit
52c50e353b

+ 4 - 2
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -1269,8 +1269,10 @@ public class InternalEngine extends Engine {
         final long seqNo = noOp.seqNo();
         try {
             final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
-            final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
-            noOpResult.setTranslogLocation(location);
+            if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
+                final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
+                noOpResult.setTranslogLocation(location);
+            }
             noOpResult.setTook(System.nanoTime() - noOp.startTime());
             noOpResult.freeze();
             return noOpResult;

+ 7 - 9
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -3724,15 +3724,13 @@ public class InternalEngineTests extends EngineTestCase {
             noOpEngine.recoverFromTranslog();
             final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get());
             final String reason = randomAlphaOfLength(16);
-            noOpEngine.noOp(
-                    new Engine.NoOp(
-                            maxSeqNo + 1,
-                            primaryTerm.get(),
-                            randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY),
-                            System.nanoTime(),
-                            reason));
+            noOpEngine.noOp(new Engine.NoOp(maxSeqNo + 1, primaryTerm.get(), LOCAL_TRANSLOG_RECOVERY, System.nanoTime(), reason));
             assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1)));
-            assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(1 + gapsFilled));
+            assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(gapsFilled));
+            noOpEngine.noOp(
+                new Engine.NoOp(maxSeqNo + 2, primaryTerm.get(), randomFrom(PRIMARY, REPLICA, PEER_RECOVERY), System.nanoTime(), reason));
+            assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 2)));
+            assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(gapsFilled + 1));
             // skip to the op that we added to the translog
             Translog.Operation op;
             Translog.Operation last = null;
@@ -3744,7 +3742,7 @@ public class InternalEngineTests extends EngineTestCase {
             assertNotNull(last);
             assertThat(last, instanceOf(Translog.NoOp.class));
             final Translog.NoOp noOp = (Translog.NoOp) last;
-            assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 1)));
+            assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 2)));
             assertThat(noOp.primaryTerm(), equalTo(primaryTerm.get()));
             assertThat(noOp.reason(), equalTo(reason));
         } finally {

+ 10 - 0
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -1661,6 +1661,16 @@ public class IndexShardTests extends IndexShardTestCase {
         IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
         assertDocCount(newShard, 1);
         assertDocCount(shard, 2);
+
+        for (int i = 0; i < 2; i++) {
+            newShard = reinitShard(newShard, ShardRoutingHelper.initWithSameId(primaryShardRouting,
+                RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE));
+            newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
+            assertTrue(newShard.recoverFromStore());
+            try (Translog.Snapshot snapshot = getTranslog(newShard).newSnapshot()) {
+                assertThat(snapshot.totalOperations(), equalTo(2));
+            }
+        }
         closeShards(newShard, shard);
     }