Browse Source

Renew retention leases while following (#39335)

This commit is the final piece of the integration of CCR with retention
leases. Namely, we periodically renew retention leases and advance the
retaining sequence number while following.
Jason Tedor 6 years ago
parent
commit
176c157615

+ 6 - 2
test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -540,12 +540,16 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
             new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute();
         }
 
-        public RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source,
+        public synchronized RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source,
                                                 ActionListener<ReplicationResponse> listener) {
             return getPrimary().addRetentionLease(id, retainingSequenceNumber, source, listener);
         }
 
-        public void removeRetentionLease(String id, ActionListener<ReplicationResponse> listener) {
+        public synchronized RetentionLease renewRetentionLease(String id, long retainingSequenceNumber, String source) {
+            return getPrimary().renewRetentionLease(id, retainingSequenceNumber, source);
+        }
+
+        public synchronized void removeRetentionLease(String id, ActionListener<ReplicationResponse> listener) {
             getPrimary().removeRetentionLease(id, listener);
         }
 

+ 38 - 22
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.ccr;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.seqno.RetentionLeaseActions;
@@ -18,11 +19,18 @@ import org.elasticsearch.index.shard.ShardId;
 
 import java.util.Locale;
 import java.util.Optional;
-
-import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
+import java.util.concurrent.TimeUnit;
 
 public class CcrRetentionLeases {
 
+    // this setting is intentionally not registered, it is only used in tests
+    public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING =
+            Setting.timeSetting(
+                    "index.ccr.retention_lease.renew_interval",
+                    new TimeValue(5, TimeUnit.MINUTES),
+                    new TimeValue(0, TimeUnit.MILLISECONDS),
+                    Setting.Property.NodeScope);
+
     /**
      * The retention lease ID used by followers.
      *
@@ -52,20 +60,22 @@ public class CcrRetentionLeases {
      * Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given
      * remote client. Note that this method will block up to the specified timeout.
      *
-     * @param leaderShardId    the leader shard ID
-     * @param retentionLeaseId the retention lease ID
-     * @param remoteClient     the remote client on which to execute this request
-     * @param timeout          the timeout
+     * @param leaderShardId           the leader shard ID
+     * @param retentionLeaseId        the retention lease ID
+     * @param retainingSequenceNumber the retaining sequence number
+     * @param remoteClient            the remote client on which to execute this request
+     * @param timeout                 the timeout
      * @return an optional exception indicating whether or not the retention lease already exists
      */
     public static Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLease(
             final ShardId leaderShardId,
             final String retentionLeaseId,
+            final long retainingSequenceNumber,
             final Client remoteClient,
             final TimeValue timeout) {
         try {
             final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
-            asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
+            asyncAddRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response);
             response.actionGet(timeout);
             return Optional.empty();
         } catch (final RetentionLeaseAlreadyExistsException e) {
@@ -78,18 +88,20 @@ public class CcrRetentionLeases {
      * remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response
      * or failure.
      *
-     * @param leaderShardId    the leader shard ID
-     * @param retentionLeaseId the retention lease ID
-     * @param remoteClient     the remote client on which to execute this request
-     * @param listener         the listener
+     * @param leaderShardId           the leader shard ID
+     * @param retentionLeaseId        the retention lease ID
+     * @param retainingSequenceNumber the retaining sequence number
+     * @param remoteClient            the remote client on which to execute this request
+     * @param listener                the listener
      */
     public static void asyncAddRetentionLease(
             final ShardId leaderShardId,
             final String retentionLeaseId,
+            final long retainingSequenceNumber,
             final Client remoteClient,
             final ActionListener<RetentionLeaseActions.Response> listener) {
         final RetentionLeaseActions.AddRequest request =
-                new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
+                new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr");
         remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener);
     }
 
@@ -97,20 +109,22 @@ public class CcrRetentionLeases {
      * Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given
      * remote client. Note that this method will block up to the specified timeout.
      *
-     * @param leaderShardId    the leader shard ID
-     * @param retentionLeaseId the retention lease ID
-     * @param remoteClient     the remote client on which to execute this request
-     * @param timeout          the timeout
+     * @param leaderShardId           the leader shard ID
+     * @param retentionLeaseId        the retention lease ID
+     * @param retainingSequenceNumber the retaining sequence number
+     * @param remoteClient            the remote client on which to execute this request
+     * @param timeout                 the timeout
      * @return an optional exception indicating whether or not the retention lease already exists
      */
     public static Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
             final ShardId leaderShardId,
             final String retentionLeaseId,
+            final long retainingSequenceNumber,
             final Client remoteClient,
             final TimeValue timeout) {
         try {
             final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
-            asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
+            asyncRenewRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response);
             response.actionGet(timeout);
             return Optional.empty();
         } catch (final RetentionLeaseNotFoundException e) {
@@ -123,18 +137,20 @@ public class CcrRetentionLeases {
      * given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a
      * response or failure.
      *
-     * @param leaderShardId    the leader shard ID
-     * @param retentionLeaseId the retention lease ID
-     * @param remoteClient     the remote client on which to execute this request
-     * @param listener         the listener
+     * @param leaderShardId           the leader shard ID
+     * @param retentionLeaseId        the retention lease ID
+     * @param retainingSequenceNumber the retaining sequence number
+     * @param remoteClient            the remote client on which to execute this request
+     * @param listener                the listener
      */
     public static void asyncRenewRetentionLease(
             final ShardId leaderShardId,
             final String retentionLeaseId,
+            final long retainingSequenceNumber,
             final Client remoteClient,
             final ActionListener<RetentionLeaseActions.Response> listener) {
         final RetentionLeaseActions.RenewRequest request =
-                new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
+                new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr");
         remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener);
     }
 

+ 23 - 2
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

@@ -30,9 +30,10 @@ import org.elasticsearch.indices.IndexClosedException;
 import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
 import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.transport.ConnectTransportException;
-import org.elasticsearch.xpack.ccr.Ccr;
 import org.elasticsearch.transport.NoSuchRemoteClusterException;
+import org.elasticsearch.xpack.ccr.Ccr;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
 
@@ -94,6 +95,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
 
     private volatile ElasticsearchException fatalException;
 
+    private Scheduler.Cancellable renewable;
+
+    synchronized Scheduler.Cancellable getRenewable() {
+        return renewable;
+    }
+
     ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
                         ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
         super(id, type, action, description, parentTask, headers);
@@ -121,7 +128,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
         final long followerMaxSeqNo) {
         /*
          * While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to
-         * avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock.
+         * avoid the need to declare these fields as volatile. That is, we are ensuring these fields are always accessed under the same
+         * lock.
          */
         synchronized (this) {
             this.followerHistoryUUID = followerHistoryUUID;
@@ -130,6 +138,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
             this.followerGlobalCheckpoint = followerGlobalCheckpoint;
             this.followerMaxSeqNo = followerMaxSeqNo;
             this.lastRequestedSeqNo = followerGlobalCheckpoint;
+            renewable = scheduleBackgroundRetentionLeaseRenewal(() -> {
+                synchronized (ShardFollowNodeTask.this) {
+                    return this.followerGlobalCheckpoint;
+                }
+            });
         }
 
         // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
@@ -507,8 +520,16 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
     protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
                                                          Consumer<Exception> errorHandler);
 
+    protected abstract Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint);
+
     @Override
     protected void onCancelled() {
+        synchronized (this) {
+            if (renewable != null) {
+                renewable.cancel();
+                renewable = null;
+            }
+        }
         markAsCompleted();
     }
 

+ 101 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

@@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.store.AlreadyClosedException;
+import org.elasticsearch.ElasticsearchSecurityException;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@@ -32,10 +34,14 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.CommitStats;
 import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.seqno.RetentionLeaseActions;
+import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
+import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -45,9 +51,11 @@ import org.elasticsearch.persistent.PersistentTaskState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.persistent.PersistentTasksExecutor;
 import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.NoSuchRemoteClusterException;
 import org.elasticsearch.xpack.ccr.Ccr;
+import org.elasticsearch.xpack.ccr.CcrRetentionLeases;
 import org.elasticsearch.xpack.ccr.CcrSettings;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
@@ -60,6 +68,7 @@ import java.util.Optional;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.LongConsumer;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient;
@@ -73,6 +82,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
     private final ThreadPool threadPool;
     private final ClusterService clusterService;
     private final IndexScopedSettings indexScopedSettings;
+    private final TimeValue retentionLeaseRenewInterval;
     private volatile TimeValue waitForMetadataTimeOut;
 
     public ShardFollowTasksExecutor(Client client,
@@ -84,6 +94,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
         this.threadPool = threadPool;
         this.clusterService = clusterService;
         this.indexScopedSettings = settingsModule.getIndexScopedSettings();
+        this.retentionLeaseRenewInterval = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(settingsModule.getSettings());
         this.waitForMetadataTimeOut = CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.get(settingsModule.getSettings());
         clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT,
             newVal -> this.waitForMetadataTimeOut = newVal);
@@ -245,6 +256,96 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
                     errorHandler.accept(e);
                 }
             }
+
+            @Override
+            protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
+                final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId(
+                        clusterService.getClusterName().value(),
+                        params.getFollowShardId().getIndex(),
+                        params.getRemoteCluster(),
+                        params.getLeaderShardId().getIndex());
+
+                /*
+                 * We are going to attempt to renew the retention lease. If this fails it is either because the retention lease does not
+                 * exist, or something else happened. If the retention lease does not exist, we will attempt to add the retention lease
+                 * again. If that fails, it had better not be because the retention lease already exists. Either way, we will attempt to
+                 * renew again on the next scheduled execution.
+                 */
+                final ActionListener<RetentionLeaseActions.Response> listener = ActionListener.wrap(
+                        r -> {},
+                        e -> {
+                            /*
+                             * We have to guard against the possibility that the shard follow node task has been stopped and the retention
+                             * lease deliberately removed via the act of unfollowing. Note that the order of operations is important in
+                             * TransportUnfollowAction. There, we first stop the shard follow node task, and then remove the retention
+                             * leases on the leader. This means that if we end up here with the retention lease not existing because of an
+                             * unfollow action, then we know that the unfollow action has already stopped the shard follow node task and
+                             * there is no race condition with the unfollow action.
+                             */
+                            if (isCancelled() || isCompleted()) {
+                                return;
+                            }
+                            final Throwable cause = ExceptionsHelper.unwrapCause(e);
+                            logRetentionLeaseFailure(retentionLeaseId, cause);
+                            // noinspection StatementWithEmptyBody
+                            if (cause instanceof RetentionLeaseNotFoundException) {
+                                // note that we do not need to mark as system context here as that is restored from the original renew
+                                logger.trace(
+                                        "{} background adding retention lease [{}] while following",
+                                        params.getFollowShardId(),
+                                        retentionLeaseId);
+                                CcrRetentionLeases.asyncAddRetentionLease(
+                                        params.getLeaderShardId(),
+                                        retentionLeaseId,
+                                        followerGlobalCheckpoint.getAsLong(),
+                                        remoteClient(params),
+                                        ActionListener.wrap(
+                                                r -> {},
+                                                inner -> {
+                                                    /*
+                                                     * If this fails that the retention lease already exists, something highly unusual is
+                                                     * going on. Log it, and renew again after another renew interval has passed.
+                                                     */
+                                                    final Throwable innerCause = ExceptionsHelper.unwrapCause(inner);
+                                                    assert innerCause instanceof RetentionLeaseAlreadyExistsException == false;
+                                                    logRetentionLeaseFailure(retentionLeaseId, innerCause);
+                                                }));
+                            } else {
+                                 // if something else happened, we will attempt to renew again after another renew interval has passed
+                            }
+                        });
+
+                return threadPool.scheduleWithFixedDelay(
+                        () -> {
+                            final ThreadContext threadContext = threadPool.getThreadContext();
+                            try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
+                                // we have to execute under the system context so that if security is enabled the management is authorized
+                                threadContext.markAsSystemContext();
+                                logger.trace(
+                                        "{} background renewing retention lease [{}] while following",
+                                        params.getFollowShardId(),
+                                        retentionLeaseId);
+                                CcrRetentionLeases.asyncRenewRetentionLease(
+                                        params.getLeaderShardId(),
+                                        retentionLeaseId,
+                                        followerGlobalCheckpoint.getAsLong(),
+                                        remoteClient(params),
+                                        listener);
+                            }
+                        },
+                        retentionLeaseRenewInterval,
+                        Ccr.CCR_THREAD_POOL_NAME);
+            }
+
+            private void logRetentionLeaseFailure(final String retentionLeaseId, final Throwable cause) {
+                assert cause instanceof ElasticsearchSecurityException == false : cause;
+                logger.warn(new ParameterizedMessage(
+                                "{} background management of retention lease [{}] failed while following",
+                                params.getFollowShardId(),
+                                retentionLeaseId),
+                        cause);
+            }
+
         };
     }
 

+ 6 - 15
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -35,7 +35,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.metrics.CounterMetric;
-import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -89,11 +88,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.LongConsumer;
 import java.util.function.Supplier;
 
+import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
 import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
 import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
 import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease;
@@ -330,6 +329,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                         CcrRetentionLeases.asyncRenewRetentionLease(
                                 leaderShardId,
                                 retentionLeaseId,
+                                RETAIN_ALL,
                                 remoteClient,
                                 ActionListener.wrap(
                                         r -> {},
@@ -343,7 +343,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                                         }));
                     }
                 },
-                RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getSettings()),
+                CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getNodeSettings()),
                 Ccr.CCR_THREAD_POOL_NAME);
 
         // TODO: There should be some local timeout. And if the remote cluster returns an unknown session
@@ -380,7 +380,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                 () -> new ParameterizedMessage("{} requesting leader to add retention lease [{}]", shardId, retentionLeaseId));
         final TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
         final Optional<RetentionLeaseAlreadyExistsException> maybeAddAlready =
-                syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout);
+                syncAddRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout);
         maybeAddAlready.ifPresent(addAlready -> {
             logger.trace(() -> new ParameterizedMessage(
                             "{} retention lease [{}] already exists, requesting a renewal",
@@ -388,7 +388,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                             retentionLeaseId),
                     addAlready);
             final Optional<RetentionLeaseNotFoundException> maybeRenewNotFound =
-                    syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout);
+                    syncRenewRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout);
             maybeRenewNotFound.ifPresent(renewNotFound -> {
                 logger.trace(() -> new ParameterizedMessage(
                                 "{} retention lease [{}] not found while attempting to renew, requesting a final add",
@@ -396,7 +396,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                                 retentionLeaseId),
                         renewNotFound);
                 final Optional<RetentionLeaseAlreadyExistsException> maybeFallbackAddAlready =
-                        syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout);
+                        syncAddRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout);
                 maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> {
                     /*
                      * At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the
@@ -409,15 +409,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         });
     }
 
-    // this setting is intentionally not registered, it is only used in tests
-    public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING =
-            Setting.timeSetting(
-                    "index.ccr.retention_lease.renew_interval",
-                    new TimeValue(5, TimeUnit.MINUTES),
-                    new TimeValue(0, TimeUnit.MILLISECONDS),
-                    Setting.Property.Dynamic,
-                    Setting.Property.IndexScope);
-
     @Override
     public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);

+ 16 - 3
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

@@ -131,6 +131,14 @@ public abstract class CcrIntegTestCase extends ESTestCase {
         return Collections.emptyList();
     }
 
+    protected Settings leaderClusterSettings() {
+        return Settings.EMPTY;
+    }
+
+    protected Settings followerClusterSettings() {
+        return Settings.EMPTY;
+    }
+
     @Before
     public final void startClusters() throws Exception {
         if (clusterGroup != null && reuseClusters()) {
@@ -145,7 +153,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
             MockNioTransportPlugin.class, InternalSettingsPlugin.class);
 
         InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
-            numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null), 0, "leader", mockPlugins,
+            numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null, true), 0, "leader", mockPlugins,
             Function.identity());
         leaderCluster.beforeTest(random(), 0.0D);
         leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
@@ -156,7 +164,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
 
         String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
         InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
-            numberOfNodesPerCluster(), "follower_cluster", createNodeConfigurationSource(address), 0, "follower",
+            numberOfNodesPerCluster(), "follower_cluster", createNodeConfigurationSource(address, false), 0, "follower",
             mockPlugins, Function.identity());
         clusterGroup = new ClusterGroup(leaderCluster, followerCluster);
 
@@ -203,7 +211,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
         }
     }
 
-    private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) {
+    private NodeConfigurationSource createNodeConfigurationSource(final String leaderSeedAddress, final boolean leaderCluster) {
         Settings.Builder builder = Settings.builder();
         builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE);
         // Default the watermarks to absurdly low to prevent the tests
@@ -225,6 +233,11 @@ public abstract class CcrIntegTestCase extends ESTestCase {
         builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
         // Let cluster state api return quickly in order to speed up auto follow tests:
         builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
+        if (leaderCluster) {
+            builder.put(leaderClusterSettings());
+        } else {
+            builder.put(followerClusterSettings());
+        }
         if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) {
             builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
         }

+ 452 - 51
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java

@@ -21,6 +21,7 @@ import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -33,6 +34,7 @@ import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.seqno.RetentionLease;
 import org.elasticsearch.index.seqno.RetentionLeaseActions;
+import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
 import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -40,14 +42,19 @@ import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.snapshots.RestoreInfo;
 import org.elasticsearch.snapshots.RestoreService;
+import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.RemoteTransportException;
+import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportActionProxy;
+import org.elasticsearch.transport.TransportMessageListener;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.CcrIntegTestCase;
 import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
 import org.elasticsearch.xpack.ccr.repository.CcrRepository;
 import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
+import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
 import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
 
 import java.io.IOException;
@@ -83,7 +90,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
 
         @Override
         public List<Setting<?>> getSettings() {
-            return Collections.singletonList(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING);
+            return Collections.singletonList(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING);
         }
 
     }
@@ -105,6 +112,13 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
                 .collect(Collectors.toList());
     }
 
+    @Override
+    protected Settings followerClusterSettings() {
+        return Settings.builder()
+                .put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200))
+                .build();
+    }
+
     private final IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
 
     private RestoreSnapshotRequest setUpRestoreSnapshotRequest(
@@ -140,7 +154,6 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
 
         final Settings.Builder settingsBuilder = Settings.builder()
                 .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
-                .put(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200))
                 .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
         return new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
                 .indexSettings(settingsBuilder)
@@ -227,42 +240,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
         restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
 
         try {
-            // ensure that a retention lease has been put in place on each shard, and grab a copy of them
-            final List<RetentionLeases> retentionLeases = new ArrayList<>();
-            assertBusy(() -> {
-                retentionLeases.clear();
-                final IndicesStatsResponse stats =
-                        leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
-                assertNotNull(stats.getShards());
-                assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
-                final List<ShardStats> shardsStats = getShardsStats(stats);
-                for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
-                    final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
-                    assertThat(currentRetentionLeases.leases(), hasSize(1));
-                    final RetentionLease retentionLease =
-                            currentRetentionLeases.leases().iterator().next();
-                    assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
-                    retentionLeases.add(currentRetentionLeases);
-                }
-            });
-
-            // now ensure that the retention leases are being renewed
-            assertBusy(() -> {
-                final IndicesStatsResponse stats =
-                        leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
-                assertNotNull(stats.getShards());
-                assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
-                final List<ShardStats> shardsStats = getShardsStats(stats);
-                for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
-                    final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
-                    assertThat(currentRetentionLeases.leases(), hasSize(1));
-                    final RetentionLease retentionLease =
-                            currentRetentionLeases.leases().iterator().next();
-                    assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
-                    // we assert that retention leases are being renewed by an increase in the timestamp
-                    assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp()));
-                }
-            });
+            assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
             latch.countDown();
         } finally {
             for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
@@ -354,15 +332,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
          * After we wake up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were
          * not renewed while we were sleeping.
          */
-        final TimeValue renewIntervalSetting = CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(
-                followerClient()
-                        .admin()
-                        .indices()
-                        .prepareGetSettings(followerIndex)
-                        .get()
-                        .getIndexToSettings()
-                        .get(followerIndex));
-
+        final TimeValue renewIntervalSetting = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(followerClusterSettings());
         final long renewEnd = System.nanoTime();
         Thread.sleep(Math.max(0, randomIntBetween(2, 4) * renewIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(renewEnd - start)));
 
@@ -404,6 +374,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
         final String leaderIndexSettings =
                 getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
         assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
+        ensureLeaderYellow(leaderIndex);
         final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
         followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
 
@@ -469,11 +440,8 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
                         });
             }
 
-
-
-
             pauseFollow(followerIndex);
-            followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
+            assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet());
             assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet());
 
             final IndicesStatsResponse afterUnfollowStats =
@@ -498,6 +466,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
         final String leaderIndexSettings =
                 getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
         assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
+        ensureLeaderYellow(leaderIndex);
         final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
         followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
 
@@ -560,6 +529,438 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
         }
     }
 
+    public void testRetentionLeaseRenewedWhileFollowing() throws Exception {
+        final String leaderIndex = "leader";
+        final String followerIndex = "follower";
+        final int numberOfShards = randomIntBetween(1, 4);
+        final int numberOfReplicas = randomIntBetween(0, 1);
+        final Map<String, String> additionalIndexSettings = new HashMap<>();
+        additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
+        additionalIndexSettings.put(
+                IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
+                TimeValue.timeValueMillis(200).getStringRep());
+        final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
+        assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
+        ensureLeaderYellow(leaderIndex);
+        final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+
+        ensureFollowerGreen(true, followerIndex);
+        assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
+    }
+
+    public void testRetentionLeaseAdvancesWhileFollowing() throws Exception {
+        final String leaderIndex = "leader";
+        final String followerIndex = "follower";
+        final int numberOfShards = randomIntBetween(1, 4);
+        final int numberOfReplicas = randomIntBetween(0, 1);
+        final Map<String, String> additionalIndexSettings = new HashMap<>();
+        additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
+        additionalIndexSettings.put(
+                IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
+                TimeValue.timeValueMillis(200).getStringRep());
+        final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
+        assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
+        ensureLeaderYellow(leaderIndex);
+        final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+
+        ensureFollowerGreen(true, followerIndex);
+
+        final int numberOfDocuments = randomIntBetween(128, 2048);
+        logger.debug("indexing [{}] docs", numberOfDocuments);
+        for (int i = 0; i < numberOfDocuments; i++) {
+            final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
+            leaderClient().prepareIndex(leaderIndex, "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
+            if (rarely()) {
+                leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
+            }
+        }
+
+        // wait until the follower global checkpoints have caught up to the leader
+        assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex);
+
+        final List<ShardStats> leaderShardsStats = getShardsStats(leaderClient().admin().indices().prepareStats(leaderIndex).get());
+        final Map<Integer, Long> leaderGlobalCheckpoints = new HashMap<>();
+        for (final ShardStats leaderShardStats : leaderShardsStats) {
+            final ShardRouting routing = leaderShardStats.getShardRouting();
+            if (routing.primary() == false) {
+                continue;
+            }
+            leaderGlobalCheckpoints.put(routing.id(), leaderShardStats.getSeqNoStats().getGlobalCheckpoint());
+        }
+
+        // now assert that the retention leases have advanced to the global checkpoints
+        assertBusy(() -> {
+            final IndicesStatsResponse stats =
+                    leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
+            assertNotNull(stats.getShards());
+            assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
+            final List<ShardStats> shardsStats = getShardsStats(stats);
+            for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
+                final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
+                assertThat(currentRetentionLeases.leases(), hasSize(1));
+                final RetentionLease retentionLease =
+                        currentRetentionLeases.leases().iterator().next();
+                assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
+                // we assert that retention leases are being advanced
+                assertThat(
+                        retentionLease.retainingSequenceNumber(),
+                        equalTo(leaderGlobalCheckpoints.get(shardsStats.get(i).getShardRouting().id())));
+            }
+        });
+    }
+
+    @TestLogging(value = "org.elasticsearch.xpack.ccr:trace")
+    public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws Exception {
+        final String leaderIndex = "leader";
+        final String followerIndex = "follower";
+        final int numberOfShards = randomIntBetween(1, 4);
+        final int numberOfReplicas = randomIntBetween(0, 1);
+        final Map<String, String> additionalIndexSettings = new HashMap<>();
+        additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
+        additionalIndexSettings.put(
+                IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
+                TimeValue.timeValueMillis(200).getStringRep());
+        final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
+        assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
+        ensureLeaderYellow(leaderIndex);
+        final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+
+        ensureFollowerGreen(true, followerIndex);
+
+        final long start = System.nanoTime();
+        pauseFollow(followerIndex);
+
+        /*
+         * We want to ensure that the retention leases have been synced to all shard copies, as otherwise they might sync between the two
+         * times that we sample the retention leases, which would cause our check to fail.
+         */
+        final TimeValue syncIntervalSetting = IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(
+                leaderClient()
+                        .admin()
+                        .indices()
+                        .prepareGetSettings(leaderIndex)
+                        .get()
+                        .getIndexToSettings()
+                        .get(leaderIndex));
+        final long syncEnd = System.nanoTime();
+        Thread.sleep(Math.max(0, randomIntBetween(2, 4) * syncIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(syncEnd - start)));
+
+        final ClusterStateResponse leaderIndexClusterState =
+                leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get();
+        final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID();
+
+        // sample the leases after pausing
+        final List<RetentionLeases> retentionLeases = new ArrayList<>();
+        assertBusy(() -> {
+            retentionLeases.clear();
+            final IndicesStatsResponse stats =
+                    leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
+            assertNotNull(stats.getShards());
+            assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
+            final List<ShardStats> shardsStats = getShardsStats(stats);
+            for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
+                final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
+                assertThat(currentRetentionLeases.leases(), hasSize(1));
+                final ClusterStateResponse followerIndexClusterState =
+                        followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
+                final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
+                final RetentionLease retentionLease =
+                        currentRetentionLeases.leases().iterator().next();
+                final String expectedRetentionLeaseId = retentionLeaseId(
+                        getFollowerCluster().getClusterName(),
+                        new Index(followerIndex, followerUUID),
+                        getLeaderCluster().getClusterName(),
+                        new Index(leaderIndex, leaderUUID));
+                assertThat(retentionLease.id(), equalTo(expectedRetentionLeaseId));
+                retentionLeases.add(currentRetentionLeases);
+            }
+        });
+
+        /*
+         * We want to ensure that the background renewal is cancelled after pausing. To do this, we will sleep a small multiple of the renew
+         * interval. If the renews are not cancelled, we expect that a renewal would have been sent while we were sleeping. After we wake
+         * up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were not renewed
+         * while we were sleeping.
+         */
+        final TimeValue renewIntervalSetting = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(followerClusterSettings());
+        final long renewEnd = System.nanoTime();
+        Thread.sleep(Math.max(0, randomIntBetween(2, 4) * renewIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(renewEnd - start)));
+
+        // now ensure that the retention leases are the same
+        assertBusy(() -> {
+            final IndicesStatsResponse stats =
+                    leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
+            assertNotNull(stats.getShards());
+            assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
+            final List<ShardStats> shardsStats = getShardsStats(stats);
+            for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
+                if (shardsStats.get(i).getShardRouting().primary() == false) {
+                    continue;
+                }
+                final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
+                assertThat(currentRetentionLeases.leases(), hasSize(1));
+                final ClusterStateResponse followerIndexClusterState =
+                        followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
+                final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
+                final RetentionLease retentionLease =
+                        currentRetentionLeases.leases().iterator().next();
+                assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID)));
+                // we assert that retention leases are not being renewed by an unchanged timestamp
+                assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).leases().iterator().next().timestamp()));
+            }
+        });
+    }
+
+    public void testRetentionLeaseRenewalIsResumedWhenFollowingIsResumed() throws Exception {
+        final String leaderIndex = "leader";
+        final String followerIndex = "follower";
+        final int numberOfShards = randomIntBetween(1, 4);
+        final int numberOfReplicas = randomIntBetween(0, 1);
+        final Map<String, String> additionalIndexSettings = new HashMap<>();
+        additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
+        additionalIndexSettings.put(
+                IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
+                TimeValue.timeValueMillis(200).getStringRep());
+        final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
+        assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
+        ensureLeaderYellow(leaderIndex);
+        final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+
+        ensureFollowerGreen(true, followerIndex);
+
+        pauseFollow(followerIndex);
+
+        followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow(followerIndex)).actionGet();
+
+        ensureFollowerGreen(true, followerIndex);
+
+        assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
+    }
+
+    public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Exception {
+        final String leaderIndex = "leader";
+        final String followerIndex = "follower";
+        final int numberOfShards = 1;
+        final int numberOfReplicas = 1;
+        final Map<String, String> additionalIndexSettings = new HashMap<>();
+        additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
+        additionalIndexSettings.put(
+                IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
+                TimeValue.timeValueMillis(200).getStringRep());
+        final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
+        assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
+        ensureLeaderYellow(leaderIndex);
+        final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+
+        ensureFollowerGreen(true, followerIndex);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
+        for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
+            final MockTransportService senderTransportService =
+                    (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
+            senderTransportService.addSendBehavior(
+                    (connection, requestId, action, request, options) -> {
+                        if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action)
+                                || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) {
+                            senderTransportService.clearAllRules();
+                            final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request;
+                            final String primaryShardNodeId =
+                                    getLeaderCluster()
+                                            .clusterService()
+                                            .state()
+                                            .routingTable()
+                                            .index(leaderIndex)
+                                            .shard(renewRequest.getShardId().id())
+                                            .primaryShard()
+                                            .currentNodeId();
+                            final String primaryShardNodeName =
+                                    getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName();
+                            final IndexShard primary =
+                                    getLeaderCluster()
+                                            .getInstance(IndicesService.class, primaryShardNodeName)
+                                            .getShardOrNull(renewRequest.getShardId());
+                            final CountDownLatch innerLatch = new CountDownLatch(1);
+                            // this forces the background renewal from following to face a retention lease not found exception
+                            primary.removeRetentionLease(
+                                    getRetentionLeaseId(followerIndex, leaderIndex),
+                                    ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString())));
+
+                            try {
+                                innerLatch.await();
+                            } catch (final InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                fail(e.toString());
+                            }
+
+                            latch.countDown();
+                        }
+                        connection.sendRequest(requestId, action, request, options);
+                    });
+        }
+
+        latch.await();
+
+        assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
+    }
+
+    /**
+     * This test is fairly evil. This test is to ensure that we are protected against a race condition when unfollowing and a background
+     * renewal fires. The action of unfollowing will remove retention leases from the leader. If a background renewal is firing at that
+     * time, it means that we will be met with a retention lease not found exception. That will in turn trigger behavior to attempt to
+     * re-add the retention lease, which means we are left in a situation where we have unfollowed, but the retention lease still remains
+     * on the leader. However, we have a guard against this in the callback after the retention lease not found exception is thrown, which
+     * checks if the shard follow node task is cancelled or completed.
+     *
+     * To test this this behavior is correct, we capture the call to renew the retention lease. Then, we will step in between and execute
+     * an unfollow request. This will remove the retention lease on the leader. At this point, we can unlatch the renew call, which will
+     * now be met with a retention lease not found exception. We will cheat and wait for that response to come back, and then synchronously
+     * trigger the listener which will check to see if the shard follow node task is cancelled or completed, and if not, add the retention
+     * lease back. After that listener returns, we can check to see if a retention lease exists on the leader.
+     *
+     * Note, this done mean that listener will fire twice, once in our onResponseReceived hook, and once after our onResponseReceived
+     * callback returns. 🤷‍♀️
+     *
+     * @throws Exception if an exception occurs in the main test thread
+     */
+    public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Exception {
+        final String leaderIndex = "leader";
+        final String followerIndex = "follower";
+        final int numberOfShards = 1;
+        final int numberOfReplicas = 1;
+        final Map<String, String> additionalIndexSettings = new HashMap<>();
+        additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
+        additionalIndexSettings.put(
+                IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
+                TimeValue.timeValueMillis(200).getStringRep());
+        final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
+        assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
+        ensureLeaderYellow(leaderIndex);
+        final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+
+        ensureFollowerGreen(true, followerIndex);
+
+        final CountDownLatch removeLeaseLatch = new CountDownLatch(1);
+        final CountDownLatch unfollowLatch = new CountDownLatch(1);
+        final CountDownLatch responseLatch = new CountDownLatch(1);
+
+        final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
+
+        try {
+            for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
+                final MockTransportService senderTransportService =
+                        (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
+                senderTransportService.addSendBehavior(
+                        (connection, requestId, action, request, options) -> {
+                            if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action)
+                                    || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) {
+                                final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex);
+                                try {
+                                    removeLeaseLatch.countDown();
+                                    unfollowLatch.await();
+
+                                    senderTransportService.transport().addMessageListener(new TransportMessageListener() {
+
+                                        @SuppressWarnings("rawtypes")
+                                        @Override
+                                        public void onResponseReceived(
+                                                final long responseRequestId,
+                                                final Transport.ResponseContext context) {
+                                            if (requestId == responseRequestId) {
+                                                final RetentionLeaseNotFoundException e =
+                                                        new RetentionLeaseNotFoundException(retentionLeaseId);
+                                                context.handler().handleException(new RemoteTransportException(e.getMessage(), e));
+                                                responseLatch.countDown();
+                                                senderTransportService.transport().removeMessageListener(this);
+                                            }
+                                        }
+
+                                    });
+
+                                } catch (final InterruptedException e) {
+                                    Thread.currentThread().interrupt();
+                                    fail(e.toString());
+                                }
+                            }
+                            connection.sendRequest(requestId, action, request, options);
+                        });
+            }
+
+            removeLeaseLatch.await();
+
+            pauseFollow(followerIndex);
+            assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet());
+            assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet());
+
+            unfollowLatch.countDown();
+
+            responseLatch.await();
+
+            final IndicesStatsResponse afterUnfollowStats =
+                    leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
+            final List<ShardStats> afterUnfollowShardsStats = getShardsStats(afterUnfollowStats);
+            for (final ShardStats shardStats : afterUnfollowShardsStats) {
+                assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty());
+            }
+        } finally {
+            for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
+                final MockTransportService senderTransportService =
+                        (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
+                senderTransportService.clearAllRules();
+            }
+        }
+    }
+
+    private void assertRetentionLeaseRenewal(
+            final int numberOfShards,
+            final int numberOfReplicas,
+            final String followerIndex,
+            final String leaderIndex) throws Exception {
+        // ensure that a retention lease has been put in place on each shard, and grab a copy of them
+        final List<RetentionLeases> retentionLeases = new ArrayList<>();
+        assertBusy(() -> {
+            retentionLeases.clear();
+            final IndicesStatsResponse stats =
+                    leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
+            assertNotNull(stats.getShards());
+            assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
+            final List<ShardStats> shardsStats = getShardsStats(stats);
+            for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
+                final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
+                assertThat(currentRetentionLeases.leases(), hasSize(1));
+                final RetentionLease retentionLease =
+                        currentRetentionLeases.leases().iterator().next();
+                assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
+                retentionLeases.add(currentRetentionLeases);
+            }
+        });
+
+        // now ensure that the retention leases are being renewed
+        assertBusy(() -> {
+            final IndicesStatsResponse stats =
+                    leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
+            assertNotNull(stats.getShards());
+            assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
+            final List<ShardStats> shardsStats = getShardsStats(stats);
+            for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
+                final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
+                assertThat(currentRetentionLeases.leases(), hasSize(1));
+                final RetentionLease retentionLease =
+                        currentRetentionLeases.leases().iterator().next();
+                assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
+                // we assert that retention leases are being renewed by an increase in the timestamp
+                assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp()));
+            }
+        });
+    }
+
     /**
      * Extract the shard stats from an indices stats response, with the stats ordered by shard ID with primaries first. This is to have a
      * consistent ordering when comparing two responses.

+ 19 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.LongConsumer;
+import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -177,6 +179,23 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                 threadPool.generic().execute(task);
             }
 
+            @Override
+            protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
+                return new Scheduler.Cancellable() {
+
+                    @Override
+                    public boolean cancel() {
+                        return true;
+                    }
+
+                    @Override
+                    public boolean isCancelled() {
+                        return true;
+                    }
+
+                };
+            }
+
             @Override
             protected boolean isStopped() {
                 return stopped.get();

+ 74 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

@@ -8,13 +8,16 @@ package org.elasticsearch.xpack.ccr.action;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardNotFoundException;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
 
@@ -27,12 +30,17 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.LongConsumer;
+import java.util.function.LongSupplier;
 
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.contains;
@@ -53,6 +61,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
     private Consumer<ShardFollowNodeTaskStatus> beforeSendShardChangesRequest = status -> {};
 
+    private AtomicBoolean scheduleRetentionLeaseRenewal = new AtomicBoolean();
+    private LongConsumer retentionLeaseRenewal = followerGlobalCheckpoint -> {};
+
     private AtomicBoolean simulateResponse = new AtomicBoolean();
 
     private Queue<Exception> readFailures;
@@ -936,6 +947,28 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(ShardFollowNodeTask.computeDelay(1024, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L)));
     }
 
+    public void testRetentionLeaseRenewal() throws InterruptedException {
+        scheduleRetentionLeaseRenewal.set(true);
+        final CountDownLatch latch = new CountDownLatch(1);
+        final long expectedFollowerGlobalChekcpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
+        retentionLeaseRenewal = followerGlobalCheckpoint -> {
+            assertThat(followerGlobalCheckpoint, equalTo(expectedFollowerGlobalChekcpoint));
+            latch.countDown();
+        };
+
+        final ShardFollowTaskParams params = new ShardFollowTaskParams();
+        final ShardFollowNodeTask task = createShardFollowTask(params);
+
+        try {
+            startTask(task, randomLongBetween(expectedFollowerGlobalChekcpoint, Long.MAX_VALUE), expectedFollowerGlobalChekcpoint);
+            latch.await();
+        } finally {
+            task.onCancelled();
+            scheduleRetentionLeaseRenewal.set(false);
+        }
+    }
+
+
     static final class ShardFollowTaskParams {
         private String remoteCluster = null;
         private ShardId followShardId = new ShardId("follow_index", "", 0);
@@ -1063,6 +1096,47 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
                 }
             }
 
+            @Override
+            protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
+                if (scheduleRetentionLeaseRenewal.get()) {
+                    final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
+                    final ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
+                            () -> retentionLeaseRenewal.accept(followerGlobalCheckpoint.getAsLong()),
+                            0,
+                            TimeValue.timeValueMillis(200).millis(),
+                            TimeUnit.MILLISECONDS);
+                    return new Scheduler.Cancellable() {
+
+                        @Override
+                        public boolean cancel() {
+                            final boolean cancel = future.cancel(true);
+                            scheduler.shutdown();
+                            return cancel;
+                        }
+
+                        @Override
+                        public boolean isCancelled() {
+                            return future.isCancelled();
+                        }
+
+                    };
+                } else {
+                    return new Scheduler.Cancellable() {
+
+                        @Override
+                        public boolean cancel() {
+                            return true;
+                        }
+
+                        @Override
+                        public boolean isCancelled() {
+                            return true;
+                        }
+
+                    };
+                }
+            }
+
             @Override
             protected boolean isStopped() {
                 return super.isStopped() || stopped.get();

+ 53 - 4
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.action.support.replication.TransportWriteAction;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -47,7 +48,9 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.snapshots.Snapshot;
 import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.ccr.CcrRetentionLeases;
 import org.elasticsearch.xpack.ccr.CcrSettings;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
@@ -69,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.LongConsumer;
+import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyMap;
@@ -390,6 +394,28 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
         }
     }
 
+    public void testRetentionLeaseManagement() throws Exception {
+        try (ReplicationGroup leader = createLeaderGroup(0)) {
+            leader.startAll();
+            try (ReplicationGroup follower = createFollowGroup(leader, 0)) {
+                follower.startAll();
+                final ShardFollowNodeTask task = createShardFollowTask(leader, follower);
+                task.start(
+                        follower.getPrimary().getHistoryUUID(),
+                        leader.getPrimary().getGlobalCheckpoint(),
+                        leader.getPrimary().seqNoStats().getMaxSeqNo(),
+                        follower.getPrimary().getGlobalCheckpoint(),
+                        follower.getPrimary().seqNoStats().getMaxSeqNo());
+                final Scheduler.Cancellable renewable = task.getRenewable();
+                assertNotNull(renewable);
+                assertFalse(renewable.isCancelled());
+                task.onCancelled();
+                assertTrue(renewable.isCancelled());
+                assertNull(task.getRenewable());
+            }
+        }
+    }
+
     private ReplicationGroup createLeaderGroup(int replicas) throws IOException {
         Settings settings = Settings.builder()
             .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
@@ -399,10 +425,12 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
     }
 
     private ReplicationGroup createFollowGroup(ReplicationGroup leaderGroup, int replicas) throws IOException {
-        Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
-            .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
-            .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB))
-            .build();
+        final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
+                .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
+                .put(
+                        IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
+                        new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB))
+                .build();
         IndexMetaData indexMetaData = buildIndexMetaData(replicas, settings, indexMapping);
         return new ReplicationGroup(indexMetaData) {
             @Override
@@ -543,6 +571,27 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                 threadPool.executor(ThreadPool.Names.GENERIC).execute(task);
             }
 
+            @Override
+            protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
+                final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId(
+                        "follower",
+                        followerGroup.getPrimary().routingEntry().index(),
+                        "remote",
+                        leaderGroup.getPrimary().routingEntry().index());
+                final PlainActionFuture<ReplicationResponse> response = new PlainActionFuture<>();
+                leaderGroup.addRetentionLease(
+                        retentionLeaseId,
+                        followerGlobalCheckpoint.getAsLong(),
+                        "ccr",
+                        ActionListener.wrap(response::onResponse, e -> fail(e.toString())));
+                response.actionGet();
+                return threadPool.scheduleWithFixedDelay(
+                        () -> leaderGroup.renewRetentionLease(retentionLeaseId, followerGlobalCheckpoint.getAsLong(), "ccr"),
+                        CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(
+                                followerGroup.getPrimary().indexSettings().getSettings()),
+                        ThreadPool.Names.GENERIC);
+            }
+
             @Override
             protected boolean isStopped() {
                 return super.isStopped() || stopped.get();