Browse Source

Sending operations concurrently in peer recovery (#58018)

Today, we send operations in phase2 of peer recoveries batch by batch 
sequentially. Normally that's okay as we should have a fairly small of
operations in phase 2 due to the file-based threshold. However, if
phase1 takes a lot of time and we are actively indexing, then phase2 can
have a lot of operations to replay.

With this change, we will send multiple batches concurrently (defaults 
to 1) to reduce the recovery time.
Nhat Nguyen 5 years ago
parent
commit
961db311f0

+ 10 - 0
docs/reference/modules/indices/recovery.asciidoc

@@ -48,3 +48,13 @@ sent in parallel for each recovery. Defaults to `2`.
 +
 You can increase the value of this setting when the recovery of a single shard
 is not reaching the traffic limit set by `indices.recovery.max_bytes_per_sec`.
+
+`indices.recovery.max_concurrent_operations`::
+(<<cluster-update-settings,Dynamic>>, Expert) Number of operations sent
+in parallel for each recovery. Defaults to `1`.
++
+Concurrently replaying operations during recovery can be very resource-intensive
+and may interfere with indexing, search, and other activities in your cluster.
+Do not increase this setting without carefully verifying that your cluster has
+the resources available to handle the extra load that will result.
+

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -222,6 +222,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
             RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
             RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
             RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
+            RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
             ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
             ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
             ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,

+ 41 - 40
server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java → server/src/main/java/org/elasticsearch/indices/recovery/MultiChunkTransfer.java

@@ -28,7 +28,6 @@ import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
-import org.elasticsearch.index.store.StoreFileMetadata;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -57,64 +56,64 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
  * one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue
  * until all chunk requests are sent/responded.
  */
-public abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest> implements Closeable {
+public abstract class MultiChunkTransfer<Source, Request extends MultiChunkTransfer.ChunkRequest> implements Closeable {
     private Status status = Status.PROCESSING;
     private final Logger logger;
     private final ActionListener<Void> listener;
     private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
-    private final AsyncIOProcessor<FileChunkResponseItem> processor;
-    private final int maxConcurrentFileChunks;
-    private StoreFileMetadata currentFile = null;
-    private final Iterator<StoreFileMetadata> remainingFiles;
-    private Tuple<StoreFileMetadata, Request> readAheadRequest = null;
-
-    protected MultiFileTransfer(Logger logger, ThreadContext threadContext, ActionListener<Void> listener,
-                                int maxConcurrentFileChunks, List<StoreFileMetadata> files) {
+    private final AsyncIOProcessor<FileChunkResponseItem<Source>> processor;
+    private final int maxConcurrentChunks;
+    private Source currentSource = null;
+    private final Iterator<Source> remainingSources;
+    private Tuple<Source, Request> readAheadRequest = null;
+
+    protected MultiChunkTransfer(Logger logger, ThreadContext threadContext, ActionListener<Void> listener,
+                                 int maxConcurrentChunks, List<Source> sources) {
         this.logger = logger;
-        this.maxConcurrentFileChunks = maxConcurrentFileChunks;
+        this.maxConcurrentChunks = maxConcurrentChunks;
         this.listener = listener;
-        this.processor = new AsyncIOProcessor<>(logger, maxConcurrentFileChunks, threadContext) {
+        this.processor = new AsyncIOProcessor<>(logger, maxConcurrentChunks, threadContext) {
             @Override
-            protected void write(List<Tuple<FileChunkResponseItem, Consumer<Exception>>> items) {
+            protected void write(List<Tuple<FileChunkResponseItem<Source>, Consumer<Exception>>> items) {
                 handleItems(items);
             }
         };
-        this.remainingFiles = files.iterator();
+        this.remainingSources = sources.iterator();
     }
 
     public final void start() {
         addItem(UNASSIGNED_SEQ_NO, null, null); // put a dummy item to start the processor
     }
 
-    private void addItem(long requestSeqId, StoreFileMetadata md, Exception failure) {
-        processor.put(new FileChunkResponseItem(requestSeqId, md, failure), e -> { assert e == null : e; });
+    private void addItem(long requestSeqId, Source resource, Exception failure) {
+        processor.put(new FileChunkResponseItem<>(requestSeqId, resource, failure), e -> { assert e == null : e; });
     }
 
-    private void handleItems(List<Tuple<FileChunkResponseItem, Consumer<Exception>>> items) {
+    private void handleItems(List<Tuple<FileChunkResponseItem<Source>, Consumer<Exception>>> items) {
         if (status != Status.PROCESSING) {
             assert status == Status.FAILED : "must not receive any response after the transfer was completed";
             // These exceptions will be ignored as we record only the first failure, log them for debugging purpose.
             items.stream().filter(item -> item.v1().failure != null).forEach(item ->
-                logger.debug(new ParameterizedMessage("failed to transfer a file chunk request {}", item.v1().md), item.v1().failure));
+                logger.debug(new ParameterizedMessage("failed to transfer a chunk request {}", item.v1().source), item.v1().failure));
             return;
         }
         try {
-            for (Tuple<FileChunkResponseItem, Consumer<Exception>> item : items) {
-                final FileChunkResponseItem resp = item.v1();
+            for (Tuple<FileChunkResponseItem<Source>, Consumer<Exception>> item : items) {
+                final FileChunkResponseItem<Source> resp = item.v1();
                 if (resp.requestSeqId == UNASSIGNED_SEQ_NO) {
                     continue; // not an actual item
                 }
                 requestSeqIdTracker.markSeqNoAsProcessed(resp.requestSeqId);
                 if (resp.failure != null) {
-                    handleError(resp.md, resp.failure);
+                    handleError(resp.source, resp.failure);
                     throw resp.failure;
                 }
             }
-            while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentFileChunks) {
-                final Tuple<StoreFileMetadata, Request> request = readAheadRequest != null ? readAheadRequest : getNextRequest();
+            while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentChunks) {
+                final Tuple<Source, Request> request = readAheadRequest != null ? readAheadRequest : getNextRequest();
                 readAheadRequest = null;
                 if (request == null) {
-                    assert currentFile == null && remainingFiles.hasNext() == false;
+                    assert currentSource == null && remainingSources.hasNext() == false;
                     if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getProcessedCheckpoint()) {
                         onCompleted(null);
                     }
@@ -149,48 +148,50 @@ public abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkR
         listener.onResponse(null);
     }
 
-    private Tuple<StoreFileMetadata, Request> getNextRequest() throws Exception {
+    private Tuple<Source, Request> getNextRequest() throws Exception {
         try {
-            if (currentFile == null) {
-                if (remainingFiles.hasNext()) {
-                    currentFile = remainingFiles.next();
-                    onNewFile(currentFile);
+            if (currentSource == null) {
+                if (remainingSources.hasNext()) {
+                    currentSource = remainingSources.next();
+                    onNewResource(currentSource);
                 } else {
                     return null;
                 }
             }
-            final StoreFileMetadata md = currentFile;
+            final Source md = currentSource;
             final Request request = nextChunkRequest(md);
             if (request.lastChunk()) {
-                currentFile = null;
+                currentSource = null;
             }
             return Tuple.tuple(md, request);
         } catch (Exception e) {
-            handleError(currentFile, e);
+            handleError(currentSource, e);
             throw e;
         }
     }
 
     /**
-     * This method is called when starting sending/requesting a new file. Subclasses should override
+     * This method is called when starting sending/requesting a new source. Subclasses should override
      * this method to reset the file offset or close the previous file and open a new file if needed.
      */
-    protected abstract void onNewFile(StoreFileMetadata md) throws IOException;
+    protected void onNewResource(Source resource) throws IOException {
 
-    protected abstract Request nextChunkRequest(StoreFileMetadata md) throws IOException;
+    }
+
+    protected abstract Request nextChunkRequest(Source resource) throws IOException;
 
     protected abstract void executeChunkRequest(Request request, ActionListener<Void> listener);
 
-    protected abstract void handleError(StoreFileMetadata md, Exception e) throws Exception;
+    protected abstract void handleError(Source resource, Exception e) throws Exception;
 
-    private static class FileChunkResponseItem {
+    private static class FileChunkResponseItem<Source> {
         final long requestSeqId;
-        final StoreFileMetadata md;
+        final Source source;
         final Exception failure;
 
-        FileChunkResponseItem(long requestSeqId, StoreFileMetadata md, Exception failure) {
+        FileChunkResponseItem(long requestSeqId, Source source, Exception failure) {
             this.requestSeqId = requestSeqId;
-            this.md = md;
+            this.source = source;
             this.failure = failure;
         }
     }

+ 3 - 1
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

@@ -320,7 +320,9 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
                     new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
                         request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
                 handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request,
-                    Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
+                    Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
+                    recoverySettings.getMaxConcurrentFileChunks(),
+                    recoverySettings.getMaxConcurrentOperations());
                 return Tuple.tuple(handler, recoveryTarget);
             }
         }

+ 18 - 0
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java

@@ -45,6 +45,12 @@ public class RecoverySettings {
     public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
         Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope);
 
+    /**
+     * Controls the maximum number of operation chunk requests that can be sent concurrently from the source node to the target node.
+     */
+    public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING =
+        Setting.intSetting("indices.recovery.max_concurrent_operations", 1, 1, 4, Property.Dynamic, Property.NodeScope);
+
     /**
      * how long to wait before retrying after issues cause by cluster state syncing between nodes
      * i.e., local node is not yet known on remote node, remote shard not yet started etc.
@@ -91,6 +97,7 @@ public class RecoverySettings {
 
     private volatile ByteSizeValue maxBytesPerSec;
     private volatile int maxConcurrentFileChunks;
+    private volatile int maxConcurrentOperations;
     private volatile SimpleRateLimiter rateLimiter;
     private volatile TimeValue retryDelayStateSync;
     private volatile TimeValue retryDelayNetwork;
@@ -104,6 +111,7 @@ public class RecoverySettings {
     public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
         this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
         this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
+        this.maxConcurrentOperations = INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING.get(settings);
         // doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
         // and we want to give the master time to remove a faulty node
         this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings);
@@ -125,6 +133,8 @@ public class RecoverySettings {
 
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
+        clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
+            this::setMaxConcurrentOperations);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout);
@@ -209,4 +219,12 @@ public class RecoverySettings {
     private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) {
         this.maxConcurrentFileChunks = maxConcurrentFileChunks;
     }
+
+    public int getMaxConcurrentOperations() {
+        return maxConcurrentOperations;
+    }
+
+    private void setMaxConcurrentOperations(int maxConcurrentOperations) {
+        this.maxConcurrentOperations = maxConcurrentOperations;
+    }
 }

+ 116 - 103
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -40,7 +40,6 @@ import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.StopWatch;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -87,6 +86,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.IntSupplier;
 import java.util.stream.StreamSupport;
@@ -113,13 +113,15 @@ public class RecoverySourceHandler {
     private final int chunkSizeInBytes;
     private final RecoveryTargetHandler recoveryTarget;
     private final int maxConcurrentFileChunks;
+    private final int maxConcurrentOperations;
     private final ThreadPool threadPool;
     private final CancellableThreads cancellableThreads = new CancellableThreads();
     private final List<Closeable> resources = new CopyOnWriteArrayList<>();
     private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
 
     public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool,
-                                 StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks) {
+                                 StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks,
+                                 int maxConcurrentOperations) {
         this.shard = shard;
         this.recoveryTarget = recoveryTarget;
         this.threadPool = threadPool;
@@ -128,6 +130,7 @@ public class RecoverySourceHandler {
         this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName());
         this.chunkSizeInBytes = fileChunkSizeInBytes;
         this.maxConcurrentFileChunks = maxConcurrentFileChunks;
+        this.maxConcurrentOperations = maxConcurrentOperations;
     }
 
     public StartRecoveryRequest getRequest() {
@@ -309,12 +312,6 @@ public class RecoverySourceHandler {
                 final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion();
                 phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
                     retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);
-                sendSnapshotStep.whenComplete(
-                    r -> IOUtils.close(phase2Snapshot),
-                    e -> {
-                        IOUtils.closeWhileHandlingException(phase2Snapshot);
-                        onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e));
-                    });
 
             }, onFailure);
 
@@ -329,7 +326,7 @@ public class RecoverySourceHandler {
                 final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
                     sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
                     sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
-                    prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
+                    prepareEngineStep.result().millis(), sendSnapshotResult.sentOperations, sendSnapshotResult.tookTime.millis());
                 try {
                     future.onResponse(response);
                 } finally {
@@ -661,106 +658,122 @@ public class RecoverySourceHandler {
             throw new IndexShardClosedException(request.shardId());
         }
         logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");
-
-        final AtomicInteger skippedOps = new AtomicInteger();
-        final AtomicInteger totalSentOps = new AtomicInteger();
-        final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
-        final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
-            // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
-            // Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible.
-            synchronized (snapshot) {
-                final List<Translog.Operation> ops = lastBatchCount.get() > 0 ? new ArrayList<>(lastBatchCount.get()) : new ArrayList<>();
-                long batchSizeInBytes = 0L;
-                Translog.Operation operation;
-                while ((operation = snapshot.next()) != null) {
-                    if (shard.state() == IndexShardState.CLOSED) {
-                        throw new IndexShardClosedException(request.shardId());
-                    }
-                    cancellableThreads.checkForCancel();
-                    final long seqNo = operation.seqNo();
-                    if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
-                        skippedOps.incrementAndGet();
-                        continue;
-                    }
-                    ops.add(operation);
-                    batchSizeInBytes += operation.estimateSize();
-                    totalSentOps.incrementAndGet();
-
-                    // check if this request is past bytes threshold, and if so, send it off
-                    if (batchSizeInBytes >= chunkSizeInBytes) {
-                        break;
-                    }
-                }
-                lastBatchCount.set(ops.size());
-                return ops;
-            }
-        };
-
         final StopWatch stopWatch = new StopWatch().start();
-        final ActionListener<Long> batchedListener = ActionListener.map(listener,
-            targetLocalCheckpoint -> {
-                assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
+        final StepListener<Void> sendListener = new StepListener<>();
+        final OperationBatchSender sender = new OperationBatchSender(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp,
+            maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, sendListener);
+        sendListener.whenComplete(
+            ignored -> {
+                final long skippedOps = sender.skippedOps.get();
+                final int totalSentOps = sender.sentOps.get();
+                final long targetLocalCheckpoint = sender.targetLocalCheckpoint.get();
+                assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps + totalSentOps
                     : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
-                    snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
+                    snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps, totalSentOps);
                 stopWatch.stop();
                 final TimeValue tookTime = stopWatch.totalTime();
                 logger.trace("recovery [phase2]: took [{}]", tookTime);
-                return new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime);
-            }
-        );
+                listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime));
+            }, listener::onFailure);
+        sender.start();
+    }
 
-        sendBatch(
-                readNextBatch,
-                true,
-                SequenceNumbers.UNASSIGNED_SEQ_NO,
-                snapshot.totalOperations(),
-                maxSeenAutoIdTimestamp,
-                maxSeqNoOfUpdatesOrDeletes,
-                retentionLeases,
-                mappingVersion,
-                batchedListener);
+    private static class OperationChunkRequest implements MultiChunkTransfer.ChunkRequest {
+        final List<Translog.Operation> operations;
+        final boolean lastChunk;
+
+        OperationChunkRequest(List<Translog.Operation> operations, boolean lastChunk) {
+            this.operations = operations;
+            this.lastChunk = lastChunk;
+        }
+
+        @Override
+        public boolean lastChunk() {
+            return lastChunk;
+        }
     }
 
-    private void sendBatch(
-            final CheckedSupplier<List<Translog.Operation>, IOException> nextBatch,
-            final boolean firstBatch,
-            final long targetLocalCheckpoint,
-            final int totalTranslogOps,
-            final long maxSeenAutoIdTimestamp,
-            final long maxSeqNoOfUpdatesOrDeletes,
-            final RetentionLeases retentionLeases,
-            final long mappingVersionOnPrimary,
-            final ActionListener<Long> listener) throws IOException {
-        assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
-        assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[send translog]");
-        final List<Translog.Operation> operations = nextBatch.get();
-        // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
-        if (operations.isEmpty() == false || firstBatch) {
+    private class OperationBatchSender extends MultiChunkTransfer<Translog.Snapshot, OperationChunkRequest> {
+        private final long startingSeqNo;
+        private final long endingSeqNo;
+        private final Translog.Snapshot snapshot;
+        private final long maxSeenAutoIdTimestamp;
+        private final long maxSeqNoOfUpdatesOrDeletes;
+        private final RetentionLeases retentionLeases;
+        private final long mappingVersion;
+        private int lastBatchCount = 0; // used to estimate the count of the subsequent batch.
+        private final AtomicInteger skippedOps = new AtomicInteger();
+        private final AtomicInteger sentOps = new AtomicInteger();
+        private final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+
+        OperationBatchSender(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
+                             long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, long mappingVersion,
+                             ActionListener<Void> listener) {
+            super(logger, threadPool.getThreadContext(), listener, maxConcurrentOperations, List.of(snapshot));
+            this.startingSeqNo = startingSeqNo;
+            this.endingSeqNo = endingSeqNo;
+            this.snapshot = snapshot;
+            this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp;
+            this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
+            this.retentionLeases = retentionLeases;
+            this.mappingVersion = mappingVersion;
+        }
+
+        @Override
+        protected synchronized OperationChunkRequest nextChunkRequest(Translog.Snapshot snapshot) throws IOException {
+            // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
+            // Even though those calls are not concurrent, Snapshot#next() uses non-synchronized state and is not multi-thread-compatible.
+            assert Transports.assertNotTransportThread("[phase2]");
+            cancellableThreads.checkForCancel();
+            final List<Translog.Operation> ops = lastBatchCount > 0 ? new ArrayList<>(lastBatchCount) : new ArrayList<>();
+            long batchSizeInBytes = 0L;
+            Translog.Operation operation;
+            while ((operation = snapshot.next()) != null) {
+                if (shard.state() == IndexShardState.CLOSED) {
+                    throw new IndexShardClosedException(request.shardId());
+                }
+                final long seqNo = operation.seqNo();
+                if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
+                    skippedOps.incrementAndGet();
+                    continue;
+                }
+                ops.add(operation);
+                batchSizeInBytes += operation.estimateSize();
+                sentOps.incrementAndGet();
+
+                // check if this request is past bytes threshold, and if so, send it off
+                if (batchSizeInBytes >= chunkSizeInBytes) {
+                    break;
+                }
+            }
+            lastBatchCount = ops.size();
+            return new OperationChunkRequest(ops, operation == null);
+        }
+
+        @Override
+        protected void executeChunkRequest(OperationChunkRequest request, ActionListener<Void> listener) {
             cancellableThreads.checkForCancel();
             recoveryTarget.indexTranslogOperations(
-                operations,
-                totalTranslogOps,
+                request.operations,
+                snapshot.totalOperations(),
                 maxSeenAutoIdTimestamp,
                 maxSeqNoOfUpdatesOrDeletes,
                 retentionLeases,
-                mappingVersionOnPrimary,
-                ActionListener.wrap(
-                    newCheckpoint -> {
-                        sendBatch(
-                            nextBatch,
-                            false,
-                            SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
-                            totalTranslogOps,
-                            maxSeenAutoIdTimestamp,
-                            maxSeqNoOfUpdatesOrDeletes,
-                            retentionLeases,
-                            mappingVersionOnPrimary,
-                            listener);
-                    },
-                    listener::onFailure
-                ));
-        } else {
-            listener.onResponse(targetLocalCheckpoint);
+                mappingVersion,
+                ActionListener.delegateFailure(listener, (l, newCheckpoint) -> {
+                    targetLocalCheckpoint.updateAndGet(curr -> SequenceNumbers.max(curr, newCheckpoint));
+                    l.onResponse(null);
+                }));
+        }
+
+        @Override
+        protected void handleError(Translog.Snapshot snapshot, Exception e) {
+            throw new RecoveryEngineException(shard.shardId(), 2, "failed to send/replay operations", e);
+        }
+
+        @Override
+        public void close() throws IOException {
+            snapshot.close();
         }
     }
 
@@ -805,12 +818,12 @@ public class RecoverySourceHandler {
 
     static final class SendSnapshotResult {
         final long targetLocalCheckpoint;
-        final int totalOperations;
+        final int sentOperations;
         final TimeValue tookTime;
 
-        SendSnapshotResult(final long targetLocalCheckpoint, final int totalOperations, final TimeValue tookTime) {
+        SendSnapshotResult(final long targetLocalCheckpoint, final int sentOperations, final TimeValue tookTime) {
             this.targetLocalCheckpoint = targetLocalCheckpoint;
-            this.totalOperations = totalOperations;
+            this.sentOperations = sentOperations;
             this.tookTime = tookTime;
         }
     }
@@ -832,7 +845,7 @@ public class RecoverySourceHandler {
                 '}';
     }
 
-    private static class FileChunk implements MultiFileTransfer.ChunkRequest, Releasable {
+    private static class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable {
         final StoreFileMetadata md;
         final BytesReference content;
         final long position;
@@ -861,15 +874,15 @@ public class RecoverySourceHandler {
     void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener<Void> listener) {
         ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first
 
-        final MultiFileTransfer<FileChunk> multiFileSender =
-            new MultiFileTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {
+        final MultiChunkTransfer<StoreFileMetadata, FileChunk> multiFileSender =
+            new MultiChunkTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {
 
                 final Deque<byte[]> buffers = new ConcurrentLinkedDeque<>();
                 InputStreamIndexInput currentInput = null;
                 long offset = 0;
 
                 @Override
-                protected void onNewFile(StoreFileMetadata md) throws IOException {
+                protected void onNewResource(StoreFileMetadata md) throws IOException {
                     offset = 0;
                     IOUtils.close(currentInput, () -> currentInput = null);
                     final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);

+ 100 - 15
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -42,6 +42,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Numbers;
+import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -53,10 +54,12 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.CancellableThreads;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.engine.RecoveryEngineException;
 import org.elasticsearch.index.engine.SegmentsStats;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.ParseContext;
@@ -88,14 +91,18 @@ import org.junit.Before;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
@@ -181,7 +188,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             }
         };
         RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor),
-            threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5));
+            threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5), between(1, 5));
         PlainActionFuture<Void> sendFilesFuture = new PlainActionFuture<>();
         handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture);
         sendFilesFuture.actionGet();
@@ -242,13 +249,13 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             }
         };
         RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
-            threadPool, request, fileChunkSizeInBytes, between(1, 10));
+            threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10));
         PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
         handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
             randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future);
         final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
         RecoverySourceHandler.SendSnapshotResult result = future.actionGet();
-        assertThat(result.totalOperations, equalTo(expectedOps));
+        assertThat(result.sentOperations, equalTo(expectedOps));
         shippedOps.sort(Comparator.comparing(Translog.Operation::seqNo));
         assertThat(shippedOps.size(), equalTo(expectedOps));
         for (int i = 0; i < shippedOps.size(); i++) {
@@ -282,17 +289,76 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             }
         };
         RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
-            threadPool, request, fileChunkSizeInBytes, between(1, 10));
+            threadPool, request, fileChunkSizeInBytes, between(1, 10), between(1, 10));
         PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
         final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
         final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
         handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
             randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future);
         if (wasFailed.get()) {
-            assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
+            final RecoveryEngineException error = expectThrows(RecoveryEngineException.class, future::actionGet);
+            assertThat(error.getMessage(), equalTo("Phase[2] failed to send/replay operations"));
+            assertThat(error.getCause().getMessage(), equalTo("test - failed to index"));
         }
     }
 
+    public void testSendOperationsConcurrently() throws Throwable {
+        final IndexShard shard = mock(IndexShard.class);
+        when(shard.state()).thenReturn(IndexShardState.STARTED);
+        Set<Long> receivedSeqNos = ConcurrentCollections.newConcurrentSet();
+        long maxSeenAutoIdTimestamp = randomBoolean() ? -1 : randomNonNegativeLong();
+        long maxSeqNoOfUpdatesOrDeletes = randomBoolean() ? -1 : randomNonNegativeLong();
+        RetentionLeases retentionLeases = new RetentionLeases(randomNonNegativeLong(), randomNonNegativeLong(), List.of());
+        long mappingVersion = randomNonNegativeLong();
+        AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        int numOps = randomIntBetween(0, 1000);
+        AtomicBoolean received = new AtomicBoolean();
+        RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
+            @Override
+            public void indexTranslogOperations(List<Translog.Operation> operations, int receivedTotalOps,
+                                                long receivedMaxSeenAutoIdTimestamp, long receivedMaxSeqNoOfUpdatesOrDeletes,
+                                                RetentionLeases receivedRetentionLease, long receivedMappingVersion,
+                                                ActionListener<Long> listener) {
+                received.set(true);
+                assertThat(receivedMaxSeenAutoIdTimestamp, equalTo(maxSeenAutoIdTimestamp));
+                assertThat(receivedMaxSeqNoOfUpdatesOrDeletes, equalTo(maxSeqNoOfUpdatesOrDeletes));
+                assertThat(receivedRetentionLease, equalTo(retentionLeases));
+                assertThat(receivedMappingVersion, equalTo(mappingVersion));
+                assertThat(receivedTotalOps, equalTo(numOps));
+                for (Translog.Operation operation : operations) {
+                    receivedSeqNos.add(operation.seqNo());
+                }
+                if (randomBoolean()) {
+                    localCheckpoint.addAndGet(randomIntBetween(1, 100));
+                }
+                listener.onResponse(localCheckpoint.get());
+            }
+        };
+
+        PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> sendFuture = new PlainActionFuture<>();
+        long startingSeqNo = randomIntBetween(0, 1000);
+        long endingSeqNo = startingSeqNo + randomIntBetween(0, 10000);
+        List<Translog.Operation> operations = generateOperations(numOps);
+        Randomness.shuffle(operations);
+        List<Translog.Operation> skipOperations = randomSubsetOf(operations);
+        Translog.Snapshot snapshot = newTranslogSnapshot(operations, skipOperations);
+        RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(target, recoveryExecutor),
+            threadPool, getStartRecoveryRequest(), between(1, 10 * 1024), between(1, 5), between(1, 5));
+        handler.phase2(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases,
+            mappingVersion, sendFuture);
+        RecoverySourceHandler.SendSnapshotResult sendSnapshotResult = sendFuture.actionGet();
+        assertTrue(received.get());
+        assertThat(sendSnapshotResult.targetLocalCheckpoint, equalTo(localCheckpoint.get()));
+        assertThat(sendSnapshotResult.sentOperations, equalTo(receivedSeqNos.size()));
+        Set<Long> sentSeqNos = new HashSet<>();
+        for (Translog.Operation op : operations) {
+            if (startingSeqNo <= op.seqNo() && op.seqNo() <= endingSeqNo && skipOperations.contains(op) == false) {
+                sentSeqNos.add(op.seqNo());
+            }
+        }
+        assertThat(receivedSeqNos, equalTo(sentSeqNos));
+    }
+
     private Engine.Index getIndex(final String id) {
         final ParseContext.Document document = new ParseContext.Document();
         document.add(new TextField("test", "test", Field.Store.YES));
@@ -352,7 +418,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             }
         };
         RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool,
-            request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) {
+            request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8), between(1, 8)) {
             @Override
             protected void failEngine(IOException cause) {
                 assertFalse(failedEngine.get());
@@ -409,7 +475,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             }
         };
         RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool,
-            request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10)) {
+            request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10), between(1, 4)) {
             @Override
             protected void failEngine(IOException cause) {
                 assertFalse(failedEngine.get());
@@ -463,7 +529,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
                 threadPool,
                 request,
                 Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
-                between(1, 8)) {
+                between(1, 8), between(1, 8)) {
 
             @Override
             void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
@@ -540,7 +606,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         final int maxConcurrentChunks = between(1, 8);
         final int chunkSize = between(1, 32);
         final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, threadPool, getStartRecoveryRequest(),
-            chunkSize, maxConcurrentChunks);
+            chunkSize, maxConcurrentChunks, between(1, 10));
         Store store = newStore(createTempDir(), false);
         List<StoreFileMetadata> files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20));
         int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum();
@@ -598,7 +664,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         final int maxConcurrentChunks = between(1, 4);
         final int chunkSize = between(1, 16);
         final RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor),
-            threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks);
+            threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks, between(1, 5));
         Store store = newStore(createTempDir(), false);
         List<StoreFileMetadata> files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20));
         int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum();
@@ -679,7 +745,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         };
         final StartRecoveryRequest startRecoveryRequest = getStartRecoveryRequest();
         final RecoverySourceHandler handler = new RecoverySourceHandler(
-            shard, recoveryTarget, threadPool, startRecoveryRequest, between(1, 16), between(1, 4)) {
+            shard, recoveryTarget, threadPool, startRecoveryRequest, between(1, 16), between(1, 4), between(1, 4)) {
             @Override
             void createRetentionLease(long startingSeqNo, ActionListener<RetentionLease> listener) {
                 final String leaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(startRecoveryRequest.targetNode().getId());
@@ -708,7 +774,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         IndexShard shard = mock(IndexShard.class);
         when(shard.state()).thenReturn(IndexShardState.STARTED);
         RecoverySourceHandler handler = new RecoverySourceHandler(
-            shard, new TestRecoveryTargetHandler(), threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4));
+            shard, new TestRecoveryTargetHandler(), threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4), between(1, 4));
 
         String syncId = UUIDs.randomBase64UUID();
         int numDocs = between(0, 1000);
@@ -823,8 +889,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
     }
 
     private Translog.Snapshot newTranslogSnapshot(List<Translog.Operation> operations, List<Translog.Operation> operationsToSkip) {
+        Iterator<Translog.Operation> iterator = operations.iterator();
         return new Translog.Snapshot() {
-            int index = 0;
             int skippedCount = 0;
 
             @Override
@@ -839,8 +905,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
 
             @Override
             public Translog.Operation next() {
-                while (index < operations.size()) {
-                    Translog.Operation op = operations.get(index++);
+                while (iterator.hasNext()) {
+                    Translog.Operation op = iterator.next();
                     if (operationsToSkip.contains(op)) {
                         skippedCount++;
                     } else {
@@ -856,4 +922,23 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             }
         };
     }
+
+    private static List<Translog.Operation> generateOperations(int numOps) {
+        final List<Translog.Operation> operations = new ArrayList<>(numOps);
+        final byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
+        final Set<Long> seqNos = new HashSet<>();
+        for (int i = 0; i < numOps; i++) {
+            final long seqNo = randomValueOtherThanMany(n -> seqNos.add(n) == false, ESTestCase::randomNonNegativeLong);
+            final Translog.Operation op;
+            if (randomBoolean()) {
+                op = new Translog.Index("id", seqNo, randomNonNegativeLong(), randomNonNegativeLong(), source, null, -1);
+            } else if (randomBoolean()) {
+                op = new Translog.Delete("id", seqNo, randomNonNegativeLong(), randomNonNegativeLong());
+            } else {
+                op = new Translog.NoOp(seqNo, randomNonNegativeLong(), "test");
+            }
+            operations.add(op);
+        }
+        return operations;
+    }
 }

+ 4 - 2
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -42,7 +42,6 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.core.internal.io.IOUtils;
@@ -73,6 +72,7 @@ import org.elasticsearch.indices.recovery.AsyncRecoveryTarget;
 import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
 import org.elasticsearch.indices.recovery.RecoveryFailedException;
 import org.elasticsearch.indices.recovery.RecoveryResponse;
+import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.recovery.RecoverySourceHandler;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.indices.recovery.RecoveryTarget;
@@ -633,9 +633,11 @@ public abstract class IndexShardTestCase extends ESTestCase {
         final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
         final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
             logger, rNode, recoveryTarget, startingSeqNo);
+        int fileChunkSizeInBytes = Math.toIntExact(
+            randomBoolean() ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() : randomIntBetween(1, 10 * 1024 * 1024));
         final RecoverySourceHandler recovery = new RecoverySourceHandler(primary,
             new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool,
-            request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
+            request, fileChunkSizeInBytes, between(1, 8), between(1, 8));
         primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
             currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable);
         try {

+ 4 - 1
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -117,6 +117,7 @@ import org.elasticsearch.indices.IndicesQueryCache;
 import org.elasticsearch.indices.IndicesRequestCache;
 import org.elasticsearch.indices.store.IndicesStore;
 import org.elasticsearch.node.NodeMocksPlugin;
+import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
 import org.elasticsearch.plugins.NetworkPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
@@ -1901,7 +1902,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 mocks.add(MockFieldFilterPlugin.class);
             }
         }
-
+        if (randomBoolean()) {
+            mocks.add(RecoverySettingsChunkSizePlugin.class);
+        }
         if (addMockTransportService()) {
             mocks.add(getTestTransportPlugin());
         }

+ 8 - 0
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -99,6 +99,7 @@ import org.elasticsearch.node.MockNode;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeService;
 import org.elasticsearch.node.NodeValidationException;
+import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.script.ScriptModule;
 import org.elasticsearch.script.ScriptService;
@@ -150,6 +151,7 @@ import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE;
 import static org.elasticsearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE;
 import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
 import static org.elasticsearch.test.ESTestCase.assertBusy;
+import static org.elasticsearch.test.ESTestCase.randomBoolean;
 import static org.elasticsearch.test.ESTestCase.randomFrom;
 import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
 import static org.elasticsearch.test.NodeRoles.masterOnlyNode;
@@ -391,6 +393,12 @@ public final class InternalTestCluster extends TestCluster {
                 RandomNumbers.randomIntBetween(random, 20, 50)));
         builder.put(RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.getKey(),
             RandomNumbers.randomIntBetween(random, 1, 5));
+        builder.put(RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING.getKey(),
+            RandomNumbers.randomIntBetween(random, 1, 4));
+        if (mockPlugins.contains(RecoverySettingsChunkSizePlugin.class) && randomBoolean()) {
+            builder.put(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING.getKey(),
+                new ByteSizeValue(RandomNumbers.randomIntBetween(random, 256, 10 * 1024 * 1024)));
+        }
         defaultSettings = builder.build();
         executor = EsExecutors.newScaling("internal_test_cluster_executor", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS,
                 EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY));

+ 4 - 4
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -55,7 +55,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F
 import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetadata;
-import org.elasticsearch.indices.recovery.MultiFileTransfer;
+import org.elasticsearch.indices.recovery.MultiChunkTransfer;
 import org.elasticsearch.indices.recovery.MultiFileWriter;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.repositories.IndexId;
@@ -508,7 +508,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         protected void restoreFiles(List<FileInfo> filesToRecover, Store store, ActionListener<Void> allFilesListener) {
             logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
             final List<StoreFileMetadata> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
-            final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<>(
+            final MultiChunkTransfer<StoreFileMetadata, FileChunk> multiFileTransfer = new MultiChunkTransfer<>(
                 logger, threadPool.getThreadContext(), allFilesListener, ccrSettings.getMaxConcurrentFileChunks(), mds) {
 
                 final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
@@ -516,7 +516,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                 long offset = 0;
 
                 @Override
-                protected void onNewFile(StoreFileMetadata md) {
+                protected void onNewResource(StoreFileMetadata md) {
                     offset = 0;
                 }
 
@@ -587,7 +587,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
                 remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
         }
 
-        private static class FileChunk implements MultiFileTransfer.ChunkRequest {
+        private static class FileChunk implements MultiChunkTransfer.ChunkRequest {
             final StoreFileMetadata md;
             final int bytesRequested;
             final boolean lastChunk;