Browse Source

Lift retention lease expiration to index shard (#38380)

This commit lifts the control of when retention leases are expired to
index shard. In this case, we move expiration to an explicit action
rather than a side-effect of calling
ReplicationTracker#getRetentionLeases. This explicit action is invoked
on a timer. If any retention leases expire, then we hard sync the
retention leases to the replicas. Otherwise, we proceed with a
background sync.
Jason Tedor 6 years ago
parent
commit
b03d138122

+ 11 - 11
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -121,7 +121,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
     private volatile AsyncRefreshTask refreshTask;
     private volatile AsyncTranslogFSync fsyncTask;
     private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
-    private volatile AsyncRetentionLeaseBackgroundSyncTask retentionLeaseBackgroundSyncTask;
+    private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
 
     // don't convert to Setting<> and register... we only set this in tests and register via a plugin
     private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
@@ -198,7 +198,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         this.refreshTask = new AsyncRefreshTask(this);
         this.trimTranslogTask = new AsyncTrimTranslogTask(this);
         this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
-        this.retentionLeaseBackgroundSyncTask = new AsyncRetentionLeaseBackgroundSyncTask(this);
+        this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
         rescheduleFsyncTask(indexSettings.getTranslogDurability());
     }
 
@@ -289,7 +289,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
                         fsyncTask,
                         trimTranslogTask,
                         globalCheckpointTask,
-                        retentionLeaseBackgroundSyncTask);
+                        retentionLeaseSyncTask);
             }
         }
     }
@@ -788,8 +788,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         sync(is -> is.maybeSyncGlobalCheckpoint("background"), "global checkpoint");
     }
 
-    private void backgroundSyncRetentionLeases() {
-        sync(IndexShard::backgroundSyncRetentionLeases, "retention lease");
+    private void syncRetentionLeases() {
+        sync(IndexShard::syncRetentionLeases, "retention lease");
     }
 
     private void sync(final Consumer<IndexShard> sync, final String source) {
@@ -812,11 +812,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
                                                 && e instanceof IndexShardClosedException == false) {
                                             logger.warn(
                                                     new ParameterizedMessage(
-                                                            "{} failed to execute background {} sync", shard.shardId(), source), e);
+                                                            "{} failed to execute {} sync", shard.shardId(), source), e);
                                         }
                                     },
                                     ThreadPool.Names.SAME,
-                                    "background " + source + " sync");
+                                    source + " sync");
                         } catch (final AlreadyClosedException | IndexShardClosedException e) {
                             // the shard was closed concurrently, continue
                         }
@@ -957,15 +957,15 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         }
     }
 
-    final class AsyncRetentionLeaseBackgroundSyncTask extends BaseAsyncTask {
+    final class AsyncRetentionLeaseSyncTask extends BaseAsyncTask {
 
-        AsyncRetentionLeaseBackgroundSyncTask(final IndexService indexService) {
+        AsyncRetentionLeaseSyncTask(final IndexService indexService) {
             super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
         }
 
         @Override
         protected void runInternal() {
-            indexService.backgroundSyncRetentionLeases();
+            indexService.syncRetentionLeases();
         }
 
         @Override
@@ -975,7 +975,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
 
         @Override
         public String toString() {
-            return "retention_lease_background_sync";
+            return "retention_lease_sync";
         }
 
     }

+ 5 - 0
server/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -339,6 +339,10 @@ public final class IndexSettings {
         return retentionLeaseMillis;
     }
 
+    private void setRetentionLeaseMillis(final TimeValue retentionLease) {
+        this.retentionLeaseMillis = retentionLease.millis();
+    }
+
     private volatile boolean warmerEnabled;
     private volatile int maxResultWindow;
     private volatile int maxInnerResultWindow;
@@ -523,6 +527,7 @@ public final class IndexSettings {
         scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
         scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
         scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
+        scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING, this::setRetentionLeaseMillis);
     }
 
     private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }

+ 38 - 38
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

@@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.AllocationId;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -155,10 +156,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
     private final LongSupplier currentTimeMillisSupplier;
 
     /**
-     * A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
-     * retention lease sync action, to sync retention leases to replicas.
+     * A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
+     * retention leases to replicas.
      */
-    private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
+    private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease;
 
     /**
      * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
@@ -177,43 +178,42 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
     private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
 
     /**
-     * Get all non-expired retention leases tracked on this shard. Note that only the primary shard calculates which leases are expired,
-     * and if any have expired, syncs the retention leases to any replicas.
+     * Get all retention leases tracked on this shard.
      *
      * @return the retention leases
      */
     public RetentionLeases getRetentionLeases() {
-        final boolean wasPrimaryMode;
-        final RetentionLeases nonExpiredRetentionLeases;
-        synchronized (this) {
-            if (primaryMode) {
-                // the primary calculates the non-expired retention leases and syncs them to replicas
-                final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
-                final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
-                final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
-                        .leases()
-                        .stream()
-                        .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
-                if (partitionByExpiration.get(true) == null) {
-                    // early out as no retention leases have expired
-                    return retentionLeases;
-                }
-                final Collection<RetentionLease> nonExpiredLeases =
-                        partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList();
-                retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases);
-            }
-            /*
-             * At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
-             * we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the
-             * non-expired retention leases, instead receiving them on syncs from the primary.
-             */
-            wasPrimaryMode = primaryMode;
-            nonExpiredRetentionLeases = retentionLeases;
+        return getRetentionLeases(false).v2();
+    }
+
+    /**
+     * If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates
+     * expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the
+     * primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the
+     * expire leases parameter is true, this replication tracker must be in primary mode.
+     *
+     * @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases
+     */
+    public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boolean expireLeases) {
+        if (expireLeases == false) {
+            return Tuple.tuple(false, retentionLeases);
         }
-        if (wasPrimaryMode) {
-            onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
+        assert primaryMode;
+        // the primary calculates the non-expired retention leases and syncs them to replicas
+        final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
+        final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
+        final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
+                .leases()
+                .stream()
+                .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
+        if (partitionByExpiration.get(true) == null) {
+            // early out as no retention leases have expired
+            return Tuple.tuple(false, retentionLeases);
         }
-        return nonExpiredRetentionLeases;
+        final Collection<RetentionLease> nonExpiredLeases =
+                partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList();
+        retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases);
+        return Tuple.tuple(true, retentionLeases);
     }
 
     /**
@@ -246,7 +246,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
                     Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
             currentRetentionLeases = retentionLeases;
         }
-        onSyncRetentionLeases.accept(currentRetentionLeases, listener);
+        onAddRetentionLease.accept(currentRetentionLeases, listener);
         return retentionLease;
     }
 
@@ -563,7 +563,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
      * @param indexSettings         the index settings
      * @param operationPrimaryTerm  the current primary term
      * @param globalCheckpoint      the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
-     * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
+     * @param onAddRetentionLease a callback when a new retention lease is created or an existing retention lease expires
      */
     public ReplicationTracker(
             final ShardId shardId,
@@ -573,7 +573,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
             final long globalCheckpoint,
             final LongConsumer onGlobalCheckpointUpdated,
             final LongSupplier currentTimeMillisSupplier,
-            final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
+            final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease) {
         super(shardId, indexSettings);
         assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
         this.shardAllocationId = allocationId;
@@ -585,7 +585,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
         checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
         this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
         this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
-        this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
+        this.onAddRetentionLease = Objects.requireNonNull(onAddRetentionLease);
         this.pendingInSync = new HashSet<>();
         this.routingTable = null;
         this.replicationGroup = null;

+ 22 - 4
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1892,13 +1892,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     /**
-     * Get all non-expired retention leases tracked on this shard.
+     * Get all retention leases tracked on this shard.
      *
      * @return the retention leases
      */
     public RetentionLeases getRetentionLeases() {
+        return getRetentionLeases(false).v2();
+    }
+
+    /**
+     * If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates
+     * expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the
+     * primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the
+     * expire leases parameter is true, this replication tracker must be in primary mode.
+     *
+     * @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases
+     */
+    public Tuple<Boolean, RetentionLeases> getRetentionLeases(final boolean expireLeases) {
+        assert expireLeases == false || assertPrimaryMode();
         verifyNotClosed();
-        return replicationTracker.getRetentionLeases();
+        return replicationTracker.getRetentionLeases(expireLeases);
     }
 
     public RetentionLeaseStats getRetentionLeaseStats() {
@@ -1956,10 +1969,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     /**
      * Syncs the current retention leases to all replicas.
      */
-    public void backgroundSyncRetentionLeases() {
+    public void syncRetentionLeases() {
         assert assertPrimaryMode();
         verifyNotClosed();
-        retentionLeaseSyncer.backgroundSync(shardId, getRetentionLeases());
+        final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
+        if (retentionLeases.v1()) {
+            retentionLeaseSyncer.sync(shardId, retentionLeases.v2(), ActionListener.wrap(() -> {}));
+        } else {
+            retentionLeaseSyncer.backgroundSync(shardId, retentionLeases.v2());
+        }
     }
 
     /**

+ 24 - 110
server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

@@ -37,7 +37,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -46,7 +45,6 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
 public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTestCase {
 
@@ -78,7 +76,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
             minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
             replicationTracker.addRetentionLease(
                     Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
-            assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true);
+            assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false);
         }
 
         for (int i = 0; i < length; i++) {
@@ -88,7 +86,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
             }
             minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
             replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
-            assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true);
+            assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, primaryTerm, 1 + length + i, true, false);
         }
     }
 
@@ -193,7 +191,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
             assertThat(retentionLeases.leases(), hasSize(1));
             final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
             assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
-            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primaryMode);
+            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 1, primaryMode, false);
         }
 
         // renew the lease
@@ -215,108 +213,20 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
             assertThat(retentionLeases.leases(), hasSize(1));
             final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
             assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
-            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primaryMode);
+            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, primaryMode, false);
         }
 
         // now force the lease to expire
         currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
         if (primaryMode) {
-            assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true);
+            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, true, false);
+            assertRetentionLeases(replicationTracker, 0, new long[0], primaryTerm, 3, true, true);
         } else {
             // leases do not expire on replicas until synced from the primary
-            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false);
+            assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, false, false);
         }
     }
 
-    public void testRetentionLeaseExpirationCausesRetentionLeaseSync() {
-        final AllocationId allocationId = AllocationId.newInitializing();
-        final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
-        final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
-        final Settings settings = Settings
-                .builder()
-                .put(
-                        IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(),
-                        TimeValue.timeValueMillis(retentionLeaseMillis))
-                .build();
-        final Map<String, Tuple<Long, Long>> retentionLeases = new HashMap<>();
-        final AtomicBoolean invoked = new AtomicBoolean();
-        final AtomicReference<ReplicationTracker> reference = new AtomicReference<>();
-        final ReplicationTracker replicationTracker = new ReplicationTracker(
-                new ShardId("test", "_na", 0),
-                allocationId.getId(),
-                IndexSettingsModule.newIndexSettings("test", settings),
-                randomNonNegativeLong(),
-                UNASSIGNED_SEQ_NO,
-                value -> {},
-                currentTimeMillis::get,
-                (leases, listener) -> {
-                    // we do not want to hold a lock on the replication tracker in the callback!
-                    assertFalse(Thread.holdsLock(reference.get()));
-                    invoked.set(true);
-                    assertThat(
-                            leases.leases()
-                                    .stream()
-                                    .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
-                            equalTo(retentionLeases));
-                });
-        reference.set(replicationTracker);
-        replicationTracker.updateFromMaster(
-                randomNonNegativeLong(),
-                Collections.singleton(allocationId.getId()),
-                routingTable(Collections.emptySet(), allocationId),
-                Collections.emptySet());
-        replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
-
-        final int length = randomIntBetween(0, 8);
-        long version = 0;
-        for (int i = 0; i < length; i++) {
-            final String id = randomAlphaOfLength(8);
-            final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
-            retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
-            replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
-            version++;
-            assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version));
-            // assert that the new retention lease callback was invoked
-            assertTrue(invoked.get());
-
-            // reset the invocation marker so that we can assert the callback was not invoked when renewing the lease
-            invoked.set(false);
-            currentTimeMillis.set(1 + currentTimeMillis.get());
-            retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
-            replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");
-            version++;
-            assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version));
-
-            // reset the invocation marker so that we can assert the callback was invoked if any leases are expired
-            assertFalse(invoked.get());
-            // randomly expire some leases
-            final long currentTimeMillisIncrement = randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get());
-            // calculate the expired leases and update our tracking map
-            final List<String> expiredIds = retentionLeases.entrySet()
-                    .stream()
-                    .filter(r -> currentTimeMillis.get() + currentTimeMillisIncrement > r.getValue().v2() + retentionLeaseMillis)
-                    .map(Map.Entry::getKey)
-                    .collect(Collectors.toList());
-            expiredIds.forEach(retentionLeases::remove);
-            if (expiredIds.isEmpty() == false) {
-                version++;
-            }
-            currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement);
-            // getting the leases has the side effect of calculating which leases are expired and invoking the sync callback
-            final RetentionLeases current = replicationTracker.getRetentionLeases();
-            assertThat(current.version(), equalTo(version));
-            // the current leases should equal our tracking map
-            assertThat(
-                    current.leases()
-                            .stream()
-                            .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
-                    equalTo(retentionLeases));
-            // the callback should only be invoked if there were expired leases
-            assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false));
-        }
-        assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version));
-    }
-
     public void testReplicaIgnoresOlderRetentionLeasesVersion() {
         final AllocationId allocationId = AllocationId.newInitializing();
         final ReplicationTracker replicationTracker = new ReplicationTracker(
@@ -370,19 +280,29 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
         }
     }
 
-    private static Tuple<Long, Long> toTuple(final RetentionLease retentionLease) {
-        return Tuple.tuple(retentionLease.retainingSequenceNumber(), retentionLease.timestamp());
-    }
-
     private void assertRetentionLeases(
             final ReplicationTracker replicationTracker,
             final int size,
             final long[] minimumRetainingSequenceNumbers,
-            final LongSupplier currentTimeMillisSupplier,
             final long primaryTerm,
             final long version,
-            final boolean primaryMode) {
-        final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases();
+            final boolean primaryMode,
+            final boolean expireLeases) {
+        assertTrue(expireLeases == false || primaryMode);
+        final RetentionLeases retentionLeases;
+        if (expireLeases == false) {
+            if (randomBoolean()) {
+                retentionLeases = replicationTracker.getRetentionLeases();
+            } else {
+                final Tuple<Boolean, RetentionLeases> tuple = replicationTracker.getRetentionLeases(false);
+                assertFalse(tuple.v1());
+                retentionLeases = tuple.v2();
+            }
+        } else {
+            final Tuple<Boolean, RetentionLeases> tuple = replicationTracker.getRetentionLeases(true);
+            assertTrue(tuple.v1());
+            retentionLeases = tuple.v2();
+        }
         assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
         assertThat(retentionLeases.version(), equalTo(version));
         final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
@@ -395,12 +315,6 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
             assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
             final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
             assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
-            if (primaryMode) {
-                // retention leases can be expired on replicas, so we can only assert on primaries here
-                assertThat(
-                        currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
-                        lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
-            }
             assertThat(retentionLease.source(), equalTo("test-" + i));
         }
     }

+ 54 - 18
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java

@@ -20,33 +20,58 @@
 package org.elasticsearch.index.seqno;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItem;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
 public class RetentionLeaseSyncIT extends ESIntegTestCase  {
 
+    public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
+
+        @Override
+        public List<Setting<?>> getSettings() {
+            return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING);
+        }
+
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Stream.concat(
+                super.nodePlugins().stream(),
+                Stream.of(RetentionLeaseBackgroundSyncIT.RetentionLeaseSyncIntervalSettingPlugin.class))
+                .collect(Collectors.toList());
+    }
+
     public void testRetentionLeasesSyncedOnAdd() throws Exception {
         final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
         internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
@@ -99,7 +124,6 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase  {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37963")
     public void testRetentionLeasesSyncOnExpiration() throws Exception {
         final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
         internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
@@ -109,7 +133,7 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase  {
         final Settings settings = Settings.builder()
                 .put("index.number_of_shards", 1)
                 .put("index.number_of_replicas", numberOfReplicas)
-                .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), retentionLeaseTimeToLive)
+                .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
                 .build();
         createIndex("index", settings);
         ensureGreen("index");
@@ -121,6 +145,17 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase  {
         // we will add multiple retention leases, wait for some to expire, and assert a consistent view between the primary and the replicas
         final int length = randomIntBetween(1, 8);
         for (int i = 0; i < length; i++) {
+            // update the index for retention leases to live a long time
+            final AcknowledgedResponse longTtlResponse = client().admin()
+                    .indices()
+                    .prepareUpdateSettings("index")
+                    .setSettings(
+                            Settings.builder()
+                                    .putNull(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey())
+                                    .build())
+                    .get();
+            assertTrue(longTtlResponse.isAcknowledged());
+
             final String id = randomAlphaOfLength(8);
             final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
             final String source = randomAlphaOfLength(8);
@@ -137,19 +172,26 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase  {
                 final IndexShard replica = internalCluster()
                         .getInstance(IndicesService.class, replicaShardNodeName)
                         .getShardOrNull(new ShardId(resolveIndex("index"), 0));
-                assertThat(replica.getRetentionLeases().leases(), hasItem(currentRetentionLease));
+                assertThat(replica.getRetentionLeases().leases(), anyOf(empty(), contains(currentRetentionLease)));
             }
 
-            // sleep long enough that *possibly* the current retention lease has expired, and certainly that any previous have
+            // update the index for retention leases to short a long time, to force expiration
+            final AcknowledgedResponse shortTtlResponse = client().admin()
+                    .indices()
+                    .prepareUpdateSettings("index")
+                    .setSettings(
+                            Settings.builder()
+                                    .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), retentionLeaseTimeToLive)
+                                    .build())
+                    .get();
+            assertTrue(shortTtlResponse.isAcknowledged());
+
+            // sleep long enough that the current retention lease has expired
             final long later = System.nanoTime();
             Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now)));
-            final RetentionLeases currentRetentionLeases = primary.getRetentionLeases();
-            assertThat(currentRetentionLeases.leases(), anyOf(empty(), contains(currentRetentionLease)));
+            assertBusy(() -> assertThat(primary.getRetentionLeases().leases(), empty()));
 
-            /*
-             * Check that expiration of retention leases has been synced to all replicas. We have to assert busy since syncing happens in
-             * the background.
-             */
+            // now that all retention leases are expired should have been synced to all replicas
             assertBusy(() -> {
                 for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
                     final String replicaShardNodeId = replicaShard.currentNodeId();
@@ -157,13 +199,7 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase  {
                     final IndexShard replica = internalCluster()
                             .getInstance(IndicesService.class, replicaShardNodeName)
                             .getShardOrNull(new ShardId(resolveIndex("index"), 0));
-                    if (currentRetentionLeases.leases().isEmpty()) {
-                        assertThat(replica.getRetentionLeases().leases(), empty());
-                    } else {
-                        assertThat(
-                                replica.getRetentionLeases().leases(),
-                                contains(currentRetentionLeases.leases().toArray(new RetentionLease[0])));
-                    }
+                    assertThat(replica.getRetentionLeases().leases(), empty());
                 }
             });
         }

+ 38 - 31
server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRoutingHelper;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexSettings;
@@ -43,14 +44,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.LongSupplier;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -85,13 +84,20 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
                 indexShard.addRetentionLease(
                         Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
                 assertRetentionLeases(
-                        indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true);
+                        indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false);
             }
 
             for (int i = 0; i < length; i++) {
                 minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
                 indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
-                assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true);
+                assertRetentionLeases(
+                        indexShard,
+                        length,
+                        minimumRetainingSequenceNumbers,
+                        primaryTerm,
+                        1 + length + i,
+                        true,
+                        false);
             }
         } finally {
             closeShards(indexShard);
@@ -121,8 +127,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
             final long[] retainingSequenceNumbers = new long[1];
             retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE);
             if (primary) {
-                indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {
-                }));
+                indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
             } else {
                 final RetentionLeases retentionLeases = new RetentionLeases(
                         primaryTerm,
@@ -137,7 +142,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
                 assertThat(retentionLeases.leases(), hasSize(1));
                 final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
                 assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
-                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primary);
+                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 1, primary, false);
             }
 
             // renew the lease
@@ -159,16 +164,17 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
                 assertThat(retentionLeases.leases(), hasSize(1));
                 final RetentionLease retentionLease = retentionLeases.leases().iterator().next();
                 assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
-                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primary);
+                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, primary, false);
             }
 
             // now force the lease to expire
             currentTimeMillis.set(
                     currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
             if (primary) {
-                assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true);
+                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, true, false);
+                assertRetentionLeases(indexShard, 0, new long[0], primaryTerm, 3, true, true);
             } else {
-                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false);
+                assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, false, false);
             }
         } finally {
             closeShards(indexShard);
@@ -191,8 +197,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
                 minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
                 currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(randomNonNegativeLong()));
                 indexShard.addRetentionLease(
-                        Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {
-                        }));
+                        Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
             }
 
             currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE));
@@ -250,13 +255,10 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
             final RetentionLeaseStats stats = indexShard.getRetentionLeaseStats();
             assertRetentionLeases(
                     stats.retentionLeases(),
-                    indexShard.indexSettings().getRetentionLeaseMillis(),
                     length,
                     minimumRetainingSequenceNumbers,
-                    () -> 0L,
                     length == 0 ? RetentionLeases.EMPTY.primaryTerm() : indexShard.getOperationPrimaryTerm(),
-                    length,
-                    true);
+                    length);
         } finally {
             closeShards(indexShard);
         }
@@ -266,30 +268,39 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
             final IndexShard indexShard,
             final int size,
             final long[] minimumRetainingSequenceNumbers,
-            final LongSupplier currentTimeMillisSupplier,
             final long primaryTerm,
             final long version,
-            final boolean primary) {
+            final boolean primary,
+            final boolean expireLeases) {
+        assertTrue(expireLeases == false || primary);
+        final RetentionLeases retentionLeases;
+        if (expireLeases == false) {
+            if (randomBoolean()) {
+                retentionLeases = indexShard.getRetentionLeases();
+            } else {
+                final Tuple<Boolean, RetentionLeases> tuple = indexShard.getRetentionLeases(false);
+                assertFalse(tuple.v1());
+                retentionLeases = tuple.v2();
+            }
+        } else {
+            final Tuple<Boolean, RetentionLeases> tuple = indexShard.getRetentionLeases(true);
+            assertTrue(tuple.v1());
+            retentionLeases = tuple.v2();
+        }
         assertRetentionLeases(
-                indexShard.getEngine().config().retentionLeasesSupplier().get(),
-                indexShard.indexSettings().getRetentionLeaseMillis(),
+                retentionLeases,
                 size,
                 minimumRetainingSequenceNumbers,
-                currentTimeMillisSupplier,
                 primaryTerm,
-                version,
-                primary);
+                version);
     }
 
     private void assertRetentionLeases(
             final RetentionLeases retentionLeases,
-            final long retentionLeaseMillis,
             final int size,
             final long[] minimumRetainingSequenceNumbers,
-            final LongSupplier currentTimeMillisSupplier,
             final long primaryTerm,
-            final long version,
-            final boolean primary) {
+            final long version) {
         assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
         assertThat(retentionLeases.version(), equalTo(version));
         final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
@@ -302,10 +313,6 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
             assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
             final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
             assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
-            if (primary) {
-                // retention leases can be expired on replicas, so we can only assert on primaries here
-                assertThat(currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), lessThanOrEqualTo(retentionLeaseMillis));
-            }
             assertThat(retentionLease.source(), equalTo("test-" + i));
         }
     }