|
|
@@ -174,7 +174,6 @@ public class PeerRecoveryTargetService implements IndexEventListener {
|
|
|
final TransportRequest requestToSend;
|
|
|
final StartRecoveryRequest startRequest;
|
|
|
final RecoveryState.Timer timer;
|
|
|
- CancellableThreads cancellableThreads;
|
|
|
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
|
|
|
if (recoveryRef == null) {
|
|
|
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
|
|
|
@@ -182,7 +181,6 @@ public class PeerRecoveryTargetService implements IndexEventListener {
|
|
|
}
|
|
|
final RecoveryTarget recoveryTarget = recoveryRef.target();
|
|
|
timer = recoveryTarget.state().getTimer();
|
|
|
- cancellableThreads = recoveryTarget.cancellableThreads();
|
|
|
if (preExistingRequest == null) {
|
|
|
try {
|
|
|
final IndexShard indexShard = recoveryTarget.indexShard();
|
|
|
@@ -211,22 +209,8 @@ public class PeerRecoveryTargetService implements IndexEventListener {
|
|
|
logger.trace("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());
|
|
|
}
|
|
|
}
|
|
|
- RecoveryResponseHandler responseHandler = new RecoveryResponseHandler(startRequest, timer);
|
|
|
-
|
|
|
- try {
|
|
|
- cancellableThreads.executeIO(() ->
|
|
|
- // we still execute under cancelableThreads here to ensure we interrupt any blocking call to the network if any
|
|
|
- // on the underlying transport. It's unclear if we need this here at all after moving to async execution but
|
|
|
- // the issues that a missing call to this could cause are sneaky and hard to debug. If we don't need it on this
|
|
|
- // call we can potentially remove it altogether which we should do it in a major release only with enough
|
|
|
- // time to test. This shoudl be done for 7.0 if possible
|
|
|
- transportService.sendRequest(startRequest.sourceNode(), actionName, requestToSend, responseHandler)
|
|
|
- );
|
|
|
- } catch (CancellableThreads.ExecutionCancelledException e) {
|
|
|
- logger.trace("recovery cancelled", e);
|
|
|
- } catch (Exception e) {
|
|
|
- responseHandler.onException(e);
|
|
|
- }
|
|
|
+ transportService.sendRequest(startRequest.sourceNode(), actionName, requestToSend,
|
|
|
+ new RecoveryResponseHandler(startRequest, timer));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -583,10 +567,6 @@ public class PeerRecoveryTargetService implements IndexEventListener {
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException e) {
|
|
|
- onException(e);
|
|
|
- }
|
|
|
-
|
|
|
- private void onException(Exception e) {
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace(() -> new ParameterizedMessage(
|
|
|
"[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(),
|