Browse Source

Don't block on peer recovery on the target side (#37076)

Today we block using the generic thread-pool on the target side
until the source side has fully executed the recovery. We still
block on the source side executing the recovery in a blocking fashion
but there is no reason to block on the target side. This will
release generic threads early if there are many concurrent recoveries
happen.

Relates to #36195
Simon Willnauer 6 years ago
parent
commit
b4f113d3ea

+ 79 - 51
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -59,17 +59,18 @@ import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.ConnectTransportException;
-import org.elasticsearch.transport.FutureTransportResponseHandler;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportChannel;
+import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequestHandler;
 import org.elasticsearch.transport.TransportRequestHandler;
 import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportResponse;
+import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.List;
 import java.util.List;
 import java.util.StringJoiner;
 import java.util.StringJoiner;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
 
 
@@ -142,6 +143,8 @@ public class PeerRecoveryTargetService implements IndexEventListener {
     public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
     public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
         // create a new recovery status, and process...
         // create a new recovery status, and process...
         final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
         final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
+        // we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
+        // assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.
         threadPool.generic().execute(new RecoveryRunner(recoveryId));
         threadPool.generic().execute(new RecoveryRunner(recoveryId));
     }
     }
 
 
@@ -165,17 +168,16 @@ public class PeerRecoveryTargetService implements IndexEventListener {
 
 
     private void doRecovery(final long recoveryId) {
     private void doRecovery(final long recoveryId) {
         final StartRecoveryRequest request;
         final StartRecoveryRequest request;
-        final CancellableThreads cancellableThreads;
         final RecoveryState.Timer timer;
         final RecoveryState.Timer timer;
-
+        CancellableThreads cancellableThreads;
         try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
         try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
             if (recoveryRef == null) {
             if (recoveryRef == null) {
                 logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
                 logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
                 return;
                 return;
             }
             }
             final RecoveryTarget recoveryTarget = recoveryRef.target();
             final RecoveryTarget recoveryTarget = recoveryRef.target();
-            cancellableThreads = recoveryTarget.cancellableThreads();
             timer = recoveryTarget.state().getTimer();
             timer = recoveryTarget.state().getTimer();
+            cancellableThreads = recoveryTarget.cancellableThreads();
             try {
             try {
                 assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
                 assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
                 request = getStartRecoveryRequest(recoveryTarget);
                 request = getStartRecoveryRequest(recoveryTarget);
@@ -189,51 +191,11 @@ public class PeerRecoveryTargetService implements IndexEventListener {
                 return;
                 return;
             }
             }
         }
         }
-
-        try {
-            logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode());
-            final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>();
-            cancellableThreads.execute(() -> responseHolder.set(
-                    transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
-                            new FutureTransportResponseHandler<RecoveryResponse>() {
-                                @Override
-                                public RecoveryResponse read(StreamInput in) throws IOException {
-                                    RecoveryResponse recoveryResponse = new RecoveryResponse();
-                                    recoveryResponse.readFrom(in);
-                                    return recoveryResponse;
-                                }
-                            }).txGet()));
-            final RecoveryResponse recoveryResponse = responseHolder.get();
-            final TimeValue recoveryTime = new TimeValue(timer.time());
-            // do this through ongoing recoveries to remove it from the collection
-            onGoingRecoveries.markRecoveryAsDone(recoveryId);
-            if (logger.isTraceEnabled()) {
-                StringBuilder sb = new StringBuilder();
-                sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id())
-                        .append("] ");
-                sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime).append("]\n");
-                sb.append("   phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with " +
-                        "total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
-                        .append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append
-                        (timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
-                        .append("\n");
-                sb.append("         : reusing_files   [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with " +
-                        "total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
-                sb.append("   phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
-                sb.append("         : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log " +
-                        "operations")
-                        .append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
-                        .append("\n");
-                logger.trace("{}", sb);
-            } else {
-                logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(), recoveryTime);
-            }
-        } catch (CancellableThreads.ExecutionCancelledException e) {
-            logger.trace("recovery cancelled", e);
-        } catch (Exception e) {
+        Consumer<Exception> handleException = e -> {
             if (logger.isTraceEnabled()) {
             if (logger.isTraceEnabled()) {
                 logger.trace(() -> new ParameterizedMessage(
                 logger.trace(() -> new ParameterizedMessage(
-                        "[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(), request.shardId().id()), e);
+                    "[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(),
+                    request.shardId().id()), e);
             }
             }
             Throwable cause = ExceptionsHelper.unwrapCause(e);
             Throwable cause = ExceptionsHelper.unwrapCause(e);
             if (cause instanceof CancellableThreads.ExecutionCancelledException) {
             if (cause instanceof CancellableThreads.ExecutionCancelledException) {
@@ -267,14 +229,16 @@ public class PeerRecoveryTargetService implements IndexEventListener {
             }
             }
 
 
             if (cause instanceof DelayRecoveryException) {
             if (cause instanceof DelayRecoveryException) {
-                retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(), recoverySettings.activityTimeout());
+                retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(),
+                    recoverySettings.activityTimeout());
                 return;
                 return;
             }
             }
 
 
             if (cause instanceof ConnectTransportException) {
             if (cause instanceof ConnectTransportException) {
                 logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(),
                 logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(),
                     recoverySettings.retryDelayNetwork(), cause.getMessage());
                     recoverySettings.retryDelayNetwork(), cause.getMessage());
-                retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(), recoverySettings.activityTimeout());
+                retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(),
+                    recoverySettings.activityTimeout());
                 return;
                 return;
             }
             }
 
 
@@ -285,6 +249,71 @@ public class PeerRecoveryTargetService implements IndexEventListener {
             }
             }
 
 
             onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
             onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
+        };
+
+        try {
+            logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode());
+            cancellableThreads.executeIO(() ->
+                // we still execute under cancelableThreads here to ensure we interrupt any blocking call to the network if any
+                // on the underlying transport. It's unclear if we need this here at all after moving to async execution but
+                // the issues that a missing call to this could cause are sneaky and hard to debug. If we don't need it on this
+                // call we can potentially remove it altogether which we should do it in a major release only with enough
+                // time to test. This shoudl be done for 7.0 if possible
+                transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
+                    new TransportResponseHandler<RecoveryResponse>() {
+                        @Override
+                        public void handleResponse(RecoveryResponse recoveryResponse) {
+                            final TimeValue recoveryTime = new TimeValue(timer.time());
+                            // do this through ongoing recoveries to remove it from the collection
+                            onGoingRecoveries.markRecoveryAsDone(recoveryId);
+                            if (logger.isTraceEnabled()) {
+                                StringBuilder sb = new StringBuilder();
+                                sb.append('[').append(request.shardId().getIndex().getName()).append(']')
+                                    .append('[').append(request.shardId().id()).append("] ");
+                                sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime)
+                                    .append("]\n");
+                                sb.append("   phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]")
+                                    .append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
+                                    .append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [")
+                                    .append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']').append("\n");
+                                sb.append("         : reusing_files   [").append(recoveryResponse.phase1ExistingFileNames.size())
+                                    .append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize))
+                                    .append("]\n");
+                                sb.append("   phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
+                                sb.append("         : recovered [").append(recoveryResponse.phase2Operations).append("]")
+                                    .append(" transaction log operations")
+                                    .append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
+                                    .append("\n");
+                                logger.trace("{}", sb);
+                            } else {
+                                logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(),
+                                    recoveryTime);
+                            }
+                        }
+
+                        @Override
+                        public void handleException(TransportException e) {
+                            handleException.accept(e);
+                        }
+
+                        @Override
+                        public String executor() {
+                            // we do some heavy work like refreshes in the response so fork off to the generic threadpool
+                            return ThreadPool.Names.GENERIC;
+                        }
+
+                        @Override
+                        public RecoveryResponse read(StreamInput in) throws IOException {
+                            RecoveryResponse recoveryResponse = new RecoveryResponse();
+                            recoveryResponse.readFrom(in);
+                            return recoveryResponse;
+                        }
+                    })
+            );
+        } catch (CancellableThreads.ExecutionCancelledException e) {
+            logger.trace("recovery cancelled", e);
+        } catch (Exception e) {
+            handleException.accept(e);
         }
         }
     }
     }
 
 
@@ -632,5 +661,4 @@ public class PeerRecoveryTargetService implements IndexEventListener {
             doRecovery(recoveryId);
             doRecovery(recoveryId);
         }
         }
     }
     }
-
 }
 }