Browse Source

Avoid copying file chunks in peer covery (#56072)

A follow-up of #55353 to avoid copying file chunks before sending 
them to the network layer.

Relates #55353
Nhat Nguyen 5 years ago
parent
commit
601617a3fc

+ 2 - 6
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

@@ -30,7 +30,6 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.shard.IndexEventListener;
@@ -65,17 +64,14 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
     private final TransportService transportService;
     private final IndicesService indicesService;
     private final RecoverySettings recoverySettings;
-    private final BigArrays bigArrays;
 
     final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries();
 
     @Inject
-    public PeerRecoverySourceService(TransportService transportService, IndicesService indicesService,
-                                     RecoverySettings recoverySettings, BigArrays bigArrays) {
+    public PeerRecoverySourceService(TransportService transportService, IndicesService indicesService, RecoverySettings recoverySettings) {
         this.transportService = transportService;
         this.indicesService = indicesService;
         this.recoverySettings = recoverySettings;
-        this.bigArrays = bigArrays;
         transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new,
             new StartRecoveryTransportRequestHandler());
     }
@@ -225,7 +221,7 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
             private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest request, IndexShard shard) {
                 RecoverySourceHandler handler;
                 final RemoteRecoveryTargetHandler recoveryTarget =
-                    new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, bigArrays,
+                    new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
                         request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
                 handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request,
                     Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());

+ 18 - 5
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -77,9 +77,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Deque;
 import java.util.List;
 import java.util.Locale;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
@@ -822,23 +825,30 @@ public class RecoverySourceHandler {
                 '}';
     }
 
-    private static class FileChunk implements MultiFileTransfer.ChunkRequest {
+    private static class FileChunk implements MultiFileTransfer.ChunkRequest, Releasable {
         final StoreFileMetadata md;
         final BytesReference content;
         final long position;
         final boolean lastChunk;
+        final Releasable onClose;
 
-        FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk) {
+        FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) {
             this.md = md;
             this.content = content;
             this.position = position;
             this.lastChunk = lastChunk;
+            this.onClose = onClose;
         }
 
         @Override
         public boolean lastChunk() {
             return lastChunk;
         }
+
+        @Override
+        public void close() {
+            onClose.close();
+        }
     }
 
     void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener<Void> listener) {
@@ -847,7 +857,7 @@ public class RecoverySourceHandler {
         final MultiFileTransfer<FileChunk> multiFileSender =
             new MultiFileTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {
 
-                final byte[] buffer = new byte[chunkSizeInBytes];
+                final Deque<byte[]> buffers = new ConcurrentLinkedDeque<>();
                 InputStreamIndexInput currentInput = null;
                 long offset = 0;
 
@@ -868,12 +878,14 @@ public class RecoverySourceHandler {
                 protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException {
                     assert Transports.assertNotTransportThread("read file chunk");
                     cancellableThreads.checkForCancel();
+                    final byte[] buffer = Objects.requireNonNullElseGet(buffers.pollFirst(), () -> new byte[chunkSizeInBytes]);
                     final int bytesRead = currentInput.read(buffer);
                     if (bytesRead == -1) {
                         throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name());
                     }
                     final boolean lastChunk = offset + bytesRead == md.length();
-                    final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk);
+                    final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk,
+                        () -> buffers.addFirst(buffer));
                     offset += bytesRead;
                     return chunk;
                 }
@@ -882,7 +894,8 @@ public class RecoverySourceHandler {
                 protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
                     cancellableThreads.checkForCancel();
                     recoveryTarget.writeFileChunk(
-                        request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener);
+                        request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(),
+                        ActionListener.runBefore(listener, request::close));
                 }
 
                 @Override

+ 9 - 28
server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

@@ -30,10 +30,8 @@ import org.elasticsearch.action.support.RetryableAction;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.CancellableThreads;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@@ -66,7 +64,6 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
     private final ThreadPool threadPool;
     private final long recoveryId;
     private final ShardId shardId;
-    private final BigArrays bigArrays;
     private final DiscoveryNode targetNode;
     private final RecoverySettings recoverySettings;
     private final Map<Object, RetryableAction<?>> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap();
@@ -79,13 +76,12 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
     private final Consumer<Long> onSourceThrottle;
     private volatile boolean isCancelled = false;
 
-    public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService, BigArrays bigArrays,
+    public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportService transportService,
                                        DiscoveryNode targetNode, RecoverySettings recoverySettings, Consumer<Long> onSourceThrottle) {
         this.transportService = transportService;
         this.threadPool = transportService.getThreadPool();
         this.recoveryId = recoveryId;
         this.shardId = shardId;
-        this.bigArrays = bigArrays;
         this.targetNode = targetNode;
         this.recoverySettings = recoverySettings;
         this.onSourceThrottle = onSourceThrottle;
@@ -209,29 +205,14 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
         }
 
         final String action = PeerRecoveryTargetService.Actions.FILE_CHUNK;
-        final ReleasableBytesStreamOutput output = new ReleasableBytesStreamOutput(content.length(), bigArrays);
-        boolean actionStarted = false;
-        try {
-            content.writeTo(output);
-            /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can
-             * see how many translog ops we accumulate while copying files across the network. A future optimization
-             * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
-             */
-            final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(recoveryId, shardId, fileMetadata,
-                position, output.bytes(), lastChunk, totalTranslogOps, throttleTimeInNanos);
-            final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
-            final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
-            final ActionListener<TransportResponse.Empty> releaseListener = ActionListener.runBefore(responseListener, output::close);
-            executeRetryableAction(action, request, fileChunkRequestOptions, releaseListener, reader);
-            actionStarted = true;
-        } catch (IOException e) {
-            // Since the content data is buffer in memory, we should never get an exception.
-            throw new AssertionError(e);
-        } finally {
-            if (actionStarted == false) {
-                output.close();
-            }
-        }
+        /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can
+         * see how many translog ops we accumulate while copying files across the network. A future optimization
+         * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
+         */
+        final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(
+            recoveryId, shardId, fileMetadata, position, content, lastChunk, totalTranslogOps, throttleTimeInNanos);
+        final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
+        executeRetryableAction(action, request, fileChunkRequestOptions, ActionListener.map(listener, r -> null), reader);
     }
 
     @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -591,7 +591,7 @@ public class Node implements Closeable {
                         RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
                         processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
                         b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,
-                                indicesService, recoverySettings, bigArrays));
+                                indicesService, recoverySettings));
                         b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool,
                                 transportService, recoverySettings, clusterService));
                     }

+ 1 - 3
server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java

@@ -21,7 +21,6 @@ package org.elasticsearch.indices.recovery;
 
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
@@ -40,8 +39,7 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
         IndexShard primary = newStartedShard(true);
         PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(
             mock(TransportService.class), mock(IndicesService.class),
-            new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
-            BigArrays.NON_RECYCLING_INSTANCE);
+            new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
         StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
             getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
             SequenceNumbers.UNASSIGNED_SEQ_NO);

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1476,7 +1476,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     new NodeMappingRefreshAction(transportService, metadataMappingService),
                     repositoriesService,
                     mock(SearchService.class),
-                    new PeerRecoverySourceService(transportService, indicesService, recoverySettings, bigArrays),
+                    new PeerRecoverySourceService(transportService, indicesService, recoverySettings),
                     snapshotShardsService,
                     new PrimaryReplicaSyncer(
                         transportService,