Browse Source

Make peer recovery send file info step async (#43792)

Relates #36195
Nhat Nguyen 6 years ago
parent
commit
562d7980ff

+ 5 - 5
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -531,11 +531,11 @@ public class PeerRecoveryTargetService implements IndexEventListener {
 
         @Override
         public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
-            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
-            )) {
-                recoveryRef.target().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames,
-                        request.phase1ExistingFileSizes, request.totalTranslogOps);
-                channel.sendResponse(TransportResponse.Empty.INSTANCE);
+            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
+                final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FILES_INFO, request);
+                recoveryRef.target().receiveFileInfo(
+                    request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, request.phase1ExistingFileSizes,
+                    request.totalTranslogOps, ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
             }
         }
     }

+ 15 - 7
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -403,17 +403,25 @@ public class RecoverySourceHandler {
                 logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
                     phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),
                     phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
-                cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
-                    phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt()));
-                sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
+                final StepListener<Void> sendFileInfoStep = new StepListener<>();
+                final StepListener<Void> cleanFilesStep = new StepListener<>();
+                cancellableThreads.execute(() ->
+                    recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
+                        phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep));
+
+                sendFileInfoStep.whenComplete(r -> {
+                    sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
+                    cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep);
+                }, listener::onFailure);
+
                 final long totalSize = totalSizeInBytes;
                 final long existingTotalSize = existingTotalSizeInBytes;
-                cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, ActionListener.map(listener, aVoid -> {
+                cleanFilesStep.whenComplete(r -> {
                     final TimeValue took = stopWatch.totalTime();
                     logger.trace("recovery [phase1]: took [{}]", took);
-                    return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
-                        phase1ExistingFileSizes, existingTotalSize, took);
-                }));
+                    listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,
+                        phase1ExistingFileSizes, existingTotalSize, took));
+                }, listener::onFailure);
             } else {
                 logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target",
                     recoverySourceMetadata.getSyncId());

+ 14 - 11
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -377,17 +377,20 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
                                 List<Long> phase1FileSizes,
                                 List<String> phase1ExistingFileNames,
                                 List<Long> phase1ExistingFileSizes,
-                                int totalTranslogOps) {
-        final RecoveryState.Index index = state().getIndex();
-        for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
-            index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
-        }
-        for (int i = 0; i < phase1FileNames.size(); i++) {
-            index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
-        }
-        state().getTranslog().totalOperations(totalTranslogOps);
-        state().getTranslog().totalOperationsOnStart(totalTranslogOps);
-
+                                int totalTranslogOps,
+                                ActionListener<Void> listener) {
+        ActionListener.completeWith(listener, () -> {
+            final RecoveryState.Index index = state().getIndex();
+            for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
+                index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
+            }
+            for (int i = 0; i < phase1FileNames.size(); i++) {
+                index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false);
+            }
+            state().getTranslog().totalOperations(totalTranslogOps);
+            state().getTranslog().totalOperationsOnStart(totalTranslogOps);
+            return null;
+        });
     }
 
     @Override

+ 2 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

@@ -88,7 +88,8 @@ public interface RecoveryTargetHandler {
                          List<Long> phase1FileSizes,
                          List<String> phase1ExistingFileNames,
                          List<Long> phase1ExistingFileSizes,
-                         int totalTranslogOps);
+                         int totalTranslogOps,
+                         ActionListener<Void> listener);
 
     /**
      * After all source files has been sent over, this command is sent to the target so it can clean any local

+ 5 - 6
server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

@@ -129,14 +129,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
 
     @Override
     public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
-                                List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
-
+                                List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
         RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(recoveryId, shardId,
-                phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
+            phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
         transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILES_INFO, recoveryInfoFilesRequest,
-                TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
-                EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
-
+            TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
+            new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
+                in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
     }
 
     @Override

+ 3 - 1
server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

@@ -145,11 +145,13 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
         final DiscoveryNode rNode = getFakeDiscoNode(targetShard.routingEntry().currentNodeId());
         targetShard.markAsRecovering("test-peer-recovery", new RecoveryState(targetShard.routingEntry(), rNode, pNode));
         final RecoveryTarget recoveryTarget = new RecoveryTarget(targetShard, null, null);
+        final PlainActionFuture<Void> receiveFileInfoFuture = new PlainActionFuture<>();
         recoveryTarget.receiveFileInfo(
             mdFiles.stream().map(StoreFileMetaData::name).collect(Collectors.toList()),
             mdFiles.stream().map(StoreFileMetaData::length).collect(Collectors.toList()),
-            Collections.emptyList(), Collections.emptyList(), 0
+            Collections.emptyList(), Collections.emptyList(), 0, receiveFileInfoFuture
         );
+        receiveFileInfoFuture.actionGet();
         List<RecoveryFileChunkRequest> requests = new ArrayList<>();
         for (StoreFileMetaData md : mdFiles) {
             try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) {

+ 2 - 1
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -753,7 +753,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
 
         @Override
         public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
-                                    List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
+                                    List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
+
         }
 
         @Override

+ 3 - 2
test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java

@@ -69,8 +69,9 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler {
 
     @Override
     public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
-                                List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
-        target.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
+                                List<Long> phase1ExistingFileSizes, int totalTranslogOps, ActionListener<Void> listener) {
+        executor.execute(() -> target.receiveFileInfo(
+            phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps, listener));
     }
 
     @Override