Browse Source

Migrate peer recovery from translog to retention lease (#49448)

Since 7.4, we switch from translog to Lucene as the source of history 
for peer recoveries. However, we reduce the likelihood of
operation-based recoveries when performing a full cluster restart from
pre-7.4 because existing copies do not have PPRL.

To remedy this issue, we fallback using translog in peer recoveries if 
the recovering replica does not have a peer recovery retention lease,
and the replication group hasn't fully migrated to PRRL.

Relates #45136
Nhat Nguyen 5 years ago
parent
commit
b9fbc8dc74
20 changed files with 439 additions and 114 deletions
  1. 66 0
      qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java
  2. 54 3
      qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java
  3. 15 9
      server/src/main/java/org/elasticsearch/index/IndexSettings.java
  4. 17 7
      server/src/main/java/org/elasticsearch/index/engine/Engine.java
  5. 51 37
      server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  6. 8 5
      server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java
  7. 7 2
      server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
  8. 57 13
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  9. 4 1
      server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
  10. 22 15
      server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
  11. 3 1
      server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
  12. 1 1
      server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java
  13. 19 11
      server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  14. 6 3
      server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
  15. 22 0
      server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java
  16. 4 3
      server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java
  17. 5 1
      server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
  18. 1 1
      server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
  19. 75 0
      test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
  20. 2 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java

+ 66 - 0
qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

@@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.seqno.RetentionLeaseUtils;
 import org.elasticsearch.test.NotEqualMessageBuilder;
 import org.elasticsearch.test.rest.ESRestTestCase;
@@ -1168,6 +1169,12 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
         }
     }
 
+    private void indexDocument(String id) throws IOException {
+        final Request indexRequest = new Request("POST", "/" + index + "/" + "_doc/" + id);
+        indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
+        assertOK(client().performRequest(indexRequest));
+    }
+
     private int countOfIndexedRandomDocuments() throws IOException {
         return Integer.parseInt(loadInfoDocument(index + "_count"));
     }
@@ -1248,4 +1255,63 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
             RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
         }
     }
+
+    /**
+     * Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
+     * but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
+     * before we restart the cluster. This is important when we move from translog based to retention leases based
+     * peer recoveries.
+     */
+    public void testOperationBasedRecovery() throws Exception {
+        if (isRunningAgainstOldCluster()) {
+            createIndex(index, Settings.builder()
+                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+                .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
+                .build());
+            ensureGreen(index);
+            int committedDocs = randomIntBetween(100, 200);
+            for (int i = 0; i < committedDocs; i++) {
+                indexDocument(Integer.toString(i));
+                if (rarely()) {
+                    flush(index, randomBoolean());
+                }
+            }
+            flush(index, true);
+            ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
+            // less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
+            int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1));
+            for (int i = 0; i < uncommittedDocs; i++) {
+                final String id = Integer.toString(randomIntBetween(1, 100));
+                indexDocument(id);
+            }
+        } else {
+            ensureGreen(index);
+            assertNoFileBasedRecovery(index, n -> true);
+        }
+    }
+
+    /**
+     * Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
+     */
+    public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
+        if (isRunningAgainstOldCluster()) {
+            createIndex(index, Settings.builder()
+                .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+                .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
+                .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
+            ensureGreen(index);
+            int numDocs = randomIntBetween(10, 100);
+            for (int i = 0; i < numDocs; i++) {
+                indexDocument(Integer.toString(randomIntBetween(1, 100)));
+                if (rarely()) {
+                    flush(index, randomBoolean());
+                }
+            }
+        } else {
+            ensureGreen(index);
+            flush(index, true);
+            assertEmptyTranslog(index);
+        }
+    }
 }

+ 54 - 3
qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

@@ -487,10 +487,10 @@ public class RecoveryIT extends AbstractRollingTestCase {
                 switch (CLUSTER_TYPE) {
                     case OLD: break;
                     case MIXED:
-                        assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
+                        assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
                         break;
                     case UPGRADED:
-                        assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME));
+                        assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME));
                         break;
                 }
             }
@@ -647,7 +647,7 @@ public class RecoveryIT extends AbstractRollingTestCase {
         }
     }
 
-    private void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
+    private void assertNoopRecoveries(String indexName, Predicate<String> targetNode) throws IOException {
         Map<String, Object> recoveries = entityAsMap(client()
             .performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));
 
@@ -678,4 +678,55 @@ public class RecoveryIT extends AbstractRollingTestCase {
 
         assertTrue("must find replica", foundReplica);
     }
+
+    /**
+     * Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
+     * but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
+     * before we upgrade each node. This is important when we move from translog based to retention leases based
+     * peer recoveries.
+     */
+    public void testOperationBasedRecovery() throws Exception {
+        final String index = "test_operation_based_recovery";
+        if (CLUSTER_TYPE == ClusterType.OLD) {
+            createIndex(index, Settings.builder()
+                .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+                .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
+                .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()).build());
+            ensureGreen(index);
+            indexDocs(index, 0, randomIntBetween(100, 200));
+            flush(index, randomBoolean());
+            ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
+            // uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
+            indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
+        } else {
+            ensureGreen(index);
+            assertNoFileBasedRecovery(index, nodeName ->
+                   CLUSTER_TYPE == ClusterType.UPGRADED
+                || nodeName.startsWith(CLUSTER_NAME + "-0")
+                || (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false));
+            indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
+        }
+    }
+
+    /**
+     * Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
+     */
+    public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
+        final String index = "turn_off_translog_retention";
+        if (CLUSTER_TYPE == ClusterType.OLD) {
+            createIndex(index, Settings.builder()
+                .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+                .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 2))
+                .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
+            ensureGreen(index);
+            indexDocs(index, 0, randomIntBetween(100, 200));
+            flush(index, randomBoolean());
+            indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 100));
+        }
+        if (CLUSTER_TYPE == ClusterType.UPGRADED) {
+            ensureGreen(index);
+            flush(index, true);
+            assertEmptyTranslog(index);
+        }
+    }
 }

+ 15 - 9
server/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -245,21 +245,22 @@ public final class IndexSettings {
      * Controls how long translog files that are no longer needed for persistence reasons
      * will be kept around before being deleted. Keeping more files is useful to increase
      * the chance of ops based recoveries for indices with soft-deletes disabled.
-     * This setting will be ignored if soft-deletes is enabled.
+     * This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
      **/
     public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
         Setting.timeSetting("index.translog.retention.age",
-            settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12), TimeValue.MINUS_ONE,
-            Property.Dynamic, Property.IndexScope);
+            settings -> shouldDisableTranslogRetention(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12),
+            TimeValue.MINUS_ONE, Property.Dynamic, Property.IndexScope);
 
     /**
      * Controls how many translog files that are no longer needed for persistence reasons
      * will be kept around before being deleted. Keeping more files is useful to increase
      * the chance of ops based recoveries for indices with soft-deletes disabled.
-     * This setting will be ignored if soft-deletes is enabled.
+     * This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
      **/
     public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
-        Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB",
+        Setting.byteSizeSetting("index.translog.retention.size",
+            settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB",
             Property.Dynamic, Property.IndexScope);
 
     /**
@@ -577,7 +578,7 @@ public final class IndexSettings {
     }
 
     private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
-        if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) {
+        if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) {
             // ignore the translog retention settings if soft-deletes enabled
             this.translogRetentionSize = new ByteSizeValue(-1);
         } else {
@@ -586,7 +587,7 @@ public final class IndexSettings {
     }
 
     private void setTranslogRetentionAge(TimeValue age) {
-        if (softDeleteEnabled && age.millis() >= 0) {
+        if (shouldDisableTranslogRetention(settings) && age.millis() >= 0) {
             // ignore the translog retention settings if soft-deletes enabled
             this.translogRetentionAge = TimeValue.MINUS_ONE;
         } else {
@@ -774,7 +775,7 @@ public final class IndexSettings {
      * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
      */
     public ByteSizeValue getTranslogRetentionSize() {
-        assert softDeleteEnabled == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
+        assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
         return translogRetentionSize;
     }
 
@@ -783,7 +784,7 @@ public final class IndexSettings {
      * around
      */
     public TimeValue getTranslogRetentionAge() {
-        assert softDeleteEnabled == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
+        assert shouldDisableTranslogRetention(settings) == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
         return translogRetentionAge;
     }
 
@@ -795,6 +796,11 @@ public final class IndexSettings {
         return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings());
     }
 
+    private static boolean shouldDisableTranslogRetention(Settings settings) {
+        return INDEX_SOFT_DELETES_SETTING.get(settings)
+            && IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(Version.V_7_4_0);
+    }
+
     /**
      * Returns the generation threshold size. As sequence numbers can cause multiple generations to
      * be preserved for rollback purposes, we want to keep the size of individual generations from

+ 17 - 7
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -66,6 +66,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.index.VersionType;
@@ -729,7 +730,7 @@ public abstract class Engine implements Closeable {
     /**
      * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
      */
-    public abstract Closeable acquireRetentionLock();
+    public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource);
 
     /**
      * Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
@@ -742,19 +743,20 @@ public abstract class Engine implements Closeable {
      * Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
      * The returned snapshot can be retrieved from either Lucene index or translog files.
      */
-    public abstract Translog.Snapshot readHistoryOperations(String source,
-                                                                MapperService mapperService, long startingSeqNo) throws IOException;
+    public abstract Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
+                                                            MapperService mapperService, long startingSeqNo) throws IOException;
 
     /**
      * Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine.
      */
-    public abstract int estimateNumberOfHistoryOperations(String source,
-                                                                MapperService mapperService, long startingSeqNo) throws IOException;
+    public abstract int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
+                                                          MapperService mapperService, long startingSeqNo) throws IOException;
 
     /**
      * Checks if this engine has every operations since  {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
      */
-    public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
+    public abstract boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
+                                                        MapperService mapperService, long startingSeqNo) throws IOException;
 
     /**
      * Gets the minimum retained sequence number for this engine.
@@ -1795,7 +1797,8 @@ public abstract class Engine implements Closeable {
         }
     }
 
-    public void onSettingsChanged() {
+    public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
+
     }
 
     /**
@@ -1929,4 +1932,11 @@ public abstract class Engine implements Closeable {
      * to advance this marker to at least the given sequence number.
      */
     public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
+
+    /**
+     * Whether we should read history operations from translog or Lucene index
+     */
+    public enum HistorySource {
+        TRANSLOG, INDEX
+    }
 }

+ 51 - 37
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -68,6 +68,8 @@ import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
 import org.elasticsearch.common.metrics.CounterMetric;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.KeyedLock;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
@@ -532,27 +534,31 @@ public class InternalEngine extends Engine {
      * The returned snapshot can be retrieved from either Lucene index or translog files.
      */
     @Override
-    public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
-        if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
-            return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
+    public Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
+                                                   MapperService mapperService, long startingSeqNo) throws IOException {
+        if (historySource == HistorySource.INDEX) {
+            ensureSoftDeletesEnabled();
+            return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
+        } else {
+            return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
         }
-
-        return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
     }
 
     /**
      * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
      */
     @Override
-    public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
-        if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
-            try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo),
+    public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
+                                                 MapperService mapperService, long startingSeqNo) throws IOException {
+        if (historySource == HistorySource.INDEX) {
+            ensureSoftDeletesEnabled();
+            try (Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo),
                 Long.MAX_VALUE, false)) {
                 return snapshot.totalOperations();
             }
+        } else {
+            return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
         }
-
-        return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
     }
 
     @Override
@@ -2479,15 +2485,15 @@ public class InternalEngine extends Engine {
         }
     }
 
-    public void onSettingsChanged() {
+    @Override
+    public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
         mergeScheduler.refreshConfig();
         // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
         maybePruneDeletes();
         final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
-        final IndexSettings indexSettings = engineConfig.getIndexSettings();
-        translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
-        translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());
-        softDeletesPolicy.setRetentionOperations(indexSettings.getSoftDeleteRetentionOperations());
+        translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
+        translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
+        softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps);
     }
 
     public MergeStats getMergeStats() {
@@ -2594,12 +2600,17 @@ public class InternalEngine extends Engine {
         return numDocUpdates.count();
     }
 
+    private void ensureSoftDeletesEnabled() {
+        if (softDeleteEnabled == false) {
+            assert false : "index " + shardId.getIndex() + " does not have soft-deletes enabled";
+            throw new IllegalStateException("index " + shardId.getIndex() + " does not have soft-deletes enabled");
+        }
+    }
+
     @Override
     public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
                                                 long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
-        if (softDeleteEnabled == false) {
-            throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled");
-        }
+        ensureSoftDeletesEnabled();
         ensureOpen();
         refreshIfNeeded(source, toSeqNo);
         Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
@@ -2621,26 +2632,28 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
-        if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
+    public boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
+                                               MapperService mapperService, long startingSeqNo) throws IOException {
+        if (historySource == HistorySource.INDEX) {
+            ensureSoftDeletesEnabled();
             return getMinRetainedSeqNo() <= startingSeqNo;
-        }
-
-        final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
-        // avoid scanning translog if not necessary
-        if (startingSeqNo > currentLocalCheckpoint) {
-            return true;
-        }
-        final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
-        try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
-            Translog.Operation operation;
-            while ((operation = snapshot.next()) != null) {
-                if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
-                    tracker.markSeqNoAsProcessed(operation.seqNo());
+        } else {
+            final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
+            // avoid scanning translog if not necessary
+            if (startingSeqNo > currentLocalCheckpoint) {
+                return true;
+            }
+            final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
+            try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
+                Translog.Operation operation;
+                while ((operation = snapshot.next()) != null) {
+                    if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+                        tracker.markSeqNoAsProcessed(operation.seqNo());
+                    }
                 }
             }
+            return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint;
         }
-        return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint;
     }
 
     /**
@@ -2648,13 +2661,14 @@ public class InternalEngine extends Engine {
      * Operations whose seq# are at least this value should exist in the Lucene index.
      */
     public final long getMinRetainedSeqNo() {
-        assert softDeleteEnabled : Thread.currentThread().getName();
+        ensureSoftDeletesEnabled();
         return softDeletesPolicy.getMinRetainedSeqNo();
     }
 
     @Override
-    public Closeable acquireRetentionLock() {
-        if (softDeleteEnabled) {
+    public Closeable acquireHistoryRetentionLock(HistorySource historySource) {
+        if (historySource == HistorySource.INDEX) {
+            ensureSoftDeletesEnabled();
             return softDeletesPolicy.acquireRetentionLock();
         } else {
             return translog.acquireRetentionLock();

+ 8 - 5
server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

@@ -307,7 +307,7 @@ public class ReadOnlyEngine extends Engine {
     }
 
     @Override
-    public Closeable acquireRetentionLock() {
+    public Closeable acquireHistoryRetentionLock(HistorySource historySource) {
         return () -> {};
     }
 
@@ -317,21 +317,24 @@ public class ReadOnlyEngine extends Engine {
         if (engineConfig.getIndexSettings().isSoftDeleteEnabled() == false) {
             throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled");
         }
-        return readHistoryOperations(source, mapperService, fromSeqNo);
+        return newEmptySnapshot();
     }
 
     @Override
-    public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
+    public Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
+                                                   MapperService mapperService, long startingSeqNo) {
         return newEmptySnapshot();
     }
 
     @Override
-    public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
+    public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
+                                                 MapperService mapperService, long startingSeqNo) {
         return 0;
     }
 
     @Override
-    public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
+    public boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
+                                               MapperService mapperService, long startingSeqNo) {
         // we can do operation-based recovery if we don't have to replay any operation.
         return startingSeqNo > seqNoStats.getMaxSeqNo();
     }

+ 7 - 2
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

@@ -895,9 +895,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
         this.pendingInSync = new HashSet<>();
         this.routingTable = null;
         this.replicationGroup = null;
-        this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) ||
+        this.hasAllPeerRecoveryRetentionLeases = indexSettings.isSoftDeleteEnabled() &&
+            (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) ||
             (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
-                indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN);
+             indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN));
         this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
         this.safeCommitInfoSupplier = safeCommitInfoSupplier;
         assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
@@ -1348,6 +1349,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
         assert invariant();
     }
 
+    public synchronized boolean hasAllPeerRecoveryRetentionLeases() {
+        return hasAllPeerRecoveryRetentionLeases;
+    }
+
     /**
      * Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version
      * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.

+ 57 - 13
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -67,6 +67,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.metrics.MeanMetric;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -157,6 +158,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -268,6 +270,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
 
     private final AtomicLong lastSearcherAccess = new AtomicLong();
     private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
+    private volatile boolean useRetentionLeasesInPeerRecovery;
 
     public IndexShard(
             final ShardRouting shardRouting,
@@ -364,6 +367,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         refreshListeners = buildRefreshListeners();
         lastSearcherAccess.set(threadPool.relativeTimeInMillis());
         persistMetadata(path, indexSettings, shardRouting, null, logger);
+        this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
     }
 
     public ThreadPool getThreadPool() {
@@ -600,6 +604,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         if (newRouting.equals(currentRouting) == false) {
             indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
         }
+
+        if (indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery == false) {
+            final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases();
+            final Set<ShardRouting> shardRoutings = new HashSet<>(routingTable.getShards());
+            shardRoutings.addAll(routingTable.assignedShards()); // include relocation targets
+            if (shardRoutings.stream().allMatch(
+                shr -> shr.assignedToNode() && retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shr)))) {
+                useRetentionLeasesInPeerRecovery = true;
+                turnOffTranslogRetention();
+            }
+        }
     }
 
     /**
@@ -1877,38 +1892,63 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public void onSettingsChanged() {
         Engine engineOrNull = getEngineOrNull();
         if (engineOrNull != null) {
-            engineOrNull.onSettingsChanged();
+            final boolean useRetentionLeasesInPeerRecovery = this.useRetentionLeasesInPeerRecovery;
+            engineOrNull.onSettingsChanged(
+                useRetentionLeasesInPeerRecovery ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
+                useRetentionLeasesInPeerRecovery ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
+                indexSettings.getSoftDeleteRetentionOperations()
+            );
         }
     }
 
+    private void turnOffTranslogRetention() {
+        logger.debug("turn off the translog retention for the replication group {} " +
+            "as it starts using retention leases exclusively in peer recoveries", shardId);
+        // Off to the generic threadPool as pruning the delete tombstones can be expensive.
+        threadPool.generic().execute(new AbstractRunnable() {
+            @Override
+            public void onFailure(Exception e) {
+                if (state != IndexShardState.CLOSED) {
+                    logger.warn("failed to turn off translog retention", e);
+                }
+            }
+
+            @Override
+            protected void doRun() {
+                onSettingsChanged();
+                trimTranslog();
+            }
+        });
+    }
+
     /**
      * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
      */
-    public Closeable acquireRetentionLock() {
-        return getEngine().acquireRetentionLock();
+    public Closeable acquireHistoryRetentionLock(Engine.HistorySource source) {
+        return getEngine().acquireHistoryRetentionLock(source);
     }
 
     /**
      * Returns the estimated number of history operations whose seq# at least the provided seq# in this shard.
      */
-    public int estimateNumberOfHistoryOperations(String source, long startingSeqNo) throws IOException {
-        return getEngine().estimateNumberOfHistoryOperations(source, mapperService, startingSeqNo);
+    public int estimateNumberOfHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
+        return getEngine().estimateNumberOfHistoryOperations(reason, source, mapperService, startingSeqNo);
     }
 
     /**
      * Creates a new history snapshot for reading operations since the provided starting seqno (inclusive).
      * The returned snapshot can be retrieved from either Lucene index or translog files.
      */
-    public Translog.Snapshot getHistoryOperations(String source, long startingSeqNo) throws IOException {
-        return getEngine().readHistoryOperations(source, mapperService, startingSeqNo);
+    public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
+        return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo);
     }
 
     /**
      * Checks if we have a completed history of operations since the given starting seqno (inclusive).
-     * This method should be called after acquiring the retention lock; See {@link #acquireRetentionLock()}
+     * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)}
      */
-    public boolean hasCompleteHistoryOperations(String source, long startingSeqNo) throws IOException {
-        return getEngine().hasCompleteOperationHistory(source, mapperService, startingSeqNo);
+    public boolean hasCompleteHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
+        return getEngine().hasCompleteOperationHistory(reason, source, mapperService, startingSeqNo);
     }
 
     /**
@@ -2097,9 +2137,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         assert assertPrimaryMode();
         verifyNotClosed();
         ensureSoftDeletesEnabled("retention leases");
-        try (Closeable ignore = acquireRetentionLock()) {
+        try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) {
             final long actualRetainingSequenceNumber =
-                    retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
+                retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
             return replicationTracker.addRetentionLease(id, actualRetainingSequenceNumber, source, listener);
         } catch (final IOException e) {
             throw new AssertionError(e);
@@ -2119,7 +2159,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         assert assertPrimaryMode();
         verifyNotClosed();
         ensureSoftDeletesEnabled("retention leases");
-        try (Closeable ignore = acquireRetentionLock()) {
+        try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) {
             final long actualRetainingSequenceNumber =
                     retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
             return replicationTracker.renewRetentionLease(id, actualRetainingSequenceNumber, source);
@@ -2600,6 +2640,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return replicationTracker.getPeerRecoveryRetentionLeases();
     }
 
+    public boolean useRetentionLeasesInPeerRecovery() {
+        return useRetentionLeasesInPeerRecovery;
+    }
+
     private SafeCommitInfo getSafeCommitInfo() {
         final Engine engine = getEngineOrNull();
         return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo();

+ 4 - 1
server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

@@ -36,6 +36,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.tasks.Task;
@@ -90,7 +91,9 @@ public class PrimaryReplicaSyncer {
             // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
             // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
             // Also fail the resync early if the shard is shutting down
-            snapshot = indexShard.getHistoryOperations("resync", startingSeqNo);
+            snapshot = indexShard.getHistoryOperations("resync",
+                indexShard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG,
+                startingSeqNo);
             final Translog.Snapshot originalSnapshot = snapshot;
             final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
                 @Override

+ 22 - 15
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -169,22 +169,28 @@ public class RecoverySourceHandler {
                     ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null);
             }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
                 shard, cancellableThreads, logger);
-            final Closeable retentionLock = shard.acquireRetentionLock();
+            final Engine.HistorySource historySource;
+            if (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null) {
+                historySource = Engine.HistorySource.INDEX;
+            } else {
+                historySource = Engine.HistorySource.TRANSLOG;
+            }
+            final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource);
             resources.add(retentionLock);
             final long startingSeqNo;
             final boolean isSequenceNumberBasedRecovery
                 = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
                 && isTargetSameHistory()
-                && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())
-                && (softDeletesEnabled == false
-                || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
+                && shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo())
+                && (historySource == Engine.HistorySource.TRANSLOG ||
+                   (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
             // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,
             // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's
             // possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold.
             // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
             // without having a complete history.
 
-            if (isSequenceNumberBasedRecovery && softDeletesEnabled) {
+            if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) {
                 // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
                 retentionLock.close();
                 logger.trace("history is retained by {}", retentionLeaseRef.get());
@@ -203,7 +209,11 @@ public class RecoverySourceHandler {
             if (isSequenceNumberBasedRecovery) {
                 logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
                 startingSeqNo = request.startingSeqNo();
-                sendFileStep.onResponse(SendFileResult.EMPTY);
+                if (softDeletesEnabled && retentionLeaseRef.get() == null) {
+                    createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY));
+                } else {
+                    sendFileStep.onResponse(SendFileResult.EMPTY);
+                }
             } else {
                 final Engine.IndexCommitRef safeCommitRef;
                 try {
@@ -229,7 +239,7 @@ public class RecoverySourceHandler {
                 logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);
 
                 try {
-                    final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
+                    final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo);
                     final Releasable releaseStore = acquireStore(shard.store());
                     resources.add(releaseStore);
                     sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
@@ -282,7 +292,8 @@ public class RecoverySourceHandler {
             sendFileStep.whenComplete(r -> {
                 assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
                 // For a sequence based recovery, the target can keep its local translog
-                prepareTargetForTranslog(shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
+                prepareTargetForTranslog(
+                    shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo), prepareEngineStep);
             }, onFailure);
 
             prepareEngineStep.whenComplete(prepareEngineTime -> {
@@ -298,14 +309,10 @@ public class RecoverySourceHandler {
 
                 final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
                 logger.trace("snapshot translog for recovery; current size is [{}]",
-                    shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
-                final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
+                    shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo));
+                final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);
                 resources.add(phase2Snapshot);
-
-                if (softDeletesEnabled == false || isSequenceNumberBasedRecovery == false) {
-                    // we can release the retention lock here because the snapshot itself will retain the required operations.
-                    retentionLock.close();
-                }
+                retentionLock.close();
 
                 // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
                 // are at least as high as the corresponding values on the primary when any of these operations were executed on it.

+ 3 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -316,7 +316,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
 
     private boolean hasUncommittedOperations() throws IOException {
         long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
-        return indexShard.estimateNumberOfHistoryOperations("peer-recovery", localCheckpointOfCommit + 1) > 0;
+        return indexShard.estimateNumberOfHistoryOperations("peer-recovery",
+            indexShard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG,
+            localCheckpointOfCommit + 1) > 0;
     }
 
     @Override

+ 1 - 1
server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java

@@ -549,7 +549,7 @@ public class IndexSettingsTests extends ESTestCase {
 
     public void testIgnoreTranslogRetentionSettingsIfSoftDeletesEnabled() {
         Settings.Builder settings = Settings.builder()
-            .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomIndexCompatibleVersion(random()));
+            .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_7_4_0, Version.CURRENT));
         if (randomBoolean()) {
             settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue());
         }

+ 19 - 11
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -376,7 +376,8 @@ public class InternalEngineTests extends EngineTestCase {
             assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
             assertThat(segments.get(1).isCompound(), equalTo(true));
 
-            engine.onSettingsChanged();
+            engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
+                indexSettings.getSoftDeleteRetentionOperations());
             ParsedDocument doc4 = testParsedDocument("4", null, testDocumentWithTextField(), B_3, null);
             engine.index(indexForDoc(doc4));
             engine.refresh("test");
@@ -1623,7 +1624,8 @@ public class InternalEngineTests extends EngineTestCase {
             }
             settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
             indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
-            engine.onSettingsChanged();
+            engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
+                indexSettings.getSoftDeleteRetentionOperations());
             globalCheckpoint.set(localCheckpoint);
             engine.syncTranslog();
 
@@ -1714,7 +1716,8 @@ public class InternalEngineTests extends EngineTestCase {
             }
             settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
             indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
-            engine.onSettingsChanged();
+            engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
+                indexSettings.getSoftDeleteRetentionOperations());
             // If we already merged down to 1 segment, then the next force-merge will be a noop. We need to add an extra segment to make
             // merges happen so we can verify that _recovery_source are pruned. See: https://github.com/elastic/elasticsearch/issues/41628.
             final int numSegments;
@@ -5040,7 +5043,8 @@ public class InternalEngineTests extends EngineTestCase {
             .settings(Settings.builder().put(indexSettings.getSettings())
                 .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
         indexSettings.updateIndexMetaData(indexMetaData);
-        engine.onSettingsChanged();
+        engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
+            indexSettings.getSoftDeleteRetentionOperations());
         assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
         assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
         engine.flush();
@@ -5088,7 +5092,8 @@ public class InternalEngineTests extends EngineTestCase {
             .settings(Settings.builder().put(indexSettings.getSettings())
                 .put(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING.getKey(),  "0b")).build();
         indexSettings.updateIndexMetaData(indexMetaData);
-        engine.onSettingsChanged();
+        engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
+            indexSettings.getSoftDeleteRetentionOperations());
         assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1));
         assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
         doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
@@ -5113,7 +5118,8 @@ public class InternalEngineTests extends EngineTestCase {
                 .put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b")
                 .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
         indexSettings.updateIndexMetaData(indexMetaData);
-        engine.onSettingsChanged();
+        engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
+            indexSettings.getSoftDeleteRetentionOperations());
         final int numOps = scaledRandomIntBetween(100, 10_000);
         for (int i = 0; i < numOps; i++) {
             final long localCheckPoint = engine.getProcessedLocalCheckpoint();
@@ -5141,7 +5147,8 @@ public class InternalEngineTests extends EngineTestCase {
                     .settings(Settings.builder().put(indexSettings.getSettings())
                         .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(1))).build();
                 engine.engineConfig.getIndexSettings().updateIndexMetaData(indexMetaData);
-                engine.onSettingsChanged();
+                engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
+                    indexSettings.getSoftDeleteRetentionOperations());
                 ParsedDocument document = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
                 final Engine.Index doc = new Engine.Index(newUid(document), document, UNASSIGNED_SEQ_NO, 0,
                     Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(),
@@ -5411,7 +5418,8 @@ public class InternalEngineTests extends EngineTestCase {
             if (rarely()) {
                 settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
                 indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
-                engine.onSettingsChanged();
+                engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
+                    indexSettings.getSoftDeleteRetentionOperations());
             }
             if (rarely()) {
                 engine.refresh("test");
@@ -5424,7 +5432,7 @@ public class InternalEngineTests extends EngineTestCase {
             if (rarely()) {
                 engine.forceMerge(randomBoolean());
             }
-            try (Closeable ignored = engine.acquireRetentionLock()) {
+            try (Closeable ignored = engine.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) {
                 long minRetainSeqNos = engine.getMinRetainedSeqNo();
                 assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1));
                 Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new);
@@ -5705,9 +5713,9 @@ public class InternalEngineTests extends EngineTestCase {
                 IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder().
                     put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)).build());
             try (InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
-                IllegalStateException error = expectThrows(IllegalStateException.class,
+                AssertionError error = expectThrows(AssertionError.class,
                     () -> engine.newChangesSnapshot("test", createMapperService(), 0, randomNonNegativeLong(), randomBoolean()));
-                assertThat(error.getMessage(), equalTo("accessing changes snapshot requires soft-deletes enabled"));
+                assertThat(error.getMessage(), containsString("does not have soft-deletes enabled"));
             }
         }
     }

+ 6 - 3
server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

@@ -470,7 +470,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
                         assertThat(snapshot.totalOperations(), equalTo(0));
                     }
                 }
-                try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) {
+                try (Translog.Snapshot snapshot = shard.getHistoryOperations(
+                    "test", shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) {
                     assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
                 }
             }
@@ -488,7 +489,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
                         assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2)));
                     }
                 }
-                try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) {
+                try (Translog.Snapshot snapshot = shard.getHistoryOperations(
+                    "test", shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) {
                     assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
                 }
             }
@@ -585,7 +587,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
             shards.promoteReplicaToPrimary(replica2).get();
             logger.info("--> Recover replica3 from replica2");
             recoverReplica(replica3, replica2, true);
-            try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) {
+            try (Translog.Snapshot snapshot = replica3.getHistoryOperations(
+                "test", replica3.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) {
                 assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
                 final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
                 expectedOps.add(op2);

+ 22 - 0
server/src/test/java/org/elasticsearch/index/replication/RetentionLeasesReplicationTests.java

@@ -32,6 +32,7 @@ import org.elasticsearch.index.seqno.RetentionLeaseUtils;
 import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.VersionUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -147,6 +148,27 @@ public class RetentionLeasesReplicationTests extends ESIndexLevelReplicationTest
         }
     }
 
+    public void testTurnOffTranslogRetentionAfterAllShardStarted() throws Exception {
+        final Settings.Builder settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
+        if (randomBoolean()) {
+            settings.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomIndexCompatibleVersion(random()));
+        }
+        try (ReplicationGroup group = createGroup(between(1, 2), settings.build())) {
+            group.startAll();
+            group.indexDocs(randomIntBetween(1, 10));
+            for (IndexShard shard : group) {
+                shard.updateShardState(shard.routingEntry(), shard.getOperationPrimaryTerm(), null, 1L,
+                    group.getPrimary().getReplicationGroup().getInSyncAllocationIds(),
+                    group.getPrimary().getReplicationGroup().getRoutingTable());
+            }
+            group.syncGlobalCheckpoint();
+            group.flush();
+            for (IndexShard shard : group) {
+                assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(0));
+            }
+        }
+    }
+
     static final class SyncRetentionLeasesResponse extends ReplicationResponse {
         final RetentionLeaseSyncAction.Request syncRequest;
         SyncRetentionLeasesResponse(RetentionLeaseSyncAction.Request syncRequest) {

+ 4 - 3
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java

@@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
@@ -110,7 +111,7 @@ public class RetentionLeaseIT extends ESIntegTestCase  {
             final CountDownLatch latch = new CountDownLatch(1);
             final ActionListener<ReplicationResponse> listener = countDownLatchListener(latch);
             // simulate a peer recovery which locks the soft deletes policy on the primary
-            final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
+            final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {};
             currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
             latch.await();
             retentionLock.close();
@@ -160,7 +161,7 @@ public class RetentionLeaseIT extends ESIntegTestCase  {
             final CountDownLatch latch = new CountDownLatch(1);
             final ActionListener<ReplicationResponse> listener = countDownLatchListener(latch);
             // simulate a peer recovery which locks the soft deletes policy on the primary
-            final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
+            final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {};
             currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
             latch.await();
             retentionLock.close();
@@ -171,7 +172,7 @@ public class RetentionLeaseIT extends ESIntegTestCase  {
             final CountDownLatch latch = new CountDownLatch(1);
             primary.removeRetentionLease(id, countDownLatchListener(latch));
             // simulate a peer recovery which locks the soft deletes policy on the primary
-            final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
+            final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {};
             currentRetentionLeases.remove(id);
             latch.await();
             retentionLock.close();

+ 5 - 1
server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

@@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.mapper.SourceToParse;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.TestTranslog;
@@ -63,6 +64,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -210,7 +212,9 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
             operations.add(new Translog.Index(
                 Integer.toString(i), randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, primaryTerm, new byte[]{1}));
         }
-        doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), anyLong());
+        Engine.HistorySource source =
+            shard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG;
+        doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), eq(source), anyLong());
         TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
         List<Translog.Operation> sentOperations = new ArrayList<>();
         PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> {

+ 1 - 1
server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

@@ -245,7 +245,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
             IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
             shards.recoverReplica(newReplica);
             shards.assertAllEqual(3);
-            try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", 0)) {
+            try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", Engine.HistorySource.INDEX, 0)) {
                 assertThat(snapshot, SnapshotMatchers.size(6));
             }
         }

+ 75 - 0
test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

@@ -55,6 +55,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -1062,4 +1063,78 @@ public abstract class ESRestTestCase extends ESTestCase {
             return false;
         }
     }
+
+    public void flush(String index, boolean force) throws IOException {
+        logger.info("flushing index {} force={}", index, force);
+        final Request flushRequest = new Request("POST", "/" + index + "/_flush");
+        flushRequest.addParameter("force", Boolean.toString(force));
+        flushRequest.addParameter("wait_if_ongoing", "true");
+        assertOK(client().performRequest(flushRequest));
+    }
+
+    /**
+     * Asserts that replicas on nodes satisfying the {@code targetNode} should have perform operation-based recoveries.
+     */
+    public void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
+        Map<String, Object> recoveries = entityAsMap(client().performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));
+        @SuppressWarnings("unchecked")
+        List<Map<String, ?>> shards = (List<Map<String, ?>>) XContentMapValues.extractValue(indexName + ".shards", recoveries);
+        assertNotNull(shards);
+        boolean foundReplica = false;
+        logger.info("index {} recovery stats {}", indexName, shards);
+        for (Map<String, ?> shard : shards) {
+            if (shard.get("primary") == Boolean.FALSE && targetNode.test((String) XContentMapValues.extractValue("target.name", shard))) {
+                List<?> details = (List<?>) XContentMapValues.extractValue("index.files.details", shard);
+                // once detailed recoveries works, remove this if.
+                if (details == null) {
+                    long totalFiles = ((Number) XContentMapValues.extractValue("index.files.total", shard)).longValue();
+                    long reusedFiles = ((Number) XContentMapValues.extractValue("index.files.reused", shard)).longValue();
+                    logger.info("total [{}] reused [{}]", totalFiles, reusedFiles);
+                    assertThat("must reuse all files, recoveries [" + recoveries + "]", totalFiles, equalTo(reusedFiles));
+                } else {
+                    assertNotNull(details);
+                    assertThat(details, Matchers.empty());
+                }
+                foundReplica = true;
+            }
+        }
+        assertTrue("must find replica", foundReplica);
+    }
+
+    /**
+     * Asserts that we do not retain any extra translog for the given index (i.e., turn off the translog retention)
+     */
+    public void assertEmptyTranslog(String index) throws Exception {
+        Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
+        assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
+        assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.operations", stats), equalTo(0));
+    }
+
+    /**
+     * Peer recovery retention leases are renewed and synced to replicas periodically (every 30 seconds). This ensures
+     * that we have renewed every PRRL to the global checkpoint of the corresponding copy and properly synced to all copies.
+     */
+    public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) throws Exception {
+        assertBusy(() -> {
+            Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
+            @SuppressWarnings("unchecked") Map<String, List<Map<String, ?>>> shards =
+                (Map<String, List<Map<String, ?>>>) XContentMapValues.extractValue("indices." + index + ".shards", stats);
+            for (List<Map<String, ?>> shard : shards.values()) {
+                for (Map<String, ?> copy : shard) {
+                    Integer globalCheckpoint = (Integer) XContentMapValues.extractValue("seq_no.global_checkpoint", copy);
+                    assertNotNull(globalCheckpoint);
+                    @SuppressWarnings("unchecked") List<Map<String, ?>> retentionLeases =
+                        (List<Map<String, ?>>) XContentMapValues.extractValue("retention_leases.leases", copy);
+                    if (retentionLeases == null) {
+                        continue;
+                    }
+                    for (Map<String, ?> retentionLease : retentionLeases) {
+                        if (((String) retentionLease.get("id")).startsWith("peer_recovery/")) {
+                            assertThat(retentionLease.get("retaining_seq_no"), equalTo(globalCheckpoint + 1));
+                        }
+                    }
+                }
+            }
+        }, 60, TimeUnit.SECONDS);
+    }
 }

+ 2 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
@@ -75,7 +76,7 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
                     operations,
                 numOps - 1, followerPrimary, logger);
 
-        try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) {
+        try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", Engine.HistorySource.INDEX, 0)) {
             assertThat(snapshot.totalOperations(), equalTo(operations.size()));
             Translog.Operation operation;
             while ((operation = snapshot.next()) != null) {