Explorar o código

Wait for cluster state in recovery (#99193)

A peer recovery is triggered by the application of a cluster state on
the target node, which creates a local `IndexShard` instance and then
reaches out to the source node to coordinate the recovery. If the source
node has not applied the same cluster state yet then it will respond
with a `DelayRecoveryException` which tells the target node to wait a
little and then retry.

With this commit we now wait on the source node until a fresh enough
cluster state is applied, avoiding the need for the timed wait on the
target node (and another network round trip) before the recovery can
begin.
David Turner %!s(int64=2) %!d(string=hai) anos
pai
achega
2b4ce4d768
Modificáronse 30 ficheiros con 505 adicións e 165 borrados
  1. 5 0
      docs/changelog/99193.yaml
  2. 133 0
      server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
  3. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  4. 3 2
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  5. 4 4
      server/src/main/java/org/elasticsearch/indices/IndicesService.java
  6. 6 4
      server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
  7. 72 0
      server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceClusterStateDelay.java
  8. 25 2
      server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
  9. 10 2
      server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
  10. 2 0
      server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java
  11. 22 7
      server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
  12. 20 1
      server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
  13. 19 3
      server/src/main/java/org/elasticsearch/indices/recovery/StatelessPrimaryRelocationAction.java
  14. 7 1
      server/src/main/java/org/elasticsearch/node/Node.java
  15. 2 2
      server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
  16. 3 3
      server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
  17. 109 91
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  18. 4 7
      server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
  19. 2 0
      server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
  20. 9 9
      server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
  21. 2 1
      server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
  22. 2 2
      server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
  23. 16 14
      server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java
  24. 18 6
      server/src/test/java/org/elasticsearch/indices/recovery/StatelessPrimaryRelocationActionTests.java
  25. 1 1
      server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java
  26. 1 0
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  27. 1 1
      test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
  28. 1 1
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  29. 4 0
      test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
  30. 1 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

+ 5 - 0
docs/changelog/99193.yaml

@@ -0,0 +1,5 @@
+pr: 99193
+summary: Wait for cluster state in recovery
+area: Recovery
+type: enhancement
+issues: []

+ 133 - 0
server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

@@ -24,6 +24,7 @@ import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
@@ -36,10 +37,15 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.coordination.ApplyCommitRequest;
+import org.elasticsearch.cluster.coordination.Coordinator;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -57,6 +63,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.lucene.search.Queries;
@@ -64,6 +71,10 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.core.AbstractRefCounted;
+import org.elasticsearch.core.RefCounted;
+import org.elasticsearch.core.Releasable;
 import org.elasticsearch.gateway.ReplicaShardAllocatorIT;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
@@ -104,6 +115,7 @@ import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.engine.MockEngineSupport;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.TestTransportChannel;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.XContentType;
 
@@ -136,6 +148,7 @@ import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
@@ -1614,6 +1627,126 @@ public class IndexRecoveryIT extends AbstractIndexRecoveryIntegTestCase {
         );
     }
 
+    public void testWaitForClusterStateToBeAppliedOnSourceNode() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        final var primaryNode = internalCluster().startDataOnlyNode();
+        String indexName = "test-index";
+        createIndex(indexName, indexSettings(1, 0).build());
+        ensureGreen(indexName);
+        final List<IndexRequestBuilder> indexRequests = IntStream.range(0, between(10, 500))
+            .mapToObj(n -> client().prepareIndex(indexName).setSource("foo", "bar"))
+            .toList();
+        indexRandom(randomBoolean(), true, true, indexRequests);
+        assertThat(indicesAdmin().prepareFlush(indexName).get().getFailedShards(), equalTo(0));
+
+        final var replicaNode = internalCluster().startDataOnlyNode();
+
+        final long initialClusterStateVersion = clusterService().state().version();
+
+        // Helper class to encapsulate the sync mechanism that delays applying cluster states on the primary node until the replica gives
+        // the go-ahead.
+        class ClusterStateSyncListeners implements Releasable {
+            private final Map<Long, SubscribableListener<Void>> clusterStateBarriers = ConcurrentCollections.newConcurrentMap();
+            private final SubscribableListener<Void> startRecoveryListener = new SubscribableListener<>();
+
+            private final CountDownLatch completeLatch = new CountDownLatch(1);
+            private final RefCounted refCounted = AbstractRefCounted.of(completeLatch::countDown);
+            private final List<Runnable> cleanup = new ArrayList<>(2);
+
+            @Override
+            public void close() {
+                refCounted.decRef();
+                safeAwait(completeLatch);
+                cleanup.forEach(Runnable::run);
+                clusterStateBarriers.values().forEach(l -> l.onResponse(null));
+            }
+
+            void addCleanup(Runnable runnable) {
+                cleanup.add(runnable);
+            }
+
+            SubscribableListener<Void> getStateApplyDelayListener(long clusterStateVersion) {
+                assertThat(clusterStateVersion, greaterThanOrEqualTo(initialClusterStateVersion));
+                if (refCounted.tryIncRef()) {
+                    try {
+                        return clusterStateBarriers.computeIfAbsent(clusterStateVersion, ignored -> new SubscribableListener<>());
+                    } finally {
+                        refCounted.decRef();
+                    }
+                } else {
+                    return SubscribableListener.newSucceeded(null);
+                }
+            }
+
+            void onStartRecovery() {
+                Thread.yield();
+                assertFalse(startRecoveryListener.isDone());
+                startRecoveryListener.onResponse(null);
+            }
+
+            public void delayUntilRecoveryStart(SubscribableListener<Void> listener) {
+                assertFalse(startRecoveryListener.isDone());
+                startRecoveryListener.addListener(listener);
+            }
+        }
+
+        try (var clusterStateSyncListeners = new ClusterStateSyncListeners()) {
+            final var primaryNodeTransportService = (MockTransportService) internalCluster().getInstance(
+                TransportService.class,
+                primaryNode
+            );
+            primaryNodeTransportService.addRequestHandlingBehavior(
+                Coordinator.COMMIT_STATE_ACTION_NAME,
+                (handler, request, channel, task) -> {
+                    assertThat(request, instanceOf(ApplyCommitRequest.class));
+                    clusterStateSyncListeners.getStateApplyDelayListener(((ApplyCommitRequest) request).getVersion())
+                        .addListener(
+                            ActionListener.wrap(ignored -> handler.messageReceived(request, channel, task), e -> fail(e, "unexpected"))
+                        );
+                }
+            );
+            primaryNodeTransportService.addRequestHandlingBehavior(
+                PeerRecoverySourceService.Actions.START_RECOVERY,
+                (handler, request, channel, task) -> {
+                    assertThat(request, instanceOf(StartRecoveryRequest.class));
+                    assertThat(((StartRecoveryRequest) request).clusterStateVersion(), greaterThan(initialClusterStateVersion));
+                    handler.messageReceived(
+                        request,
+                        new TestTransportChannel(
+                            new ChannelActionListener<>(channel).delegateResponse(
+                                (l, e) -> fail(e, "recovery should succeed on first attempt")
+                            )
+                        ),
+                        task
+                    );
+                    clusterStateSyncListeners.onStartRecovery();
+                }
+            );
+            clusterStateSyncListeners.addCleanup(primaryNodeTransportService::clearInboundRules);
+
+            final var replicaClusterService = internalCluster().getInstance(ClusterService.class, replicaNode);
+            final ClusterStateListener clusterStateListener = event -> {
+                final var primaryProceedListener = clusterStateSyncListeners.getStateApplyDelayListener(event.state().version());
+                final var indexRoutingTable = event.state().routingTable().index(indexName);
+                assertNotNull(indexRoutingTable);
+                final var indexShardRoutingTable = indexRoutingTable.shard(0);
+                if (indexShardRoutingTable.size() == 2 && indexShardRoutingTable.getAllInitializingShards().isEmpty() == false) {
+                    // this is the cluster state update which starts the recovery, so delay the primary node application until recovery
+                    // has started
+                    clusterStateSyncListeners.delayUntilRecoveryStart(primaryProceedListener);
+                } else {
+                    // this is some other cluster state update, so we must let it proceed now
+                    primaryProceedListener.onResponse(null);
+                }
+            };
+            replicaClusterService.addListener(clusterStateListener);
+            clusterStateSyncListeners.addCleanup(() -> replicaClusterService.removeListener(clusterStateListener));
+
+            updateIndexSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1), indexName);
+            ensureGreen(indexName);
+        }
+    }
+
     private void assertGlobalCheckpointIsStableAndSyncedInAllNodes(String indexName, List<String> nodes, int shard) throws Exception {
         assertThat(nodes, is(not(empty())));
 

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -142,6 +142,7 @@ public class TransportVersions {
     public static final TransportVersion NODE_INFO_INDEX_VERSION_ADDED = def(8_500_075);
     public static final TransportVersion FIRST_NEW_ID_LAYOUT = def(8_501_00_0);
     public static final TransportVersion COMMIT_PRIMARY_TERM_GENERATION = def(8_501_00_1);
+    public static final TransportVersion WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED = def(8_502_00_0);
     /*
      * STOP! READ THIS FIRST! No, really,
      *        ____ _____ ___  ____  _        ____  _____    _    ____    _____ _   _ ___ ____    _____ ___ ____  ____ _____ _

+ 3 - 2
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -3090,7 +3090,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         PeerRecoveryTargetService.RecoveryListener recoveryListener,
         RepositoriesService repositoriesService,
         BiConsumer<MappingMetadata, ActionListener<Void>> mappingUpdateConsumer,
-        IndicesService indicesService
+        IndicesService indicesService,
+        long clusterStateVersion
     ) {
         // TODO: Create a proper object to encapsulate the recovery context
         // all of the current methods here follow a pattern of:
@@ -3114,7 +3115,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             case PEER -> {
                 try {
                     markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
-                    recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
+                    recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), clusterStateVersion, recoveryListener);
                 } catch (Exception e) {
                     failShard("corrupted preexisting index", e);
                     recoveryListener.onRecoveryFailure(new RecoveryFailedException(recoveryState, null, e), true);

+ 4 - 4
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -879,7 +879,7 @@ public class IndicesService extends AbstractLifecycleComponent
     }
 
     @Override
-    public IndexShard createShard(
+    public void createShard(
         final ShardRouting shardRouting,
         final PeerRecoveryTargetService recoveryTargetService,
         final PeerRecoveryTargetService.RecoveryListener recoveryListener,
@@ -888,7 +888,8 @@ public class IndicesService extends AbstractLifecycleComponent
         final GlobalCheckpointSyncer globalCheckpointSyncer,
         final RetentionLeaseSyncer retentionLeaseSyncer,
         final DiscoveryNode targetNode,
-        final DiscoveryNode sourceNode
+        final DiscoveryNode sourceNode,
+        long clusterStateVersion
     ) throws IOException {
         Objects.requireNonNull(retentionLeaseSyncer);
         ensureChangesAllowed();
@@ -911,8 +912,7 @@ public class IndicesService extends AbstractLifecycleComponent
                     .masterNodeTimeout(TimeValue.MAX_VALUE),
                 new ThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null))
             );
-        }, this);
-        return indexShard;
+        }, this, clusterStateVersion);
     }
 
     @Override

+ 6 - 4
server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -645,7 +645,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                 this::updateGlobalCheckpointForShard,
                 retentionLeaseSyncer,
                 originalState.nodes().getLocalNode(),
-                sourceNode
+                sourceNode,
+                originalState.version()
             );
             listener.onResponse(true);
         } catch (ShardLockObtainFailedException e) {
@@ -1090,10 +1091,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
          * @param retentionLeaseSyncer   a callback when this shard syncs retention leases
          * @param targetNode             the node where this shard will be recovered
          * @param sourceNode             the source node to recover this shard from (it might be null)
-         * @return a new shard
+         * @param clusterStateVersion    the cluster state version in which the shard was created
          * @throws IOException if an I/O exception occurs when creating the shard
          */
-        T createShard(
+        void createShard(
             ShardRouting shardRouting,
             PeerRecoveryTargetService recoveryTargetService,
             PeerRecoveryTargetService.RecoveryListener recoveryListener,
@@ -1102,7 +1103,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
             GlobalCheckpointSyncer globalCheckpointSyncer,
             RetentionLeaseSyncer retentionLeaseSyncer,
             DiscoveryNode targetNode,
-            @Nullable DiscoveryNode sourceNode
+            @Nullable DiscoveryNode sourceNode,
+            long clusterStateVersion
         ) throws IOException;
 
         /**

+ 72 - 0
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceClusterStateDelay.java

@@ -0,0 +1,72 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.indices.recovery;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.SubscribableListener;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+public class PeerRecoverySourceClusterStateDelay {
+    private PeerRecoverySourceClusterStateDelay() {}
+
+    private static final Logger logger = LogManager.getLogger(PeerRecoverySourceClusterStateDelay.class);
+
+    /**
+     * Waits for the given cluster state version to be applied locally before proceeding with recovery
+     */
+    public static <T> void ensureClusterStateVersion(
+        long clusterStateVersion,
+        ClusterService clusterService,
+        Executor executor,
+        ThreadContext threadContext,
+        ActionListener<T> listener,
+        Consumer<ActionListener<T>> proceedWithRecovery
+    ) {
+        if (clusterStateVersion <= clusterService.state().version()) {
+            // either our locally-applied cluster state is already fresh enough, or request.clusterStateVersion() == 0 for bwc
+            proceedWithRecovery.accept(listener);
+        } else {
+            logger.debug("delaying {} until application of cluster state version {}", proceedWithRecovery, clusterStateVersion);
+            final var waitListener = new SubscribableListener<Void>();
+            final var clusterStateVersionListener = new ClusterStateListener() {
+                @Override
+                public void clusterChanged(ClusterChangedEvent event) {
+                    if (clusterStateVersion <= event.state().version()) {
+                        waitListener.onResponse(null);
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "ClusterStateListener for " + proceedWithRecovery;
+                }
+            };
+            clusterService.addListener(clusterStateVersionListener);
+            waitListener.addListener(ActionListener.running(() -> clusterService.removeListener(clusterStateVersionListener)));
+            if (clusterStateVersion <= clusterService.state().version()) {
+                waitListener.onResponse(null);
+            }
+            waitListener.addListener(
+                listener.delegateFailureAndWrap((l, ignored) -> proceedWithRecovery.accept(l)),
+                executor,
+                threadContext
+            );
+            // NB no timeout. If we never apply the fresh cluster state then eventually we leave the cluster which removes the recovery
+            // from the routing table so the target shard will fail.
+        }
+    }
+}

+ 25 - 2
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

@@ -21,7 +21,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.core.Nullable;
@@ -43,6 +42,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * The source recovery accepts recovery requests from other peer shards and start the recovery process from this
@@ -59,20 +59,22 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
 
     private final TransportService transportService;
     private final IndicesService indicesService;
+    private final ClusterService clusterService;
     private final RecoverySettings recoverySettings;
     private final RecoveryPlannerService recoveryPlannerService;
 
     final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries();
 
-    @Inject
     public PeerRecoverySourceService(
         TransportService transportService,
         IndicesService indicesService,
+        ClusterService clusterService,
         RecoverySettings recoverySettings,
         RecoveryPlannerService recoveryPlannerService
     ) {
         this.transportService = transportService;
         this.indicesService = indicesService;
+        this.clusterService = clusterService;
         this.recoverySettings = recoverySettings;
         this.recoveryPlannerService = recoveryPlannerService;
         // When the target node wants to start a peer recovery it sends a START_RECOVERY request to the source
@@ -132,6 +134,27 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
     }
 
     private void recover(StartRecoveryRequest request, Task task, ActionListener<RecoveryResponse> listener) {
+        PeerRecoverySourceClusterStateDelay.ensureClusterStateVersion(
+            request.clusterStateVersion(),
+            clusterService,
+            transportService.getThreadPool().generic(),
+            transportService.getThreadPool().getThreadContext(),
+            listener,
+            new Consumer<>() {
+                @Override
+                public void accept(ActionListener<RecoveryResponse> l) {
+                    recoverWithFreshClusterState(request, task, l);
+                }
+
+                @Override
+                public String toString() {
+                    return "recovery [" + request + "]";
+                }
+            }
+        );
+    }
+
+    private void recoverWithFreshClusterState(StartRecoveryRequest request, Task task, ActionListener<RecoveryResponse> listener) {
         final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
         final IndexShard shard = indexService.getShard(request.shardId().id());
 

+ 10 - 2
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -228,12 +228,18 @@ public class PeerRecoveryTargetService implements IndexEventListener {
         }
     }
 
-    public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
+    public void startRecovery(
+        final IndexShard indexShard,
+        final DiscoveryNode sourceNode,
+        final long clusterStateVersion,
+        final RecoveryListener listener
+    ) {
         final Releasable snapshotFileDownloadsPermit = tryAcquireSnapshotDownloadPermits();
         // create a new recovery status, and process...
         final long recoveryId = onGoingRecoveries.startRecovery(
             indexShard,
             sourceNode,
+            clusterStateVersion,
             snapshotFilesProvider,
             listener,
             recoverySettings.activityTimeout(),
@@ -319,7 +325,8 @@ public class PeerRecoveryTargetService implements IndexEventListener {
                         recoveryId,
                         indexShard.shardId(),
                         transportService.getLocalNode(),
-                        indexShard.routingEntry().allocationId().getId()
+                        indexShard.routingEntry().allocationId().getId(),
+                        recoveryTarget.clusterStateVersion()
                     ),
                     new ActionListener<>() {
                         @Override
@@ -455,6 +462,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
             recoveryTarget.indexShard().routingEntry().allocationId().getId(),
             recoveryTarget.sourceNode(),
             localNode,
+            recoveryTarget.clusterStateVersion(),
             metadataSnapshot,
             recoveryTarget.state().getPrimary(),
             recoveryTarget.recoveryId(),

+ 2 - 0
server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java

@@ -54,6 +54,7 @@ public class RecoveriesCollection {
     public long startRecovery(
         IndexShard indexShard,
         DiscoveryNode sourceNode,
+        long clusterStateVersion,
         SnapshotFilesProvider snapshotFilesProvider,
         PeerRecoveryTargetService.RecoveryListener listener,
         TimeValue activityTimeout,
@@ -62,6 +63,7 @@ public class RecoveriesCollection {
         RecoveryTarget recoveryTarget = new RecoveryTarget(
             indexShard,
             sourceNode,
+            clusterStateVersion,
             snapshotFilesProvider,
             snapshotFileDownloadsPermit,
             listener

+ 22 - 7
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -70,6 +70,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
     private final long recoveryId;
     private final IndexShard indexShard;
     private final DiscoveryNode sourceNode;
+    private final long clusterStateVersion;
     private final SnapshotFilesProvider snapshotFilesProvider;
     private volatile MultiFileWriter multiFileWriter;
     private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();
@@ -94,16 +95,18 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
     /**
      * Creates a new recovery target object that represents a recovery to the provided shard.
      *
-     * @param indexShard                        local shard where we want to recover to
-     * @param sourceNode                        source node of the recovery where we recover from
-     * @param snapshotFileDownloadsPermit       a permit that allows to download files from a snapshot,
-     *                                          limiting the concurrent snapshot file downloads per node
-     *                                          preventing the exhaustion of repository resources.
-     * @param listener                          called when recovery is completed/failed
+     * @param indexShard                  local shard where we want to recover to
+     * @param sourceNode                  source node of the recovery where we recover from
+     * @param clusterStateVersion         version of the cluster state that initiated the recovery
+     * @param snapshotFileDownloadsPermit a permit that allows to download files from a snapshot,
+     *                                    limiting the concurrent snapshot file downloads per node
+     *                                    preventing the exhaustion of repository resources.
+     * @param listener                    called when recovery is completed/failed
      */
     public RecoveryTarget(
         IndexShard indexShard,
         DiscoveryNode sourceNode,
+        long clusterStateVersion,
         SnapshotFilesProvider snapshotFilesProvider,
         @Nullable Releasable snapshotFileDownloadsPermit,
         PeerRecoveryTargetService.RecoveryListener listener
@@ -114,6 +117,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
         this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
         this.indexShard = indexShard;
         this.sourceNode = sourceNode;
+        this.clusterStateVersion = clusterStateVersion;
         this.snapshotFilesProvider = snapshotFilesProvider;
         this.snapshotFileDownloadsPermit = snapshotFileDownloadsPermit;
         this.shardId = indexShard.shardId();
@@ -149,7 +153,14 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
         // get released after the retry copy is created
         Releasable snapshotFileDownloadsPermitCopy = snapshotFileDownloadsPermit;
         snapshotFileDownloadsPermit = null;
-        return new RecoveryTarget(indexShard, sourceNode, snapshotFilesProvider, snapshotFileDownloadsPermitCopy, listener);
+        return new RecoveryTarget(
+            indexShard,
+            sourceNode,
+            clusterStateVersion,
+            snapshotFilesProvider,
+            snapshotFileDownloadsPermitCopy,
+            listener
+        );
     }
 
     @Nullable
@@ -174,6 +185,10 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
         return this.sourceNode;
     }
 
+    public long clusterStateVersion() {
+        return clusterStateVersion;
+    }
+
     public RecoveryState state() {
         return indexShard.recoveryState();
     }

+ 20 - 1
server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.indices.recovery;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -29,6 +30,7 @@ public class StartRecoveryRequest extends TransportRequest {
     private final String targetAllocationId;
     private final DiscoveryNode sourceNode;
     private final DiscoveryNode targetNode;
+    private final long clusterStateVersion;
     private final Store.MetadataSnapshot metadataSnapshot;
     private final boolean primaryRelocation;
     private final long startingSeqNo;
@@ -41,6 +43,11 @@ public class StartRecoveryRequest extends TransportRequest {
         targetAllocationId = in.readString();
         sourceNode = new DiscoveryNode(in);
         targetNode = new DiscoveryNode(in);
+        if (in.getTransportVersion().onOrAfter(TransportVersions.WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED)) {
+            clusterStateVersion = in.readVLong();
+        } else {
+            clusterStateVersion = 0L; // bwc: do not wait for cluster state to be applied
+        }
         metadataSnapshot = Store.MetadataSnapshot.readFrom(in);
         primaryRelocation = in.readBoolean();
         startingSeqNo = in.readLong();
@@ -58,6 +65,7 @@ public class StartRecoveryRequest extends TransportRequest {
      * @param targetAllocationId       the allocation id of the target shard
      * @param sourceNode               the source node to remover from
      * @param targetNode               the target node to recover to
+     * @param clusterStateVersion      the cluster state version which initiated the recovery
      * @param metadataSnapshot         the Lucene metadata
      * @param primaryRelocation        whether or not the recovery is a primary relocation
      * @param recoveryId               the recovery ID
@@ -69,12 +77,14 @@ public class StartRecoveryRequest extends TransportRequest {
         final String targetAllocationId,
         final DiscoveryNode sourceNode,
         final DiscoveryNode targetNode,
+        final long clusterStateVersion,
         final Store.MetadataSnapshot metadataSnapshot,
         final boolean primaryRelocation,
         final long recoveryId,
         final long startingSeqNo,
         final boolean canDownloadSnapshotFiles
     ) {
+        this.clusterStateVersion = clusterStateVersion;
         this.recoveryId = recoveryId;
         this.shardId = shardId;
         this.targetAllocationId = targetAllocationId;
@@ -108,6 +118,10 @@ public class StartRecoveryRequest extends TransportRequest {
         return targetNode;
     }
 
+    public long clusterStateVersion() {
+        return clusterStateVersion;
+    }
+
     public boolean isPrimaryRelocation() {
         return primaryRelocation;
     }
@@ -129,11 +143,13 @@ public class StartRecoveryRequest extends TransportRequest {
         return Strings.format(
             """
                 recovery of %s to %s \
-                [recoveryId=%d, targetAllocationId=%s, startingSeqNo=%d, primaryRelocation=%s, canDownloadSnapshotFiles=%s]""",
+                [recoveryId=%d, targetAllocationId=%s, clusterStateVersion=%d, startingSeqNo=%d, \
+                primaryRelocation=%s, canDownloadSnapshotFiles=%s]""",
             shardId,
             targetNode.descriptionWithoutAttributes(),
             recoveryId,
             targetAllocationId,
+            clusterStateVersion,
             startingSeqNo,
             primaryRelocation,
             canDownloadSnapshotFiles
@@ -148,6 +164,9 @@ public class StartRecoveryRequest extends TransportRequest {
         out.writeString(targetAllocationId);
         sourceNode.writeTo(out);
         targetNode.writeTo(out);
+        if (out.getTransportVersion().onOrAfter(TransportVersions.WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED)) {
+            out.writeVLong(clusterStateVersion);
+        } // else bwc: just omit it, the receiver doesn't wait for a cluster state anyway
         metadataSnapshot.writeTo(out);
         out.writeBoolean(primaryRelocation);
         out.writeLong(startingSeqNo);

+ 19 - 3
server/src/main/java/org/elasticsearch/indices/recovery/StatelessPrimaryRelocationAction.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.indices.recovery;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
@@ -33,12 +34,14 @@ public class StatelessPrimaryRelocationAction {
         private final ShardId shardId;
         private final DiscoveryNode targetNode;
         private final String targetAllocationId;
+        private final long clusterStateVersion;
 
-        public Request(long recoveryId, ShardId shardId, DiscoveryNode targetNode, String targetAllocationId) {
+        public Request(long recoveryId, ShardId shardId, DiscoveryNode targetNode, String targetAllocationId, long clusterStateVersion) {
             this.recoveryId = recoveryId;
             this.shardId = shardId;
             this.targetNode = targetNode;
             this.targetAllocationId = targetAllocationId;
+            this.clusterStateVersion = clusterStateVersion;
         }
 
         public Request(StreamInput in) throws IOException {
@@ -47,6 +50,11 @@ public class StatelessPrimaryRelocationAction {
             shardId = new ShardId(in);
             targetNode = new DiscoveryNode(in);
             targetAllocationId = in.readString();
+            if (in.getTransportVersion().onOrAfter(TransportVersions.WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED)) {
+                clusterStateVersion = in.readVLong();
+            } else {
+                clusterStateVersion = 0L; // temporary bwc: do not wait for cluster state to be applied
+            }
         }
 
         @Override
@@ -61,6 +69,9 @@ public class StatelessPrimaryRelocationAction {
             shardId.writeTo(out);
             targetNode.writeTo(out);
             out.writeString(targetAllocationId);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED)) {
+                out.writeVLong(clusterStateVersion);
+            } // temporary bwc: just omit it, the receiver doesn't wait for a cluster state anyway
         }
 
         public long recoveryId() {
@@ -79,6 +90,10 @@ public class StatelessPrimaryRelocationAction {
             return targetAllocationId;
         }
 
+        public long clusterStateVersion() {
+            return clusterStateVersion;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
@@ -87,12 +102,13 @@ public class StatelessPrimaryRelocationAction {
             return recoveryId == request.recoveryId
                 && shardId.equals(request.shardId)
                 && targetNode.equals(request.targetNode)
-                && targetAllocationId.equals(request.targetAllocationId);
+                && targetAllocationId.equals(request.targetAllocationId)
+                && clusterStateVersion == request.clusterStateVersion;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(recoveryId, shardId, targetNode, targetAllocationId);
+            return Objects.hash(recoveryId, shardId, targetNode, targetAllocationId, clusterStateVersion);
         }
     }
 }

+ 7 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -1106,7 +1106,13 @@ public class Node implements Closeable {
                     final SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoryService);
                     b.bind(PeerRecoverySourceService.class)
                         .toInstance(
-                            new PeerRecoverySourceService(transportService, indicesService, recoverySettings, recoveryPlannerService)
+                            new PeerRecoverySourceService(
+                                transportService,
+                                indicesService,
+                                clusterService,
+                                recoverySettings,
+                                recoveryPlannerService
+                            )
                         );
                     b.bind(PeerRecoveryTargetService.class)
                         .toInstance(

+ 2 - 2
server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

@@ -116,7 +116,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
             IndexShard replica = shards.addReplica();
             Future<Void> future = shards.asyncRecoverReplica(
                 replica,
-                (indexShard, node) -> new RecoveryTarget(indexShard, node, null, null, recoveryListener) {
+                (indexShard, node) -> new RecoveryTarget(indexShard, node, 0L, null, null, recoveryListener) {
                     @Override
                     public void cleanFiles(
                         int totalTranslogOps,
@@ -199,7 +199,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
             IndexShard replica = shards.addReplica();
             Future<Void> fut = shards.asyncRecoverReplica(
                 replica,
-                (shard, node) -> new RecoveryTarget(shard, node, null, null, recoveryListener) {
+                (shard, node) -> new RecoveryTarget(shard, node, 0L, null, null, recoveryListener) {
                     @Override
                     public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
                         try {

+ 3 - 3
server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

@@ -452,7 +452,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
             AtomicBoolean recoveryDone = new AtomicBoolean(false);
             final Future<Void> recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> {
                 recoveryStart.countDown();
-                return new RecoveryTarget(indexShard, node, null, null, recoveryListener) {
+                return new RecoveryTarget(indexShard, node, 0L, null, null, recoveryListener) {
                     @Override
                     public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
                         recoveryDone.set(true);
@@ -506,7 +506,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
             final IndexShard replica = shards.addReplica();
             final Future<Void> recoveryFuture = shards.asyncRecoverReplica(
                 replica,
-                (indexShard, node) -> new RecoveryTarget(indexShard, node, null, null, recoveryListener) {
+                (indexShard, node) -> new RecoveryTarget(indexShard, node, 0L, null, null, recoveryListener) {
                     @Override
                     public void indexTranslogOperations(
                         final List<Translog.Operation> operations,
@@ -784,7 +784,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
             PeerRecoveryTargetService.RecoveryListener listener,
             Logger logger
         ) {
-            super(shard, sourceNode, null, null, listener);
+            super(shard, sourceNode, 0L, null, null, listener);
             this.recoveryBlocked = recoveryBlocked;
             this.releaseRecovery = releaseRecovery;
             this.stageToBlock = stageToBlock;

+ 109 - 91
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -2856,31 +2856,37 @@ public class IndexShardTests extends IndexShardTestCase {
 
         indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}");
         IndexShard replica = newShard(primary.shardId(), false, "n2", metadata, null);
-        recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, null, null, recoveryListener) {
-            @Override
-            public void indexTranslogOperations(
-                final List<Translog.Operation> operations,
-                final int totalTranslogOps,
-                final long maxSeenAutoIdTimestamp,
-                final long maxSeqNoOfUpdatesOrDeletes,
-                final RetentionLeases retentionLeases,
-                final long mappingVersion,
-                final ActionListener<Long> listener
-            ) {
-                super.indexTranslogOperations(
-                    operations,
-                    totalTranslogOps,
-                    maxSeenAutoIdTimestamp,
-                    maxSeqNoOfUpdatesOrDeletes,
-                    retentionLeases,
-                    mappingVersion,
-                    listener.delegateFailureAndWrap((l, r) -> {
-                        assertFalse(replica.isSyncNeeded());
-                        l.onResponse(r);
-                    })
-                );
-            }
-        }, true, true);
+        recoverReplica(
+            replica,
+            primary,
+            (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, 0L, null, null, recoveryListener) {
+                @Override
+                public void indexTranslogOperations(
+                    final List<Translog.Operation> operations,
+                    final int totalTranslogOps,
+                    final long maxSeenAutoIdTimestamp,
+                    final long maxSeqNoOfUpdatesOrDeletes,
+                    final RetentionLeases retentionLeases,
+                    final long mappingVersion,
+                    final ActionListener<Long> listener
+                ) {
+                    super.indexTranslogOperations(
+                        operations,
+                        totalTranslogOps,
+                        maxSeenAutoIdTimestamp,
+                        maxSeqNoOfUpdatesOrDeletes,
+                        retentionLeases,
+                        mappingVersion,
+                        listener.delegateFailureAndWrap((l, r) -> {
+                            assertFalse(replica.isSyncNeeded());
+                            l.onResponse(r);
+                        })
+                    );
+                }
+            },
+            true,
+            true
+        );
 
         closeShards(primary, replica);
     }
@@ -2980,32 +2986,38 @@ public class IndexShardTests extends IndexShardTestCase {
         replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
         // Shard is still inactive since we haven't started recovering yet
         assertFalse(replica.isActive());
-        recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, null, null, recoveryListener) {
-            @Override
-            public void indexTranslogOperations(
-                final List<Translog.Operation> operations,
-                final int totalTranslogOps,
-                final long maxAutoIdTimestamp,
-                final long maxSeqNoOfUpdatesOrDeletes,
-                final RetentionLeases retentionLeases,
-                final long mappingVersion,
-                final ActionListener<Long> listener
-            ) {
-                super.indexTranslogOperations(
-                    operations,
-                    totalTranslogOps,
-                    maxAutoIdTimestamp,
-                    maxSeqNoOfUpdatesOrDeletes,
-                    retentionLeases,
-                    mappingVersion,
-                    listener.delegateFailureAndWrap((l, checkpoint) -> {
-                        l.onResponse(checkpoint);
-                        // Shard should now be active since we did recover:
-                        assertTrue(replica.isActive());
-                    })
-                );
-            }
-        }, false, true);
+        recoverReplica(
+            replica,
+            primary,
+            (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, 0L, null, null, recoveryListener) {
+                @Override
+                public void indexTranslogOperations(
+                    final List<Translog.Operation> operations,
+                    final int totalTranslogOps,
+                    final long maxAutoIdTimestamp,
+                    final long maxSeqNoOfUpdatesOrDeletes,
+                    final RetentionLeases retentionLeases,
+                    final long mappingVersion,
+                    final ActionListener<Long> listener
+                ) {
+                    super.indexTranslogOperations(
+                        operations,
+                        totalTranslogOps,
+                        maxAutoIdTimestamp,
+                        maxSeqNoOfUpdatesOrDeletes,
+                        retentionLeases,
+                        mappingVersion,
+                        listener.delegateFailureAndWrap((l, checkpoint) -> {
+                            l.onResponse(checkpoint);
+                            // Shard should now be active since we did recover:
+                            assertTrue(replica.isActive());
+                        })
+                    );
+                }
+            },
+            false,
+            true
+        );
 
         closeShards(primary, replica);
     }
@@ -3033,48 +3045,54 @@ public class IndexShardTests extends IndexShardTestCase {
         DiscoveryNode localNode = DiscoveryNodeUtils.builder("foo").roles(emptySet()).build();
         replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
         assertListenerCalled.accept(replica);
-        recoverReplica(replica, primary, (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, null, null, recoveryListener) {
-            // we're only checking that listeners are called when the engine is open, before there is no point
-            @Override
-            public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
-                super.prepareForTranslogOperations(totalTranslogOps, listener.delegateFailureAndWrap((l, r) -> {
-                    assertListenerCalled.accept(replica);
-                    l.onResponse(r);
-                }));
-            }
-
-            @Override
-            public void indexTranslogOperations(
-                final List<Translog.Operation> operations,
-                final int totalTranslogOps,
-                final long maxAutoIdTimestamp,
-                final long maxSeqNoOfUpdatesOrDeletes,
-                final RetentionLeases retentionLeases,
-                final long mappingVersion,
-                final ActionListener<Long> listener
-            ) {
-                super.indexTranslogOperations(
-                    operations,
-                    totalTranslogOps,
-                    maxAutoIdTimestamp,
-                    maxSeqNoOfUpdatesOrDeletes,
-                    retentionLeases,
-                    mappingVersion,
-                    listener.delegateFailureAndWrap((l, r) -> {
+        recoverReplica(
+            replica,
+            primary,
+            (shard, discoveryNode) -> new RecoveryTarget(shard, discoveryNode, 0L, null, null, recoveryListener) {
+                // we're only checking that listeners are called when the engine is open, before there is no point
+                @Override
+                public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
+                    super.prepareForTranslogOperations(totalTranslogOps, listener.delegateFailureAndWrap((l, r) -> {
                         assertListenerCalled.accept(replica);
                         l.onResponse(r);
-                    })
-                );
-            }
+                    }));
+                }
 
-            @Override
-            public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
-                super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener.delegateFailureAndWrap((l, r) -> {
-                    assertListenerCalled.accept(replica);
-                    l.onResponse(r);
-                }));
-            }
-        }, false, true);
+                @Override
+                public void indexTranslogOperations(
+                    final List<Translog.Operation> operations,
+                    final int totalTranslogOps,
+                    final long maxAutoIdTimestamp,
+                    final long maxSeqNoOfUpdatesOrDeletes,
+                    final RetentionLeases retentionLeases,
+                    final long mappingVersion,
+                    final ActionListener<Long> listener
+                ) {
+                    super.indexTranslogOperations(
+                        operations,
+                        totalTranslogOps,
+                        maxAutoIdTimestamp,
+                        maxSeqNoOfUpdatesOrDeletes,
+                        retentionLeases,
+                        mappingVersion,
+                        listener.delegateFailureAndWrap((l, r) -> {
+                            assertListenerCalled.accept(replica);
+                            l.onResponse(r);
+                        })
+                    );
+                }
+
+                @Override
+                public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
+                    super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener.delegateFailureAndWrap((l, r) -> {
+                        assertListenerCalled.accept(replica);
+                        l.onResponse(r);
+                    }));
+                }
+            },
+            false,
+            true
+        );
 
         closeShards(primary, replica);
     }
@@ -4740,7 +4758,7 @@ public class IndexShardTests extends IndexShardTestCase {
                 assert false : "Unexpected failure";
             }
         };
-        recoverReplica(replicaShard, primary, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener) {
+        recoverReplica(replicaShard, primary, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, 0L, null, null, recoveryListener) {
             @Override
             public void indexTranslogOperations(
                 List<Translog.Operation> operations,

+ 4 - 7
server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java

@@ -229,7 +229,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
         }
 
         @Override
-        public MockIndexShard createShard(
+        public void createShard(
             final ShardRouting shardRouting,
             final PeerRecoveryTargetService recoveryTargetService,
             final PeerRecoveryTargetService.RecoveryListener recoveryListener,
@@ -238,21 +238,18 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
             final GlobalCheckpointSyncer globalCheckpointSyncer,
             final RetentionLeaseSyncer retentionLeaseSyncer,
             final DiscoveryNode targetNode,
-            final DiscoveryNode sourceNode
+            final DiscoveryNode sourceNode,
+            long clusterStateVersion
         ) throws IOException {
             failRandomly();
             RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode);
             MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
             MockIndexShard indexShard = indexService.createShard(shardRouting);
             indexShard.recoveryState = recoveryState;
-            return indexShard;
         }
 
         @Override
-        public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) throws IOException,
-            InterruptedException {
-
-        }
+        public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) {}
 
         private boolean hasIndex(Index index) {
             return indices.containsKey(index.getUUID());

+ 2 - 0
server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java

@@ -43,6 +43,7 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
         PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(
             transportService,
             indicesService,
+            clusterService,
             new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
             mock(RecoveryPlannerService.class)
         );
@@ -51,6 +52,7 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
             randomAlphaOfLength(10),
             getFakeDiscoNode("source"),
             getFakeDiscoNode("target"),
+            0L,
             Store.MetadataSnapshot.EMPTY,
             randomBoolean(),
             randomLong(),

+ 9 - 9
server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

@@ -100,7 +100,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId());
         final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId());
         targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode));
-        final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, null, null);
+        final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, 0L, null, null, null);
         final PlainActionFuture<Void> receiveFileInfoFuture = new PlainActionFuture<>();
         recoveryTarget.receiveFileInfo(
             mdFiles.stream().map(StoreFileMetadata::name).toList(),
@@ -330,7 +330,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         shard.prepareForIndexRecovery();
         long startingSeqNo = shard.recoverLocallyUpToGlobalCheckpoint();
         shard.store().markStoreCorrupted(new IOException("simulated"));
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null, null);
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, 0L, null, null, null);
         StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(logger, rNode, recoveryTarget, startingSeqNo);
         assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
         assertThat(request.metadataSnapshot().size(), equalTo(0));
@@ -348,7 +348,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         shard.prepareForIndexRecovery();
 
         PlainActionFuture<Void> future = PlainActionFuture.newFuture();
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null, new PeerRecoveryTargetService.RecoveryListener() {
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, 0L, null, null, new PeerRecoveryTargetService.RecoveryListener() {
             @Override
             public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) {
                 future.onResponse(null);
@@ -388,7 +388,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE));
         shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode));
         shard.prepareForIndexRecovery();
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, null, null);
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, 0L, null, null, null);
         StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
             logger,
             rNode,
@@ -456,7 +456,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         recoveryStateIndex.addFileDetail(storeFileMetadata.name(), storeFileMetadata.length(), false);
         recoveryStateIndex.setFileDetailsComplete();
 
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null);
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, 0L, snapshotFilesProvider, () -> {}, null);
 
         PlainActionFuture<Void> writeSnapshotFileFuture = PlainActionFuture.newFuture();
         recoveryTarget.restoreFileFromSnapshot(repositoryName, indexId, fileInfo, writeSnapshotFileFuture);
@@ -528,7 +528,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         recoveryStateIndex.addFileDetail(storeFileMetadata.name(), storeFileMetadata.length(), false);
         recoveryStateIndex.setFileDetailsComplete();
 
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null);
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, 0L, snapshotFilesProvider, () -> {}, null);
 
         String repositoryName = "repo";
         IndexId indexId = new IndexId("index", "uuid");
@@ -635,7 +635,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
             }
         };
 
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null);
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, 0L, snapshotFilesProvider, () -> {}, null);
 
         String[] fileNamesBeforeRecoveringSnapshotFiles = directory.listAll();
 
@@ -701,7 +701,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         recoveryStateIndex.addFileDetail(storeFileMetadata.name(), storeFileMetadata.length(), false);
         recoveryStateIndex.setFileDetailsComplete();
 
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, snapshotFilesProvider, () -> {}, null);
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, 0L, snapshotFilesProvider, () -> {}, null);
 
         String repository = "repo";
         IndexId indexId = new IndexId("index", "uuid");
@@ -749,7 +749,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         Releasable snapshotFileDownloadsPermit = () -> {
             assertThat(snapshotFileDownloadsPermitFlag.compareAndSet(false, true), is(equalTo(true)));
         };
-        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null, snapshotFileDownloadsPermit, null);
+        RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, 0L, null, snapshotFileDownloadsPermit, null);
 
         recoveryTarget.decRef();
 

+ 2 - 1
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -233,7 +233,7 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
         IOUtils.close(reader, store, multiFileWriter, targetStore);
     }
 
-    public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
+    public StartRecoveryRequest getStartRecoveryRequest() {
         Store.MetadataSnapshot metadataSnapshot = randomBoolean()
             ? Store.MetadataSnapshot.EMPTY
             : new Store.MetadataSnapshot(
@@ -246,6 +246,7 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
             null,
             DiscoveryNodeUtils.builder("b").roles(emptySet()).build(),
             DiscoveryNodeUtils.builder("b").roles(emptySet()).build(),
+            0L,
             metadataSnapshot,
             randomBoolean(),
             randomNonNegativeLong(),

+ 2 - 2
server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

@@ -311,7 +311,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
         }
         IndexShard replicaShard = newShard(primaryShard.shardId(), false);
         updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetadata());
-        recoverReplica(replicaShard, primaryShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener) {
+        recoverReplica(replicaShard, primaryShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, 0L, null, null, recoveryListener) {
             @Override
             public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Void> listener) {
                 super.prepareForTranslogOperations(totalTranslogOps, listener);
@@ -432,7 +432,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
             allowShardFailures();
             IndexShard replica = group.addReplica();
             expectThrows(Exception.class, () -> group.recoverReplica(replica, (shard, sourceNode) -> {
-                return new RecoveryTarget(shard, sourceNode, null, null, new PeerRecoveryTargetService.RecoveryListener() {
+                return new RecoveryTarget(shard, sourceNode, 0L, null, null, new PeerRecoveryTargetService.RecoveryListener() {
                     @Override
                     public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) {
                         throw new AssertionError("recovery must fail");

+ 16 - 14
server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java

@@ -9,11 +9,10 @@
 package org.elasticsearch.indices.recovery;
 
 import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.common.UUIDs;
-import org.elasticsearch.common.io.stream.InputStreamStreamInput;
-import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -22,8 +21,6 @@ import org.elasticsearch.index.store.Store;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.TransportVersionUtils;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.util.Collections;
 
 import static java.util.Collections.emptySet;
@@ -47,6 +44,7 @@ public class StartRecoveryRequestTests extends ESTestCase {
             UUIDs.randomBase64UUID(),
             DiscoveryNodeUtils.builder("a").roles(emptySet()).version(targetNodeVersion, IndexVersion.ZERO, IndexVersion.current()).build(),
             DiscoveryNodeUtils.builder("b").roles(emptySet()).version(targetNodeVersion, IndexVersion.ZERO, IndexVersion.current()).build(),
+            randomNonNegativeLong(),
             metadataSnapshot,
             randomBoolean(),
             randomNonNegativeLong(),
@@ -54,15 +52,12 @@ public class StartRecoveryRequestTests extends ESTestCase {
             randomBoolean()
         );
 
-        final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
-        final OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
-        out.setTransportVersion(serializationVersion);
-        outRequest.writeTo(out);
-
-        final ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
-        InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
-        in.setTransportVersion(serializationVersion);
-        final StartRecoveryRequest inRequest = new StartRecoveryRequest(in);
+        final StartRecoveryRequest inRequest = copyWriteable(
+            outRequest,
+            writableRegistry(),
+            StartRecoveryRequest::new,
+            serializationVersion
+        );
 
         assertThat(outRequest.shardId(), equalTo(inRequest.shardId()));
         assertThat(outRequest.targetAllocationId(), equalTo(inRequest.targetAllocationId()));
@@ -72,6 +67,12 @@ public class StartRecoveryRequestTests extends ESTestCase {
         assertThat(outRequest.isPrimaryRelocation(), equalTo(inRequest.isPrimaryRelocation()));
         assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId()));
         assertThat(outRequest.startingSeqNo(), equalTo(inRequest.startingSeqNo()));
+
+        if (serializationVersion.onOrAfter(TransportVersions.WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED)) {
+            assertEquals(outRequest.clusterStateVersion(), inRequest.clusterStateVersion());
+        } else {
+            assertEquals(0L, inRequest.clusterStateVersion());
+        }
     }
 
     public void testDescription() {
@@ -79,13 +80,14 @@ public class StartRecoveryRequestTests extends ESTestCase {
         assertEquals(
             "recovery of [index][0] to "
                 + node.descriptionWithoutAttributes()
-                + " [recoveryId=1, targetAllocationId=allocationId, startingSeqNo=-2, "
+                + " [recoveryId=1, targetAllocationId=allocationId, clusterStateVersion=3, startingSeqNo=-2, "
                 + "primaryRelocation=false, canDownloadSnapshotFiles=true]",
             new StartRecoveryRequest(
                 new ShardId("index", "uuid", 0),
                 "allocationId",
                 null,
                 node,
+                3,
                 Store.MetadataSnapshot.EMPTY,
                 false,
                 1,

+ 18 - 6
server/src/test/java/org/elasticsearch/indices/recovery/StatelessPrimaryRelocationActionTests.java

@@ -32,7 +32,8 @@ public class StatelessPrimaryRelocationActionTests extends AbstractWireSerializi
             randomNonNegativeLong(),
             new ShardId(randomIdentifier(), UUIDs.randomBase64UUID(), randomIntBetween(0, 99)),
             newDiscoveryNode(),
-            UUIDs.randomBase64UUID()
+            UUIDs.randomBase64UUID(),
+            randomNonNegativeLong()
         );
     }
 
@@ -43,30 +44,41 @@ public class StatelessPrimaryRelocationActionTests extends AbstractWireSerializi
     @Override
     protected StatelessPrimaryRelocationAction.Request mutateInstance(StatelessPrimaryRelocationAction.Request instance)
         throws IOException {
-        return switch (between(1, 4)) {
+        return switch (between(1, 5)) {
             case 1 -> new StatelessPrimaryRelocationAction.Request(
                 randomValueOtherThan(instance.recoveryId(), ESTestCase::randomNonNegativeLong),
                 instance.shardId(),
                 instance.targetNode(),
-                instance.targetAllocationId()
+                instance.targetAllocationId(),
+                instance.clusterStateVersion()
             );
             case 2 -> new StatelessPrimaryRelocationAction.Request(
                 instance.recoveryId(),
                 ShardIdTests.mutate(instance.shardId()),
                 instance.targetNode(),
-                instance.targetAllocationId()
+                instance.targetAllocationId(),
+                instance.clusterStateVersion()
             );
             case 3 -> new StatelessPrimaryRelocationAction.Request(
                 instance.recoveryId(),
                 instance.shardId(),
                 randomValueOtherThan(instance.targetNode(), StatelessPrimaryRelocationActionTests::newDiscoveryNode),
-                instance.targetAllocationId()
+                instance.targetAllocationId(),
+                instance.clusterStateVersion()
             );
             case 4 -> new StatelessPrimaryRelocationAction.Request(
                 instance.recoveryId(),
                 instance.shardId(),
                 instance.targetNode(),
-                randomValueOtherThan(instance.targetAllocationId(), UUIDs::randomBase64UUID)
+                randomValueOtherThan(instance.targetAllocationId(), UUIDs::randomBase64UUID),
+                instance.clusterStateVersion()
+            );
+            case 5 -> new StatelessPrimaryRelocationAction.Request(
+                instance.recoveryId(),
+                instance.shardId(),
+                instance.targetNode(),
+                instance.targetAllocationId(),
+                randomValueOtherThan(instance.clusterStateVersion(), ESTestCase::randomNonNegativeLong)
             );
             default -> throw new AssertionError("impossible");
         };

+ 1 - 1
server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java

@@ -160,6 +160,6 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
         final DiscoveryNode rNode = getDiscoveryNode(indexShard.routingEntry().currentNodeId());
         indexShard.markAsRecovering("remote", new RecoveryState(indexShard.routingEntry(), sourceNode, rNode));
         indexShard.prepareForIndexRecovery();
-        return collection.startRecovery(indexShard, sourceNode, null, listener, timeValue, null);
+        return collection.startRecovery(indexShard, sourceNode, 0L, null, listener, timeValue, null);
     }
 }

+ 1 - 0
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1829,6 +1829,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 peerRecoverySourceService = new PeerRecoverySourceService(
                     transportService,
                     indicesService,
+                    clusterService,
                     recoverySettings,
                     PeerOnlyRecoveryPlannerService.INSTANCE
                 );

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -451,7 +451,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
         }
 
         public void recoverReplica(IndexShard replica) throws IOException {
-            recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener));
+            recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, 0L, null, null, recoveryListener));
         }
 
         public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier)

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -751,7 +751,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
         recoverReplica(
             replica,
             primary,
-            (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener),
+            (r, sourceNode) -> new RecoveryTarget(r, sourceNode, 0L, null, null, recoveryListener),
             true,
             startReplica
         );

+ 4 - 0
test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

@@ -2035,4 +2035,8 @@ public abstract class ESTestCase extends LuceneTestCase {
         return Locale.getDefault().getLanguage().equals(new Locale("tr").getLanguage())
             || Locale.getDefault().getLanguage().equals(new Locale("az").getLanguage());
     }
+
+    public static void fail(Throwable t, String msg, Object... args) {
+        throw new AssertionError(org.elasticsearch.common.Strings.format(msg, args), t);
+    }
 }

+ 1 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

@@ -433,7 +433,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                     // operations between the local checkpoint and max_seq_no which the recovering replica is waiting for.
                     recoveryFuture = group.asyncRecoverReplica(
                         newReplica,
-                        (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, null, null, recoveryListener) {
+                        (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, 0L, null, null, recoveryListener) {
                         }
                     );
                 }