Преглед изворни кода

Reduce recovery time with compress or secure transport (#36981)

Today file-chunks are sent sequentially one by one in peer-recovery. This is a
correct choice since the implementation is straightforward and recovery is
network bound in most of the time. However, if the connection is encrypted, we
might not be able to saturate the network pipe because encrypting/decrypting
are cpu bound rather than network-bound.

With this commit, a source node can send multiple (default to 2) file-chunks
without waiting for the acknowledgments from the target.

Below are the benchmark results for PMC and NYC_taxis.

- PMC (20.2 GB)

| Transport | Baseline | chunks=1 | chunks=2 | chunks=3 | chunks=4 |
| ----------| ---------| -------- | -------- | -------- | -------- |
| Plain     | 184s     | 137s     | 106s     | 105s     | 106s     |
| TLS       | 346s     | 294s     | 176s     | 153s     | 117s     |
| Compress  | 1556s    | 1407s    | 1193s    | 1183s    | 1211s    |

- NYC_Taxis (38.6GB)

| Transport | Baseline | chunks=1 | chunks=2 | chunks=3 | chunks=4 |
| ----------| ---------| ---------| ---------| ---------| -------- |
| Plain     | 321s     | 249s     | 191s     |  *       | *        |
| TLS       | 618s     | 539s     | 323s     | 290s     | 213s     |
| Compress  | 2622s    | 2421s    | 2018s    | 2029s    | n/a      |

Relates #33844
Nhat Nguyen пре 6 година
родитељ
комит
15aa3764a4

+ 9 - 0
docs/reference/modules/indices/recovery.asciidoc

@@ -20,5 +20,14 @@ peer recoveries:
     consume an excess of bandwidth (or other resources) which could destabilize
     the cluster. Defaults to `40mb`.
 
+`indices.recovery.max_concurrent_file_chunks`::
+    Controls the number of file chunk requests that can be sent in parallel per recovery.
+    As multiple recoveries are already running in parallel (controlled by
+    cluster.routing.allocation.node_concurrent_recoveries), increasing this expert-level
+    setting might only help in situations where peer recovery of a single shard is not
+    reaching the total inbound and outbound peer recovery traffic as configured by
+    indices.recovery.max_bytes_per_sec, but is CPU-bound instead, typically when using
+    transport-level security or compression. Defaults to `2`.
+
 This setting can be dynamically updated on a live cluster with the
 <<cluster-update-settings,cluster-update-settings>> API.

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -214,6 +214,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
                     RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
                     RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
+                    RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
                     ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
                     ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
                     ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,

+ 2 - 1
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

@@ -176,7 +176,8 @@ public class PeerRecoverySourceService implements IndexEventListener {
                 final RemoteRecoveryTargetHandler recoveryTarget =
                     new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
                         request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
-                handler = new RecoverySourceHandler(shard, recoveryTarget, request, recoverySettings.getChunkSize().bytesAsInt());
+                handler = new RecoverySourceHandler(shard, recoveryTarget, request,
+                    Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
                 return handler;
             }
         }

+ 8 - 7
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -29,6 +29,8 @@ import org.apache.lucene.store.RateLimiter;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateObserver;
@@ -602,8 +604,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
 
         @Override
         public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
-            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
-            )) {
+            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
                 final RecoveryTarget recoveryTarget = recoveryRef.target();
                 final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
                 if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
@@ -621,12 +622,12 @@ public class PeerRecoveryTargetService implements IndexEventListener {
                         recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
                     }
                 }
-
-                recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(),
-                        request.lastChunk(), request.totalTranslogOps()
-                );
+                final ActionListener<TransportResponse> listener =
+                    new HandledTransportAction.ChannelActionListener<>(channel, Actions.FILE_CHUNK, request);
+                recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(),
+                    request.totalTranslogOps(),
+                    ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
             }
-            channel.sendResponse(TransportResponse.Empty.INSTANCE);
         }
     }
 

+ 17 - 0
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java

@@ -39,6 +39,12 @@ public class RecoverySettings {
         Setting.byteSizeSetting("indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
             Property.Dynamic, Property.NodeScope);
 
+    /**
+     * Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
+     */
+    public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
+        Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope);
+
     /**
      * how long to wait before retrying after issues cause by cluster state syncing between nodes
      * i.e., local node is not yet known on remote node, remote shard not yet started etc.
@@ -78,6 +84,7 @@ public class RecoverySettings {
     public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
 
     private volatile ByteSizeValue maxBytesPerSec;
+    private volatile int maxConcurrentFileChunks;
     private volatile SimpleRateLimiter rateLimiter;
     private volatile TimeValue retryDelayStateSync;
     private volatile TimeValue retryDelayNetwork;
@@ -89,6 +96,7 @@ public class RecoverySettings {
 
     public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
         this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
+        this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
         // doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
         // and we want to give the master time to remove a faulty node
         this.retryDelayNetwork = INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.get(settings);
@@ -108,6 +116,7 @@ public class RecoverySettings {
         logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
 
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
+        clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, this::setRetryDelayNetwork);
         clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING, this::setInternalActionTimeout);
@@ -180,4 +189,12 @@ public class RecoverySettings {
             rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
         }
     }
+
+    public int getMaxConcurrentFileChunks() {
+        return maxConcurrentFileChunks;
+    }
+
+    private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) {
+        this.maxConcurrentFileChunks = maxConcurrentFileChunks;
+    }
 }

+ 71 - 72
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.StopWatch;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@@ -44,7 +45,6 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.CancellableThreads;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.core.internal.io.IOUtils;
-import org.elasticsearch.core.internal.io.Streams;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.RecoveryEngineException;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@@ -59,10 +59,9 @@ import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.RemoteTransportException;
 
-import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.OutputStream;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -71,10 +70,12 @@ import java.util.Locale;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import java.util.stream.StreamSupport;
 
+import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
+
 /**
  * RecoverySourceHandler handles the three phases of shard recovery, which is
  * everything relating to copying the segment files as well as sending translog
@@ -96,17 +97,19 @@ public class RecoverySourceHandler {
     private final StartRecoveryRequest request;
     private final int chunkSizeInBytes;
     private final RecoveryTargetHandler recoveryTarget;
+    private final int maxConcurrentFileChunks;
     private final CancellableThreads cancellableThreads = new CancellableThreads();
 
-    public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget,
-                                 final StartRecoveryRequest request,
-                                 final int fileChunkSizeInBytes) {
+    public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request,
+                                 final int fileChunkSizeInBytes, final int maxConcurrentFileChunks) {
         this.shard = shard;
         this.recoveryTarget = recoveryTarget;
         this.request = request;
         this.shardId = this.request.shardId().id();
         this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName());
         this.chunkSizeInBytes = fileChunkSizeInBytes;
+        // if the target is on an old version, it won't be able to handle out-of-order file chunks.
+        this.maxConcurrentFileChunks = request.targetNode().getVersion().onOrAfter(Version.V_7_0_0) ? maxConcurrentFileChunks : 1;
     }
 
     public StartRecoveryRequest getRequest() {
@@ -407,10 +410,7 @@ public class RecoverySourceHandler {
                     phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
                 cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
                     phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get()));
-                // How many bytes we've copied since we last called RateLimiter.pause
-                final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
-                        md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogOps), chunkSizeInBytes);
-                sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
+                sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
                 // Send the CLEAN_FILES request, which takes all of the files that
                 // were transferred and renames them from their temporary file
                 // names to the actual file names. It also writes checksums for
@@ -649,73 +649,72 @@ public class RecoverySourceHandler {
                 '}';
     }
 
-
-    final class RecoveryOutputStream extends OutputStream {
-        private final StoreFileMetaData md;
-        private final Supplier<Integer> translogOps;
-        private long position = 0;
-
-        RecoveryOutputStream(StoreFileMetaData md, Supplier<Integer> translogOps) {
-            this.md = md;
-            this.translogOps = translogOps;
-        }
-
-        @Override
-        public void write(int b) throws IOException {
-            throw new UnsupportedOperationException("we can't send single bytes over the wire");
+    void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) throws Exception {
+        ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
+        final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
+        final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
+        final byte[] buffer = new byte[chunkSizeInBytes];
+        for (final StoreFileMetaData md : files) {
+            if (error.get() != null) {
+                break;
+            }
+            try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
+                 InputStream in = new InputStreamIndexInput(indexInput, md.length())) {
+                long position = 0;
+                int bytesRead;
+                while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) {
+                    final BytesArray content = new BytesArray(buffer, 0, bytesRead);
+                    final boolean lastChunk = position + content.length() == md.length();
+                    final long requestSeqId = requestSeqIdTracker.generateSeqNo();
+                    cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks));
+                    cancellableThreads.checkForCancel();
+                    if (error.get() != null) {
+                        break;
+                    }
+                    final long requestFilePosition = position;
+                    cancellableThreads.executeIO(() ->
+                        recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(),
+                            ActionListener.wrap(
+                                r -> requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId),
+                                e -> {
+                                    error.compareAndSet(null, Tuple.tuple(md, e));
+                                    requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
+                                }
+                            )));
+                    position += content.length();
+                }
+            } catch (Exception e) {
+                error.compareAndSet(null, Tuple.tuple(md, e));
+                break;
+            }
         }
-
-        @Override
-        public void write(byte[] b, int offset, int length) throws IOException {
-            sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length);
-            position += length;
-            assert md.length() >= position : "length: " + md.length() + " but positions was: " + position;
+        // When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway.
+        // This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error.
+        if (error.get() == null) {
+            cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo()));
         }
-
-        private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
-            // Actually send the file chunk to the target node, waiting for it to complete
-            cancellableThreads.executeIO(() ->
-                    recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogOps.get())
-            );
-            if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
-                throw new IndexShardClosedException(request.shardId());
-            }
+        if (error.get() != null) {
+            handleErrorOnSendFiles(store, error.get().v1(), error.get().v2());
         }
     }
 
-    void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Exception {
-        store.incRef();
-        try {
-            ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
-            for (int i = 0; i < files.length; i++) {
-                final StoreFileMetaData md = files[i];
-                try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
-                    // it's fine that we are only having the indexInput in the try/with block. The copy methods handles
-                    // exceptions during close correctly and doesn't hide the original exception.
-                    Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md));
-                } catch (Exception e) {
-                    final IOException corruptIndexException;
-                    if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
-                        if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
-                            logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
-                            failEngine(corruptIndexException);
-                            throw corruptIndexException;
-                        } else { // corruption has happened on the way to replica
-                            RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " +
-                                    "checksums are ok", null);
-                            exception.addSuppressed(e);
-                            logger.warn(() -> new ParameterizedMessage(
-                                    "{} Remote file corruption on node {}, recovering {}. local checksum OK",
-                                    shardId, request.targetNode(), md), corruptIndexException);
-                            throw exception;
-                        }
-                    } else {
-                        throw e;
-                    }
-                }
+    private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception {
+        final IOException corruptIndexException;
+        if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
+            if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
+                logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
+                failEngine(corruptIndexException);
+                throw corruptIndexException;
+            } else { // corruption has happened on the way to replica
+                RemoteTransportException exception = new RemoteTransportException(
+                    "File corruption occurred on recovery but checksums are ok", null);
+                exception.addSuppressed(e);
+                logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK",
+                    shardId, request.targetNode(), md), corruptIndexException);
+                throw exception;
             }
-        } finally {
-            store.decRef();
+        } else {
+            throw e;
         }
     }
 

+ 66 - 4
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRefIterator;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -55,10 +56,12 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -89,6 +92,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
     private final AtomicBoolean finished = new AtomicBoolean();
 
     private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
+    private final ConcurrentMap<String, FileChunkWriter> fileChunkWriters = ConcurrentCollections.newConcurrentMap();
     private final CancellableThreads cancellableThreads;
 
     // last time this status was accessed
@@ -340,6 +344,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
             }
         } finally {
             // free store. increment happens in constructor
+            fileChunkWriters.clear();
             store.decRef();
             indexShard.recoveryStats().decCurrentAsTarget();
             closedLatch.countDown();
@@ -487,12 +492,10 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
         }
     }
 
-    @Override
-    public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
-                               boolean lastChunk, int totalTranslogOps) throws IOException {
+    private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position,
+                                     BytesReference content, boolean lastChunk) throws IOException {
         final Store store = store();
         final String name = fileMetaData.name();
-        state().getTranslog().totalOperations(totalTranslogOps);
         final RecoveryState.Index indexState = state().getIndex();
         IndexOutput indexOutput;
         if (position == 0) {
@@ -500,6 +503,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
         } else {
             indexOutput = getOpenIndexOutput(name);
         }
+        assert indexOutput.getFilePointer() == position : "file-pointer " + indexOutput.getFilePointer() + " != " + position;
         BytesRefIterator iterator = content.iterator();
         BytesRef scratch;
         while((scratch = iterator.next()) != null) { // we iterate over all pages - this is a 0-copy for all core impls
@@ -522,6 +526,64 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
         }
     }
 
+    @Override
+    public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
+                               boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
+        try {
+            state().getTranslog().totalOperations(totalTranslogOps);
+            final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter());
+            writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk));
+            listener.onResponse(null);
+        } catch (Exception e) {
+            listener.onFailure(e);
+        }
+    }
+
+    private static final class FileChunk {
+        final StoreFileMetaData md;
+        final BytesReference content;
+        final long position;
+        final boolean lastChunk;
+        FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
+            this.md = md;
+            this.content = content;
+            this.position = position;
+            this.lastChunk = lastChunk;
+        }
+    }
+
+    private final class FileChunkWriter {
+        // chunks can be delivered out of order, we need to buffer chunks if there's a gap between them.
+        final PriorityQueue<FileChunk> pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position));
+        long lastPosition = 0;
+
+        void writeChunk(FileChunk newChunk) throws IOException {
+            synchronized (this) {
+                pendingChunks.add(newChunk);
+            }
+            while (true) {
+                final FileChunk chunk;
+                synchronized (this) {
+                    chunk = pendingChunks.peek();
+                    if (chunk == null || chunk.position != lastPosition) {
+                        return;
+                    }
+                    pendingChunks.remove();
+                }
+                innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk);
+                synchronized (this) {
+                    assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position;
+                    lastPosition += chunk.content.length();
+                    if (chunk.lastChunk) {
+                        assert pendingChunks.isEmpty() == true : "still have pending chunks [" + pendingChunks + "]";
+                        fileChunkWriters.remove(chunk.md.name());
+                        assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed";
+                    }
+                }
+            }
+        }
+    }
+
     Path translogLocation() {
         return indexShard().shardPath().resolveTranslog();
     }

+ 2 - 2
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.indices.recovery;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.store.Store;
@@ -27,7 +28,6 @@ import org.elasticsearch.index.translog.Translog;
 import java.io.IOException;
 import java.util.List;
 
-
 public interface RecoveryTargetHandler {
 
     /**
@@ -90,6 +90,6 @@ public interface RecoveryTargetHandler {
 
     /** writes a partial file chunk to the target store */
     void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
-                        boolean lastChunk, int totalTranslogOps) throws IOException;
+                        boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener);
 
 }

+ 7 - 3
server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

@@ -21,6 +21,8 @@ package org.elasticsearch.indices.recovery;
 
 import org.apache.lucene.store.RateLimiter;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.index.seqno.ReplicationTracker;
@@ -31,6 +33,7 @@ import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.transport.EmptyTransportResponseHandler;
 import org.elasticsearch.transport.TransportFuture;
 import org.elasticsearch.transport.TransportRequestOptions;
+import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
@@ -142,8 +145,8 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
     }
 
     @Override
-    public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean
-            lastChunk, int totalTranslogOps) throws IOException {
+    public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
+                               boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
         // Pause using the rate limiter, if desired, to throttle the recovery
         final long throttleTimeInNanos;
         // always fetch the ratelimiter - it might be updated in real-time on the recovery settings
@@ -173,7 +176,8 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
                  * see how many translog ops we accumulate while copying files across the network. A future optimization
                  * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
                  */
-                throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
+                throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>(
+                    ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), in -> TransportResponse.Empty.INSTANCE));
     }
 
 }

+ 86 - 0
server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

@@ -24,17 +24,33 @@ import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.store.StoreFileMetaData;
 import org.elasticsearch.index.translog.Translog;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 
 public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
@@ -110,4 +126,74 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
             closeShards(replica);
         }
     }
+
+    public void testWriteFileChunksConcurrently() throws Exception {
+        IndexShard sourceShard = newStartedShard(true);
+        int numDocs = between(20, 100);
+        for (int i = 0; i < numDocs; i++) {
+            indexDoc(sourceShard, "_doc", Integer.toString(i));
+        }
+        sourceShard.flush(new FlushRequest());
+        Store.MetadataSnapshot sourceSnapshot = sourceShard.store().getMetadata(null);
+        List<StoreFileMetaData> mdFiles = new ArrayList<>();
+        for (StoreFileMetaData md : sourceSnapshot) {
+            mdFiles.add(md);
+        }
+        final IndexShard targetShard = newShard(false);
+        final DiscoveryNode pNode = getFakeDiscoNode(sourceShard.routingEntry().currentNodeId());
+        final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId());
+        targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode));
+        final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null, null);
+        recoveryTarget.receiveFileInfo(
+            mdFiles.stream().map(StoreFileMetaData::name).collect(Collectors.toList()),
+            mdFiles.stream().map(StoreFileMetaData::length).collect(Collectors.toList()),
+            Collections.emptyList(), Collections.emptyList(), 0
+        );
+        List<RecoveryFileChunkRequest> requests = new ArrayList<>();
+        for (StoreFileMetaData md : mdFiles) {
+            try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) {
+                int pos = 0;
+                while (pos < md.length()) {
+                    int length = between(1, Math.toIntExact(md.length() - pos));
+                    byte[] buffer = new byte[length];
+                    in.readBytes(buffer, 0, length);
+                    requests.add(new RecoveryFileChunkRequest(0, sourceShard.shardId(), md, pos, new BytesArray(buffer),
+                        pos + length == md.length(), 1, 1));
+                    pos += length;
+                }
+            }
+        }
+        Randomness.shuffle(requests);
+        BlockingQueue<RecoveryFileChunkRequest> queue = new ArrayBlockingQueue<>(requests.size());
+        queue.addAll(requests);
+        Thread[] senders = new Thread[between(1, 4)];
+        CyclicBarrier barrier = new CyclicBarrier(senders.length);
+        for (int i = 0; i < senders.length; i++) {
+            senders[i] = new Thread(() -> {
+                try {
+                    barrier.await();
+                    RecoveryFileChunkRequest r;
+                    while ((r = queue.poll()) != null) {
+                        recoveryTarget.writeFileChunk(r.metadata(), r.position(), r.content(), r.lastChunk(), r.totalTranslogOps(),
+                            ActionListener.wrap(ignored -> {},
+                                e -> {
+                                    throw new AssertionError(e);
+                                }));
+                    }
+                } catch (Exception e) {
+                    throw new AssertionError(e);
+                }
+            });
+            senders[i].start();
+        }
+        for (Thread sender : senders) {
+            sender.join();
+        }
+        recoveryTarget.renameAllTempFiles();
+        recoveryTarget.decRef();
+        Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata();
+        Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot);
+        assertThat(diff.different, empty());
+        closeShards(sourceShard, targetShard);
+    }
 }

+ 286 - 50
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -32,12 +32,15 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.store.BaseDirectoryWrapper;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Numbers;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -59,6 +62,7 @@ import org.elasticsearch.index.mapper.ParseContext;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.Uid;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
@@ -75,19 +79,29 @@ import org.elasticsearch.test.IndexSettingsModule;
 import org.mockito.ArgumentCaptor;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.IntSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.zip.CRC32;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.core.IsNull.notNullValue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyObject;
@@ -109,8 +123,6 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
         final StartRecoveryRequest request = getStartRecoveryRequest();
         Store store = newStore(createTempDir());
-        RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request,
-            recoverySettings.getChunkSize().bytesAsInt());
         Directory dir = store.directory();
         RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
         int numDocs = randomIntBetween(10, 100);
@@ -129,19 +141,38 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             metas.add(md);
         }
         Store targetStore = newStore(createTempDir());
-        handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
-            try {
-                return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) {
-                    @Override
-                    public void close() throws IOException {
-                        super.close();
-                        targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it
+        RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
+            IndexOutputOutputStream out;
+            @Override
+            public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk,
+                                       int totalTranslogOps, ActionListener<Void> listener) {
+                try {
+                    if (position == 0) {
+                        out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) {
+                            @Override
+                            public void close() throws IOException {
+                                super.close();
+                                targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it
+                            }
+                        };
+                    }
+                    final BytesRefIterator iterator = content.iterator();
+                    BytesRef scratch;
+                    while ((scratch = iterator.next()) != null) {
+                        out.write(scratch.bytes, scratch.offset, scratch.length);
                     }
-                };
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+                    if (lastChunk) {
+                        out.close();
+                    }
+                    listener.onResponse(null);
+                } catch (Exception e) {
+                    listener.onFailure(e);
+                }
             }
-        });
+        };
+        RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request,
+            Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5));
+        handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0);
         Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null);
         Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata);
         assertEquals(metas.size(), recoveryDiff.identical.size());
@@ -176,7 +207,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         when(shard.state()).thenReturn(IndexShardState.STARTED);
         final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class);
         final RecoverySourceHandler handler =
-            new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes);
+            new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
         final List<Translog.Operation> operations = new ArrayList<>();
         final int initialNumberOfDocs = randomIntBetween(16, 64);
         for (int i = 0; i < initialNumberOfDocs; i++) {
@@ -283,14 +314,6 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         Path tempDir = createTempDir();
         Store store = newStore(tempDir, false);
         AtomicBoolean failedEngine = new AtomicBoolean(false);
-        RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request,
-                recoverySettings.getChunkSize().bytesAsInt()) {
-            @Override
-            protected void failEngine(IOException cause) {
-                assertFalse(failedEngine.get());
-                failedEngine.set(true);
-            }
-        };
         Directory dir = store.directory();
         RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
         int numDocs = randomIntBetween(10, 100);
@@ -313,20 +336,46 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             (p.getFileName().toString().equals("write.lock") ||
                 p.getFileName().toString().startsWith("extra")) == false));
         Store targetStore = newStore(createTempDir(), false);
-        try {
-            handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
+        RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
+            IndexOutputOutputStream out;
+             @Override
+            public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk,
+                                       int totalTranslogOps, ActionListener<Void> listener) {
                 try {
-                    return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) {
-                        @Override
-                        public void close() throws IOException {
-                            super.close();
-                            store.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it
-                        }
-                    };
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
+                    if (position == 0) {
+                        out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) {
+                            @Override
+                            public void close() throws IOException {
+                                super.close();
+                                targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it
+                            }
+                        };
+                    }
+                    final BytesRefIterator iterator = content.iterator();
+                    BytesRef scratch;
+                    while ((scratch = iterator.next()) != null) {
+                        out.write(scratch.bytes, scratch.offset, scratch.length);
+                    }
+                    if (lastChunk) {
+                        out.close();
+                    }
+                    listener.onResponse(null);
+                } catch (Exception e) {
+                    IOUtils.closeWhileHandlingException(out, () -> listener.onFailure(e));
                 }
-            });
+            }
+        };
+        RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request,
+            Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) {
+            @Override
+            protected void failEngine(IOException cause) {
+                assertFalse(failedEngine.get());
+                failedEngine.set(true);
+            }
+        };
+
+        try {
+            handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0);
             fail("corrupted index");
         } catch (IOException ex) {
             assertNotNull(ExceptionsHelper.unwrapCorruption(ex));
@@ -342,14 +391,6 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         Path tempDir = createTempDir();
         Store store = newStore(tempDir, false);
         AtomicBoolean failedEngine = new AtomicBoolean(false);
-        RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request,
-                recoverySettings.getChunkSize().bytesAsInt()) {
-            @Override
-            protected void failEngine(IOException cause) {
-                assertFalse(failedEngine.get());
-                failedEngine.set(true);
-            }
-        };
         Directory dir = store.directory();
         RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
         int numDocs = randomIntBetween(10, 100);
@@ -368,15 +409,27 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             metas.add(md);
         }
         final boolean throwCorruptedIndexException = randomBoolean();
-        Store targetStore = newStore(createTempDir(), false);
-        try {
-            handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
+        RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
+            @Override
+            public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk,
+                                       int totalTranslogOps, ActionListener<Void> listener) {
                 if (throwCorruptedIndexException) {
-                    throw new RuntimeException(new CorruptIndexException("foo", "bar"));
+                    listener.onFailure(new RuntimeException(new CorruptIndexException("foo", "bar")));
                 } else {
-                    throw new RuntimeException("boom");
+                    listener.onFailure(new RuntimeException("boom"));
                 }
-            });
+            }
+        };
+        RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request,
+            Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10)) {
+            @Override
+            protected void failEngine(IOException cause) {
+                assertFalse(failedEngine.get());
+                failedEngine.set(true);
+            }
+        };
+        try {
+            handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0);
             fail("exception index");
         } catch (RuntimeException ex) {
             assertNull(ExceptionsHelper.unwrapCorruption(ex));
@@ -389,7 +442,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
             fail("not expected here");
         }
         assertFalse(failedEngine.get());
-        IOUtils.close(store, targetStore);
+        IOUtils.close(store);
     }
 
     public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOException {
@@ -411,7 +464,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
                 shard,
                 mock(RecoveryTargetHandler.class),
                 request,
-                recoverySettings.getChunkSize().bytesAsInt()) {
+                Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
+                between(1, 8)) {
 
             @Override
             public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
@@ -468,9 +522,128 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         assertBusy(() -> assertTrue(freed.get()));
     }
 
+    public void testSendFileChunksConcurrently() throws Exception {
+        final IndexShard shard = mock(IndexShard.class);
+        when(shard.state()).thenReturn(IndexShardState.STARTED);
+        final List<FileChunkResponse> unrepliedChunks = new CopyOnWriteArrayList<>();
+        final AtomicInteger sentChunks = new AtomicInteger();
+        final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
+            final AtomicLong chunkNumberGenerator = new AtomicLong();
+            @Override
+            public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk,
+                                       int totalTranslogOps, ActionListener<Void> listener) {
+                final long chunkNumber = chunkNumberGenerator.getAndIncrement();
+                logger.info("--> write chunk name={} seq={}, position={}", md.name(), chunkNumber, position);
+                unrepliedChunks.add(new FileChunkResponse(chunkNumber, listener));
+                sentChunks.incrementAndGet();
+            }
+        };
+        final int maxConcurrentChunks = between(1, 8);
+        final int chunkSize = between(1, 32);
+        final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, getStartRecoveryRequest(),
+            chunkSize, maxConcurrentChunks);
+        Store store = newStore(createTempDir(), false);
+        List<StoreFileMetaData> files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20));
+        int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum();
+        Thread sender = new Thread(() -> {
+            try {
+                handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0);
+            } catch (Exception ex) {
+                throw new AssertionError(ex);
+            }
+        });
+        sender.start();
+        assertBusy(() -> {
+            assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)));
+            assertThat(unrepliedChunks, hasSize(sentChunks.get()));
+        });
+
+        List<FileChunkResponse> ackedChunks = new ArrayList<>();
+        while (sentChunks.get() < totalChunks || unrepliedChunks.isEmpty() == false) {
+            List<FileChunkResponse> chunksToAck = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks);
+            unrepliedChunks.removeAll(chunksToAck);
+            ackedChunks.addAll(chunksToAck);
+            ackedChunks.sort(Comparator.comparing(c -> c.chunkNumber));
+            int checkpoint = -1;
+            for (int i = 0; i < ackedChunks.size(); i++) {
+                if (i != ackedChunks.get(i).chunkNumber) {
+                    break;
+                } else {
+                    checkpoint = i;
+                }
+            }
+            int chunksToSend = Math.min(
+                totalChunks - sentChunks.get(),                             // limited by the remaining chunks
+                maxConcurrentChunks - (sentChunks.get() - 1 - checkpoint)); // limited by the buffering chunks
+
+            int expectedSentChunks = sentChunks.get() + chunksToSend;
+            int expectedUnrepliedChunks = unrepliedChunks.size() + chunksToSend;
+            chunksToAck.forEach(c -> c.listener.onResponse(null));
+            assertBusy(() -> {
+                assertThat(sentChunks.get(), equalTo(expectedSentChunks));
+                assertThat(unrepliedChunks, hasSize(expectedUnrepliedChunks));
+            });
+        }
+        sender.join();
+        store.close();
+    }
+
+    public void testSendFileChunksStopOnError() throws Exception {
+        final IndexShard shard = mock(IndexShard.class);
+        when(shard.state()).thenReturn(IndexShardState.STARTED);
+        final List<FileChunkResponse> unrepliedChunks = new CopyOnWriteArrayList<>();
+        final AtomicInteger sentChunks = new AtomicInteger();
+        final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
+            final AtomicLong chunkNumberGenerator = new AtomicLong();
+            @Override
+            public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk,
+                                       int totalTranslogOps, ActionListener<Void> listener) {
+                final long chunkNumber = chunkNumberGenerator.getAndIncrement();
+                logger.info("--> write chunk name={} seq={}, position={}", md.name(), chunkNumber, position);
+                unrepliedChunks.add(new FileChunkResponse(chunkNumber, listener));
+                sentChunks.incrementAndGet();
+            }
+        };
+        final int maxConcurrentChunks = between(1, 4);
+        final int chunkSize = between(1, 16);
+        final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, getStartRecoveryRequest(),
+            chunkSize, maxConcurrentChunks);
+        Store store = newStore(createTempDir(), false);
+        List<StoreFileMetaData> files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20));
+        int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum();
+        AtomicReference<Exception> error = new AtomicReference<>();
+        Thread sender = new Thread(() -> {
+            try {
+                handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0);
+            } catch (Exception ex) {
+                error.set(ex);
+            }
+        });
+        sender.start();
+        assertBusy(() -> assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))));
+        List<FileChunkResponse> failedChunks = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks);
+        failedChunks.forEach(c -> c.listener.onFailure(new RuntimeException("test chunk exception")));
+        unrepliedChunks.removeAll(failedChunks);
+        unrepliedChunks.forEach(c -> {
+            if (randomBoolean()) {
+                c.listener.onFailure(new RuntimeException("test"));
+            } else {
+                c.listener.onResponse(null);
+            }
+        });
+        assertBusy(() -> {
+            assertThat(error.get(), notNullValue());
+            assertThat(error.get().getMessage(), containsString("test chunk exception"));
+        });
+        assertThat("no more chunks should be sent", sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)));
+        sender.join();
+        store.close();
+    }
+
     private Store newStore(Path path) throws IOException {
         return newStore(path, true);
     }
+
     private Store newStore(Path path, boolean checkIndex) throws IOException {
         BaseDirectoryWrapper baseDirectoryWrapper = RecoverySourceHandlerTests.newFSDirectory(path);
         if (checkIndex == false) {
@@ -479,5 +652,68 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         return new Store(shardId,  INDEX_SETTINGS, baseDirectoryWrapper, new DummyShardLock(shardId));
     }
 
+    static final class FileChunkResponse {
+        final long chunkNumber;
+        final ActionListener<Void> listener;
+
+        FileChunkResponse(long chunkNumber, ActionListener<Void> listener) {
+            this.chunkNumber = chunkNumber;
+            this.listener = listener;
+        }
+    }
+
+    private List<StoreFileMetaData> generateFiles(Store store, int numFiles, IntSupplier fileSizeSupplier) throws IOException {
+        List<StoreFileMetaData> files = new ArrayList<>();
+        for (int i = 0; i < numFiles; i++) {
+            byte[] buffer = randomByteArrayOfLength(fileSizeSupplier.getAsInt());
+            CRC32 digest = new CRC32();
+            digest.update(buffer, 0, buffer.length);
+            StoreFileMetaData md = new StoreFileMetaData("test-" + i, buffer.length + 8,
+                Store.digestToString(digest.getValue()), org.apache.lucene.util.Version.LATEST);
+            try (OutputStream out = new IndexOutputOutputStream(store.createVerifyingOutput(md.name(), md, IOContext.DEFAULT))) {
+                out.write(buffer);
+                out.write(Numbers.longToBytes(digest.getValue()));
+            }
+            store.directory().sync(Collections.singleton(md.name()));
+            files.add(md);
+        }
+        return files;
+    }
+
+    class TestRecoveryTargetHandler implements RecoveryTargetHandler {
+        @Override
+        public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) {
+        }
+
+        @Override
+        public void finalizeRecovery(long globalCheckpoint) {
+        }
+
+        @Override
+        public void ensureClusterStateVersion(long clusterStateVersion) {
+        }
+
+        @Override
+        public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {
+        }
+
+        @Override
+        public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu) {
+            return 0;
+        }
 
+        @Override
+        public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
+                                    List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
+        }
+
+        @Override
+        public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) {
+        }
+
+        @Override
+        public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk,
+                                   int totalTranslogOps, ActionListener<Void> listener) {
+        }
+    }
 }

+ 1 - 1
server/src/test/java/org/elasticsearch/recovery/RelocationIT.java

@@ -385,7 +385,7 @@ public class RelocationIT extends ESIntegTestCase {
         assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForGreenStatus().get().isTimedOut());
         flush();
 
-        int allowedFailures = randomIntBetween(3, 10);
+        int allowedFailures = randomIntBetween(3, 5); // the default of the `index.allocation.max_retries` is 5.
         logger.info("--> blocking recoveries from primary (allowed failures: [{}])", allowedFailures);
         CountDownLatch corruptionCount = new CountDownLatch(allowedFailures);
         ClusterService clusterService = internalCluster().getInstance(ClusterService.class, p_node);

+ 1 - 4
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -610,10 +610,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
         final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId,
             pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo);
         final RecoverySourceHandler recovery = new RecoverySourceHandler(
-                primary,
-                recoveryTarget,
-                request,
-                (int) ByteSizeUnit.MB.toBytes(1));
+            primary, recoveryTarget, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
         primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
             currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet());
 

+ 2 - 0
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -390,6 +390,8 @@ public final class InternalTestCluster extends TestCluster {
         // always reduce this - it can make tests really slow
         builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(
                 RandomNumbers.randomIntBetween(random, 20, 50)));
+        builder.put(RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.getKey(),
+            RandomNumbers.randomIntBetween(random, 1, 5));
         defaultSettings = builder.build();
         executor = EsExecutors.newScaling("internal_test_cluster_executor", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS,
                 EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY));