|
@@ -733,6 +733,11 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|
|
"failed to notify channel of error message for action [{}]", action), inner);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "processing of [" + requestId + "][" + action + "]: " + request;
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -946,7 +951,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|
|
assert responseHandlers.contains(requestId) == false;
|
|
|
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId);
|
|
|
if (timeoutInfoHolder != null) {
|
|
|
- long time = System.currentTimeMillis();
|
|
|
+ long time = threadPool.relativeTimeInMillis();
|
|
|
logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " +
|
|
|
"action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(),
|
|
|
timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId);
|
|
@@ -1009,7 +1014,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|
|
final class TimeoutHandler implements Runnable {
|
|
|
|
|
|
private final long requestId;
|
|
|
- private final long sentTime = System.currentTimeMillis();
|
|
|
+ private final long sentTime = threadPool.relativeTimeInMillis();
|
|
|
private final String action;
|
|
|
private final DiscoveryNode node;
|
|
|
volatile ScheduledFuture future;
|
|
@@ -1023,7 +1028,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|
|
@Override
|
|
|
public void run() {
|
|
|
if (responseHandlers.contains(requestId)) {
|
|
|
- long timeoutTime = System.currentTimeMillis();
|
|
|
+ long timeoutTime = threadPool.relativeTimeInMillis();
|
|
|
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(node, action, sentTime, timeoutTime));
|
|
|
// now that we have the information visible via timeoutInfoHandlers, we try to remove the request id
|
|
|
final Transport.ResponseContext holder = responseHandlers.remove(requestId);
|
|
@@ -1049,6 +1054,11 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|
|
"cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
|
|
|
FutureUtils.cancel(future);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "timeout handler for [" + requestId + "][" + action + "]";
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static class TimeoutInfoHolder {
|
|
@@ -1176,7 +1186,17 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|
|
if (ThreadPool.Names.SAME.equals(executor)) {
|
|
|
processResponse(handler, response);
|
|
|
} else {
|
|
|
- threadPool.executor(executor).execute(() -> processResponse(handler, response));
|
|
|
+ threadPool.executor(executor).execute(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ processResponse(handler, response);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "delivery of response to [" + requestId + "][" + action + "]: " + response;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1201,7 +1221,17 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|
|
if (ThreadPool.Names.SAME.equals(executor)) {
|
|
|
processException(handler, rtx);
|
|
|
} else {
|
|
|
- threadPool.executor(handler.executor()).execute(() -> processException(handler, rtx));
|
|
|
+ threadPool.executor(handler.executor()).execute(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ processException(handler, rtx);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "delivery of failure response to [" + requestId + "][" + action + "]: " + exception;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
}
|