Browse Source

Do not send recovery requests with CancellableThreads (#46287)

Previously, we send recovery requests using CancellableThreads because
we send requests and wait for responses in a blocking manner. With async
recovery, we no longer need to do so. Moreover, if we fail to submit a
request, then we can release the Store using an interruptible thread
which can risk invalidating the node lock.

This PR is the first step to avoid forking when releasing the Store.

Relates #45409
Relates #46178
Nhat Nguyen 6 years ago
parent
commit
b38f464403

+ 36 - 34
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -517,9 +517,9 @@ public class RecoverySourceHandler {
                 final StepListener<Void> sendFilesStep = new StepListener<>();
                 final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
                 final StepListener<Void> cleanFilesStep = new StepListener<>();
-                cancellableThreads.execute(() ->
-                    recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
-                        phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep));
+                cancellableThreads.checkForCancel();
+                recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
+                        phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);
 
                 sendFileInfoStep.whenComplete(r ->
                     sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);
@@ -634,8 +634,8 @@ public class RecoverySourceHandler {
         // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
         // garbage collection (not the JVM's GC!) of tombstone deletes.
         logger.trace("recovery [phase1]: prepare remote engine for translog");
-        cancellableThreads.execute(() ->
-            recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener));
+        cancellableThreads.checkForCancel();
+        recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener);
     }
 
     /**
@@ -741,30 +741,29 @@ public class RecoverySourceHandler {
         final List<Translog.Operation> operations = nextBatch.get();
         // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
         if (operations.isEmpty() == false || firstBatch) {
-            cancellableThreads.execute(() -> {
-                recoveryTarget.indexTranslogOperations(
-                        operations,
-                        totalTranslogOps,
-                        maxSeenAutoIdTimestamp,
-                        maxSeqNoOfUpdatesOrDeletes,
-                        retentionLeases,
-                        mappingVersionOnPrimary,
-                        ActionListener.wrap(
-                                newCheckpoint -> {
-                                    sendBatch(
-                                            nextBatch,
-                                            false,
-                                            SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
-                                            totalTranslogOps,
-                                            maxSeenAutoIdTimestamp,
-                                            maxSeqNoOfUpdatesOrDeletes,
-                                            retentionLeases,
-                                            mappingVersionOnPrimary,
-                                            listener);
-                                },
-                                listener::onFailure
-                        ));
-            });
+            cancellableThreads.checkForCancel();
+            recoveryTarget.indexTranslogOperations(
+                operations,
+                totalTranslogOps,
+                maxSeenAutoIdTimestamp,
+                maxSeqNoOfUpdatesOrDeletes,
+                retentionLeases,
+                mappingVersionOnPrimary,
+                ActionListener.wrap(
+                    newCheckpoint -> {
+                        sendBatch(
+                            nextBatch,
+                            false,
+                            SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
+                            totalTranslogOps,
+                            maxSeenAutoIdTimestamp,
+                            maxSeqNoOfUpdatesOrDeletes,
+                            retentionLeases,
+                            mappingVersionOnPrimary,
+                            listener);
+                    },
+                    listener::onFailure
+                ));
         } else {
             listener.onResponse(targetLocalCheckpoint);
         }
@@ -787,7 +786,8 @@ public class RecoverySourceHandler {
             shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
         final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
         final StepListener<Void> finalizeListener = new StepListener<>();
-        cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener));
+        cancellableThreads.checkForCancel();
+        recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener);
         finalizeListener.whenComplete(r -> {
             runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
                 shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
@@ -894,8 +894,9 @@ public class RecoverySourceHandler {
 
                 @Override
                 protected void sendChunkRequest(FileChunk request, ActionListener<Void> listener) {
-                    cancellableThreads.execute(() -> recoveryTarget.writeFileChunk(
-                        request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener));
+                    cancellableThreads.checkForCancel();
+                    recoveryTarget.writeFileChunk(
+                        request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener);
                 }
 
                 @Override
@@ -922,13 +923,14 @@ public class RecoverySourceHandler {
         // Once the files have been renamed, any other files that are not
         // related to this recovery (out of date segments, for example)
         // are deleted
-        cancellableThreads.execute(() -> recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
+        cancellableThreads.checkForCancel();
+        recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
             ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> {
                 StoreFileMetaData[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetaData[]::new);
                 ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
                 handleErrorOnSendFiles(store, e, mds);
                 throw e;
-            }))));
+            })));
     }
 
     private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception {