浏览代码

Introducing a translog deletion policy (#24950)

Currently, the decisions regarding which translog generation files to delete are hard coded in the interaction between the `InternalEngine` and the `Translog` classes. This PR extracts it to a dedicated class called `TranslogDeletionPolicy`, for two main reasons:

1) Simplicity - the code is easier to read and understand (no more two phase commit on the translog, the Engine can just commit and the translog will respond)
2) Preparing for future plans to extend the logic we need - i.e., retain multiple lucene commit and also introduce a size based retention logic, allowing people to always keep a certain amount of translog files around. The latter is useful to increase the chance of an ops based recovery.
Boaz Leskes 8 年之前
父节点
当前提交
1775e4253e

+ 86 - 0
core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

@@ -0,0 +1,86 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.engine;
+
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.index.translog.TranslogDeletionPolicy;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * An {@link IndexDeletionPolicy} that coordinates between Lucene's commits and the retention of translog generation files,
+ * making sure that all translog files that are needed to recover from the Lucene commit are not deleted.
+ */
+class CombinedDeletionPolicy extends IndexDeletionPolicy {
+
+    private final TranslogDeletionPolicy translogDeletionPolicy;
+    private final EngineConfig.OpenMode openMode;
+
+    private final SnapshotDeletionPolicy indexDeletionPolicy;
+
+    CombinedDeletionPolicy(SnapshotDeletionPolicy indexDeletionPolicy, TranslogDeletionPolicy translogDeletionPolicy,
+                           EngineConfig.OpenMode openMode) {
+        this.indexDeletionPolicy = indexDeletionPolicy;
+        this.translogDeletionPolicy = translogDeletionPolicy;
+        this.openMode = openMode;
+    }
+
+    @Override
+    public void onInit(List<? extends IndexCommit> commits) throws IOException {
+        indexDeletionPolicy.onInit(commits);
+        switch (openMode) {
+            case CREATE_INDEX_AND_TRANSLOG:
+                assert commits.isEmpty() : "index is being created but we already have commits";
+                break;
+            case OPEN_INDEX_CREATE_TRANSLOG:
+                assert commits.isEmpty() == false : "index is opened, but we have no commits";
+                break;
+            case OPEN_INDEX_AND_TRANSLOG:
+                assert commits.isEmpty() == false : "index is opened, but we have no commits";
+                setLastCommittedTranslogGeneration(commits);
+                break;
+            default:
+                throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
+        }
+    }
+
+    @Override
+    public void onCommit(List<? extends IndexCommit> commits) throws IOException {
+        indexDeletionPolicy.onCommit(commits);
+        setLastCommittedTranslogGeneration(commits);
+    }
+
+    private void setLastCommittedTranslogGeneration(List<? extends IndexCommit> commits) throws IOException {
+        // when opening an existing lucene index, we currently always open the last commit.
+        // we therefore use the translog gen as the one that will be required for recovery
+        final IndexCommit indexCommit = commits.get(commits.size() - 1);
+        assert indexCommit.isDeleted() == false : "last commit is deleted";
+        long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
+        translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen);
+    }
+
+    public SnapshotDeletionPolicy getIndexDeletionPolicy() {
+        return indexDeletionPolicy;
+    }
+}

+ 24 - 37
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -75,6 +75,7 @@ import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogConfig;
 import org.elasticsearch.index.translog.TranslogCorruptedException;
+import org.elasticsearch.index.translog.TranslogDeletionPolicy;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
@@ -127,7 +128,7 @@ public class InternalEngine extends Engine {
 
     private final String uidField;
 
-    private final SnapshotDeletionPolicy deletionPolicy;
+    private final CombinedDeletionPolicy deletionPolicy;
 
     // How many callers are currently requesting index throttling.  Currently there are only two situations where we do this: when merges
     // are falling behind and when writing indexing buffer to disk is too slow.  When this is 0, there is no throttling, else we throttling
@@ -147,9 +148,11 @@ public class InternalEngine extends Engine {
         if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
             maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
         }
-        deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
         this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
         this.versionMap = new LiveVersionMap();
+        final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
+        this.deletionPolicy = new CombinedDeletionPolicy(
+            new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
         store.incRef();
         IndexWriter writer = null;
         Translog translog = null;
@@ -188,7 +191,7 @@ public class InternalEngine extends Engine {
                 seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
                 updateMaxUnsafeAutoIdTimestampFromWriter(writer);
                 indexWriter = writer;
-                translog = openTranslog(engineConfig, writer, () -> seqNoService().getGlobalCheckpoint());
+                translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint());
                 assert translog.getGeneration() != null;
             } catch (IOException | TranslogCorruptedException e) {
                 throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@@ -320,29 +323,21 @@ public class InternalEngine extends Engine {
         }
     }
 
-    private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, LongSupplier globalCheckpointSupplier) throws IOException {
+    private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
         assert openMode != null;
         final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
-        Translog.TranslogGeneration generation = null;
+        String translogUUID = null;
         if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
-            generation = loadTranslogIdFromCommit(writer);
+            translogUUID = loadTranslogUUIDFromCommit(writer);
             // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
-            if (generation == null) {
-                throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
-            }
-            if (generation.translogUUID == null) {
+            if (translogUUID == null) {
                 throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
             }
         }
-        final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier);
-        if (generation == null || generation.translogUUID == null) {
+        final Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
+        if (translogUUID == null) {
             assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
                 + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
-            if (generation == null) {
-                logger.debug("no translog ID present in the current generation - creating one");
-            } else if (generation.translogUUID == null) {
-                logger.debug("upgraded translog to pre 2.0 format, associating translog with index - writing translog UUID");
-            }
             boolean success = false;
             try {
                 commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
@@ -368,22 +363,18 @@ public class InternalEngine extends Engine {
      * translog id into lucene and returns null.
      */
     @Nullable
-    private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) throws IOException {
+    private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException {
         // commit on a just opened writer will commit even if there are no changes done to it
         // we rely on that for the commit data translog id key
         final Map<String, String> commitUserData = commitDataAsMap(writer);
-        if (commitUserData.containsKey("translog_id")) {
-            assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID";
-            return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id")));
-        } else if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY)) {
-            if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false) {
-                throw new IllegalStateException("commit doesn't contain translog UUID");
+        if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY)) {
+            if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
+                throw new IllegalStateException("commit doesn't contain translog generation id");
             }
-            final String translogUUID = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
-            final long translogGen = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
-            return new Translog.TranslogGeneration(translogUUID, translogGen);
+            return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
+        } else {
+            return null;
         }
-        return null;
     }
 
     private SearcherManager createSearcherManager() throws EngineException {
@@ -1269,14 +1260,13 @@ public class InternalEngine extends Engine {
                 if (indexWriter.hasUncommittedChanges() || force) {
                     ensureCanFlush();
                     try {
-                        translog.prepareCommit();
+                        translog.rollGeneration();
                         logger.trace("starting commit for flush; commitTranslog=true");
-                        final long committedGeneration = commitIndexWriter(indexWriter, translog, null);
+                        commitIndexWriter(indexWriter, translog, null);
                         logger.trace("finished commit for flush");
                         // we need to refresh in order to clear older version values
                         refresh("version_table_flush");
-                        // after refresh documents can be retrieved from the index so we can now commit the translog
-                        translog.commit(committedGeneration);
+                        translog.trimUnreferencedReaders();
                     } catch (Exception e) {
                         throw new FlushFailedEngineException(shardId, e);
                     }
@@ -1428,9 +1418,8 @@ public class InternalEngine extends Engine {
             logger.trace("finish flush for snapshot");
         }
         try (ReleasableLock lock = readLock.acquire()) {
-            ensureOpen();
             logger.trace("pulling snapshot");
-            return new IndexCommitRef(deletionPolicy);
+            return new IndexCommitRef(deletionPolicy.getIndexDeletionPolicy());
         } catch (IOException e) {
             throw new SnapshotFailedEngineException(shardId, e);
         }
@@ -1781,10 +1770,9 @@ public class InternalEngine extends Engine {
      * @param writer   the index writer to commit
      * @param translog the translog
      * @param syncId   the sync flush ID ({@code null} if not committing a synced flush)
-     * @return the minimum translog generation for the local checkpoint committed with the specified index writer
      * @throws IOException if an I/O exception occurs committing the specfied writer
      */
-    private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
+    private void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
         ensureCanFlush();
         try {
             final long localCheckpoint = seqNoService().getLocalCheckpoint();
@@ -1817,7 +1805,6 @@ public class InternalEngine extends Engine {
             });
 
             writer.commit();
-            return translogGeneration.translogFileGeneration;
         } catch (final Exception ex) {
             try {
                 failEngine("lucene commit failed", ex);

+ 1 - 2
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -256,8 +256,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         logger.debug("state: [CREATED]");
 
         this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
-        this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings,
-            bigArrays);
+        this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
         // the query cache is a node-level thing, however we want the most popular filters
         // to be computed on a per-shard basis
         if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {

+ 59 - 113
core/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -37,8 +37,8 @@ import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.seqno.SequenceNumbersService;
@@ -54,10 +54,9 @@ import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Locale;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -76,19 +75,17 @@ import java.util.stream.Stream;
  * between the lucene index an the transaction log file. This UUID is used to prevent accidental recovery from a transaction log that belongs to a
  * different engine.
  * <p>
- * Each Translog has only one translog file open at any time referenced by a translog generation ID. This ID is written to a <tt>translog.ckp</tt> file that is designed
- * to fit in a single disk block such that a write of the file is atomic. The checkpoint file is written on each fsync operation of the translog and records the number of operations
- * written, the current translogs file generation and it's fsynced offset in bytes.
+ * Each Translog has only one translog file open for writes at any time referenced by a translog generation ID. This ID is written to a
+ * <tt>translog.ckp</tt> file that is designed to fit in a single disk block such that a write of the file is atomic. The checkpoint file
+ * is written on each fsync operation of the translog and records the number of operations written, the current translog's file generation,
+ * its fsynced offset in bytes, and other important statistics.
  * </p>
  * <p>
- * When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
- * The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against
- * the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
- * generation is committed using {@link Translog#commit(long)}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
- * the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit(long)}. In such a case
- * the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than
- * one translog file present. Such an uncommitted translog file always has a <tt>translog-${gen}.ckp</tt> associated with it which is an fsynced copy of the it's last <tt>translog.ckp</tt> such that in
- * disaster recovery last fsynced offsets, number of operation etc. are still preserved.
+ * When the current translog file reaches a certain size ({@link IndexSettings#INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING}, or when
+ * a clear separation between old and new operations (upon change in primary term), the current file is reopened for read only and a new
+ * write only file is created. Any non-current, read only translog file always has a <tt>translog-${gen}.ckp</tt> associated with it
+ * which is an fsynced copy of its last <tt>translog.ckp</tt> such that in disaster recovery last fsynced offsets, number of
+ * operation etc. are still preserved.
  * </p>
  */
 public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
@@ -111,23 +108,17 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
     // the list of translog readers is guaranteed to be in order of translog generation
     private final List<TranslogReader> readers = new ArrayList<>();
-    // this is a concurrent set and is not protected by any of the locks. The main reason
-    // is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed)
-    private final Set<View> outstandingViews = ConcurrentCollections.newConcurrentSet();
     private BigArrays bigArrays;
     protected final ReleasableLock readLock;
     protected final ReleasableLock writeLock;
     private final Path location;
     private TranslogWriter current;
 
-    private static final long NOT_SET_GENERATION = -1; // -1 is safe as it will not cause a translog deletion.
-
-    private volatile long currentCommittingGeneration = NOT_SET_GENERATION;
-    private volatile long lastCommittedTranslogFileGeneration = NOT_SET_GENERATION;
     private final AtomicBoolean closed = new AtomicBoolean();
     private final TranslogConfig config;
     private final LongSupplier globalCheckpointSupplier;
     private final String translogUUID;
+    private final TranslogDeletionPolicy deletionPolicy;
 
     /**
      * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
@@ -137,20 +128,22 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
      * translog file referenced by this generation. The translog creation will fail if this generation can't be opened.
      *
      * @param config                   the configuration of this translog
-     * @param translogGeneration       the translog generation to open
+     * @param expectedTranslogUUID     the translog uuid to open, null for a new translog
+     * @param deletionPolicy           an instance of {@link TranslogDeletionPolicy} that controls when a translog file can be safely
+     *                                 deleted
      * @param globalCheckpointSupplier a supplier for the global checkpoint
      */
     public Translog(
-        final TranslogConfig config,
-        final TranslogGeneration translogGeneration,
+        final TranslogConfig config, final String expectedTranslogUUID, TranslogDeletionPolicy deletionPolicy,
         final LongSupplier globalCheckpointSupplier) throws IOException {
         super(config.getShardId(), config.getIndexSettings());
         this.config = config;
         this.globalCheckpointSupplier = globalCheckpointSupplier;
-        if (translogGeneration == null || translogGeneration.translogUUID == null) { // legacy case
+        this.deletionPolicy = deletionPolicy;
+        if (expectedTranslogUUID == null) {
             translogUUID = UUIDs.randomBase64UUID();
         } else {
-            translogUUID = translogGeneration.translogUUID;
+            translogUUID = expectedTranslogUUID;
         }
         bigArrays = config.getBigArrays();
         ReadWriteLock rwl = new ReentrantReadWriteLock();
@@ -160,7 +153,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         Files.createDirectories(this.location);
 
         try {
-            if (translogGeneration != null) {
+            if (expectedTranslogUUID != null) {
                 final Checkpoint checkpoint = readCheckpoint(location);
                 final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1));
                 final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
@@ -172,19 +165,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                 //
                 // For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists
                 // if not we don't even try to clean it up and wait until we fail creating it
-                assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]";
+                assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(expectedTranslogUUID) : "unexpected translog file: [" + nextTranslogFile + "]";
                 if (Files.exists(currentCheckpointFile) // current checkpoint is already copied
                         && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
                     logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName());
                 }
-                this.readers.addAll(recoverFromFiles(translogGeneration, checkpoint));
+                this.readers.addAll(recoverFromFiles(deletionPolicy.getMinTranslogGenerationForRecovery(), checkpoint));
                 if (readers.isEmpty()) {
                     throw new IllegalStateException("at least one reader must be recovered");
                 }
                 boolean success = false;
                 try {
                     current = createWriter(checkpoint.generation + 1);
-                    this.lastCommittedTranslogFileGeneration = translogGeneration.translogFileGeneration;
                     success = true;
                 } finally {
                     // we have to close all the recovered ones otherwise we leak file handles here
@@ -196,15 +188,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                 }
             } else {
                 IOUtils.rm(location);
-                logger.debug("wipe translog location - creating new translog");
+                // start from whatever generation lucene points to
+                final long generation = deletionPolicy.getMinTranslogGenerationForRecovery();
+                logger.debug("wipe translog location - creating new translog, starting generation [{}]", generation);
                 Files.createDirectories(location);
-                final long generation = 1;
                 final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong());
                 final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
                 Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
                 IOUtils.fsync(checkpointFile, false);
                 current = createWriter(generation);
-                this.lastCommittedTranslogFileGeneration = NOT_SET_GENERATION;
 
             }
             // now that we know which files are there, create a new current one.
@@ -217,7 +209,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     }
 
     /** recover all translog files found on disk */
-    private ArrayList<TranslogReader> recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException {
+    private ArrayList<TranslogReader> recoverFromFiles(long translogFileGeneration, Checkpoint checkpoint) throws IOException {
         boolean success = false;
         ArrayList<TranslogReader> foundTranslogs = new ArrayList<>();
         final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
@@ -225,16 +217,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         try (ReleasableLock lock = writeLock.acquire()) {
             logger.debug("open uncommitted translog checkpoint {}", checkpoint);
             final String checkpointTranslogFile = getFilename(checkpoint.generation);
-            for (long i = translogGeneration.translogFileGeneration; i < checkpoint.generation; i++) {
+            // we open files in reverse order in order to validate tranlsog uuid before we start traversing the translog based on
+            // the generation id we found in the lucene commit. This gives for better error messages if the wrong
+            // translog was found.
+            foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
+            for (long i = checkpoint.generation - 1; i >= translogFileGeneration; i--) {
                 Path committedTranslogFile = location.resolve(getFilename(i));
                 if (Files.exists(committedTranslogFile) == false) {
-                    throw new IllegalStateException("translog file doesn't exist with generation: " + i + " lastCommitted: " + lastCommittedTranslogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
+                    throw new IllegalStateException("translog file doesn't exist with generation: " + i + " recovering from: " +
+                        translogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
                 }
                 final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))));
                 foundTranslogs.add(reader);
                 logger.debug("recovered local translog from checkpoint {}", checkpoint);
             }
-            foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
+            Collections.reverse(foundTranslogs);
             Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
             if (Files.exists(commitCheckpoint)) {
                 Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint);
@@ -339,14 +336,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
      * Returns the number of operations in the transaction files that aren't committed to lucene..
      */
     public int totalOperations() {
-        return totalOperations(lastCommittedTranslogFileGeneration);
+        return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery());
     }
 
     /**
      * Returns the size in bytes of the translog files that aren't committed to lucene.
      */
     public long sizeInBytes() {
-        return sizeInBytes(lastCommittedTranslogFileGeneration);
+        return sizeInBytes(deletionPolicy.getMinTranslogGenerationForRecovery());
     }
 
     /**
@@ -517,9 +514,13 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     public Translog.View newView() {
         try (ReleasableLock lock = readLock.acquire()) {
             ensureOpen();
-            View view = new View(lastCommittedTranslogFileGeneration);
-            outstandingViews.add(view);
-            return view;
+            final long viewGen = deletionPolicy.acquireTranslogGenForView();
+            try {
+                return new View(viewGen);
+            } catch (Exception e) {
+                deletionPolicy.releaseTranslogGenView(viewGen);
+                throw e;
+            }
         }
     }
 
@@ -628,6 +629,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         return config;
     }
 
+    // public for testing
+    public TranslogDeletionPolicy getDeletionPolicy() {
+        return deletionPolicy;
+    }
+
     /**
      * a view into the translog, capturing all translog file at the moment of creation
      * and updated with any future translog.
@@ -679,9 +685,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         @Override
         public void close() throws IOException {
             if (closed.getAndSet(true) == false) {
-                logger.trace("closing view starting at translog [{}]", minTranslogGeneration());
-                boolean removed = outstandingViews.remove(this);
-                assert removed : "View was never set but was supposed to be removed";
+                logger.trace("closing view starting at translog [{}]", minGeneration);
+                deletionPolicy.releaseTranslogGenView(minGeneration);
                 trimUnreferencedReaders();
                 closeFilesIfNoPendingViews();
             }
@@ -1429,72 +1434,20 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     }
 
     /**
-     * Prepares a translog commit by setting the current committing generation and rolling the translog generation.
-     *
-     * @throws IOException if an I/O exception occurred while rolling the translog generation
+     * Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum
+     * required generation
      */
-    public void prepareCommit() throws IOException {
-        try (ReleasableLock ignored = writeLock.acquire()) {
-            ensureOpen();
-            if (currentCommittingGeneration != NOT_SET_GENERATION) {
-                final String message =
-                        String.format(Locale.ROOT, "already committing a translog with generation [%d]", currentCommittingGeneration);
-                throw new IllegalStateException(message);
-            }
-            currentCommittingGeneration = current.getGeneration();
-            rollGeneration();
-        }
-    }
-
-    /**
-     * Commits the translog and sets the last committed translog generation to the specified generation. The specified committed generation
-     * will be used when trimming unreferenced translog generations such that generations from the committed generation will be preserved.
-     *
-     * If {@link Translog#prepareCommit()} was not called before calling commit, this method will be invoked too causing the translog
-     * generation to be rolled.
-     *
-     * @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations
-     * @throws IOException if an I/O exception occurred preparing the translog commit
-     */
-    public void commit(final long committedGeneration) throws IOException {
-        try (ReleasableLock ignored = writeLock.acquire()) {
-            ensureOpen();
-            assert assertCommittedGenerationIsInValidRange(committedGeneration);
-            if (currentCommittingGeneration == NOT_SET_GENERATION) {
-                prepareCommit();
-            }
-            assert currentCommittingGeneration != NOT_SET_GENERATION;
-            assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration)
-                    : "readers missing committing generation [" + currentCommittingGeneration + "]";
-            // set the last committed generation otherwise old files will not be cleaned up
-            lastCommittedTranslogFileGeneration = committedGeneration;
-            currentCommittingGeneration = NOT_SET_GENERATION;
-            trimUnreferencedReaders();
-        }
-    }
-
-    private boolean assertCommittedGenerationIsInValidRange(final long committedGeneration) {
-        assert committedGeneration <= current.generation
-                : "tried to commit generation [" + committedGeneration + "] after current generation [" + current.generation + "]";
-        final long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
-        assert committedGeneration >= min
-                : "tried to commit generation [" + committedGeneration + "] before minimum generation [" + min + "]";
-        return true;
-    }
-
-    /**
-     * Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views
-     * and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}.
-     */
-    void trimUnreferencedReaders() {
+    public void trimUnreferencedReaders() {
         try (ReleasableLock ignored = writeLock.acquire()) {
             if (closed.get()) {
                 // we're shutdown potentially on some tragic event, don't delete anything
                 return;
             }
-            long minReferencedGen = Math.min(
-                    lastCommittedTranslogFileGeneration,
-                    outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE));
+            long minReferencedGen = deletionPolicy.minTranslogGenRequired();
+            final long minExistingGen = readers.isEmpty() ? current.getGeneration() : readers.get(0).getGeneration();
+            assert minReferencedGen >= minExistingGen :
+                "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is ["
+                    + minExistingGen + "]";
             final List<TranslogReader> unreferenced =
                     readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList());
             for (final TranslogReader unreferencedReader : unreferenced) {
@@ -1510,7 +1463,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
     void closeFilesIfNoPendingViews() throws IOException {
         try (ReleasableLock ignored = writeLock.acquire()) {
-            if (closed.get() && outstandingViews.isEmpty()) {
+            if (closed.get() && deletionPolicy.pendingViewsCount() == 0) {
                 logger.trace("closing files. translog is closed and there are no pending views");
                 ArrayList<Closeable> toClose = new ArrayList<>(readers);
                 toClose.add(current);
@@ -1567,13 +1520,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         }
     }
 
-    /**
-     * The number of currently open views
-     */
-    int getNumOpenViews() {
-        return outstandingViews.size();
-    }
-
     ChannelFactory getChannelFactory() {
         return FileChannel::open;
     }

+ 87 - 0
core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java

@@ -0,0 +1,87 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import org.apache.lucene.util.Counter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TranslogDeletionPolicy {
+
+    /** Records how many views are held against each
+     *  translog generation */
+    private final Map<Long, Counter> translogRefCounts = new HashMap<>();
+
+    /**
+     * the translog generation that is requires to properly recover from the oldest non deleted
+     * {@link org.apache.lucene.index.IndexCommit}.
+     */
+    private long minTranslogGenerationForRecovery = 1;
+
+    public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
+        if (newGen < minTranslogGenerationForRecovery) {
+            throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" +
+                minTranslogGenerationForRecovery+ "]");
+        }
+        minTranslogGenerationForRecovery = newGen;
+    }
+
+    /**
+     * acquires the basis generation for a new view. Any translog generation above, and including, the returned generation
+     * will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called.
+     */
+    synchronized long acquireTranslogGenForView() {
+        translogRefCounts.computeIfAbsent(minTranslogGenerationForRecovery, l -> Counter.newCounter(false)).addAndGet(1);
+        return minTranslogGenerationForRecovery;
+    }
+
+    /** returns the number of generations that were acquired for views */
+    synchronized int pendingViewsCount() {
+        return translogRefCounts.size();
+    }
+
+    /**
+     * releases a generation that was acquired by {@link #acquireTranslogGenForView()}
+     */
+    synchronized void releaseTranslogGenView(long translogGen) {
+        Counter current = translogRefCounts.get(translogGen);
+        if (current == null || current.get() <= 0) {
+            throw new IllegalArgumentException("translog gen [" + translogGen + "] wasn't acquired");
+        }
+        if (current.addAndGet(-1) == 0) {
+            translogRefCounts.remove(translogGen);
+        }
+    }
+
+    /**
+     * returns the minimum translog generation that is still required by the system. Any generation below
+     * the returned value may be safely deleted
+     */
+    synchronized long minTranslogGenRequired() {
+        long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
+        return Math.min(viewRefs, minTranslogGenerationForRecovery);
+    }
+
+    /** returns the translog generation that will be used as a basis of a future store/peer recovery */
+    public synchronized long getMinTranslogGenerationForRecovery() {
+        return minTranslogGenerationForRecovery;
+    }
+}

+ 83 - 0
core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

@@ -0,0 +1,83 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.engine;
+
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.index.translog.TranslogDeletionPolicy;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CombinedDeletionPolicyTests extends ESTestCase {
+
+    public void testPassThrough() throws IOException {
+        SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class);
+        CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, new TranslogDeletionPolicy(),
+            EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
+        List<IndexCommit> commitList = new ArrayList<>();
+        long count = randomIntBetween(1, 3);
+        for (int i = 0; i < count; i++) {
+            commitList.add(mockIndexCommitWithTranslogGen(randomNonNegativeLong()));
+        }
+        combinedDeletionPolicy.onInit(commitList);
+        verify(indexDeletionPolicy, times(1)).onInit(commitList);
+        combinedDeletionPolicy.onCommit(commitList);
+        verify(indexDeletionPolicy, times(1)).onCommit(commitList);
+    }
+
+    public void testSettingMinTranslogGen() throws IOException {
+        SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class);
+        final TranslogDeletionPolicy translogDeletionPolicy = mock(TranslogDeletionPolicy.class);
+        CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, translogDeletionPolicy,
+            EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
+        List<IndexCommit> commitList = new ArrayList<>();
+        long count = randomIntBetween(10, 20);
+        long lastGen = 0;
+        for (int i = 0; i < count; i++) {
+            lastGen += randomIntBetween(10, 20000);
+            commitList.add(mockIndexCommitWithTranslogGen(lastGen));
+        }
+        combinedDeletionPolicy.onInit(commitList);
+        verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen);
+        commitList.clear();
+        for (int i = 0; i < count; i++) {
+            lastGen += randomIntBetween(10, 20000);
+            commitList.add(mockIndexCommitWithTranslogGen(lastGen));
+        }
+        combinedDeletionPolicy.onCommit(commitList);
+        verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen);
+    }
+
+    IndexCommit mockIndexCommitWithTranslogGen(long gen) throws IOException {
+        IndexCommit commit = mock(IndexCommit.class);
+        when(commit.getUserData()).thenReturn(Collections.singletonMap(Translog.TRANSLOG_GENERATION_KEY, Long.toString(gen)));
+        return commit;
+    }
+}

+ 63 - 59
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -126,6 +126,7 @@ import org.elasticsearch.index.store.DirectoryUtils;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogConfig;
+import org.elasticsearch.index.translog.TranslogDeletionPolicy;
 import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.mapper.MapperRegistry;
 import org.elasticsearch.test.DummyShardLock;
@@ -165,6 +166,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -259,8 +261,8 @@ public class InternalEngineTests extends ESTestCase {
         return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
             config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
             new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(),
-            config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
-            config.getIndexSort());
+            config.getQueryCachingPolicy(), config.getTranslogConfig(),
+            config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort());
     }
 
     @Override
@@ -331,13 +333,18 @@ public class InternalEngineTests extends ESTestCase {
 
     protected Translog createTranslog(Path translogPath) throws IOException {
         TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
-        return new Translog(translogConfig, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+        return new Translog(translogConfig, null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
     }
 
     protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
         return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null);
     }
 
+    protected InternalEngine createEngine(Store store, Path translogPath,
+                                          Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
+        return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier);
+    }
+
     protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
         return createEngine(indexSettings, store, translogPath, mergePolicy, null);
 
@@ -353,7 +360,7 @@ public class InternalEngineTests extends ESTestCase {
         Path translogPath,
         MergePolicy mergePolicy,
         @Nullable IndexWriterFactory indexWriterFactory,
-        @Nullable Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
+        @Nullable Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
         return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null);
     }
 
@@ -363,7 +370,7 @@ public class InternalEngineTests extends ESTestCase {
         Path translogPath,
         MergePolicy mergePolicy,
         @Nullable IndexWriterFactory indexWriterFactory,
-        @Nullable Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier,
+        @Nullable Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier,
         @Nullable Sort indexSort) throws IOException {
         EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort);
         InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config);
@@ -380,7 +387,7 @@ public class InternalEngineTests extends ESTestCase {
     }
 
     public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory,
-                                                      @Nullable final Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier,
+                                                      @Nullable final Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier,
                                                       final EngineConfig config) {
         return new InternalEngine(config) {
                 @Override
@@ -392,7 +399,7 @@ public class InternalEngineTests extends ESTestCase {
 
                 @Override
                 public SequenceNumbersService seqNoService() {
-                    return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.get() : super.seqNoService();
+                    return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.apply(config) : super.seqNoService();
                 }
             };
     }
@@ -696,25 +703,18 @@ public class InternalEngineTests extends ESTestCase {
     }
 
     public void testCommitStats() throws IOException {
-        InternalEngine engine = null;
-        try {
-            this.engine.close();
-
-            final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
-            final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
-            final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO);
-
-            engine = new InternalEngine(copy(this.engine.config(), this.engine.config().getOpenMode())) {
-                @Override
-                public SequenceNumbersService seqNoService() {
-                    return new SequenceNumbersService(
-                        this.config().getShardId(),
-                        this.config().getIndexSettings(),
+        final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
+        final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO);
+        try (
+            Store store = createStore();
+            InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService(
+                        config.getShardId(),
+                        config.getIndexSettings(),
                         maxSeqNo.get(),
                         localCheckpoint.get(),
-                        globalCheckpoint.get());
-                }
-            };
+                        globalCheckpoint.get())
+            )) {
             CommitStats stats1 = engine.commitStats();
             assertThat(stats1.getGeneration(), greaterThan(0L));
             assertThat(stats1.getId(), notNullValue());
@@ -751,8 +751,6 @@ public class InternalEngineTests extends ESTestCase {
             assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get()));
             assertThat(stats2.getUserData(), hasKey(SequenceNumbers.MAX_SEQ_NO));
             assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
-        } finally {
-            IOUtils.close(engine);
         }
     }
 
@@ -877,26 +875,24 @@ public class InternalEngineTests extends ESTestCase {
         final int docs = randomIntBetween(1, 4096);
         final List<Long> seqNos = LongStream.range(0, docs).boxed().collect(Collectors.toList());
         Randomness.shuffle(seqNos);
-        engine.close();
         Engine initialEngine = null;
+        Engine recoveringEngine = null;
+        Store store = createStore();
+        final AtomicInteger counter = new AtomicInteger();
         try {
-            final AtomicInteger counter = new AtomicInteger();
-            initialEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG)) {
-                @Override
-                public SequenceNumbersService seqNoService() {
-                    return new SequenceNumbersService(
-                            engine.shardId,
-                            engine.config().getIndexSettings(),
-                            SequenceNumbersService.NO_OPS_PERFORMED,
-                            SequenceNumbersService.NO_OPS_PERFORMED,
-                            SequenceNumbersService.UNASSIGNED_SEQ_NO) {
-                        @Override
-                        public long generateSeqNo() {
-                            return seqNos.get(counter.getAndIncrement());
-                        }
-                    };
+            initialEngine = createEngine(store, createTempDir(), (config) ->
+                new SequenceNumbersService(
+                    config.getShardId(),
+                    config.getIndexSettings(),
+                    SequenceNumbersService.NO_OPS_PERFORMED,
+                    SequenceNumbersService.NO_OPS_PERFORMED,
+                    SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+                    @Override
+                    public long generateSeqNo() {
+                        return seqNos.get(counter.getAndIncrement());
+                    }
                 }
-            };
+            );
             for (int i = 0; i < docs; i++) {
                 final String id = Integer.toString(i);
                 final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
@@ -907,12 +903,7 @@ public class InternalEngineTests extends ESTestCase {
                     initialEngine.flush();
                 }
             }
-        } finally {
-            IOUtils.close(initialEngine);
-        }
-
-        Engine recoveringEngine = null;
-        try {
+            initialEngine.close();
             recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
             recoveringEngine.recoverFromTranslog();
             try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
@@ -920,9 +911,8 @@ public class InternalEngineTests extends ESTestCase {
                 assertEquals(docs, topDocs.totalHits);
             }
         } finally {
-            IOUtils.close(recoveringEngine);
+            IOUtils.close(initialEngine, recoveringEngine, store);
         }
-
     }
 
     public void testConcurrentGetAndFlush() throws Exception {
@@ -1148,6 +1138,20 @@ public class InternalEngineTests extends ESTestCase {
         searchResult.close();
     }
 
+    public void testCommitAdvancesMinTranslogForRecovery() throws IOException {
+        ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
+        engine.index(indexForDoc(doc));
+        engine.flush();
+        assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(2L));
+        engine.flush();
+        assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(2L));
+        engine.flush(true, true);
+        assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
+        assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(3L));
+    }
+
     public void testSyncedFlush() throws IOException {
         try (Store store = createStore();
             Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) {
@@ -2479,13 +2483,11 @@ public class InternalEngineTests extends ESTestCase {
         }
         assertVisibleCount(engine, numDocs);
         engine.close();
-        engine = new InternalEngine(engine.config());
-
+        engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG));
         try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
             TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
             assertThat(topDocs.totalHits, equalTo(0));
         }
-
     }
 
     private Mapping dynamicUpdate() {
@@ -2713,15 +2715,15 @@ public class InternalEngineTests extends ESTestCase {
 
         Translog translog = new Translog(
             new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
-            null,
-            () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+            null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
         translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8"))));
         assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
         translog.close();
 
         EngineConfig config = engine.config();
         /* create a TranslogConfig that has been created with a different UUID */
-        TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE);
+        TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(),
+            BigArrays.NON_RECYCLING_INSTANCE);
 
         EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool,
                 config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(),
@@ -2822,6 +2824,8 @@ public class InternalEngineTests extends ESTestCase {
                     assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
                     assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
                     expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
+                    assertEquals(1, engine.getTranslog().currentFileGeneration());
+                    assertEquals(0L, engine.getTranslog().totalOperations());
                 }
             }
 
@@ -3539,7 +3543,7 @@ public class InternalEngineTests extends ESTestCase {
             final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
             final List<Thread> threads = new ArrayList<>();
             final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
-            initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
+            initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService);
             final InternalEngine finalInitialEngine = initialEngine;
             for (int i = 0; i < docs; i++) {
                 final String id = Integer.toString(i);
@@ -3767,7 +3771,7 @@ public class InternalEngineTests extends ESTestCase {
             final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
             final Map<Thread, CountDownLatch> threads = new LinkedHashMap<>();
             final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
-            actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
+            actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService);
             final InternalEngine finalActualEngine = actualEngine;
             final Translog translog = finalActualEngine.getTranslog();
             final long generation = finalActualEngine.getTranslog().currentFileGeneration();

+ 1 - 1
core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -108,7 +108,7 @@ public class RefreshListenersTests extends ESTestCase {
         store = new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
         IndexWriterConfig iwc = newIndexWriterConfig();
         TranslogConfig translogConfig = new TranslogConfig(shardId, createTempDir("translog"), indexSettings,
-                BigArrays.NON_RECYCLING_INSTANCE);
+            BigArrays.NON_RECYCLING_INSTANCE);
         Engine.EventListener eventListener = new Engine.EventListener() {
             @Override
             public void onFailedEngine(String reason, @Nullable Exception e) {

+ 115 - 85
core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -126,7 +126,8 @@ public class TranslogTests extends ESTestCase {
 
         if (translog.isOpen()) {
             if (translog.currentFileGeneration() > 1) {
-                translog.commit(translog.currentFileGeneration());
+                markCurrentGenAsCommitted(translog);
+                translog.trimUnreferencedReaders();
                 assertFileDeleted(translog, translog.currentFileGeneration() - 1);
             }
             translog.close();
@@ -136,6 +137,24 @@ public class TranslogTests extends ESTestCase {
 
     }
 
+    protected Translog createTranslog(TranslogConfig config, String translogUUID) throws IOException {
+        return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+    }
+
+    private void markCurrentGenAsCommitted(Translog translog) {
+        commit(translog, translog.currentFileGeneration());
+    }
+
+    private void rollAndCommit(Translog translog) throws IOException {
+        translog.rollGeneration();
+        commit(translog, translog.currentFileGeneration());
+    }
+
+    private void commit(Translog translog, long genToCommit) {
+        translog.getDeletionPolicy().setMinTranslogGenerationForRecovery(genToCommit);
+        translog.trimUnreferencedReaders();
+    }
+
     @Override
     @Before
     public void setUp() throws Exception {
@@ -149,7 +168,7 @@ public class TranslogTests extends ESTestCase {
     @After
     public void tearDown() throws Exception {
         try {
-            assertEquals("there are still open views", 0, translog.getNumOpenViews());
+            assertEquals("there are still open views", 0, translog.getDeletionPolicy().pendingViewsCount());
             translog.close();
         } finally {
             super.tearDown();
@@ -158,7 +177,7 @@ public class TranslogTests extends ESTestCase {
 
     private Translog create(Path path) throws IOException {
         globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO);
-        return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get());
+        return new Translog(getTranslogConfig(path), null, new TranslogDeletionPolicy(), () -> globalCheckpoint.get());
     }
 
     private TranslogConfig getTranslogConfig(final Path path) {
@@ -182,7 +201,7 @@ public class TranslogTests extends ESTestCase {
         return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
     }
 
-    protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
+    private void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
         list.add(op);
         translog.add(op);
     }
@@ -282,14 +301,14 @@ public class TranslogTests extends ESTestCase {
         assertNull(snapshot.next());
 
         long firstId = translog.currentFileGeneration();
-        translog.prepareCommit();
+        translog.rollGeneration();
         assertThat(translog.currentFileGeneration(), Matchers.not(equalTo(firstId)));
 
         snapshot = translog.newSnapshot();
         assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
         assertThat(snapshot.totalOperations(), equalTo(ops.size()));
 
-        translog.commit(translog.currentFileGeneration());
+        markCurrentGenAsCommitted(translog);
         snapshot = translog.newSnapshot();
         assertThat(snapshot, SnapshotMatchers.size(0));
         assertThat(snapshot.totalOperations(), equalTo(0));
@@ -346,7 +365,7 @@ public class TranslogTests extends ESTestCase {
         }
 
         final long expectedSizeInBytes = 266L;
-        translog.prepareCommit();
+        translog.rollGeneration();
         {
             final TranslogStats stats = stats();
             assertThat(stats.estimatedNumberOfOperations(), equalTo(4L));
@@ -373,7 +392,7 @@ public class TranslogTests extends ESTestCase {
             }
         }
 
-        translog.commit(translog.currentFileGeneration());
+        markCurrentGenAsCommitted(translog);
         {
             final TranslogStats stats = stats();
             assertThat(stats.estimatedNumberOfOperations(), equalTo(0L));
@@ -441,12 +460,12 @@ public class TranslogTests extends ESTestCase {
 
         assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0)));
 
-        translog.prepareCommit();
+        translog.rollGeneration();
         addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{3}));
 
         try (Translog.View view = translog.newView()) {
             Translog.Snapshot snapshot2 = translog.newSnapshot();
-            translog.commit(translog.currentFileGeneration());
+            markCurrentGenAsCommitted(translog);
             assertThat(snapshot2, SnapshotMatchers.equalsTo(ops));
             assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
         }
@@ -517,7 +536,7 @@ public class TranslogTests extends ESTestCase {
             threads[i].join(60 * 1000);
         }
 
-        List<LocationOperation> collect = writtenOperations.stream().collect(Collectors.toList());
+        List<LocationOperation> collect = new ArrayList<>(writtenOperations);
         Collections.sort(collect);
         Translog.Snapshot snapshot = translog.newSnapshot();
         for (LocationOperation locationOperation : collect) {
@@ -581,7 +600,7 @@ public class TranslogTests extends ESTestCase {
                 corruptionsCaught.incrementAndGet();
             }
         }
-        expectThrows(TranslogCorruptedException.class, () -> snapshot.next());
+        expectThrows(TranslogCorruptedException.class, snapshot::next);
         assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1));
     }
 
@@ -725,8 +744,8 @@ public class TranslogTests extends ESTestCase {
                         if (id % flushEveryOps == 0) {
                             synchronized (flushMutex) {
                                 // we need not do this concurrently as we need to make sure that the generation
-                                // we're committing - translog.currentFileGeneration() - is still present when we're committing
-                                translog.commit(translog.currentFileGeneration());
+                                // we're committing - is still present when we're committing
+                                rollAndCommit(translog);
                             }
                         }
                         if (id % 7 == 0) {
@@ -872,7 +891,7 @@ public class TranslogTests extends ESTestCase {
                 assertTrue("we only synced a previous operation yet", translog.syncNeeded());
             }
             if (rarely()) {
-                translog.commit(translog.currentFileGeneration());
+                rollAndCommit(translog);
                 assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now
                 assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
             }
@@ -892,7 +911,7 @@ public class TranslogTests extends ESTestCase {
             ArrayList<Location> locations = new ArrayList<>();
             for (int op = 0; op < translogOperations; op++) {
                 if (rarely()) {
-                    translog.commit(translog.currentFileGeneration()); // do this first so that there is at least one pending tlog entry
+                    rollAndCommit(translog); // do this first so that there is at least one pending tlog entry
                 }
                 final Translog.Location location =
                     translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
@@ -904,7 +923,7 @@ public class TranslogTests extends ESTestCase {
                 assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream()));
                 assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
             } else if (rarely()) {
-                translog.commit(translog.currentFileGeneration());
+                rollAndCommit(translog);
                 assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); // not syncing now
                 assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
             } else {
@@ -925,7 +944,7 @@ public class TranslogTests extends ESTestCase {
             locations.add(
                 translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
             if (rarely() && translogOperations > op + 1) {
-                translog.commit(translog.currentFileGeneration());
+                rollAndCommit(translog);
             }
         }
         Collections.shuffle(locations, random());
@@ -1091,7 +1110,7 @@ public class TranslogTests extends ESTestCase {
             locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
             final boolean commit = commitOften ? frequently() : rarely();
             if (commit && op < translogOperations - 1) {
-                translog.commit(translog.currentFileGeneration());
+                rollAndCommit(translog);
                 minUncommittedOp = op + 1;
                 translogGeneration = translog.getGeneration();
             }
@@ -1100,14 +1119,15 @@ public class TranslogTests extends ESTestCase {
         TranslogConfig config = translog.getConfig();
 
         translog.close();
-        translog = new Translog(config, translogGeneration,() -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
         if (translogGeneration == null) {
+            translog = createTranslog(config, null);
             assertEquals(0, translog.stats().estimatedNumberOfOperations());
             assertEquals(1, translog.currentFileGeneration());
             assertFalse(translog.syncNeeded());
             Translog.Snapshot snapshot = translog.newSnapshot();
             assertNull(snapshot.next());
         } else {
+            translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
             assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration());
             assertFalse(translog.syncNeeded());
             Translog.Snapshot snapshot = translog.newSnapshot();
@@ -1130,7 +1150,7 @@ public class TranslogTests extends ESTestCase {
             locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
             if (op == prepareOp) {
                 translogGeneration = translog.getGeneration();
-                translog.prepareCommit();
+                translog.rollGeneration();
                 assertEquals("expected this to be the first commit", 1L, translogGeneration.translogFileGeneration);
                 assertNotNull(translogGeneration.translogUUID);
             }
@@ -1141,7 +1161,9 @@ public class TranslogTests extends ESTestCase {
         // we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
         // translog here as well.
         TranslogConfig config = translog.getConfig();
-        try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
+        final String translogUUID = translog.getTranslogUUID();
+        final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
+        try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
             assertNotNull(translogGeneration);
             assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
             assertFalse(translog.syncNeeded());
@@ -1154,7 +1176,7 @@ public class TranslogTests extends ESTestCase {
             }
         }
         if (randomBoolean()) { // recover twice
-            try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
+            try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
                 assertNotNull(translogGeneration);
                 assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
                 assertFalse(translog.syncNeeded());
@@ -1180,7 +1202,7 @@ public class TranslogTests extends ESTestCase {
             locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
             if (op == prepareOp) {
                 translogGeneration = translog.getGeneration();
-                translog.prepareCommit();
+                translog.rollGeneration();
                 assertEquals("expected this to be the first commit", 1L, translogGeneration.translogFileGeneration);
                 assertNotNull(translogGeneration.translogUUID);
             }
@@ -1195,7 +1217,9 @@ public class TranslogTests extends ESTestCase {
         Checkpoint read = Checkpoint.read(ckp);
         Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
 
-        try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
+        final String translogUUID = translog.getTranslogUUID();
+        final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
+        try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
             assertNotNull(translogGeneration);
             assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
             assertFalse(translog.syncNeeded());
@@ -1210,7 +1234,7 @@ public class TranslogTests extends ESTestCase {
         }
 
         if (randomBoolean()) { // recover twice
-            try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
+            try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
                 assertNotNull(translogGeneration);
                 assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
                 assertFalse(translog.syncNeeded());
@@ -1235,7 +1259,7 @@ public class TranslogTests extends ESTestCase {
             locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
             if (op == prepareOp) {
                 translogGeneration = translog.getGeneration();
-                translog.prepareCommit();
+                translog.rollGeneration();
                 assertEquals("expected this to be the first commit", 1L, translogGeneration.translogFileGeneration);
                 assertNotNull(translogGeneration.translogUUID);
             }
@@ -1248,7 +1272,9 @@ public class TranslogTests extends ESTestCase {
         Checkpoint read = Checkpoint.read(ckp);
         Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO);
         Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
-        try (Translog ignored = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
+        final String translogUUID = translog.getTranslogUUID();
+        final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
+        try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
             fail("corrupted");
         } catch (IllegalStateException ex) {
             assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " +
@@ -1256,7 +1282,7 @@ public class TranslogTests extends ESTestCase {
                 "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage());
         }
         Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
-        try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
+        try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
             assertNotNull(translogGeneration);
             assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
             assertFalse(translog.syncNeeded());
@@ -1319,23 +1345,25 @@ public class TranslogTests extends ESTestCase {
         for (int op = 0; op < translogOperations; op++) {
             locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
             if (randomBoolean()) {
-                translog.commit(translog.currentFileGeneration());
+                rollAndCommit(translog);
                 firstUncommitted = op + 1;
             }
         }
-        TranslogConfig config = translog.getConfig();
+        final TranslogConfig config = translog.getConfig();
+        final String translogUUID = translog.getTranslogUUID();
+        final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
         Translog.TranslogGeneration translogGeneration = translog.getGeneration();
         translog.close();
 
-        Translog.TranslogGeneration generation = new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1,
-            translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration);
+        final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1,
+            translogGeneration.translogUUID.length());
         try {
-            new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+            new Translog(config, foreignTranslog, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
             fail("translog doesn't belong to this UUID");
         } catch (TranslogCorruptedException ex) {
 
         }
-        this.translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+        this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
         Translog.Snapshot snapshot = this.translog.newSnapshot();
         for (int i = firstUncommitted; i < translogOperations; i++) {
             Translog.Operation next = snapshot.next();
@@ -1509,7 +1537,7 @@ public class TranslogTests extends ESTestCase {
         }
 
         try {
-            translog.commit(translog.currentFileGeneration());
+            rollAndCommit(translog);
             fail("already closed");
         } catch (AlreadyClosedException ex) {
             assertNotNull(ex.getCause());
@@ -1518,7 +1546,9 @@ public class TranslogTests extends ESTestCase {
 
         assertFalse(translog.isOpen());
         translog.close(); // we are closed
-        try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
+        final String translogUUID = translog.getTranslogUUID();
+        final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
+        try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
             assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
             assertFalse(tlog.syncNeeded());
 
@@ -1554,7 +1584,7 @@ public class TranslogTests extends ESTestCase {
         Path tempDir = createTempDir();
         final FailSwitch fail = new FailSwitch();
         TranslogConfig config = getTranslogConfig(tempDir);
-        Translog translog = getFailableTranslog(fail, config, false, true, null);
+        Translog translog = getFailableTranslog(fail, config, false, true, null, new TranslogDeletionPolicy());
         LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
         translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
         fail.failAlways();
@@ -1583,6 +1613,7 @@ public class TranslogTests extends ESTestCase {
 
         TranslogConfig config = getTranslogConfig(tempDir);
         Translog translog = getFailableTranslog(fail, config);
+        final String translogUUID = translog.getTranslogUUID();
 
         final int threadCount = randomIntBetween(1, 5);
         Thread[] threads = new Thread[threadCount];
@@ -1648,7 +1679,7 @@ public class TranslogTests extends ESTestCase {
                     iterator.remove();
                 }
             }
-            try (Translog tlog = new Translog(config, translog.getGeneration(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
+            try (Translog tlog = new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
                 Translog.Snapshot snapshot = tlog.newSnapshot();
                 if (writtenOperations.size() != snapshot.totalOperations()) {
                     for (int i = 0; i < threadCount; i++) {
@@ -1669,7 +1700,7 @@ public class TranslogTests extends ESTestCase {
     }
 
     private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException {
-        return getFailableTranslog(fail, config, randomBoolean(), false, null);
+        return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy());
     }
 
     private static class FailSwitch {
@@ -1702,8 +1733,10 @@ public class TranslogTests extends ESTestCase {
     }
 
 
-    private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException, Translog.TranslogGeneration generation) throws IOException {
-        return new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+    private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean partialWrites,
+                                         final boolean throwUnknownException, String translogUUID,
+                                         final TranslogDeletionPolicy deletionPolicy) throws IOException {
+        return new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) {
             @Override
             ChannelFactory getChannelFactory() {
                 final ChannelFactory factory = super.getChannelFactory();
@@ -1713,7 +1746,7 @@ public class TranslogTests extends ESTestCase {
                     boolean success = false;
                     try {
                         final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
-                        ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : paritalWrites, throwUnknownException, channel);
+                        ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : partialWrites, throwUnknownException, channel);
                         success = true;
                         return throwingFileChannel;
                     } finally {
@@ -1815,12 +1848,11 @@ public class TranslogTests extends ESTestCase {
     public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException {
         Path tempDir = createTempDir();
         TranslogConfig config = getTranslogConfig(tempDir);
-        Translog translog = new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+        Translog translog = createTranslog(config, null);
         translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8"))));
-        Translog.TranslogGeneration generation = translog.getGeneration();
         translog.close();
         try {
-            new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+            new Translog(config, translog.getTranslogUUID(), new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) {
                 @Override
                 protected TranslogWriter createWriter(long fileGeneration) throws IOException {
                     throw new MockDirectoryWrapper.FakeIOException();
@@ -1835,7 +1867,6 @@ public class TranslogTests extends ESTestCase {
 
     public void testRecoverWithUnbackedNextGen() throws IOException {
         translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
-        Translog.TranslogGeneration translogGeneration = translog.getGeneration();
         translog.close();
         TranslogConfig config = translog.getConfig();
 
@@ -1843,8 +1874,7 @@ public class TranslogTests extends ESTestCase {
         Checkpoint read = Checkpoint.read(ckp);
         Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
         Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
-        try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
-            assertNotNull(translogGeneration);
+        try (Translog tlog = createTranslog(config, translog.getTranslogUUID())) {
             assertFalse(tlog.syncNeeded());
             Translog.Snapshot snapshot = tlog.newSnapshot();
             for (int i = 0; i < 1; i++) {
@@ -1854,8 +1884,7 @@ public class TranslogTests extends ESTestCase {
             }
             tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
         }
-        try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
-            assertNotNull(translogGeneration);
+        try (Translog tlog = createTranslog(config, translog.getTranslogUUID())) {
             assertFalse(tlog.syncNeeded());
             Translog.Snapshot snapshot = tlog.newSnapshot();
             for (int i = 0; i < 2; i++) {
@@ -1868,7 +1897,6 @@ public class TranslogTests extends ESTestCase {
 
     public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException {
         translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
-        Translog.TranslogGeneration translogGeneration = translog.getGeneration();
         translog.close();
         TranslogConfig config = translog.getConfig();
         Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
@@ -1877,7 +1905,7 @@ public class TranslogTests extends ESTestCase {
         Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
 
         try {
-            Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+            Translog tlog = new Translog(config, translog.getTranslogUUID(), translog.getDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
             fail("file already exists?");
         } catch (TranslogException ex) {
             // all is well
@@ -1888,9 +1916,10 @@ public class TranslogTests extends ESTestCase {
 
     public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException {
         translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
-        Translog.TranslogGeneration translogGeneration = translog.getGeneration();
         translog.close();
         TranslogConfig config = translog.getConfig();
+        final String translogUUID = translog.getTranslogUUID();
+        final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
 
         Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
         Checkpoint read = Checkpoint.read(ckp);
@@ -1898,8 +1927,7 @@ public class TranslogTests extends ESTestCase {
         Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
         // we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition
         Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog"));
-        try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
-            assertNotNull(translogGeneration);
+        try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
             assertFalse(tlog.syncNeeded());
             Translog.Snapshot snapshot = tlog.newSnapshot();
             for (int i = 0; i < 1; i++) {
@@ -1911,7 +1939,7 @@ public class TranslogTests extends ESTestCase {
         }
 
         try {
-            Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+            Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
             fail("file already exists?");
         } catch (TranslogException ex) {
             // all is well
@@ -1933,14 +1961,15 @@ public class TranslogTests extends ESTestCase {
             fail.failRandomly();
             TranslogConfig config = getTranslogConfig(tempDir);
             final int numOps = randomIntBetween(100, 200);
+            long minGenForRecovery = 1;
             List<String> syncedDocs = new ArrayList<>();
             List<String> unsynced = new ArrayList<>();
             if (randomBoolean()) {
                 fail.onceFailedFailAlways();
             }
-            Translog.TranslogGeneration generation = null;
+            String generationUUID = null;
             try {
-                final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generation);
+                final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, new TranslogDeletionPolicy());
                 try {
                     LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly
                     for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
@@ -1956,10 +1985,7 @@ public class TranslogTests extends ESTestCase {
                             failableTLog.sync(); // we have to sync here first otherwise we don't know if the sync succeeded if the commit fails
                             syncedDocs.addAll(unsynced);
                             unsynced.clear();
-                            if (randomBoolean()) {
-                                failableTLog.prepareCommit();
-                            }
-                            failableTLog.commit(translog.currentFileGeneration());
+                            rollAndCommit(failableTLog);
                             syncedDocs.clear();
                         }
                     }
@@ -1979,7 +2005,8 @@ public class TranslogTests extends ESTestCase {
                         syncedDocs.addAll(unsynced); // failed in fsync but got fully written
                         unsynced.clear();
                     }
-                    generation = failableTLog.getGeneration();
+                    generationUUID = failableTLog.getTranslogUUID();
+                    minGenForRecovery = failableTLog.getDeletionPolicy().getMinTranslogGenerationForRecovery();
                     IOUtils.closeWhileHandlingException(failableTLog);
                 }
             } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
@@ -1990,7 +2017,9 @@ public class TranslogTests extends ESTestCase {
             // now randomly open this failing tlog again just to make sure we can also recover from failing during recovery
             if (randomBoolean()) {
                 try {
-                    IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generation));
+                    TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
+                    deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
+                    IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy));
                 } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
                     // failed - that's ok, we didn't even create it
                 } catch (IOException ex) {
@@ -1999,7 +2028,9 @@ public class TranslogTests extends ESTestCase {
             }
 
             fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file
-            try (Translog translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
+            TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
+            deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
+            try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
                 Translog.Snapshot snapshot = translog.newSnapshot();
                 assertEquals(syncedDocs.size(), snapshot.totalOperations());
                 for (int i = 0; i < syncedDocs.size(); i++) {
@@ -2057,18 +2088,19 @@ public class TranslogTests extends ESTestCase {
      */
     public void testPendingDelete() throws IOException {
         translog.add(new Translog.Index("test", "1", 0, new byte[]{1}));
-        translog.prepareCommit();
-        Translog.TranslogGeneration generation = translog.getGeneration();
+        translog.rollGeneration();
         TranslogConfig config = translog.getConfig();
+        final String translogUUID = translog.getTranslogUUID();
+        final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy();
         translog.close();
-        translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+        translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
         translog.add(new Translog.Index("test", "2", 1, new byte[]{2}));
-        translog.prepareCommit();
+        translog.rollGeneration();
         Translog.View view = translog.newView();
         translog.add(new Translog.Index("test", "3", 2, new byte[]{3}));
         translog.close();
         IOUtils.close(view);
-        translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
+        translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
     }
 
     public static Translog.Location randomTranslogLocation() {
@@ -2140,14 +2172,13 @@ public class TranslogTests extends ESTestCase {
         for (int i = 0; i <= rolls; i++) {
             assertFileIsPresent(translog, generation + i);
         }
-        translog.commit(generation + rolls);
-        assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1));
+        commit(translog, generation + rolls);
+        assertThat(translog.currentFileGeneration(), equalTo(generation + rolls ));
         assertThat(translog.totalOperations(), equalTo(0));
         for (int i = 0; i < rolls; i++) {
             assertFileDeleted(translog, generation + i);
         }
         assertFileIsPresent(translog, generation + rolls);
-        assertFileIsPresent(translog, generation + rolls + 1);
     }
 
     public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException {
@@ -2172,7 +2203,7 @@ public class TranslogTests extends ESTestCase {
         }
 
         assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore));
-        translog.prepareCommit();
+        translog.rollGeneration();
         assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore + 1));
 
         for (int i = 0; i <= rollsBefore + 1; i++) {
@@ -2198,7 +2229,7 @@ public class TranslogTests extends ESTestCase {
             }
         }
 
-        translog.commit(generation + rollsBefore + 1);
+        commit(translog, generation + rollsBefore + 1);
 
         for (int i = 0; i <= rollsBefore; i++) {
             assertFileDeleted(translog, generation + i);
@@ -2210,7 +2241,6 @@ public class TranslogTests extends ESTestCase {
     }
 
     public void testMinGenerationForSeqNo() throws IOException {
-        final long initialGeneration = translog.getGeneration().translogFileGeneration;
         final int operations = randomIntBetween(1, 4096);
         final List<Long> shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList());
         Randomness.shuffle(shuffledSeqNos);
@@ -2230,8 +2260,9 @@ public class TranslogTests extends ESTestCase {
         }
 
         Map<Long, Set<Tuple<Long, Long>>> generations = new HashMap<>();
-
-        translog.commit(initialGeneration);
+        // one extra roll to make sure that all ops so far are available via a reader and a translog-{gen}.ckp
+        // file in a consistent way, in order to simplify checking code.
+        translog.rollGeneration();
         for (long seqNo = 0; seqNo < operations; seqNo++) {
             final Set<Tuple<Long, Long>> seenSeqNos = new HashSet<>();
             final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration;
@@ -2271,7 +2302,7 @@ public class TranslogTests extends ESTestCase {
 
         final long generation =
                 randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration()));
-        translog.commit(generation);
+        commit(translog, generation);
         for (long g = 0; g < generation; g++) {
             assertFileDeleted(translog, g);
         }
@@ -2288,13 +2319,13 @@ public class TranslogTests extends ESTestCase {
             translog.add(new Translog.NoOp(seqNo++, 0, "test"));
             if (rarely()) {
                 final long generation = translog.currentFileGeneration();
-                translog.prepareCommit();
+                translog.rollGeneration();
                 if (rarely()) {
                     // simulate generation filling up and rolling between preparing the commit and committing
                     translog.rollGeneration();
                 }
                 final int committedGeneration = randomIntBetween(Math.max(1, Math.toIntExact(last)), Math.toIntExact(generation));
-                translog.commit(committedGeneration);
+                commit(translog, committedGeneration);
                 last = committedGeneration;
                 for (long g = 0; g < committedGeneration; g++) {
                     assertFileDeleted(translog, g);
@@ -2315,11 +2346,11 @@ public class TranslogTests extends ESTestCase {
             if (rarely()) {
                 try (Translog.View ignored = translog.newView()) {
                     final long viewGeneration = lastCommittedGeneration;
-                    translog.prepareCommit();
+                    translog.rollGeneration();
                     final long committedGeneration = randomIntBetween(
                             Math.max(1, Math.toIntExact(lastCommittedGeneration)),
                             Math.toIntExact(translog.currentFileGeneration()));
-                    translog.commit(committedGeneration);
+                    commit(translog, committedGeneration);
                     lastCommittedGeneration = committedGeneration;
                     // with an open view, committing should preserve generations back to the last committed generation
                     for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) {
@@ -2334,5 +2365,4 @@ public class TranslogTests extends ESTestCase {
             }
         }
     }
-
 }