瀏覽代碼

Introduce retention lease persistence (#37375)

This commit introduces the persistence of retention leases by persisting
them in index commits and recovering them when recovering a shard from
store.
Jason Tedor 6 年之前
父節點
當前提交
03be4dbaca

+ 3 - 1
docs/reference/indices/flush.asciidoc

@@ -102,7 +102,8 @@ which returns something similar to:
                      "max_seq_no" : "-1",
                      "sync_id" : "AVvFY-071siAOuFGEO9P", <1>
                      "max_unsafe_auto_id_timestamp" : "-1",
-                     "min_retained_seq_no": "0"
+                     "min_retained_seq_no" : "0",
+                     "retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"
                    },
                    "num_docs" : 0
                  }
@@ -117,6 +118,7 @@ which returns something similar to:
 // TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
 // TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
 // TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
+// TESTRESPONSE[s/"retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/]
 <1> the `sync id` marker
 
 [float]

+ 1 - 0
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -113,6 +113,7 @@ public abstract class Engine implements Closeable {
     public static final String SYNC_COMMIT_ID = "sync_id";
     public static final String HISTORY_UUID_KEY = "history_uuid";
     public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
+    public static final String RETENTION_LEASES = "retention_leases";
     public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
 
     protected final ShardId shardId;

+ 9 - 1
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -51,6 +51,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lucene.LoggerInfoStream;
 import org.elasticsearch.common.lucene.Lucene;
@@ -74,6 +75,7 @@ import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.merge.MergeStats;
 import org.elasticsearch.index.merge.OnGoingMerge;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
+import org.elasticsearch.index.seqno.RetentionLease;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
@@ -2336,7 +2338,13 @@ public class InternalEngine extends Engine {
                 commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
                 commitData.put(HISTORY_UUID_KEY, historyUUID);
                 if (softDeleteEnabled) {
-                    commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
+                    /*
+                     * We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
+                     * retained sequence number, and the retention leases.
+                     */
+                    final Tuple<Long, Collection<RetentionLease>> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
+                    commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
+                    commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2()));
                 }
                 logger.trace("committing writer with commit data [{}]", commitData);
                 return commitData.entrySet().iterator();

+ 11 - 4
server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java

@@ -21,6 +21,7 @@ package org.elasticsearch.index.engine;
 
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.search.Query;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.seqno.RetentionLease;
@@ -45,6 +46,7 @@ final class SoftDeletesPolicy {
     private long retentionOperations;
     // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
     private long minRetainedSeqNo;
+    private Collection<RetentionLease> retentionLeases;
     // provides the retention leases used to calculate the minimum sequence number to retain
     private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
 
@@ -57,6 +59,7 @@ final class SoftDeletesPolicy {
         this.retentionOperations = retentionOperations;
         this.minRetainedSeqNo = minRetainedSeqNo;
         this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
+        retentionLeases = retentionLeasesSupplier.get();
         this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
         this.retentionLockCount = 0;
     }
@@ -106,7 +109,11 @@ final class SoftDeletesPolicy {
      * Operations whose seq# is least this value should exist in the Lucene index.
      */
     synchronized long getMinRetainedSeqNo() {
-        // Do not advance if the retention lock is held
+        return getRetentionPolicy().v1();
+    }
+
+    public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
+        // do not advance if the retention lock is held
         if (retentionLockCount == 0) {
             /*
              * This policy retains operations for two purposes: peer-recovery and querying changes history.
@@ -119,8 +126,8 @@ final class SoftDeletesPolicy {
              */
 
             // calculate the minimum sequence number to retain based on retention leases
-            final long minimumRetainingSequenceNumber = retentionLeasesSupplier
-                    .get()
+            retentionLeases = retentionLeasesSupplier.get();
+            final long minimumRetainingSequenceNumber = retentionLeases
                     .stream()
                     .mapToLong(RetentionLease::retainingSequenceNumber)
                     .min()
@@ -139,7 +146,7 @@ final class SoftDeletesPolicy {
              */
             minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
         }
-        return minRetainedSeqNo;
+        return Tuple.tuple(minRetainedSeqNo, retentionLeases);
     }
 
     /**

+ 11 - 0
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

@@ -185,6 +185,17 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
         retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source));
     }
 
+    /**
+     * Updates retention leases on a replica.
+     *
+     * @param retentionLeases the retention leases
+     */
+    public synchronized void updateRetentionLeasesOnReplica(final Collection<RetentionLease> retentionLeases) {
+        assert primaryMode == false;
+        this.retentionLeases.clear();
+        this.retentionLeases.putAll(retentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())));
+    }
+
     public static class CheckpointState implements Writeable {
 
         /**

+ 107 - 0
server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java

@@ -19,6 +19,13 @@
 
 package org.elasticsearch.index.seqno;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
 /**
  * A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
  * that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could
@@ -81,18 +88,118 @@ public final class RetentionLease {
      * @param source                  the source of the retention lease
      */
     public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) {
+        Objects.requireNonNull(id);
+        if (id.isEmpty()) {
+            throw new IllegalArgumentException("retention lease ID can not be empty");
+        }
+        if (id.contains(":") || id.contains(";") || id.contains(",")) {
+            // retention lease IDs can not contain these characters because they are used in encoding retention leases
+            throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]");
+        }
         if (retainingSequenceNumber < SequenceNumbers.UNASSIGNED_SEQ_NO) {
             throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
         }
         if (timestamp < 0) {
             throw new IllegalArgumentException("retention lease timestamp [" + timestamp + "] out of range");
         }
+        Objects.requireNonNull(source);
+        if (source.isEmpty()) {
+            throw new IllegalArgumentException("retention lease source can not be empty");
+        }
+        if (source.contains(":") || source.contains(";") || source.contains(",")) {
+            // retention lease sources can not contain these characters because they are used in encoding retention leases
+            throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]");
+        }
         this.id = id;
         this.retainingSequenceNumber = retainingSequenceNumber;
         this.timestamp = timestamp;
         this.source = source;
     }
 
+    /**
+     * Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is
+     * encoded in the format <code>id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}</code>.
+     *
+     * @param retentionLease the retention lease
+     * @return the encoding of the retention lease
+     */
+    static String encodeRetentionLease(final RetentionLease retentionLease) {
+        Objects.requireNonNull(retentionLease);
+        return String.format(
+                Locale.ROOT,
+                "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
+                retentionLease.id(),
+                retentionLease.retainingSequenceNumber(),
+                retentionLease.timestamp(),
+                retentionLease.source());
+    }
+
+    /**
+     * Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The
+     * encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}.
+     *
+     * @param retentionLeases the retention leases
+     * @return the encoding of the retention leases
+     */
+    public static String encodeRetentionLeases(final Collection<RetentionLease> retentionLeases) {
+        Objects.requireNonNull(retentionLeases);
+        return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(","));
+    }
+
+    /**
+     * Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
+     *
+     * @param encodedRetentionLease an encoded retention lease
+     * @return the decoded retention lease
+     */
+    static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
+        Objects.requireNonNull(encodedRetentionLease);
+        final String[] fields = encodedRetentionLease.split(";");
+        assert fields.length == 4 : Arrays.toString(fields);
+        assert fields[0].matches("id:[^:;,]+") : fields[0];
+        final String id = fields[0].substring("id:".length());
+        assert fields[1].matches("retaining_seq_no:\\d+") : fields[1];
+        final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length()));
+        assert fields[2].matches("timestamp:\\d+") : fields[2];
+        final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length()));
+        assert fields[3].matches("source:[^:;,]+") : fields[3];
+        final String source = fields[3].substring("source:".length());
+        return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
+    }
+
+    /**
+     * Decodes retention leases encoded by {@link #encodeRetentionLeases(Collection)}.
+     *
+     * @param encodedRetentionLeases an encoded collection of retention leases
+     * @return the decoded retention leases
+     */
+    public static Collection<RetentionLease> decodeRetentionLeases(final String encodedRetentionLeases) {
+        Objects.requireNonNull(encodedRetentionLeases);
+        if (encodedRetentionLeases.isEmpty()) {
+            return Collections.emptyList();
+        }
+        assert Arrays.stream(encodedRetentionLeases.split(","))
+                .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
+                : encodedRetentionLeases;
+        return Arrays.stream(encodedRetentionLeases.split(",")).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final RetentionLease that = (RetentionLease) o;
+        return Objects.equals(id, that.id) &&
+                retainingSequenceNumber == that.retainingSequenceNumber &&
+                timestamp == that.timestamp &&
+                Objects.equals(source, that.source);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, retainingSequenceNumber, timestamp, source);
+    }
+
     @Override
     public String toString() {
         return "RetentionLease{" +

+ 11 - 0
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -106,6 +106,7 @@ import org.elasticsearch.index.refresh.RefreshStats;
 import org.elasticsearch.index.search.stats.SearchStats;
 import org.elasticsearch.index.search.stats.ShardSearchStats;
 import org.elasticsearch.index.seqno.ReplicationTracker;
+import org.elasticsearch.index.seqno.RetentionLease;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
@@ -140,6 +141,7 @@ import java.io.UncheckedIOException;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -1416,6 +1418,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
         final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
         replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
+        replicationTracker.updateRetentionLeasesOnReplica(getRetentionLeases(store.readLastCommittedSegmentsInfo()));
         trimUnsafeCommits();
         synchronized (mutex) {
             verifyNotClosed();
@@ -1435,6 +1438,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
     }
 
+    static Collection<RetentionLease> getRetentionLeases(final SegmentInfos segmentInfos) {
+        final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES);
+        if (committedRetentionLeases == null) {
+            return Collections.emptyList();
+        }
+        return RetentionLease.decodeRetentionLeases(committedRetentionLeases);
+    }
+
     private void trimUnsafeCommits() throws IOException {
         assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running";
         final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);

+ 25 - 2
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -116,6 +116,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.seqno.ReplicationTracker;
+import org.elasticsearch.index.seqno.RetentionLease;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
@@ -140,6 +141,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -5241,13 +5243,14 @@ public class InternalEngineTests extends EngineTestCase {
         final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
         final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
         final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        final AtomicReference<Collection<RetentionLease>> leasesHolder = new AtomicReference<>(Collections.emptyList());
         final List<Engine.Operation> operations = generateSingleDocHistory(true,
             randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "2");
         Randomness.shuffle(operations);
         Set<Long> existingSeqNos = new HashSet<>();
         store = createStore();
-        engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null,
-            globalCheckpoint::get));
+        engine = createEngine(
+                config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, leasesHolder::get));
         assertThat(engine.getMinRetainedSeqNo(), equalTo(0L));
         long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo();
         for (Engine.Operation op : operations) {
@@ -5261,6 +5264,18 @@ public class InternalEngineTests extends EngineTestCase {
             if (randomBoolean()) {
                 globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint()));
             }
+            if (randomBoolean()) {
+                final int length = randomIntBetween(0, 8);
+                final List<RetentionLease> leases = new ArrayList<>(length);
+                for (int i = 0; i < length; i++) {
+                    final String id = randomAlphaOfLength(8);
+                    final long retainingSequenceNumber = randomLongBetween(0L, Math.max(0L, globalCheckpoint.get()));
+                    final long timestamp = randomLongBetween(0L, Long.MAX_VALUE);
+                    final String source = randomAlphaOfLength(8);
+                    leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source));
+                }
+                leasesHolder.set(leases);
+            }
             if (rarely()) {
                 settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
                 indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
@@ -5273,6 +5288,14 @@ public class InternalEngineTests extends EngineTestCase {
                 engine.flush(true, true);
                 assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)),
                     equalTo(engine.getMinRetainedSeqNo()));
+                final Collection<RetentionLease> leases = leasesHolder.get();
+                if (leases.isEmpty()) {
+                    assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo(""));
+                } else {
+                    assertThat(
+                            engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
+                            equalTo(RetentionLease.encodeRetentionLeases(leases)));
+                }
             }
             if (rarely()) {
                 engine.forceMerge(randomBoolean());

+ 67 - 3
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java

@@ -21,12 +21,34 @@ package org.elasticsearch.index.seqno;
 
 import org.elasticsearch.test.ESTestCase;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasToString;
 
 public class RetentionLeaseTests extends ESTestCase {
 
+    public void testInvalidId() {
+        final String id = "id" + randomFrom(":", ";", ",");
+        final IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> new RetentionLease(id, randomNonNegativeLong(), randomNonNegativeLong(), "source"));
+        assertThat(e, hasToString(containsString("retention lease ID can not contain any of [:;,] but was [" + id + "]")));
+    }
+
+    public void testEmptyId() {
+        final IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> new RetentionLease("", randomNonNegativeLong(), randomNonNegativeLong(), "source"));
+        assertThat(e, hasToString(containsString("retention lease ID can not be empty")));
+    }
+
     public void testRetainingSequenceNumberOutOfRange() {
         final long retainingSequenceNumber = randomLongBetween(Long.MIN_VALUE, UNASSIGNED_SEQ_NO - 1);
         final IllegalArgumentException e = expectThrows(
@@ -42,9 +64,51 @@ public class RetentionLeaseTests extends ESTestCase {
         final IllegalArgumentException e = expectThrows(
                 IllegalArgumentException.class,
                 () -> new RetentionLease("id", randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE), timestamp, "source"));
-        assertThat(
-                e,
-                hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range")));
+        assertThat(e, hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range")));
+    }
+
+    public void testInvalidSource() {
+        final String source = "source" + randomFrom(":", ";", ",");
+        final IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), source));
+        assertThat(e, hasToString(containsString("retention lease source can not contain any of [:;,] but was [" + source + "]")));
+    }
+
+    public void testEmptySource() {
+        final IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), ""));
+        assertThat(e, hasToString(containsString("retention lease source can not be empty")));
+    }
+
+    public void testRetentionLeaseEncoding() {
+        final String id = randomAlphaOfLength(8);
+        final long retainingSequenceNumber = randomNonNegativeLong();
+        final long timestamp = randomNonNegativeLong();
+        final String source = randomAlphaOfLength(8);
+        final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
+        assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease));
+    }
+
+    public void testRetentionLeasesEncoding() {
+        final int length = randomIntBetween(0, 8);
+        final List<RetentionLease> retentionLeases = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            final String id = randomAlphaOfLength(8);
+            final long retainingSequenceNumber = randomNonNegativeLong();
+            final long timestamp = randomNonNegativeLong();
+            final String source = randomAlphaOfLength(8);
+            final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
+            retentionLeases.add(retentionLease);
+        }
+        final Collection<RetentionLease> decodedRetentionLeases =
+                RetentionLease.decodeRetentionLeases(RetentionLease.encodeRetentionLeases(retentionLeases));
+        if (length == 0) {
+            assertThat(decodedRetentionLeases, empty());
+        } else {
+            assertThat(decodedRetentionLeases, contains(retentionLeases.toArray(new RetentionLease[0])));
+        }
     }
 
 }

+ 53 - 0
server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java

@@ -19,9 +19,14 @@
 
 package org.elasticsearch.index.shard;
 
+import org.apache.lucene.index.SegmentInfos;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.ShardRoutingHelper;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.InternalEngineFactory;
 import org.elasticsearch.index.seqno.RetentionLease;
 import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -33,9 +38,11 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.LongSupplier;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
@@ -127,6 +134,52 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
         }
     }
 
+    public void testCommit() throws IOException {
+        final Settings settings = Settings.builder()
+                .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
+                .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), Long.MAX_VALUE, TimeUnit.NANOSECONDS)
+                .build();
+        final IndexShard indexShard = newStartedShard(
+                true,
+                settings,
+                new InternalEngineFactory());
+        try {
+            final int length = randomIntBetween(0, 8);
+            final long[] minimumRetainingSequenceNumbers = new long[length];
+            for (int i = 0; i < length; i++) {
+                minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
+                currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(randomNonNegativeLong()));
+                indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
+            }
+
+            currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE));
+
+            // force a commit
+            indexShard.flush(new FlushRequest().force(true));
+
+            // the committed retention leases should equal our current retention leases
+            final SegmentInfos segmentCommitInfos = indexShard.store().readLastCommittedSegmentsInfo();
+            assertTrue(segmentCommitInfos.getUserData().containsKey(Engine.RETENTION_LEASES));
+            final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
+            assertThat(IndexShard.getRetentionLeases(segmentCommitInfos), contains(retentionLeases.toArray(new RetentionLease[0])));
+
+            // when we recover, we should recover the retention leases
+            final IndexShard recoveredShard = reinitShard(
+                    indexShard,
+                    ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), RecoverySource.ExistingStoreRecoverySource.INSTANCE));
+            try {
+                recoverShardFromStore(recoveredShard);
+                assertThat(
+                        recoveredShard.getEngine().config().retentionLeasesSupplier().get(),
+                        contains(retentionLeases.toArray(new RetentionLease[0])));
+            } finally {
+                closeShards(recoveredShard);
+            }
+        } finally {
+            closeShards(indexShard);
+        }
+    }
+
     private void assertRetentionLeases(
             final IndexShard indexShard,
             final int size,

+ 88 - 20
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -578,44 +578,112 @@ public abstract class EngineTestCase extends ESTestCase {
 
     public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
                                ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) {
-        return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier,
-            new NoneCircuitBreakerService());
+        return config(
+                indexSettings,
+                store,
+                translogPath,
+                mergePolicy,
+                refreshListener,
+                indexSort,
+                globalCheckpointSupplier,
+                globalCheckpointSupplier == null ? null : Collections::emptyList);
+    }
+
+    public EngineConfig config(
+            final IndexSettings indexSettings,
+            final Store store,
+            final Path translogPath,
+            final MergePolicy mergePolicy,
+            final ReferenceManager.RefreshListener refreshListener,
+            final Sort indexSort,
+            final LongSupplier globalCheckpointSupplier,
+            final Supplier<Collection<RetentionLease>> retentionLeasesSupplier) {
+        return config(
+                indexSettings,
+                store,
+                translogPath,
+                mergePolicy,
+                refreshListener,
+                null,
+                indexSort,
+                globalCheckpointSupplier,
+                retentionLeasesSupplier,
+                new NoneCircuitBreakerService());
     }
 
     public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
                                ReferenceManager.RefreshListener externalRefreshListener,
                                ReferenceManager.RefreshListener internalRefreshListener,
-                               Sort indexSort, @Nullable final LongSupplier maybeGlobalCheckpointSupplier,
+                               Sort indexSort, @Nullable LongSupplier maybeGlobalCheckpointSupplier,
                                CircuitBreakerService breakerService) {
-            IndexWriterConfig iwc = newIndexWriterConfig();
-        TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
-        Engine.EventListener listener = new Engine.EventListener() {
-            @Override
-            public void onFailedEngine(String reason, @Nullable Exception e) {
-                // we don't need to notify anybody in this test
-            }
-        };
+        return config(
+                indexSettings,
+                store,
+                translogPath,
+                mergePolicy,
+                externalRefreshListener,
+                internalRefreshListener,
+                indexSort,
+                maybeGlobalCheckpointSupplier,
+                maybeGlobalCheckpointSupplier == null ? null : Collections::emptyList,
+                breakerService);
+    }
+
+    public EngineConfig config(
+            final IndexSettings indexSettings,
+            final Store store,
+            final Path translogPath,
+            final MergePolicy mergePolicy,
+            final ReferenceManager.RefreshListener externalRefreshListener,
+            final ReferenceManager.RefreshListener internalRefreshListener,
+            final Sort indexSort,
+            final @Nullable LongSupplier maybeGlobalCheckpointSupplier,
+            final @Nullable Supplier<Collection<RetentionLease>> maybeRetentionLeasesSupplier,
+            final CircuitBreakerService breakerService) {
+        final IndexWriterConfig iwc = newIndexWriterConfig();
+        final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
+        final Engine.EventListener listener = new Engine.EventListener() {}; // we don't need to notify anybody in this test
         final List<ReferenceManager.RefreshListener> extRefreshListenerList =
-            externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener);
+                externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener);
         final List<ReferenceManager.RefreshListener> intRefreshListenerList =
-            internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener);
+                internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener);
         final LongSupplier globalCheckpointSupplier;
         final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
         if (maybeGlobalCheckpointSupplier == null) {
+            assert maybeRetentionLeasesSupplier == null;
             final ReplicationTracker replicationTracker = new ReplicationTracker(
                     shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L);
             globalCheckpointSupplier = replicationTracker;
             retentionLeasesSupplier = replicationTracker::getRetentionLeases;
         } else {
+            assert maybeRetentionLeasesSupplier != null;
             globalCheckpointSupplier = maybeGlobalCheckpointSupplier;
-            retentionLeasesSupplier = Collections::emptyList;
+            retentionLeasesSupplier = maybeRetentionLeasesSupplier;
         }
-        EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store,
-                mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
-                IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
-                TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort,
-                breakerService, globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm::get, tombstoneDocSupplier());
-        return config;
+        return new EngineConfig(
+                shardId,
+                allocationId.getId(),
+                threadPool,
+                indexSettings,
+                null,
+                store,
+                mergePolicy,
+                iwc.getAnalyzer(),
+                iwc.getSimilarity(),
+                new CodecService(null, logger),
+                listener,
+                IndexSearcher.getDefaultQueryCache(),
+                IndexSearcher.getDefaultQueryCachingPolicy(),
+                translogConfig,
+                TimeValue.timeValueMinutes(5),
+                extRefreshListenerList,
+                intRefreshListenerList,
+                indexSort,
+                breakerService,
+                globalCheckpointSupplier,
+                retentionLeasesSupplier,
+                primaryTerm::get,
+                tombstoneDocSupplier());
     }
 
     protected static final BytesReference B_1 = new BytesArray(new byte[]{1});