Kaynağa Gözat

Keep commits and translog up to the global checkpoint (#27606)

We need to keep index commits and translog operations up to the current 
global checkpoint to allow us to throw away unsafe operations and
increase the operation-based recovery chance. This is achieved by a new
index deletion policy.

Relates #10708
Nhat Nguyen 7 yıl önce
ebeveyn
işleme
57fc705d5e

+ 62 - 25
core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

@@ -21,43 +21,48 @@ package org.elasticsearch.index.engine;
 
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogDeletionPolicy;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import java.util.function.LongSupplier;
 
 /**
  * An {@link IndexDeletionPolicy} that coordinates between Lucene's commits and the retention of translog generation files,
  * making sure that all translog files that are needed to recover from the Lucene commit are not deleted.
+ * <p>
+ * In particular, this policy will delete index commits whose max sequence number is at most
+ * the current global checkpoint except the index commit which has the highest max sequence number among those.
  */
-class CombinedDeletionPolicy extends IndexDeletionPolicy {
-
+final class CombinedDeletionPolicy extends IndexDeletionPolicy {
     private final TranslogDeletionPolicy translogDeletionPolicy;
     private final EngineConfig.OpenMode openMode;
+    private final LongSupplier globalCheckpointSupplier;
 
-    private final SnapshotDeletionPolicy indexDeletionPolicy;
-
-    CombinedDeletionPolicy(SnapshotDeletionPolicy indexDeletionPolicy, TranslogDeletionPolicy translogDeletionPolicy,
-                           EngineConfig.OpenMode openMode) {
-        this.indexDeletionPolicy = indexDeletionPolicy;
-        this.translogDeletionPolicy = translogDeletionPolicy;
+    CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
+                           LongSupplier globalCheckpointSupplier) {
         this.openMode = openMode;
+        this.translogDeletionPolicy = translogDeletionPolicy;
+        this.globalCheckpointSupplier = globalCheckpointSupplier;
     }
 
     @Override
     public void onInit(List<? extends IndexCommit> commits) throws IOException {
-        indexDeletionPolicy.onInit(commits);
         switch (openMode) {
             case CREATE_INDEX_AND_TRANSLOG:
+                assert commits.isEmpty() : "index is created, but we have commits";
                 break;
             case OPEN_INDEX_CREATE_TRANSLOG:
                 assert commits.isEmpty() == false : "index is opened, but we have no commits";
+                // When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately.
+                // We therefore can simply skip processing here as `onCommit` will be called right after with a new commit.
                 break;
             case OPEN_INDEX_AND_TRANSLOG:
                 assert commits.isEmpty() == false : "index is opened, but we have no commits";
-                setLastCommittedTranslogGeneration(commits);
+                onCommit(commits);
                 break;
             default:
                 throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
@@ -66,24 +71,56 @@ class CombinedDeletionPolicy extends IndexDeletionPolicy {
 
     @Override
     public void onCommit(List<? extends IndexCommit> commits) throws IOException {
-        indexDeletionPolicy.onCommit(commits);
-        setLastCommittedTranslogGeneration(commits);
+        final int keptPosition = indexOfKeptCommits(commits);
+        for (int i = 0; i < keptPosition; i++) {
+            commits.get(i).delete();
+        }
+        updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1));
     }
 
-    private void setLastCommittedTranslogGeneration(List<? extends IndexCommit> commits) throws IOException {
-        // when opening an existing lucene index, we currently always open the last commit.
-        // we therefore use the translog gen as the one that will be required for recovery
-        final IndexCommit indexCommit = commits.get(commits.size() - 1);
-        assert indexCommit.isDeleted() == false : "last commit is deleted";
-        long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
-        translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen);
-    }
+    private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException {
+        assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted";
+        final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
+
+        assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
+        final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
 
-    public SnapshotDeletionPolicy getIndexDeletionPolicy() {
-        return indexDeletionPolicy;
+        assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
+        translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
+        translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
     }
 
-    public TranslogDeletionPolicy getTranslogDeletionPolicy() {
-        return translogDeletionPolicy;
+    /**
+     * Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
+     * Index commits with different translog UUID will be filtered out as they don't belong to this engine.
+     */
+    private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
+        final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
+        final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
+
+        // Commits are sorted by age (the 0th one is the oldest commit).
+        for (int i = commits.size() - 1; i >= 0; i--) {
+            final Map<String, String> commitUserData = commits.get(i).getUserData();
+            // Ignore index commits with different translog uuid.
+            if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
+                return i + 1;
+            }
+            // 5.x commits do not contain MAX_SEQ_NO.
+            if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
+                return i;
+            }
+            final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
+            if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
+                return i;
+            }
+        }
+        /*
+         * We may reach to this point in these cases:
+         * 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress.
+         * Thus, after upgrading, we may not find a safe commit until we can reserve one.
+         * 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary.
+         * However, that commit may not be a safe commit if writes are in progress in the primary.
+         */
+        return 0;
     }
 }

+ 31 - 28
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -25,7 +25,6 @@ import org.apache.lucene.index.IndexFormatTooOldException;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LiveIndexWriterConfig;
 import org.apache.lucene.index.MergePolicy;
@@ -128,7 +127,7 @@ public class InternalEngine extends Engine {
 
     private final String uidField;
 
-    private final CombinedDeletionPolicy deletionPolicy;
+    private final SnapshotDeletionPolicy snapshotDeletionPolicy;
 
     // How many callers are currently requesting index throttling.  Currently there are only two situations where we do this: when merges
     // are falling behind and when writing indexing buffer to disk is too slow.  When this is 0, there is no throttling, else we throttling
@@ -167,8 +166,6 @@ public class InternalEngine extends Engine {
                 engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
                 engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
         );
-        this.deletionPolicy = new CombinedDeletionPolicy(
-                new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
         store.incRef();
         IndexWriter writer = null;
         Translog translog = null;
@@ -182,30 +179,19 @@ public class InternalEngine extends Engine {
             mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
             throttle = new IndexThrottle();
             try {
-                final SeqNoStats seqNoStats;
-                switch (openMode) {
-                    case OPEN_INDEX_AND_TRANSLOG:
-                        writer = createWriter(false);
-                        final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
-                        seqNoStats = store.loadSeqNoStats(globalCheckpoint);
-                        break;
-                    case OPEN_INDEX_CREATE_TRANSLOG:
-                        writer = createWriter(false);
-                        seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
-                        break;
-                    case CREATE_INDEX_AND_TRANSLOG:
-                        writer = createWriter(true);
-                        seqNoStats = new SeqNoStats(
-                                SequenceNumbers.NO_OPS_PERFORMED,
-                                SequenceNumbers.NO_OPS_PERFORMED,
-                                SequenceNumbers.UNASSIGNED_SEQ_NO);
-                        break;
-                    default:
-                        throw new IllegalArgumentException(openMode.toString());
-                }
+                final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
                 logger.trace("recovered [{}]", seqNoStats);
-                seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
+                this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
+                this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
+                    new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint)
+                );
+                writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
                 updateMaxUnsafeAutoIdTimestampFromWriter(writer);
+                assert engineConfig.getForceNewHistoryUUID() == false
+                    || openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
+                    || openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
+                    : "OpenMode must be either CREATE_INDEX_AND_TRANSLOG or OPEN_INDEX_CREATE_TRANSLOG if forceNewHistoryUUID; " +
+                    "openMode [" + openMode + "], forceNewHistoryUUID [" + engineConfig.getForceNewHistoryUUID() + "]";
                 historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
                 Objects.requireNonNull(historyUUID, "history uuid should not be null");
                 indexWriter = writer;
@@ -380,6 +366,23 @@ public class InternalEngine extends Engine {
             seqNoStats.getGlobalCheckpoint());
     }
 
+    private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOException {
+        switch (openMode) {
+            case OPEN_INDEX_AND_TRANSLOG:
+                final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
+                return store.loadSeqNoStats(globalCheckpoint);
+            case OPEN_INDEX_CREATE_TRANSLOG:
+                return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
+            case CREATE_INDEX_AND_TRANSLOG:
+                return new SeqNoStats(
+                    SequenceNumbers.NO_OPS_PERFORMED,
+                    SequenceNumbers.NO_OPS_PERFORMED,
+                    SequenceNumbers.UNASSIGNED_SEQ_NO);
+            default:
+                throw new IllegalArgumentException(openMode.toString());
+        }
+    }
+
     @Override
     public InternalEngine recoverFromTranslog() throws IOException {
         flushLock.lock();
@@ -1607,7 +1610,7 @@ public class InternalEngine extends Engine {
         }
         try (ReleasableLock lock = readLock.acquire()) {
             logger.trace("pulling snapshot");
-            return new IndexCommitRef(deletionPolicy.getIndexDeletionPolicy());
+            return new IndexCommitRef(snapshotDeletionPolicy);
         } catch (IOException e) {
             throw new SnapshotFailedEngineException(shardId, e);
         }
@@ -1788,7 +1791,7 @@ public class InternalEngine extends Engine {
         final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
         iwc.setCommitOnClose(false); // we by default don't commit on close
         iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
-        iwc.setIndexDeletionPolicy(deletionPolicy);
+        iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
         // with tests.verbose, lucene sets this up: plumb to align with filesystem stream
         boolean verbose = false;
         try {

+ 2 - 2
core/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -372,14 +372,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
      * Returns the number of operations in the translog files that aren't committed to lucene.
      */
     public int uncommittedOperations() {
-        return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery());
+        return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit());
     }
 
     /**
      * Returns the size in bytes of the translog files that aren't committed to lucene.
      */
     public long uncommittedSizeInBytes() {
-        return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery());
+        return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit());
     }
 
     /**

+ 27 - 3
core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java

@@ -54,6 +54,11 @@ public class TranslogDeletionPolicy {
      */
     private long minTranslogGenerationForRecovery = 1;
 
+    /**
+     * This translog generation is used to calculate the number of uncommitted operations since the last index commit.
+     */
+    private long translogGenerationOfLastCommit = 1;
+
     private long retentionSizeInBytes;
 
     private long retentionAgeInMillis;
@@ -69,13 +74,24 @@ public class TranslogDeletionPolicy {
     }
 
     public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
-        if (newGen < minTranslogGenerationForRecovery) {
-            throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" +
-                minTranslogGenerationForRecovery + "]");
+        if (newGen < minTranslogGenerationForRecovery || newGen > translogGenerationOfLastCommit) {
+            throw new IllegalArgumentException("Invalid minTranslogGenerationForRecovery can't go backwards; new [" + newGen + "]," +
+                "current [" + minTranslogGenerationForRecovery + "], lastGen [" + translogGenerationOfLastCommit + "]");
         }
         minTranslogGenerationForRecovery = newGen;
     }
 
+    /**
+     * Sets the translog generation of the last index commit.
+     */
+    public synchronized void setTranslogGenerationOfLastCommit(long lastGen) {
+        if (lastGen < translogGenerationOfLastCommit || lastGen < minTranslogGenerationForRecovery) {
+            throw new IllegalArgumentException("Invalid translogGenerationOfLastCommit; new [" + lastGen + "]," +
+                "current [" + translogGenerationOfLastCommit + "], minRequiredGen [" + minTranslogGenerationForRecovery + "]");
+        }
+        translogGenerationOfLastCommit = lastGen;
+    }
+
     public synchronized void setRetentionSizeInBytes(long bytes) {
         retentionSizeInBytes = bytes;
     }
@@ -193,6 +209,14 @@ public class TranslogDeletionPolicy {
         return minTranslogGenerationForRecovery;
     }
 
+    /**
+     * Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit.
+     * See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()}
+     */
+    public synchronized long getTranslogGenerationOfLastCommit() {
+        return translogGenerationOfLastCommit;
+    }
+
     synchronized long getTranslogRefCount(long gen) {
         final Counter counter = translogRefCounts.get(gen);
         return counter == null ? 0 : counter.get();

+ 147 - 35
core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

@@ -19,66 +19,178 @@
 
 package org.elasticsearch.index.engine;
 
+import com.carrotsearch.hppc.LongArrayList;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogDeletionPolicy;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
+import static java.util.Collections.singletonList;
+import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
+import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
 import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class CombinedDeletionPolicyTests extends ESTestCase {
 
-    public void testPassThrough() throws IOException {
-        SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class);
-        CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, createTranslogDeletionPolicy(),
-            EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
-        List<IndexCommit> commitList = new ArrayList<>();
-        long count = randomIntBetween(1, 3);
-        for (int i = 0; i < count; i++) {
-            commitList.add(mockIndexCommitWithTranslogGen(randomNonNegativeLong()));
+    public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong();
+        TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
+
+        final LongArrayList maxSeqNoList = new LongArrayList();
+        final LongArrayList translogGenList = new LongArrayList();
+        final List<IndexCommit> commitList = new ArrayList<>();
+        int totalCommits = between(2, 20);
+        long lastMaxSeqNo = 0;
+        long lastTranslogGen = 0;
+        final UUID translogUUID = UUID.randomUUID();
+        for (int i = 0; i < totalCommits; i++) {
+            lastMaxSeqNo += between(1, 10000);
+            lastTranslogGen += between(1, 100);
+            commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
+            maxSeqNoList.add(lastMaxSeqNo);
+            translogGenList.add(lastTranslogGen);
+        }
+
+        int keptIndex = randomInt(commitList.size() - 1);
+        final long lower = maxSeqNoList.get(keptIndex);
+        final long upper = keptIndex == commitList.size() - 1 ?
+            Long.MAX_VALUE : Math.max(maxSeqNoList.get(keptIndex), maxSeqNoList.get(keptIndex + 1) - 1);
+        globalCheckpoint.set(randomLongBetween(lower, upper));
+        indexPolicy.onCommit(commitList);
+
+        for (int i = 0; i < commitList.size(); i++) {
+            if (i < keptIndex) {
+                verify(commitList.get(i), times(1)).delete();
+            } else {
+                verify(commitList.get(i), never()).delete();
+            }
         }
-        combinedDeletionPolicy.onInit(commitList);
-        verify(indexDeletionPolicy, times(1)).onInit(commitList);
-        combinedDeletionPolicy.onCommit(commitList);
-        verify(indexDeletionPolicy, times(1)).onCommit(commitList);
+        assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGenList.get(keptIndex)));
+        assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
+    }
+
+    public void testIgnoreSnapshottingCommits() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong();
+        final UUID translogUUID = UUID.randomUUID();
+        TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
+
+        long firstMaxSeqNo = randomLongBetween(0, Long.MAX_VALUE - 1);
+        long secondMaxSeqNo = randomLongBetween(firstMaxSeqNo + 1, Long.MAX_VALUE);
+
+        long lastTranslogGen = randomNonNegativeLong();
+        final IndexCommit firstCommit = mockIndexCommit(firstMaxSeqNo, translogUUID, randomLongBetween(0, lastTranslogGen));
+        final IndexCommit secondCommit = mockIndexCommit(secondMaxSeqNo, translogUUID, lastTranslogGen);
+        SnapshotDeletionPolicy snapshotDeletionPolicy = new SnapshotDeletionPolicy(indexPolicy);
+
+        snapshotDeletionPolicy.onInit(Arrays.asList(firstCommit));
+        snapshotDeletionPolicy.snapshot();
+        assertThat(snapshotDeletionPolicy.getSnapshots(), contains(firstCommit));
+
+        // SnapshotPolicy prevents the first commit from deleting, but CombinedPolicy does not retain its translog.
+        globalCheckpoint.set(randomLongBetween(secondMaxSeqNo, Long.MAX_VALUE));
+        snapshotDeletionPolicy.onCommit(Arrays.asList(firstCommit, secondCommit));
+        verify(firstCommit, never()).delete();
+        verify(secondCommit, never()).delete();
+        assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
+        assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
+    }
+
+    public void testLegacyIndex() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong();
+        final UUID translogUUID = UUID.randomUUID();
+
+        TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
+
+        long legacyTranslogGen = randomNonNegativeLong();
+        IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
+        indexPolicy.onInit(singletonList(legacyCommit));
+        verify(legacyCommit, never()).delete();
+        assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
+        assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen));
+
+        long safeTranslogGen = randomLongBetween(legacyTranslogGen, Long.MAX_VALUE);
+        long maxSeqNo = randomLongBetween(1, Long.MAX_VALUE);
+        final IndexCommit freshCommit = mockIndexCommit(maxSeqNo, translogUUID, safeTranslogGen);
+
+        globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1));
+        indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
+        verify(legacyCommit, times(0)).delete();
+        verify(freshCommit, times(0)).delete();
+        assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
+        assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
+
+        // Make the fresh commit safe.
+        globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
+        indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
+        verify(legacyCommit, times(1)).delete();
+        verify(freshCommit, times(0)).delete();
+        assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
+        assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
     }
 
-    public void testSettingMinTranslogGen() throws IOException {
-        SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class);
-        final TranslogDeletionPolicy translogDeletionPolicy = mock(TranslogDeletionPolicy.class);
-        CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, translogDeletionPolicy,
-            EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
-        List<IndexCommit> commitList = new ArrayList<>();
-        long count = randomIntBetween(10, 20);
-        long lastGen = 0;
-        for (int i = 0; i < count; i++) {
-            lastGen += randomIntBetween(10, 20000);
-            commitList.add(mockIndexCommitWithTranslogGen(lastGen));
+    public void testDeleteInvalidCommits() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
+        TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get);
+
+        final int invalidCommits = between(1, 10);
+        final List<IndexCommit> commitList = new ArrayList<>();
+        for (int i = 0; i < invalidCommits; i++) {
+            commitList.add(mockIndexCommit(randomNonNegativeLong(), UUID.randomUUID(), randomNonNegativeLong()));
         }
-        combinedDeletionPolicy.onInit(commitList);
-        verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen);
-        commitList.clear();
-        for (int i = 0; i < count; i++) {
-            lastGen += randomIntBetween(10, 20000);
-            commitList.add(mockIndexCommitWithTranslogGen(lastGen));
+
+        final UUID expectedTranslogUUID = UUID.randomUUID();
+        long lastTranslogGen = 0;
+        final int validCommits = between(1, 10);
+        for (int i = 0; i < validCommits; i++) {
+            lastTranslogGen += between(1, 1000);
+            commitList.add(mockIndexCommit(randomNonNegativeLong(), expectedTranslogUUID, lastTranslogGen));
+        }
+
+        // We should never keep invalid commits regardless of the value of the global checkpoint.
+        indexPolicy.onCommit(commitList);
+        for (int i = 0; i < invalidCommits - 1; i++) {
+            verify(commitList.get(i), times(1)).delete();
         }
-        combinedDeletionPolicy.onCommit(commitList);
-        verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen);
     }
 
-    IndexCommit mockIndexCommitWithTranslogGen(long gen) throws IOException {
-        IndexCommit commit = mock(IndexCommit.class);
-        when(commit.getUserData()).thenReturn(Collections.singletonMap(Translog.TRANSLOG_GENERATION_KEY, Long.toString(gen)));
+    IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
+        final Map<String, String> userData = new HashMap<>();
+        userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
+        userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
+        userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
+        final IndexCommit commit = mock(IndexCommit.class);
+        when(commit.getUserData()).thenReturn(userData);
+        return commit;
+    }
+
+    IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException {
+        final Map<String, String> userData = new HashMap<>();
+        userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
+        userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
+        final IndexCommit commit = mock(IndexCommit.class);
+        when(commit.getUserData()).thenReturn(userData);
         return commit;
     }
 }

+ 117 - 6
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -79,6 +79,7 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
@@ -113,6 +114,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardUtils;
 import org.elasticsearch.index.store.DirectoryUtils;
 import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.SnapshotMatchers;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogConfig;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@@ -165,6 +167,7 @@ import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
@@ -945,17 +948,47 @@ public class InternalEngineTests extends EngineTestCase {
     }
 
     public void testCommitAdvancesMinTranslogForRecovery() throws IOException {
+        IOUtils.close(engine, store);
+        final Path translogPath = createTempDir();
+        store = createStore();
+        final AtomicBoolean inSync = new AtomicBoolean(randomBoolean());
+        final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoServiceSupplier = (config, seqNoStats) ->
+            new SequenceNumbersService(
+                config.getShardId(),
+                config.getAllocationId(),
+                config.getIndexSettings(),
+                seqNoStats.getMaxSeqNo(),
+                seqNoStats.getLocalCheckpoint(),
+                seqNoStats.getGlobalCheckpoint()) {
+                @Override
+                public long getGlobalCheckpoint() {
+                    return inSync.get() ? getLocalCheckpoint() : SequenceNumbers.UNASSIGNED_SEQ_NO;
+                }
+            };
+        engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null), seqNoServiceSupplier);
         ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
         engine.index(indexForDoc(doc));
+
         engine.flush();
         assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L));
-        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(2L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync.get() ? 2L : 1L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(2L));
+
         engine.flush();
         assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L));
-        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(2L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync.get() ? 2L : 1L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(2L));
+
         engine.flush(true, true);
         assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
-        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(3L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync.get() ? 3L : 1L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L));
+
+        inSync.set(true);
+        engine.flush(true, true);
+        assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(4L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L));
     }
 
     public void testSyncedFlush() throws IOException {
@@ -2359,10 +2392,26 @@ public class InternalEngineTests extends EngineTestCase {
         );
         indexSettings.updateIndexMetaData(builder.build());
 
+        final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoServiceSupplier = (config, seqNoStats) ->
+            new SequenceNumbersService(
+                config.getShardId(),
+                config.getAllocationId(),
+                config.getIndexSettings(),
+                seqNoStats.getMaxSeqNo(),
+                seqNoStats.getLocalCheckpoint(),
+                seqNoStats.getGlobalCheckpoint()) {
+                @Override
+                public long getGlobalCheckpoint() {
+                    return getLocalCheckpoint();
+                }
+            };
+
         try (Store store = createStore()) {
             AtomicBoolean throwErrorOnCommit = new AtomicBoolean();
             final Path translogPath = createTempDir();
-            try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null)) {
+            try (InternalEngine engine =
+                     new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null), seqNoServiceSupplier) {
+
                 @Override
                 protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
                     super.commitIndexWriter(writer, translog, syncId);
@@ -2377,7 +2426,8 @@ public class InternalEngineTests extends EngineTestCase {
                 FlushFailedEngineException e = expectThrows(FlushFailedEngineException.class, engine::flush);
                 assertThat(e.getCause().getMessage(), equalTo("power's out"));
             }
-            try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null))) {
+            try (InternalEngine engine =
+                     new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null), seqNoServiceSupplier)) {
                 engine.recoverFromTranslog();
                 assertVisibleCount(engine, 1);
                 final long committedGen = Long.valueOf(
@@ -2608,13 +2658,16 @@ public class InternalEngineTests extends EngineTestCase {
         EngineConfig config = engine.config();
 
         EngineConfig newConfig = new EngineConfig(
-            randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG,
+            randomBoolean() ? EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG,
             shardId, allocationId.getId(),
             threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
             new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
             IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
             config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
             new NoneCircuitBreakerService());
+        if (newConfig.getOpenMode() == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG) {
+            Lucene.cleanLuceneIndex(store.directory());
+        }
         engine = new InternalEngine(newConfig);
         if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
             engine.recoverFromTranslog();
@@ -4116,4 +4169,62 @@ public class InternalEngineTests extends EngineTestCase {
         }
     }
 
+    public void testKeepTranslogAfterGlobalCheckpoint() throws Exception {
+        IOUtils.close(engine, store);
+        final AtomicLong globalCheckpoint = new AtomicLong(0);
+        final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoServiceSupplier = (config, seqNoStats) ->
+            new SequenceNumbersService(
+                config.getShardId(),
+                config.getAllocationId(),
+                config.getIndexSettings(),
+                seqNoStats.getMaxSeqNo(),
+                seqNoStats.getLocalCheckpoint(),
+                seqNoStats.getGlobalCheckpoint()) {
+                @Override
+                public long getGlobalCheckpoint() {
+                    return globalCheckpoint.get();
+                }
+            };
+
+        final IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(),
+            defaultSettings.getScopedSettings());
+        IndexMetaData.Builder builder = IndexMetaData.builder(indexSettings.getIndexMetaData())
+            .settings(Settings.builder().put(indexSettings.getSettings())
+                .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomFrom("-1", "100micros", "30m"))
+                .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomFrom("-1", "512b", "1gb")));
+        indexSettings.updateIndexMetaData(builder.build());
+
+        store = createStore();
+        try (InternalEngine engine
+                 = createEngine(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, seqNoServiceSupplier)) {
+            int numDocs = scaledRandomIntBetween(10, 100);
+            for (int docId = 0; docId < numDocs; docId++) {
+                ParseContext.Document document = testDocumentWithTextField();
+                document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
+                engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null)));
+                if (frequently()) {
+                    globalCheckpoint.set(randomIntBetween(
+                        Math.toIntExact(engine.seqNoService().getGlobalCheckpoint()),
+                        Math.toIntExact(engine.seqNoService().getLocalCheckpoint())));
+                }
+                if (frequently()) {
+                    engine.flush(randomBoolean(), true);
+                    final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
+                    // Keep only one safe commit as the oldest commit.
+                    final IndexCommit safeCommit = commits.get(0);
+                    assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
+                        lessThanOrEqualTo(globalCheckpoint.get()));
+                    for (int i = 1; i < commits.size(); i++) {
+                        assertThat(Long.parseLong(commits.get(i).getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
+                            greaterThan(globalCheckpoint.get()));
+                    }
+                    // Make sure we keep all translog operations after the local checkpoint of the safe commit.
+                    long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
+                    try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
+                        assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(localCheckpointFromSafeCommit + 1, docId));
+                    }
+                }
+            }
+        }
+    }
 }

+ 11 - 2
core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -1051,8 +1051,9 @@ public class IndexShardTests extends IndexShardTestCase {
         closeShards(indexShard);
     }
 
-    public void testAcquireIndexCommit() throws IOException {
-        final IndexShard shard = newStartedShard();
+    public void testAcquireIndexCommit() throws Exception {
+        boolean isPrimary = randomBoolean();
+        final IndexShard shard = newStartedShard(isPrimary);
         int numDocs = randomInt(20);
         for (int i = 0; i < numDocs; i++) {
             indexDoc(shard, "type", "id_" + i);
@@ -1069,6 +1070,14 @@ public class IndexShardTests extends IndexShardTestCase {
             assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0));
         }
         commit.close();
+        // Make the global checkpoint in sync with the local checkpoint.
+        if (isPrimary) {
+            final String allocationId = shard.shardRouting.allocationId().getId();
+            shard.updateLocalCheckpointForShard(allocationId, numDocs + moreDocs - 1);
+            shard.updateGlobalCheckpointForShard(allocationId, shard.getLocalCheckpoint());
+        } else {
+            shard.updateGlobalCheckpointOnReplica(numDocs + moreDocs - 1, "test");
+        }
         flushShard(shard, true);
 
         // check it's clean up

+ 50 - 0
core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.index.translog;
 
+import com.carrotsearch.hppc.LongHashSet;
+import com.carrotsearch.hppc.LongSet;
 import org.elasticsearch.ElasticsearchException;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
@@ -61,6 +63,13 @@ public final class SnapshotMatchers {
         return new ContainingInAnyOrderMatcher(expectedOperations);
     }
 
+    /**
+     * Consumes a snapshot and makes sure that its operations have all seqno between minSeqNo(inclusive) and maxSeqNo(inclusive).
+     */
+    public static Matcher<Translog.Snapshot> containsSeqNoRange(long minSeqNo, long maxSeqNo) {
+        return new ContainingSeqNoRangeMatcher(minSeqNo, maxSeqNo);
+    }
+
     public static class SizeMatcher extends TypeSafeMatcher<Translog.Snapshot> {
 
         private final int size;
@@ -190,4 +199,45 @@ public final class SnapshotMatchers {
                 .appendText(" in any order.");
         }
     }
+
+    static class ContainingSeqNoRangeMatcher extends TypeSafeMatcher<Translog.Snapshot> {
+        private final long minSeqNo;
+        private final long maxSeqNo;
+        private final List<Long> notFoundSeqNo = new ArrayList<>();
+
+        ContainingSeqNoRangeMatcher(long minSeqNo, long maxSeqNo) {
+            this.minSeqNo = minSeqNo;
+            this.maxSeqNo = maxSeqNo;
+        }
+
+        @Override
+        protected boolean matchesSafely(Translog.Snapshot snapshot) {
+            try {
+                final LongSet seqNoList = new LongHashSet();
+                Translog.Operation op;
+                while ((op = snapshot.next()) != null) {
+                    seqNoList.add(op.seqNo());
+                }
+                for (long i = minSeqNo; i <= maxSeqNo; i++) {
+                    if (seqNoList.contains(i) == false) {
+                        notFoundSeqNo.add(i);
+                    }
+                }
+                return notFoundSeqNo.isEmpty();
+            } catch (IOException ex) {
+                throw new ElasticsearchException("failed to read snapshot content", ex);
+            }
+        }
+
+        @Override
+        protected void describeMismatchSafely(Translog.Snapshot snapshot, Description mismatchDescription) {
+            mismatchDescription
+                .appendText("not found seqno ").appendValueList("[", ", ", "]", notFoundSeqNo);
+        }
+
+        @Override
+        public void describeTo(Description description) {
+            description.appendText("snapshot contains all seqno from [" + minSeqNo + " to " + maxSeqNo + "]");
+        }
+    }
 }

+ 3 - 0
core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java

@@ -53,6 +53,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
             assertMinGenRequired(deletionPolicy, readersAndWriter, 1L);
             final int committedReader = randomIntBetween(0, allGens.size() - 1);
             final long committedGen = allGens.get(committedReader).generation;
+            deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE));
             deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
             assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen);
         } finally {
@@ -109,6 +110,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
         allGens.add(readersAndWriter.v2());
         try {
             TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE);
+            deletionPolicy.setTranslogGenerationOfLastCommit(Long.MAX_VALUE);
             deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE);
             int selectedReader = randomIntBetween(0, allGens.size() - 1);
             final long selectedGenerationByAge = allGens.get(selectedReader).generation;
@@ -122,6 +124,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
             // make a new policy as committed gen can't go backwards (for now)
             deletionPolicy = new MockDeletionPolicy(now, size, maxAge);
             long committedGen = randomFrom(allGens).generation;
+            deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE));
             deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
             assertMinGenRequired(deletionPolicy, readersAndWriter,
                 Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize)));

+ 51 - 16
core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -153,17 +153,20 @@ public class TranslogTests extends ESTestCase {
     }
 
     private void markCurrentGenAsCommitted(Translog translog) throws IOException {
-        commit(translog, translog.currentFileGeneration());
+        long genToCommit = translog.currentFileGeneration();
+        long genToRetain = randomLongBetween(translog.getDeletionPolicy().getMinTranslogGenerationForRecovery(), genToCommit);
+        commit(translog, genToRetain, genToCommit);
     }
 
     private void rollAndCommit(Translog translog) throws IOException {
         translog.rollGeneration();
-        commit(translog, translog.currentFileGeneration());
+        markCurrentGenAsCommitted(translog);
     }
 
-    private void commit(Translog translog, long genToCommit) throws IOException {
+    private void commit(Translog translog, long genToRetain, long genToCommit) throws IOException {
         final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
-        deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit);
+        deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit);
+        deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain);
         long minGenRequired = deletionPolicy.minTranslogGenRequired(translog.getReaders(), translog.getCurrent());
         translog.trimUnreferencedReaders();
         assertThat(minGenRequired, equalTo(translog.getMinFileGeneration()));
@@ -440,6 +443,31 @@ public class TranslogTests extends ESTestCase {
         }
     }
 
+    public void testUncommittedOperations() throws Exception {
+        final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
+        deletionPolicy.setRetentionAgeInMillis(randomLong());
+        deletionPolicy.setRetentionSizeInBytes(randomLong());
+
+        final int operations = scaledRandomIntBetween(10, 100);
+        int uncommittedOps = 0;
+        int operationsInLastGen = 0;
+        for (int i = 0; i < operations; i++) {
+            translog.add(new Translog.Index("test", Integer.toString(i), i, new byte[]{1}));
+            uncommittedOps++;
+            operationsInLastGen++;
+            if (rarely()) {
+                translog.rollGeneration();
+                operationsInLastGen = 0;
+            }
+            assertThat(translog.uncommittedOperations(), equalTo(uncommittedOps));
+            if (frequently()) {
+                markCurrentGenAsCommitted(translog);
+                assertThat(translog.uncommittedOperations(), equalTo(operationsInLastGen));
+                uncommittedOps = operationsInLastGen;
+            }
+        }
+    }
+
     public void testTotalTests() {
         final TranslogStats total = new TranslogStats();
         final int n = randomIntBetween(0, 16);
@@ -824,6 +852,7 @@ public class TranslogTests extends ESTestCase {
                                 translog.rollGeneration();
                                 // expose the new checkpoint (simulating a commit), before we trim the translog
                                 lastCommittedLocalCheckpoint.set(localCheckpoint);
+                                deletionPolicy.setTranslogGenerationOfLastCommit(translog.currentFileGeneration());
                                 deletionPolicy.setMinTranslogGenerationForRecovery(
                                     translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration);
                                 translog.trimUnreferencedReaders();
@@ -1822,6 +1851,7 @@ public class TranslogTests extends ESTestCase {
         translog.close();
         TranslogConfig config = translog.getConfig();
         final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
+        deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
         deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
         translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
         assertThat(translog.getMinFileGeneration(), equalTo(1L));
@@ -1867,6 +1897,7 @@ public class TranslogTests extends ESTestCase {
                     translog.rollGeneration();
                 }
             }
+            deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, translog.currentFileGeneration()));
             deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
             fail.failRandomly();
             try {
@@ -1876,6 +1907,7 @@ public class TranslogTests extends ESTestCase {
             }
         }
         final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
+        deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
         deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
         try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) {
             // we don't know when things broke exactly
@@ -2413,8 +2445,9 @@ public class TranslogTests extends ESTestCase {
         for (int i = 0; i <= rolls; i++) {
             assertFileIsPresent(translog, generation + i);
         }
-        commit(translog, generation + rolls);
-        assertThat(translog.currentFileGeneration(), equalTo(generation + rolls ));
+        long minGenForRecovery = randomLongBetween(generation, generation + rolls);
+        commit(translog, minGenForRecovery, generation + rolls);
+        assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
         assertThat(translog.uncommittedOperations(), equalTo(0));
         if (longRetention) {
             for (int i = 0; i <= rolls; i++) {
@@ -2423,17 +2456,19 @@ public class TranslogTests extends ESTestCase {
             deletionPolicy.setRetentionAgeInMillis(randomBoolean() ? 100 : -1);
             assertBusy(() -> {
                 translog.trimUnreferencedReaders();
-                for (int i = 0; i < rolls; i++) {
-                    assertFileDeleted(translog, generation + i);
+                for (long i = 0; i < minGenForRecovery; i++) {
+                    assertFileDeleted(translog, i);
                 }
             });
         } else {
             // immediate cleanup
-            for (int i = 0; i < rolls; i++) {
-                assertFileDeleted(translog, generation + i);
+            for (long i = 0; i < minGenForRecovery; i++) {
+                assertFileDeleted(translog, i);
             }
         }
-        assertFileIsPresent(translog, generation + rolls);
+        for (long i = minGenForRecovery; i < generation + rolls; i++) {
+            assertFileIsPresent(translog, i);
+        }
     }
 
     public void testMinSeqNoBasedAPI() throws IOException {
@@ -2516,10 +2551,8 @@ public class TranslogTests extends ESTestCase {
                 translog.rollGeneration();
             }
         }
-
-        final long generation =
-                randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration()));
-        commit(translog, generation);
+        long lastGen = randomLongBetween(1, translog.currentFileGeneration());
+        commit(translog, randomLongBetween(1, lastGen), lastGen);
     }
 
     public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException {
@@ -2531,7 +2564,9 @@ public class TranslogTests extends ESTestCase {
                 translog.rollGeneration();
             }
             if (rarely()) {
-                commit(translog, randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), translog.currentFileGeneration()));
+                long lastGen = randomLongBetween(deletionPolicy.getTranslogGenerationOfLastCommit(), translog.currentFileGeneration());
+                long minGen = randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), lastGen);
+                commit(translog, minGen, lastGen);
             }
             if (frequently()) {
                 long minGen;

+ 33 - 12
core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

@@ -22,6 +22,7 @@ package org.elasticsearch.indices.recovery;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.NoMergePolicy;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.UUIDs;
@@ -98,20 +99,41 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
     }
 
     public void testRecoveryWithOutOfOrderDelete() throws Exception {
+        /*
+         * The flow of this test:
+         * - delete #1
+         * - roll generation (to create gen 2)
+         * - index #0
+         * - index #3
+         * - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained)
+         * - index #2
+         * - index #5
+         * - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed.
+         */
         try (ReplicationGroup shards = createGroup(1)) {
             shards.startAll();
             // create out of order delete and index op on replica
             final IndexShard orgReplica = shards.getReplicas().get(0);
+            final String indexName = orgReplica.shardId().getIndexName();
+
+            // delete #1
             orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL, u -> {});
             orgReplica.getTranslog().rollGeneration(); // isolate the delete in it's own generation
+            // index #0
             orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
-                SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON),
-                u -> {});
-
-            // index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation
-            // stick around
+                SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON), u -> {});
+            // index #3
             orgReplica.applyIndexOperationOnReplica(3, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
-                SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id2", new BytesArray("{}"), XContentType.JSON), u -> {});
+                SourceToParse.source(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON), u -> {});
+            // Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1.
+            orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
+            // index #2
+            orgReplica.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
+                SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON), u -> {});
+            orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
+            // index #5 -> force NoOp #4.
+            orgReplica.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
+                SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON), u -> {});
 
             final int translogOps;
             if (randomBoolean()) {
@@ -120,18 +142,17 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
                     IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData());
                     builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings())
                         .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
-                        .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
-                    );
+                        .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1"));
                     orgReplica.indexSettings().updateIndexMetaData(builder.build());
                     orgReplica.onSettingsChanged();
-                    translogOps = 3; // 2 ops + seqno gaps
+                    translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed).
                 } else {
                     logger.info("--> flushing shard (translog will be retained)");
-                    translogOps = 4; // 3 ops + seqno gaps
+                    translogOps = 6; // 5 ops + seqno gaps
                 }
                 flushShard(orgReplica);
             } else {
-                translogOps = 4; // 3 ops + seqno gaps
+                translogOps = 6; // 5 ops + seqno gaps
             }
 
             final IndexShard orgPrimary = shards.getPrimary();
@@ -139,7 +160,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
 
             IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
             shards.recoverReplica(newReplica);
-            shards.assertAllEqual(1);
+            shards.assertAllEqual(3);
 
             assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps));
         }

+ 7 - 0
test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

@@ -461,6 +461,13 @@ public abstract class ESTestCase extends LuceneTestCase {
         return RandomNumbers.randomIntBetween(random(), min, max);
     }
 
+    /**
+     * A random long number between min (inclusive) and max (inclusive).
+     */
+    public static long randomLongBetween(long min, long max) {
+        return RandomNumbers.randomLongBetween(random(), min, max);
+    }
+
     /**
      * Returns a "scaled" number of iterations for loops which can have a variable
      * iteration count. This method is effectively