Explorar o código

Introduce retention leases versioning (#37951)

Because concurrent sync requests from a primary to its replicas could be
in flight, it can be the case that an older retention leases collection
arrives and is processed on the replica after a newer retention leases
collection has arrived and been processed. Without a defense, in this
case the replica would overwrite the newer retention leases with the
older retention leases. This commit addresses this issue by introducing
a versioning scheme to retention leases. This versioning scheme is used
to resolve out-of-order processing on the replica. We persist this
version into Lucene and restore it on recovery. The encoding of
retention leases is starting to get a little ugly. We can consider
addressing this in a follow-up.
Jason Tedor %!s(int64=6) %!d(string=hai) anos
pai
achega
f181e17038
Modificáronse 27 ficheiros con 729 adicións e 277 borrados
  1. 1 1
      build.gradle
  2. 2 2
      docs/reference/indices/flush.asciidoc
  3. 4 5
      server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
  4. 3 3
      server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  5. 6 5
      server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java
  6. 39 31
      server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
  7. 4 48
      server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java
  8. 17 16
      server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java
  9. 6 7
      server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java
  10. 1 3
      server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java
  11. 253 0
      server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java
  12. 7 7
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  13. 6 6
      server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
  14. 44 15
      server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  15. 11 8
      server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java
  16. 119 25
      server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java
  17. 1 1
      server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java
  18. 1 1
      server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java
  19. 3 1
      server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java
  20. 5 11
      server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java
  21. 13 12
      server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java
  22. 0 25
      server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java
  23. 95 0
      server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java
  24. 55 30
      server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java
  25. 25 6
      server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
  26. 6 7
      test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
  27. 2 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

+ 1 - 1
build.gradle

@@ -160,7 +160,7 @@ task verifyVersions {
  * after the backport of the backcompat code is complete.
  */
 final boolean bwc_tests_enabled = false
-final String bwc_tests_disabled_issue = "backport of#37977, #37857 and #37872" /* place a PR link here when committing bwc changes */
+final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/37951" /* place a PR link here when committing bwc changes */
 if (bwc_tests_enabled == false) {
   if (bwc_tests_disabled_issue.isEmpty()) {
     throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

+ 2 - 2
docs/reference/indices/flush.asciidoc

@@ -104,7 +104,7 @@ which returns something similar to:
                      "sync_id" : "AVvFY-071siAOuFGEO9P", <1>
                      "max_unsafe_auto_id_timestamp" : "-1",
                      "min_retained_seq_no" : "0",
-                     "retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"
+                     "retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"
                    },
                    "num_docs" : 0
                  }
@@ -119,7 +119,7 @@ which returns something similar to:
 // TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
 // TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
 // TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
-// TESTRESPONSE[s/"retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/]
+// TESTRESPONSE[s/"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/]
 <1> the `sync id` marker
 
 [float]

+ 4 - 5
server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -35,7 +35,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.codec.CodecService;
 import org.elasticsearch.index.mapper.ParsedDocument;
-import org.elasticsearch.index.seqno.RetentionLease;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.TranslogConfig;
@@ -43,7 +43,6 @@ import org.elasticsearch.indices.IndexingMemoryController;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.LongSupplier;
@@ -81,7 +80,7 @@ public final class EngineConfig {
     @Nullable
     private final CircuitBreakerService circuitBreakerService;
     private final LongSupplier globalCheckpointSupplier;
-    private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
+    private final Supplier<RetentionLeases> retentionLeasesSupplier;
 
     /**
      * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@@ -89,7 +88,7 @@ public final class EngineConfig {
      *
      * @return a supplier of outstanding retention leases
      */
-    public Supplier<Collection<RetentionLease>> retentionLeasesSupplier() {
+    public Supplier<RetentionLeases> retentionLeasesSupplier() {
         return retentionLeasesSupplier;
     }
 
@@ -141,7 +140,7 @@ public final class EngineConfig {
                         List<ReferenceManager.RefreshListener> externalRefreshListener,
                         List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
                         CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
-                        Supplier<Collection<RetentionLease>> retentionLeasesSupplier,
+                        Supplier<RetentionLeases> retentionLeasesSupplier,
                         LongSupplier primaryTermSupplier,
                         TombstoneDocSupplier tombstoneDocSupplier) {
         this.shardId = shardId;

+ 3 - 3
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -75,7 +75,7 @@ import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.merge.MergeStats;
 import org.elasticsearch.index.merge.OnGoingMerge;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
-import org.elasticsearch.index.seqno.RetentionLease;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
@@ -2348,9 +2348,9 @@ public class InternalEngine extends Engine {
                      * We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
                      * retained sequence number, and the retention leases.
                      */
-                    final Tuple<Long, Collection<RetentionLease>> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
+                    final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
                     commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
-                    commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2()));
+                    commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
                 }
                 logger.trace("committing writer with commit data [{}]", commitData);
                 return commitData.entrySet().iterator();

+ 6 - 5
server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java

@@ -25,10 +25,10 @@ import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.seqno.RetentionLease;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
 
-import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongSupplier;
@@ -47,13 +47,13 @@ final class SoftDeletesPolicy {
     // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
     private long minRetainedSeqNo;
     // provides the retention leases used to calculate the minimum sequence number to retain
-    private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
+    private final Supplier<RetentionLeases> retentionLeasesSupplier;
 
     SoftDeletesPolicy(
             final LongSupplier globalCheckpointSupplier,
             final long minRetainedSeqNo,
             final long retentionOperations,
-            final Supplier<Collection<RetentionLease>> retentionLeasesSupplier) {
+            final Supplier<RetentionLeases> retentionLeasesSupplier) {
         this.globalCheckpointSupplier = globalCheckpointSupplier;
         this.retentionOperations = retentionOperations;
         this.minRetainedSeqNo = minRetainedSeqNo;
@@ -110,12 +110,12 @@ final class SoftDeletesPolicy {
         return getRetentionPolicy().v1();
     }
 
-    public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
+    public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
         /*
          * When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
          * locked for peer recovery.
          */
-        final Collection<RetentionLease> retentionLeases = retentionLeasesSupplier.get();
+        final RetentionLeases retentionLeases = retentionLeasesSupplier.get();
         // do not advance if the retention lock is held
         if (retentionLockCount == 0) {
             /*
@@ -130,6 +130,7 @@ final class SoftDeletesPolicy {
 
             // calculate the minimum sequence number to retain based on retention leases
             final long minimumRetainingSequenceNumber = retentionLeases
+                    .leases()
                     .stream()
                     .mapToLong(RetentionLease::retainingSequenceNumber)
                     .min()

+ 39 - 31
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

@@ -38,11 +38,11 @@ import org.elasticsearch.index.shard.ReplicationGroup;
 import org.elasticsearch.index.shard.ShardId;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalLong;
@@ -54,6 +54,7 @@ import java.util.function.LongSupplier;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
+import java.util.stream.Stream;
 
 /**
  * This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
@@ -157,7 +158,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
      * A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
      * retention lease sync action, to sync retention leases to replicas.
      */
-    private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
+    private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
 
     /**
      * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
@@ -170,12 +171,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
      */
     volatile ReplicationGroup replicationGroup;
 
-    private final Map<String, RetentionLease> retentionLeases = new HashMap<>();
-
-    private Collection<RetentionLease> copyRetentionLeases() {
-        assert Thread.holdsLock(this);
-        return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values()));
-    }
+    /**
+     * The current retention leases.
+     */
+    private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
 
     /**
      * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
@@ -183,27 +182,25 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
      *
      * @return the retention leases
      */
-    public Collection<RetentionLease> getRetentionLeases() {
+    public RetentionLeases getRetentionLeases() {
         final boolean wasPrimaryMode;
-        final Collection<RetentionLease> nonExpiredRetentionLeases;
+        final RetentionLeases nonExpiredRetentionLeases;
         synchronized (this) {
             if (primaryMode) {
                 // the primary calculates the non-expired retention leases and syncs them to replicas
                 final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
                 final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
-                final Collection<RetentionLease> expiredRetentionLeases = retentionLeases
-                        .values()
+                final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
+                        .leases()
                         .stream()
-                        .filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis)
-                        .collect(Collectors.toList());
-                if (expiredRetentionLeases.isEmpty()) {
+                        .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
+                if (partitionByExpiration.get(true) == null) {
                     // early out as no retention leases have expired
-                    return copyRetentionLeases();
-                }
-                // clean up the expired retention leases
-                for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) {
-                    retentionLeases.remove(expiredRetentionLease.id());
+                    return retentionLeases;
                 }
+                final Collection<RetentionLease> nonExpiredLeases =
+                        partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList();
+                retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases);
             }
             /*
              * At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
@@ -211,7 +208,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
              * non-expired retention leases, instead receiving them on syncs from the primary.
              */
             wasPrimaryMode = primaryMode;
-            nonExpiredRetentionLeases = copyRetentionLeases();
+            nonExpiredRetentionLeases = retentionLeases;
         }
         if (wasPrimaryMode) {
             onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
@@ -236,15 +233,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
             final ActionListener<ReplicationResponse> listener) {
         Objects.requireNonNull(listener);
         final RetentionLease retentionLease;
-        final Collection<RetentionLease> currentRetentionLeases;
+        final RetentionLeases currentRetentionLeases;
         synchronized (this) {
             assert primaryMode;
-            if (retentionLeases.containsKey(id)) {
+            if (retentionLeases.contains(id)) {
                 throw new IllegalArgumentException("retention lease with ID [" + id + "] already exists");
             }
             retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
-            retentionLeases.put(id, retentionLease);
-            currentRetentionLeases = copyRetentionLeases();
+            retentionLeases = new RetentionLeases(
+                    operationPrimaryTerm,
+                    retentionLeases.version() + 1,
+                    Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
+            currentRetentionLeases = retentionLeases;
         }
         onSyncRetentionLeases.accept(currentRetentionLeases, listener);
         return retentionLease;
@@ -261,18 +261,25 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
      */
     public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
         assert primaryMode;
-        if (retentionLeases.containsKey(id) == false) {
+        if (retentionLeases.contains(id) == false) {
             throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist");
         }
         final RetentionLease retentionLease =
                 new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
-        final RetentionLease existingRetentionLease = retentionLeases.put(id, retentionLease);
+        final RetentionLease existingRetentionLease = retentionLeases.get(id);
         assert existingRetentionLease != null;
         assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() :
                 "retention lease renewal for [" + id + "]"
                         + " from [" + source + "]"
                         + " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]"
                         + " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]";
+        retentionLeases = new RetentionLeases(
+                operationPrimaryTerm,
+                retentionLeases.version() + 1,
+                Stream.concat(
+                        retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false),
+                        Stream.of(retentionLease))
+                        .collect(Collectors.toList()));
         return retentionLease;
     }
 
@@ -281,10 +288,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
      *
      * @param retentionLeases the retention leases
      */
-    public synchronized void updateRetentionLeasesOnReplica(final Collection<RetentionLease> retentionLeases) {
+    public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases) {
         assert primaryMode == false;
-        this.retentionLeases.clear();
-        this.retentionLeases.putAll(retentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())));
+        if (retentionLeases.supersedes(this.retentionLeases)) {
+            this.retentionLeases = retentionLeases;
+        }
     }
 
     public static class CheckpointState implements Writeable {
@@ -565,7 +573,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
             final long globalCheckpoint,
             final LongConsumer onGlobalCheckpointUpdated,
             final LongSupplier currentTimeMillisSupplier,
-            final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
+            final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
         super(shardId, indexSettings);
         assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
         this.shardAllocationId = allocationId;

+ 4 - 48
server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java

@@ -25,13 +25,8 @@ import org.elasticsearch.common.io.stream.Writeable;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Objects;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
  * A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
@@ -162,22 +157,10 @@ public final class RetentionLease implements Writeable {
         return String.format(
                 Locale.ROOT,
                 "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
-                retentionLease.id(),
-                retentionLease.retainingSequenceNumber(),
-                retentionLease.timestamp(),
-                retentionLease.source());
-    }
-
-    /**
-     * Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The
-     * encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}.
-     *
-     * @param retentionLeases the retention leases
-     * @return the encoding of the retention leases
-     */
-    public static String encodeRetentionLeases(final Collection<RetentionLease> retentionLeases) {
-        Objects.requireNonNull(retentionLeases);
-        return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(","));
+                retentionLease.id,
+                retentionLease.retainingSequenceNumber,
+                retentionLease.timestamp,
+                retentionLease.source);
     }
 
     /**
@@ -201,23 +184,6 @@ public final class RetentionLease implements Writeable {
         return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
     }
 
-    /**
-     * Decodes retention leases encoded by {@link #encodeRetentionLeases(Collection)}.
-     *
-     * @param encodedRetentionLeases an encoded collection of retention leases
-     * @return the decoded retention leases
-     */
-    public static Collection<RetentionLease> decodeRetentionLeases(final String encodedRetentionLeases) {
-        Objects.requireNonNull(encodedRetentionLeases);
-        if (encodedRetentionLeases.isEmpty()) {
-            return Collections.emptyList();
-        }
-        assert Arrays.stream(encodedRetentionLeases.split(","))
-                .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
-                : encodedRetentionLeases;
-        return Arrays.stream(encodedRetentionLeases.split(",")).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList());
-    }
-
     @Override
     public boolean equals(final Object o) {
         if (this == o) return true;
@@ -244,14 +210,4 @@ public final class RetentionLease implements Writeable {
                 '}';
     }
 
-    /**
-     * A utility method to convert a collection of retention leases to a map from retention lease ID to retention lease.
-     *
-     * @param leases the leases
-     * @return the map from retention lease ID to retention lease
-     */
-    static Map<String, RetentionLease> toMap(final Collection<RetentionLease> leases) {
-        return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()));
-    }
-
 }

+ 17 - 16
server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseStats.java

@@ -26,7 +26,6 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Objects;
 
 /**
@@ -34,24 +33,24 @@ import java.util.Objects;
  */
 public final class RetentionLeaseStats implements ToXContentFragment, Writeable {
 
-    private final Collection<RetentionLease> leases;
+    private final RetentionLeases retentionLeases;
 
     /**
-     * The underlying retention leases backing this stats object.
+     * The underlying retention lease collection backing this stats object.
      *
-     * @return the leases
+     * @return the retention lease collection
      */
-    public Collection<RetentionLease> leases() {
-        return leases;
+    public RetentionLeases retentionLeases() {
+        return retentionLeases;
     }
 
     /**
-     * Constructs a new retention lease stats object from the specified leases.
+     * Constructs a new retention lease stats object from the specified retention lease collection.
      *
-     * @param leases the leases
+     * @param retentionLeases the retention lease collection
      */
-    public RetentionLeaseStats(final Collection<RetentionLease> leases) {
-        this.leases = Objects.requireNonNull(leases);
+    public RetentionLeaseStats(final RetentionLeases retentionLeases) {
+        this.retentionLeases = Objects.requireNonNull(retentionLeases);
     }
 
     /**
@@ -62,7 +61,7 @@ public final class RetentionLeaseStats implements ToXContentFragment, Writeable
      * @throws IOException if an I/O exception occurs reading from the stream
      */
     public RetentionLeaseStats(final StreamInput in) throws IOException {
-        leases = in.readList(RetentionLease::new);
+        retentionLeases = new RetentionLeases(in);
     }
 
     /**
@@ -74,7 +73,7 @@ public final class RetentionLeaseStats implements ToXContentFragment, Writeable
      */
     @Override
     public void writeTo(final StreamOutput out) throws IOException {
-        out.writeCollection(leases);
+        retentionLeases.writeTo(out);
     }
 
     /**
@@ -82,16 +81,18 @@ public final class RetentionLeaseStats implements ToXContentFragment, Writeable
      *
      * @param builder the builder
      * @param params  the params
-     * @return the builder that these retention leases were converted to {@link org.elasticsearch.common.xcontent.XContent} into
+     * @return the builder that this retention lease collection was converted to {@link org.elasticsearch.common.xcontent.XContent} into
      * @throws IOException if an I/O exception occurs writing to the builder
      */
     @Override
     public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
         builder.startObject("retention_leases");
         {
+            builder.field("primary_term", retentionLeases.primaryTerm());
+            builder.field("version", retentionLeases.version());
             builder.startArray("leases");
             {
-                for (final RetentionLease retentionLease : leases) {
+                for (final RetentionLease retentionLease : retentionLeases.leases()) {
                     builder.startObject();
                     {
                         builder.field("id", retentionLease.id());
@@ -113,12 +114,12 @@ public final class RetentionLeaseStats implements ToXContentFragment, Writeable
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final RetentionLeaseStats that = (RetentionLeaseStats) o;
-        return Objects.equals(leases, that.leases);
+        return Objects.equals(retentionLeases, that.retentionLeases);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(leases);
+        return Objects.hash(retentionLeases);
     }
 
 }

+ 6 - 7
server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

@@ -47,7 +47,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Objects;
 
 /**
@@ -99,7 +98,7 @@ public class RetentionLeaseSyncAction extends
      */
     public void syncRetentionLeasesForShard(
             final ShardId shardId,
-            final Collection<RetentionLease> retentionLeases,
+            final RetentionLeases retentionLeases,
             final ActionListener<ReplicationResponse> listener) {
         Objects.requireNonNull(shardId);
         Objects.requireNonNull(retentionLeases);
@@ -149,9 +148,9 @@ public class RetentionLeaseSyncAction extends
 
     public static final class Request extends ReplicatedWriteRequest<Request> {
 
-        private Collection<RetentionLease> retentionLeases;
+        private RetentionLeases retentionLeases;
 
-        public Collection<RetentionLease> getRetentionLeases() {
+        public RetentionLeases getRetentionLeases() {
             return retentionLeases;
         }
 
@@ -159,7 +158,7 @@ public class RetentionLeaseSyncAction extends
 
         }
 
-        public Request(final ShardId shardId, final Collection<RetentionLease> retentionLeases) {
+        public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
             super(Objects.requireNonNull(shardId));
             this.retentionLeases = Objects.requireNonNull(retentionLeases);
         }
@@ -167,13 +166,13 @@ public class RetentionLeaseSyncAction extends
         @Override
         public void readFrom(final StreamInput in) throws IOException {
             super.readFrom(in);
-            retentionLeases = in.readList(RetentionLease::new);
+            retentionLeases = new RetentionLeases(in);
         }
 
         @Override
         public void writeTo(final StreamOutput out) throws IOException {
             super.writeTo(Objects.requireNonNull(out));
-            out.writeCollection(retentionLeases);
+            retentionLeases.writeTo(out);
         }
 
         @Override

+ 1 - 3
server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java

@@ -23,8 +23,6 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.index.shard.ShardId;
 
-import java.util.Collection;
-
 /**
  * A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on
  * the primary.
@@ -42,7 +40,7 @@ public interface RetentionLeaseSyncer {
      */
     void syncRetentionLeasesForShard(
             ShardId shardId,
-            Collection<RetentionLease> retentionLeases,
+            RetentionLeases retentionLeases,
             ActionListener<ReplicationResponse> listener);
 
 }

+ 253 - 0
server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java

@@ -0,0 +1,253 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.seqno;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a versioned collection of retention leases. We version the collection of retention leases to ensure that sync requests that
+ * arrive out of order on the replica, using the version to ensure that older sync requests are rejected.
+ */
+public class RetentionLeases implements Writeable {
+
+    private final long primaryTerm;
+
+    /**
+     * The primary term of this retention lease collection.
+     *
+     * @return the primary term
+     */
+    public long primaryTerm() {
+        return primaryTerm;
+    }
+
+    private final long version;
+
+    /**
+     * The version of this retention lease collection. The version is managed on the primary and incremented any time that a retention lease
+     * is added, renewed, or when retention leases expire.
+     *
+     * @return the version of this retention lease collection
+     */
+    public long version() {
+        return version;
+    }
+
+    /**
+     * Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection
+     * supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher
+     *
+     * @param that the retention leases collection to test against
+     * @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false
+     */
+    public boolean supersedes(final RetentionLeases that) {
+        return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version;
+    }
+
+    private final Map<String, RetentionLease> leases;
+
+    /**
+     * The underlying collection of retention leases
+     *
+     * @return the retention leases
+     */
+    public Collection<RetentionLease> leases() {
+        return Collections.unmodifiableCollection(leases.values());
+    }
+
+    /**
+     * Checks if this retention lease collection contains a retention lease with the specified {@link RetentionLease#id()}.
+     *
+     * @param id the retention lease ID
+     * @return true if this retention lease collection contains a retention lease with the specified ID, otherwise false
+     */
+    public boolean contains(final String id) {
+        return leases.containsKey(id);
+    }
+
+    /**
+     * Returns the retention lease with the specified ID, or null if no such retention lease exists.
+     *
+     * @param id the retention lease ID
+     * @return the retention lease, or null if no retention lease with the specified ID exists
+     */
+    public RetentionLease get(final String id) {
+        return leases.get(id);
+    }
+
+    /**
+     * Represents an empty an un-versioned retention lease collection. This is used when no retention lease collection is found in the
+     * commit point
+     */
+    public static RetentionLeases EMPTY = new RetentionLeases(1, 0, Collections.emptyList());
+
+    /**
+     * Constructs a new retention lease collection with the specified version and underlying collection of retention leases.
+     *
+     * @param primaryTerm the primary term under which this retention lease collection was created
+     * @param version the version of this retention lease collection
+     * @param leases  the retention leases
+     */
+    public RetentionLeases(final long primaryTerm, final long version, final Collection<RetentionLease> leases) {
+        if (primaryTerm <= 0) {
+            throw new IllegalArgumentException("primary term must be positive but was [" + primaryTerm + "]");
+        }
+        if (version < 0) {
+            throw new IllegalArgumentException("version must be non-negative but was [" + version + "]");
+        }
+        Objects.requireNonNull(leases);
+        this.primaryTerm = primaryTerm;
+        this.version = version;
+        this.leases = Collections.unmodifiableMap(toMap(leases));
+    }
+
+    /**
+     * Constructs a new retention lease collection from a stream. The retention lease collection should have been written via
+     * {@link #writeTo(StreamOutput)}.
+     *
+     * @param in the stream to construct the retention lease collection from
+     * @throws IOException if an I/O exception occurs reading from the stream
+     */
+    public RetentionLeases(final StreamInput in) throws IOException {
+        primaryTerm = in.readVLong();
+        version = in.readVLong();
+        leases = Collections.unmodifiableMap(toMap(in.readList(RetentionLease::new)));
+    }
+
+    /**
+     * Writes a retention lease collection to a stream in a manner suitable for later reconstruction via
+     * {@link #RetentionLeases(StreamInput)} (StreamInput)}.
+     *
+     * @param out the stream to write the retention lease collection to
+     * @throws IOException if an I/O exception occurs writing to the stream
+     */
+    @Override
+    public void writeTo(final StreamOutput out) throws IOException {
+        out.writeVLong(primaryTerm);
+        out.writeVLong(version);
+        out.writeCollection(leases.values());
+    }
+
+    /**
+     * Encodes a retention lease collection as a string. This encoding can be decoded by
+     * {@link RetentionLeases#decodeRetentionLeases(String)}. The encoding is a comma-separated encoding of each retention lease as encoded
+     * by {@link RetentionLease#encodeRetentionLease(RetentionLease)}, prefixed by the version of the retention lease collection.
+     *
+     * @param retentionLeases the retention lease collection
+     * @return the encoding of the retention lease collection
+     */
+    public static String encodeRetentionLeases(final RetentionLeases retentionLeases) {
+        Objects.requireNonNull(retentionLeases);
+        return String.format(
+                Locale.ROOT,
+                "primary_term:%d;version:%d;%s",
+                retentionLeases.primaryTerm,
+                retentionLeases.version,
+                retentionLeases.leases.values().stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")));
+    }
+
+    /**
+     * Decodes retention leases encoded by {@link #encodeRetentionLeases(RetentionLeases)}.
+     *
+     * @param encodedRetentionLeases an encoded retention lease collection
+     * @return the decoded retention lease collection
+     */
+    public static RetentionLeases decodeRetentionLeases(final String encodedRetentionLeases) {
+        Objects.requireNonNull(encodedRetentionLeases);
+        if (encodedRetentionLeases.isEmpty()) {
+            return EMPTY;
+        }
+        assert encodedRetentionLeases.matches("primary_term:\\d+;version:\\d+;.*") : encodedRetentionLeases;
+        final int firstSemicolon = encodedRetentionLeases.indexOf(";");
+        final long primaryTerm = Long.parseLong(encodedRetentionLeases.substring("primary_term:".length(), firstSemicolon));
+        final int secondSemicolon = encodedRetentionLeases.indexOf(";", firstSemicolon + 1);
+        final long version = Long.parseLong(encodedRetentionLeases.substring(firstSemicolon + 1 + "version:".length(), secondSemicolon));
+        final Collection<RetentionLease> leases;
+        if (secondSemicolon + 1 == encodedRetentionLeases.length()) {
+            leases = Collections.emptyList();
+        } else {
+            assert Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(","))
+                    .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
+                    : encodedRetentionLeases;
+            leases = Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(","))
+                    .map(RetentionLease::decodeRetentionLease)
+                    .collect(Collectors.toList());
+        }
+
+        return new RetentionLeases(primaryTerm, version, leases);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final RetentionLeases that = (RetentionLeases) o;
+        return primaryTerm == that.primaryTerm &&
+                version == that.version &&
+                Objects.equals(leases, that.leases);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(primaryTerm, version, leases);
+    }
+
+    @Override
+    public String toString() {
+        return "RetentionLeases{" +
+                "primaryTerm=" + primaryTerm +
+                ", version=" + version +
+                ", leases=" + leases +
+                '}';
+    }
+
+    /**
+     * A utility method to convert retention leases to a map from retention lease ID to retention lease.
+     *
+     * @param leases the retention leases
+     * @return the map from retention lease ID to retention lease
+     */
+    private static Map<String, RetentionLease> toMap(final Collection<RetentionLease> leases) {
+        return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()));
+    }
+
+    /**
+     * A utility method to convert a retention lease collection to a map from retention lease ID to retention lease.
+     *
+     * @param retentionLeases the retention lease collection
+     * @return the map from retention lease ID to retention lease
+     */
+    static Map<String, RetentionLease> toMap(final RetentionLeases retentionLeases) {
+        return retentionLeases.leases;
+    }
+
+}

+ 7 - 7
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -109,6 +109,7 @@ import org.elasticsearch.index.search.stats.ShardSearchStats;
 import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.seqno.RetentionLease;
 import org.elasticsearch.index.seqno.RetentionLeaseStats;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
@@ -143,7 +144,6 @@ import java.io.UncheckedIOException;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -267,7 +267,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             final List<SearchOperationListener> searchOperationListener,
             final List<IndexingOperationListener> listeners,
             final Runnable globalCheckpointSyncer,
-            final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> retentionLeaseSyncer,
+            final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> retentionLeaseSyncer,
             final CircuitBreakerService circuitBreakerService) throws IOException {
         super(shardRouting.shardId(), indexSettings);
         assert shardRouting.initializing();
@@ -1444,12 +1444,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
     }
 
-    static Collection<RetentionLease> getRetentionLeases(final SegmentInfos segmentInfos) {
+    static RetentionLeases getRetentionLeases(final SegmentInfos segmentInfos) {
         final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES);
         if (committedRetentionLeases == null) {
-            return Collections.emptyList();
+            return RetentionLeases.EMPTY;
         }
-        return RetentionLease.decodeRetentionLeases(committedRetentionLeases);
+        return RetentionLeases.decodeRetentionLeases(committedRetentionLeases);
     }
 
     private void trimUnsafeCommits() throws IOException {
@@ -1892,7 +1892,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      *
      * @return the retention leases
      */
-    public Collection<RetentionLease> getRetentionLeases() {
+    public RetentionLeases getRetentionLeases() {
         verifyNotClosed();
         return replicationTracker.getRetentionLeases();
     }
@@ -1943,7 +1943,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      *
      * @param retentionLeases the retention leases
      */
-    public void updateRetentionLeasesOnReplica(final Collection<RetentionLease> retentionLeases) {
+    public void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases) {
         assert assertReplicationTarget();
         verifyNotClosed();
         replicationTracker.updateRetentionLeasesOnReplica(retentionLeases);

+ 6 - 6
server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

@@ -22,6 +22,7 @@ package org.elasticsearch.index.engine;
 import com.carrotsearch.hppc.LongArrayList;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.store.Directory;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogDeletionPolicy;
@@ -30,7 +31,6 @@ import org.elasticsearch.test.ESTestCase;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -55,7 +55,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         final AtomicLong globalCheckpoint = new AtomicLong();
         final int extraRetainedOps = between(0, 100);
         final SoftDeletesPolicy softDeletesPolicy =
-                new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, extraRetainedOps, Collections::emptyList);
+                new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, extraRetainedOps, () -> RetentionLeases.EMPTY);
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
 
@@ -101,7 +101,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         final AtomicLong globalCheckpoint = new AtomicLong();
         final int extraRetainedOps = between(0, 100);
         final SoftDeletesPolicy softDeletesPolicy =
-                new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList);
+                new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, () -> RetentionLeases.EMPTY);
         final UUID translogUUID = UUID.randomUUID();
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
@@ -182,7 +182,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
 
     public void testLegacyIndex() throws Exception {
         final AtomicLong globalCheckpoint = new AtomicLong();
-        final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
+        final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
         final UUID translogUUID = UUID.randomUUID();
 
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
@@ -217,7 +217,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
 
     public void testDeleteInvalidCommits() throws Exception {
         final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
-        final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
+        final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
 
@@ -251,7 +251,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
 
     public void testCheckUnreferencedCommits() throws Exception {
         final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
-        final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
+        final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY);
         final UUID translogUUID = UUID.randomUUID();
         final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);

+ 44 - 15
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -117,6 +117,7 @@ import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.seqno.RetentionLease;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
@@ -141,7 +142,6 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -3052,12 +3052,29 @@ public class InternalEngineTests extends EngineTestCase {
         TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(),
             BigArrays.NON_RECYCLING_INSTANCE);
 
-        EngineConfig brokenConfig = new EngineConfig(shardId, allocationId.getId(),
-                threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
-                new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
-                IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
-                config.getExternalRefreshListener(), config.getInternalRefreshListener(), null,
-                new NoneCircuitBreakerService(), () -> UNASSIGNED_SEQ_NO, Collections::emptyList, primaryTerm::get,
+        EngineConfig brokenConfig = new EngineConfig(
+                shardId,
+                allocationId.getId(),
+                threadPool,
+                config.getIndexSettings(),
+                null,
+                store,
+                newMergePolicy(),
+                config.getAnalyzer(),
+                config.getSimilarity(),
+                new CodecService(null, logger),
+                config.getEventListener(),
+                IndexSearcher.getDefaultQueryCache(),
+                IndexSearcher.getDefaultQueryCachingPolicy(),
+                translogConfig,
+                TimeValue.timeValueMinutes(5),
+                config.getExternalRefreshListener(),
+                config.getInternalRefreshListener(),
+                null,
+                new NoneCircuitBreakerService(),
+                () -> UNASSIGNED_SEQ_NO,
+                () -> RetentionLeases.EMPTY,
+                primaryTerm::get,
                 tombstoneDocSupplier());
         expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
 
@@ -5287,14 +5304,23 @@ public class InternalEngineTests extends EngineTestCase {
         final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
         final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
         final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
-        final AtomicReference<Collection<RetentionLease>> leasesHolder = new AtomicReference<>(Collections.emptyList());
+        final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
+        final AtomicLong retentionLeasesVersion = new AtomicLong();
+        final AtomicReference<RetentionLeases> retentionLeasesHolder = new AtomicReference<>(RetentionLeases.EMPTY);
         final List<Engine.Operation> operations = generateSingleDocHistory(true,
             randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "2");
         Randomness.shuffle(operations);
         Set<Long> existingSeqNos = new HashSet<>();
         store = createStore();
-        engine = createEngine(
-                config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, leasesHolder::get));
+        engine = createEngine(config(
+                indexSettings,
+                store,
+                createTempDir(),
+                newMergePolicy(),
+                null,
+                null,
+                globalCheckpoint::get,
+                retentionLeasesHolder::get));
         assertThat(engine.getMinRetainedSeqNo(), equalTo(0L));
         long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo();
         for (Engine.Operation op : operations) {
@@ -5309,6 +5335,7 @@ public class InternalEngineTests extends EngineTestCase {
                 globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint()));
             }
             if (randomBoolean()) {
+                retentionLeasesVersion.incrementAndGet();
                 final int length = randomIntBetween(0, 8);
                 final List<RetentionLease> leases = new ArrayList<>(length);
                 for (int i = 0; i < length; i++) {
@@ -5318,7 +5345,7 @@ public class InternalEngineTests extends EngineTestCase {
                     final String source = randomAlphaOfLength(8);
                     leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source));
                 }
-                leasesHolder.set(leases);
+                retentionLeasesHolder.set(new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), leases));
             }
             if (rarely()) {
                 settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
@@ -5332,13 +5359,15 @@ public class InternalEngineTests extends EngineTestCase {
                 engine.flush(true, true);
                 assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)),
                     equalTo(engine.getMinRetainedSeqNo()));
-                final Collection<RetentionLease> leases = leasesHolder.get();
-                if (leases.isEmpty()) {
-                    assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo(""));
+                final RetentionLeases leases = retentionLeasesHolder.get();
+                if (leases.leases().isEmpty()) {
+                    assertThat(
+                            engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
+                            equalTo("primary_term:" + primaryTerm + ";version:" + retentionLeasesVersion.get() + ";"));
                 } else {
                     assertThat(
                             engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
-                            equalTo(RetentionLease.encodeRetentionLeases(leases)));
+                            equalTo(RetentionLeases.encodeRetentionLeases(leases)));
                 }
             }
             if (rarely()) {

+ 11 - 8
server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java

@@ -24,15 +24,14 @@ import org.apache.lucene.search.PointRangeQuery;
 import org.apache.lucene.search.Query;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.index.seqno.RetentionLease;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.test.ESTestCase;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
@@ -54,13 +53,13 @@ public class SoftDeletesPolicyTests extends ESTestCase  {
         for (int i = 0; i < retainingSequenceNumbers.length; i++) {
             retainingSequenceNumbers[i] = new AtomicLong();
         }
-        final Supplier<Collection<RetentionLease>> retentionLeasesSupplier =
+        final Supplier<RetentionLeases> retentionLeasesSupplier =
                 () -> {
-                    final Set<RetentionLease> leases = new HashSet<>(retainingSequenceNumbers.length);
+                    final List<RetentionLease> leases = new ArrayList<>(retainingSequenceNumbers.length);
                     for (int i = 0; i < retainingSequenceNumbers.length; i++) {
                         leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), 0L, "test"));
                     }
-                    return leases;
+                    return new RetentionLeases(1, 1, leases);
                 };
         long safeCommitCheckpoint = globalCheckpoint.get();
         SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 10000), retainedOps, retentionLeasesSupplier);
@@ -126,16 +125,20 @@ public class SoftDeletesPolicyTests extends ESTestCase  {
         for (int i = 0; i < numLeases; i++) {
             leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test"));
         }
-        final Supplier<Collection<RetentionLease>> leasesSupplier = () -> Collections.unmodifiableCollection(new ArrayList<>(leases));
+        final Supplier<RetentionLeases> leasesSupplier =
+                () -> new RetentionLeases(
+                        randomNonNegativeLong(),
+                        randomNonNegativeLong(),
+                        Collections.unmodifiableCollection(new ArrayList<>(leases)));
         final SoftDeletesPolicy policy =
                 new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier);
         if (randomBoolean()) {
             policy.acquireRetentionLock();
         }
         if (numLeases == 0) {
-            assertThat(policy.getRetentionPolicy().v2(), empty());
+            assertThat(policy.getRetentionPolicy().v2().leases(), empty());
         } else {
-            assertThat(policy.getRetentionPolicy().v2(), contains(leases.toArray(new RetentionLease[0])));
+            assertThat(policy.getRetentionPolicy().v2().leases(), contains(leases.toArray(new RetentionLease[0])));
         }
     }
 }

+ 119 - 25
server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

@@ -28,6 +28,7 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.IndexSettingsModule;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -40,6 +41,8 @@ import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
@@ -49,11 +52,12 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
 
     public void testAddOrRenewRetentionLease() {
         final AllocationId allocationId = AllocationId.newInitializing();
+        long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
         final ReplicationTracker replicationTracker = new ReplicationTracker(
                 new ShardId("test", "_na", 0),
                 allocationId.getId(),
                 IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
-                randomNonNegativeLong(),
+                primaryTerm,
                 UNASSIGNED_SEQ_NO,
                 value -> {},
                 () -> 0L,
@@ -70,19 +74,27 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
             minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
             replicationTracker.addRetentionLease(
                     Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
-            assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true);
+            if (rarely() && primaryTerm < Long.MAX_VALUE) {
+                primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
+                replicationTracker.setOperationPrimaryTerm(primaryTerm);
+            }
+            assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true);
         }
 
         for (int i = 0; i < length; i++) {
             minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
             replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
-            assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, true);
+            if (rarely() && primaryTerm < Long.MAX_VALUE) {
+                primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
+                replicationTracker.setOperationPrimaryTerm(primaryTerm);
+            }
+            assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true);
         }
     }
 
     public void testAddRetentionLeaseCausesRetentionLeaseSync() {
         final AllocationId allocationId = AllocationId.newInitializing();
-        final Map<String, Long> retentionLeases = new HashMap<>();
+        final Map<String, Long> retainingSequenceNumbers = new HashMap<>();
         final AtomicBoolean invoked = new AtomicBoolean();
         final AtomicReference<ReplicationTracker> reference = new AtomicReference<>();
         final ReplicationTracker replicationTracker = new ReplicationTracker(
@@ -98,8 +110,10 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
                     assertFalse(Thread.holdsLock(reference.get()));
                     invoked.set(true);
                     assertThat(
-                            leases.stream().collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)),
-                            equalTo(retentionLeases));
+                            leases.leases()
+                                    .stream()
+                                    .collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)),
+                            equalTo(retainingSequenceNumbers));
                 });
         reference.set(replicationTracker);
         replicationTracker.updateFromMaster(
@@ -113,7 +127,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
         for (int i = 0; i < length; i++) {
             final String id = randomAlphaOfLength(8);
             final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
-            retentionLeases.put(id, retainingSequenceNumber);
+            retainingSequenceNumbers.put(id, retainingSequenceNumber);
             replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
             // assert that the new retention lease callback was invoked
             assertTrue(invoked.get());
@@ -141,11 +155,12 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
                 .builder()
                 .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
                 .build();
+        final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
         final ReplicationTracker replicationTracker = new ReplicationTracker(
                 new ShardId("test", "_na", 0),
                 allocationId.getId(),
                 IndexSettingsModule.newIndexSettings("test", settings),
-                randomNonNegativeLong(),
+                primaryTerm,
                 UNASSIGNED_SEQ_NO,
                 value -> {},
                 currentTimeMillis::get,
@@ -163,16 +178,20 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
         if (primaryMode) {
             replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
         } else {
-            replicationTracker.updateRetentionLeasesOnReplica(
+            final RetentionLeases retentionLeases = new RetentionLeases(
+                    primaryTerm,
+                    1,
                     Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
+            replicationTracker.updateRetentionLeasesOnReplica(retentionLeases);
         }
 
         {
-            final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
-            assertThat(retentionLeases, hasSize(1));
-            final RetentionLease retentionLease = retentionLeases.iterator().next();
+            final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases();
+            assertThat(retentionLeases.version(), equalTo(1L));
+            assertThat(retentionLeases.leases(), hasSize(1));
+            final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
             assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
-            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
+            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primaryMode);
         }
 
         // renew the lease
@@ -181,25 +200,29 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
         if (primaryMode) {
             replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
         } else {
-            replicationTracker.updateRetentionLeasesOnReplica(
+            final RetentionLeases retentionLeases = new RetentionLeases(
+                    primaryTerm,
+                    2,
                     Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
+            replicationTracker.updateRetentionLeasesOnReplica(retentionLeases);
         }
 
         {
-            final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
-            assertThat(retentionLeases, hasSize(1));
-            final RetentionLease retentionLease = retentionLeases.iterator().next();
+            final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases();
+            assertThat(retentionLeases.version(), equalTo(2L));
+            assertThat(retentionLeases.leases(), hasSize(1));
+            final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
             assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
-            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
+            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primaryMode);
         }
 
         // now force the lease to expire
         currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
         if (primaryMode) {
-            assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, true);
+            assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true);
         } else {
             // leases do not expire on replicas until synced from the primary
-            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, false);
+            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false);
         }
     }
 
@@ -227,7 +250,9 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
                     assertFalse(Thread.holdsLock(reference.get()));
                     invoked.set(true);
                     assertThat(
-                            leases.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
+                            leases.leases()
+                                    .stream()
+                                    .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
                             equalTo(retentionLeases));
                 });
         reference.set(replicationTracker);
@@ -239,11 +264,14 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
         replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
 
         final int length = randomIntBetween(0, 8);
+        long version = 0;
         for (int i = 0; i < length; i++) {
             final String id = randomAlphaOfLength(8);
             final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
             retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
             replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
+            version++;
+            assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version));
             // assert that the new retention lease callback was invoked
             assertTrue(invoked.get());
 
@@ -252,6 +280,8 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
             currentTimeMillis.set(1 + currentTimeMillis.get());
             retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
             replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");
+            version++;
+            assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version));
 
             // reset the invocation marker so that we can assert the callback was invoked if any leases are expired
             assertFalse(invoked.get());
@@ -264,16 +294,76 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
                     .map(Map.Entry::getKey)
                     .collect(Collectors.toList());
             expiredIds.forEach(retentionLeases::remove);
+            if (expiredIds.isEmpty() == false) {
+                version++;
+            }
             currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement);
             // getting the leases has the side effect of calculating which leases are expired and invoking the sync callback
-            final Collection<RetentionLease> current = replicationTracker.getRetentionLeases();
+            final RetentionLeases current = replicationTracker.getRetentionLeases();
+            assertThat(current.version(), equalTo(version));
             // the current leases should equal our tracking map
             assertThat(
-                    current.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
+                    current.leases()
+                            .stream()
+                            .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
                     equalTo(retentionLeases));
             // the callback should only be invoked if there were expired leases
             assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false));
         }
+        assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version));
+    }
+
+    public void testReplicaIgnoresOlderRetentionLeasesVersion() {
+        final AllocationId allocationId = AllocationId.newInitializing();
+        final ReplicationTracker replicationTracker = new ReplicationTracker(
+                new ShardId("test", "_na", 0),
+                allocationId.getId(),
+                IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
+                randomNonNegativeLong(),
+                UNASSIGNED_SEQ_NO,
+                value -> {},
+                () -> 0L,
+                (leases, listener) -> {});
+        replicationTracker.updateFromMaster(
+                randomNonNegativeLong(),
+                Collections.singleton(allocationId.getId()),
+                routingTable(Collections.emptySet(), allocationId),
+                Collections.emptySet());
+        final int length = randomIntBetween(0, 8);
+        final List<RetentionLeases> retentionLeasesCollection = new ArrayList<>(length);
+        long primaryTerm = 1;
+        long version = 0;
+        for (int i = 0; i < length; i++) {
+            final int innerLength = randomIntBetween(0, 8);
+            final Collection<RetentionLease> leases = new ArrayList<>();
+            for (int j = 0; j < innerLength; j++) {
+                leases.add(
+                        new RetentionLease(i + "-" + j, randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(8)));
+                version++;
+            }
+            if (rarely()) {
+                primaryTerm++;
+            }
+            retentionLeasesCollection.add(new RetentionLeases(primaryTerm, version, leases));
+        }
+        final Collection<RetentionLease> expectedLeases;
+        if (length == 0 || retentionLeasesCollection.get(length - 1).leases().isEmpty()) {
+            expectedLeases = Collections.emptyList();
+        } else {
+            expectedLeases = retentionLeasesCollection.get(length - 1).leases();
+        }
+        Collections.shuffle(retentionLeasesCollection, random());
+        for (final RetentionLeases retentionLeases : retentionLeasesCollection) {
+            replicationTracker.updateRetentionLeasesOnReplica(retentionLeases);
+        }
+        assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version));
+        if (expectedLeases.isEmpty()) {
+            assertThat(replicationTracker.getRetentionLeases().leases(), empty());
+        } else {
+            assertThat(
+                    replicationTracker.getRetentionLeases().leases(),
+                    contains(expectedLeases.toArray(new RetentionLease[0])));
+        }
     }
 
     private static Tuple<Long, Long> toTuple(final RetentionLease retentionLease) {
@@ -285,10 +375,14 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
             final int size,
             final long[] minimumRetainingSequenceNumbers,
             final LongSupplier currentTimeMillisSupplier,
+            final long primaryTerm,
+            final long version,
             final boolean primaryMode) {
-        final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
+        final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases();
+        assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
+        assertThat(retentionLeases.version(), equalTo(version));
         final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
-        for (final RetentionLease retentionLease : retentionLeases) {
+        for (final RetentionLease retentionLease : retentionLeases.leases()) {
             idToRetentionLease.put(retentionLease.id(), retentionLease);
         }
 

+ 1 - 1
server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java

@@ -687,7 +687,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
         final LongConsumer onUpdate = updatedGlobalCheckpoint -> {};
         final long primaryTerm = randomNonNegativeLong();
         final long globalCheckpoint = UNASSIGNED_SEQ_NO;
-        final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease =
+        final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onNewRetentionLease =
                 (leases, listener) -> {};
         ReplicationTracker oldPrimary = new ReplicationTracker(
                 shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);

+ 1 - 1
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsTests.java

@@ -61,7 +61,7 @@ public class RetentionLeaseStatsTests extends ESSingleNodeTestCase {
         final IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("index").execute().actionGet();
         assertThat(indicesStats.getShards(), arrayWithSize(1));
         final RetentionLeaseStats retentionLeaseStats = indicesStats.getShards()[0].getRetentionLeaseStats();
-        assertThat(RetentionLease.toMap(retentionLeaseStats.leases()), equalTo(currentRetentionLeases));
+        assertThat(RetentionLeases.toMap(retentionLeaseStats.retentionLeases()), equalTo(currentRetentionLeases));
     }
 
 }

+ 3 - 1
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseStatsWireSerializingTests.java

@@ -30,6 +30,8 @@ public class RetentionLeaseStatsWireSerializingTests extends AbstractWireSeriali
 
     @Override
     protected RetentionLeaseStats createTestInstance() {
+        final long primaryTerm = randomNonNegativeLong();
+        final long version = randomNonNegativeLong();
         final int length = randomIntBetween(0, 8);
         final Collection<RetentionLease> leases;
         if (length == 0) {
@@ -44,7 +46,7 @@ public class RetentionLeaseStatsWireSerializingTests extends AbstractWireSeriali
                 leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source));
             }
         }
-        return new RetentionLeaseStats(leases);
+        return new RetentionLeaseStats(new RetentionLeases(primaryTerm, version, leases));
     }
 
     @Override

+ 5 - 11
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java

@@ -43,7 +43,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.mockito.ArgumentCaptor;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -114,10 +113,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
                 shardStateAction,
                 new ActionFilters(Collections.emptySet()),
                 new IndexNameExpressionResolver());
-        @SuppressWarnings("unchecked") final Collection<RetentionLease> retentionLeases =
-                (Collection<RetentionLease>) mock(Collection.class);
-        final RetentionLeaseSyncAction.Request request =
-                new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
+        final RetentionLeases retentionLeases = mock(RetentionLeases.class);
+        final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
 
         final TransportWriteAction.WritePrimaryResult<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> result =
                 action.shardOperationOnPrimary(request, indexShard);
@@ -155,10 +152,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
                 shardStateAction,
                 new ActionFilters(Collections.emptySet()),
                 new IndexNameExpressionResolver());
-        @SuppressWarnings("unchecked") final Collection<RetentionLease> retentionLeases =
-                (Collection<RetentionLease>) mock(Collection.class);
-        final RetentionLeaseSyncAction.Request request =
-                new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
+        final RetentionLeases retentionLeases = mock(RetentionLeases.class);
+        final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
 
         final TransportWriteAction.WriteReplicaResult result = action.shardOperationOnReplica(request, indexShard);
         // the retention leases on the shard should be updated
@@ -190,8 +185,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
 
         final Logger retentionLeaseSyncActionLogger = mock(Logger.class);
 
-        @SuppressWarnings("unchecked") final Collection<RetentionLease> retentionLeases =
-                (Collection<RetentionLease>) mock(Collection.class);
+        final RetentionLeases retentionLeases = mock(RetentionLeases.class);
         final AtomicBoolean invoked = new AtomicBoolean();
         final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction(
                 Settings.EMPTY,

+ 13 - 12
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java

@@ -33,7 +33,6 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.Closeable;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -78,9 +77,9 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase  {
             retentionLock.close();
 
             // check retention leases have been committed on the primary
-            final Collection<RetentionLease> primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases(
+            final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
                     primary.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES));
-            assertThat(currentRetentionLeases, equalTo(RetentionLease.toMap(primaryCommittedRetentionLeases)));
+            assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases)));
 
             // check current retention leases have been synced to all replicas
             for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
@@ -89,13 +88,13 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase  {
                 final IndexShard replica = internalCluster()
                         .getInstance(IndicesService.class, replicaShardNodeName)
                         .getShardOrNull(new ShardId(resolveIndex("index"), 0));
-                final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLease.toMap(replica.getRetentionLeases());
+                final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
                 assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
 
                 // check retention leases have been committed on the replica
-                final Collection<RetentionLease> replicaCommittedRetentionLeases = RetentionLease.decodeRetentionLeases(
+                final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
                         replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES));
-                assertThat(currentRetentionLeases, equalTo(RetentionLease.toMap(replicaCommittedRetentionLeases)));
+                assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
             }
         }
     }
@@ -138,14 +137,14 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase  {
                 final IndexShard replica = internalCluster()
                         .getInstance(IndicesService.class, replicaShardNodeName)
                         .getShardOrNull(new ShardId(resolveIndex("index"), 0));
-                assertThat(replica.getRetentionLeases(), hasItem(currentRetentionLease));
+                assertThat(replica.getRetentionLeases().leases(), hasItem(currentRetentionLease));
             }
 
             // sleep long enough that *possibly* the current retention lease has expired, and certainly that any previous have
             final long later = System.nanoTime();
             Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now)));
-            final Collection<RetentionLease> currentRetentionLeases = primary.getRetentionLeases();
-            assertThat(currentRetentionLeases, anyOf(empty(), contains(currentRetentionLease)));
+            final RetentionLeases currentRetentionLeases = primary.getRetentionLeases();
+            assertThat(currentRetentionLeases.leases(), anyOf(empty(), contains(currentRetentionLease)));
 
             /*
              * Check that expiration of retention leases has been synced to all replicas. We have to assert busy since syncing happens in
@@ -158,10 +157,12 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase  {
                     final IndexShard replica = internalCluster()
                             .getInstance(IndicesService.class, replicaShardNodeName)
                             .getShardOrNull(new ShardId(resolveIndex("index"), 0));
-                    if (currentRetentionLeases.isEmpty()) {
-                        assertThat(replica.getRetentionLeases(), empty());
+                    if (currentRetentionLeases.leases().isEmpty()) {
+                        assertThat(replica.getRetentionLeases().leases(), empty());
                     } else {
-                        assertThat(replica.getRetentionLeases(), contains(currentRetentionLeases.toArray(new RetentionLease[0])));
+                        assertThat(
+                                replica.getRetentionLeases().leases(),
+                                contains(currentRetentionLeases.leases().toArray(new RetentionLease[0])));
                     }
                 }
             });

+ 0 - 25
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java

@@ -24,13 +24,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasToString;
 
@@ -107,24 +102,4 @@ public class RetentionLeaseTests extends ESTestCase {
         assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease));
     }
 
-    public void testRetentionLeasesEncoding() {
-        final int length = randomIntBetween(0, 8);
-        final List<RetentionLease> retentionLeases = new ArrayList<>(length);
-        for (int i = 0; i < length; i++) {
-            final String id = randomAlphaOfLength(8);
-            final long retainingSequenceNumber = randomNonNegativeLong();
-            final long timestamp = randomNonNegativeLong();
-            final String source = randomAlphaOfLength(8);
-            final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
-            retentionLeases.add(retentionLease);
-        }
-        final Collection<RetentionLease> decodedRetentionLeases =
-                RetentionLease.decodeRetentionLeases(RetentionLease.encodeRetentionLeases(retentionLeases));
-        if (length == 0) {
-            assertThat(decodedRetentionLeases, empty());
-        } else {
-            assertThat(decodedRetentionLeases, contains(retentionLeases.toArray(new RetentionLease[0])));
-        }
-    }
-
 }

+ 95 - 0
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java

@@ -0,0 +1,95 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.seqno;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasToString;
+
+public class RetentionLeasesTests extends ESTestCase {
+
+    public void testPrimaryTermOutOfRange() {
+        final long primaryTerm = randomLongBetween(Long.MIN_VALUE, 0);
+        final IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> new RetentionLeases(primaryTerm, randomNonNegativeLong(), Collections.emptyList()));
+        assertThat(e, hasToString(containsString("primary term must be positive but was [" + primaryTerm + "]")));
+    }
+
+    public void testVersionOutOfRange() {
+        final long version = randomLongBetween(Long.MIN_VALUE, -1);
+        final IllegalArgumentException e = expectThrows(
+                IllegalArgumentException.class,
+                () -> new RetentionLeases(randomLongBetween(1, Long.MAX_VALUE), version, Collections.emptyList()));
+        assertThat(e, hasToString(containsString("version must be non-negative but was [" + version + "]")));
+    }
+
+    public void testRetentionLeasesEncoding() {
+        final long primaryTerm = randomNonNegativeLong();
+        final long version = randomNonNegativeLong();
+        final int length = randomIntBetween(0, 8);
+        final List<RetentionLease> retentionLeases = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            final String id = randomAlphaOfLength(8);
+            final long retainingSequenceNumber = randomNonNegativeLong();
+            final long timestamp = randomNonNegativeLong();
+            final String source = randomAlphaOfLength(8);
+            final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
+            retentionLeases.add(retentionLease);
+        }
+        final RetentionLeases decodedRetentionLeases =
+                RetentionLeases.decodeRetentionLeases(
+                        RetentionLeases.encodeRetentionLeases(new RetentionLeases(primaryTerm, version, retentionLeases)));
+        assertThat(decodedRetentionLeases.version(), equalTo(version));
+        if (length == 0) {
+            assertThat(decodedRetentionLeases.leases(), empty());
+        } else {
+            assertThat(decodedRetentionLeases.leases(), containsInAnyOrder(retentionLeases.toArray(new RetentionLease[0])));
+        }
+    }
+
+    public void testSupersedesByPrimaryTerm() {
+        final long lowerPrimaryTerm = randomLongBetween(1, Long.MAX_VALUE);
+        final RetentionLeases left = new RetentionLeases(lowerPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
+        final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE);
+        final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
+        assertTrue(right.supersedes(left));
+        assertFalse(left.supersedes(right));
+    }
+
+    public void testSupersedesByVersion() {
+        final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
+        final long lowerVersion = randomLongBetween(1, Long.MAX_VALUE);
+        final long higherVersion = randomLongBetween(lowerVersion + 1, Long.MAX_VALUE);
+        final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList());
+        final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList());
+        assertTrue(right.supersedes(left));
+        assertFalse(left.supersedes(right));
+    }
+
+}

+ 55 - 30
server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java

@@ -31,11 +31,11 @@ import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.InternalEngineFactory;
 import org.elasticsearch.index.seqno.RetentionLease;
 import org.elasticsearch.index.seqno.RetentionLeaseStats;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -76,21 +76,22 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
 
     public void testAddOrRenewRetentionLease() throws IOException {
         final IndexShard indexShard = newStartedShard(true);
+        final long primaryTerm = indexShard.getOperationPrimaryTerm();
         try {
             final int length = randomIntBetween(0, 8);
             final long[] minimumRetainingSequenceNumbers = new long[length];
             for (int i = 0; i < length; i++) {
                 minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
                 indexShard.addRetentionLease(
-                        Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {
-                        }));
-                assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true);
+                        Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
+                assertRetentionLeases(
+                        indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true);
             }
 
             for (int i = 0; i < length; i++) {
                 minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
                 indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
-                assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, true);
+                assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true);
             }
         } finally {
             closeShards(indexShard);
@@ -113,6 +114,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
                 .build();
         // current time is mocked through the thread pool
         final IndexShard indexShard = newStartedShard(primary, settings, new InternalEngineFactory());
+        final long primaryTerm = indexShard.getOperationPrimaryTerm();
         try {
             final long[] retainingSequenceNumbers = new long[1];
             retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE);
@@ -120,16 +122,20 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
                 indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {
                 }));
             } else {
-                indexShard.updateRetentionLeasesOnReplica(
+                final RetentionLeases retentionLeases = new RetentionLeases(
+                        primaryTerm,
+                        1,
                         Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
+                indexShard.updateRetentionLeasesOnReplica(retentionLeases);
             }
 
             {
-                final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
-                assertThat(retentionLeases, hasSize(1));
-                final RetentionLease retentionLease = retentionLeases.iterator().next();
+                final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
+                assertThat(retentionLeases.version(), equalTo(1L));
+                assertThat(retentionLeases.leases(), hasSize(1));
+                final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
                 assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
-                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary);
+                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primary);
             }
 
             // renew the lease
@@ -138,25 +144,29 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
             if (primary) {
                 indexShard.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
             } else {
-                indexShard.updateRetentionLeasesOnReplica(
+                final RetentionLeases retentionLeases = new RetentionLeases(
+                        primaryTerm,
+                        2,
                         Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
+                indexShard.updateRetentionLeasesOnReplica(retentionLeases);
             }
 
             {
-                final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
-                assertThat(retentionLeases, hasSize(1));
-                final RetentionLease retentionLease = retentionLeases.iterator().next();
+                final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
+                assertThat(retentionLeases.version(), equalTo(2L));
+                assertThat(retentionLeases.leases(), hasSize(1));
+                final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
                 assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
-                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary);
+                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primary);
             }
 
             // now force the lease to expire
             currentTimeMillis.set(
                     currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
             if (primary) {
-                assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, true);
+                assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true);
             } else {
-                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, false);
+                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false);
             }
         } finally {
             closeShards(indexShard);
@@ -191,11 +201,14 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
             // the committed retention leases should equal our current retention leases
             final SegmentInfos segmentCommitInfos = indexShard.store().readLastCommittedSegmentsInfo();
             assertTrue(segmentCommitInfos.getUserData().containsKey(Engine.RETENTION_LEASES));
-            final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
-            if (retentionLeases.isEmpty()) {
-                assertThat(IndexShard.getRetentionLeases(segmentCommitInfos), empty());
+            final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
+            final RetentionLeases committedRetentionLeases = IndexShard.getRetentionLeases(segmentCommitInfos);
+            if (retentionLeases.leases().isEmpty()) {
+                assertThat(committedRetentionLeases.version(), equalTo(0L));
+                assertThat(committedRetentionLeases.leases(), empty());
             } else {
-                assertThat(IndexShard.getRetentionLeases(segmentCommitInfos), contains(retentionLeases.toArray(new RetentionLease[0])));
+                assertThat(committedRetentionLeases.version(), equalTo((long) length));
+                assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0])));
             }
 
             // when we recover, we should recover the retention leases
@@ -204,12 +217,15 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
                     ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), RecoverySource.ExistingStoreRecoverySource.INSTANCE));
             try {
                 recoverShardFromStore(recoveredShard);
-                if (retentionLeases.isEmpty()) {
-                    assertThat(recoveredShard.getEngine().config().retentionLeasesSupplier().get(), empty());
+                final RetentionLeases recoveredRetentionLeases = recoveredShard.getEngine().config().retentionLeasesSupplier().get();
+                if (retentionLeases.leases().isEmpty()) {
+                    assertThat(recoveredRetentionLeases.version(), equalTo(0L));
+                    assertThat(recoveredRetentionLeases.leases(), empty());
                 } else {
+                    assertThat(recoveredRetentionLeases.version(), equalTo((long) length));
                     assertThat(
-                            recoveredShard.getEngine().config().retentionLeasesSupplier().get(),
-                            contains(retentionLeases.toArray(new RetentionLease[0])));
+                            recoveredRetentionLeases.leases(),
+                            contains(retentionLeases.leases().toArray(new RetentionLease[0])));
                 }
             } finally {
                 closeShards(recoveredShard);
@@ -227,16 +243,17 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
             for (int i = 0; i < length; i++) {
                 minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
                 indexShard.addRetentionLease(
-                        Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {
-                        }));
+                        Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
             }
             final RetentionLeaseStats stats = indexShard.getRetentionLeaseStats();
             assertRetentionLeases(
-                    stats.leases(),
+                    stats.retentionLeases(),
                     indexShard.indexSettings().getRetentionLeaseMillis(),
                     length,
                     minimumRetainingSequenceNumbers,
                     () -> 0L,
+                    length == 0 ? RetentionLeases.EMPTY.primaryTerm() : indexShard.getOperationPrimaryTerm(),
+                    length,
                     true);
         } finally {
             closeShards(indexShard);
@@ -248,6 +265,8 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
             final int size,
             final long[] minimumRetainingSequenceNumbers,
             final LongSupplier currentTimeMillisSupplier,
+            final long primaryTerm,
+            final long version,
             final boolean primary) {
         assertRetentionLeases(
                 indexShard.getEngine().config().retentionLeasesSupplier().get(),
@@ -255,18 +274,24 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
                 size,
                 minimumRetainingSequenceNumbers,
                 currentTimeMillisSupplier,
+                primaryTerm,
+                version,
                 primary);
     }
 
     private void assertRetentionLeases(
-            final Collection<RetentionLease> retentionLeases,
+            final RetentionLeases retentionLeases,
             final long retentionLeaseMillis,
             final int size,
             final long[] minimumRetainingSequenceNumbers,
             final LongSupplier currentTimeMillisSupplier,
+            final long primaryTerm,
+            final long version,
             final boolean primary) {
+        assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
+        assertThat(retentionLeases.version(), equalTo(version));
         final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
-        for (final RetentionLease retentionLease : retentionLeases) {
+        for (final RetentionLease retentionLease : retentionLeases.leases()) {
             idToRetentionLease.put(retentionLease.id(), retentionLease);
         }
 

+ 25 - 6
server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.ParseContext.Document;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.Translog;
@@ -122,12 +123,30 @@ public class RefreshListenersTests extends ESTestCase {
         final String translogUUID =
             Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm);
         store.associateIndexWithNewTranslog(translogUUID);
-        EngineConfig config = new EngineConfig(shardId, allocationId, threadPool,
-            indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
-            eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
-            TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null,
-            new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, Collections::emptyList,
-                () -> primaryTerm, EngineTestCase.tombstoneDocSupplier());
+        EngineConfig config = new EngineConfig(
+                shardId,
+                allocationId,
+                threadPool,
+                indexSettings,
+                null,
+                store,
+                newMergePolicy(),
+                iwc.getAnalyzer(),
+                iwc.getSimilarity(),
+                new CodecService(null, logger),
+                eventListener,
+                IndexSearcher.getDefaultQueryCache(),
+                IndexSearcher.getDefaultQueryCachingPolicy(),
+                translogConfig,
+                TimeValue.timeValueMinutes(5),
+                Collections.singletonList(listeners),
+                Collections.emptyList(),
+                null,
+                new NoneCircuitBreakerService(),
+                () -> SequenceNumbers.NO_OPS_PERFORMED,
+                () -> RetentionLeases.EMPTY,
+                () -> primaryTerm,
+                EngineTestCase.tombstoneDocSupplier());
         engine = new InternalEngine(config);
         engine.initializeMaxSeqNoOfUpdatesOrDeletes();
         engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);

+ 6 - 7
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -84,7 +84,7 @@ import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.mapper.VersionFieldMapper;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.seqno.ReplicationTracker;
-import org.elasticsearch.index.seqno.RetentionLease;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.store.Store;
@@ -105,7 +105,6 @@ import java.nio.charset.Charset;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -586,7 +585,7 @@ public abstract class EngineTestCase extends ESTestCase {
                 refreshListener,
                 indexSort,
                 globalCheckpointSupplier,
-                globalCheckpointSupplier == null ? null : Collections::emptyList);
+                globalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY);
     }
 
     public EngineConfig config(
@@ -597,7 +596,7 @@ public abstract class EngineTestCase extends ESTestCase {
             final ReferenceManager.RefreshListener refreshListener,
             final Sort indexSort,
             final LongSupplier globalCheckpointSupplier,
-            final Supplier<Collection<RetentionLease>> retentionLeasesSupplier) {
+            final Supplier<RetentionLeases> retentionLeasesSupplier) {
         return config(
                 indexSettings,
                 store,
@@ -625,7 +624,7 @@ public abstract class EngineTestCase extends ESTestCase {
                 internalRefreshListener,
                 indexSort,
                 maybeGlobalCheckpointSupplier,
-                maybeGlobalCheckpointSupplier == null ? null : Collections::emptyList,
+                maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY,
                 breakerService);
     }
 
@@ -638,7 +637,7 @@ public abstract class EngineTestCase extends ESTestCase {
             final ReferenceManager.RefreshListener internalRefreshListener,
             final Sort indexSort,
             final @Nullable LongSupplier maybeGlobalCheckpointSupplier,
-            final @Nullable Supplier<Collection<RetentionLease>> maybeRetentionLeasesSupplier,
+            final @Nullable Supplier<RetentionLeases> maybeRetentionLeasesSupplier,
             final CircuitBreakerService breakerService) {
         final IndexWriterConfig iwc = newIndexWriterConfig();
         final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
@@ -648,7 +647,7 @@ public abstract class EngineTestCase extends ESTestCase {
         final List<ReferenceManager.RefreshListener> intRefreshListenerList =
                 internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener);
         final LongSupplier globalCheckpointSupplier;
-        final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
+        final Supplier<RetentionLeases> retentionLeasesSupplier;
         if (maybeGlobalCheckpointSupplier == null) {
             assert maybeRetentionLeasesSupplier == null;
             final ReplicationTracker replicationTracker = new ReplicationTracker(

+ 2 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

@@ -32,6 +32,7 @@ import org.elasticsearch.index.engine.InternalEngine;
 import org.elasticsearch.index.engine.TranslogHandler;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.ParsedDocument;
+import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.store.Store;
@@ -270,7 +271,7 @@ public class FollowingEngineTests extends ESTestCase {
                 null,
                 new NoneCircuitBreakerService(),
                 globalCheckpoint::longValue,
-                Collections::emptyList,
+                () -> RetentionLeases.EMPTY,
                 () -> primaryTerm.get(),
                 EngineTestCase.tombstoneDocSupplier());
     }