|
@@ -80,6 +80,7 @@ import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
@@ -131,6 +132,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
private final ResponseHandlers responseHandlers = new ResponseHandlers();
|
|
|
private final RequestHandlers requestHandlers = new RequestHandlers();
|
|
|
|
|
|
+ private final AtomicLong outboundConnectionCount = new AtomicLong(); // also used as a correlation ID for open/close logs
|
|
|
+
|
|
|
public TcpTransport(Settings settings, Version version, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
|
|
|
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
|
|
|
NetworkService networkService) {
|
|
@@ -830,7 +833,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
final long messagesSent = statsTracker.getMessagesSent();
|
|
|
final long messagesReceived = statsTracker.getMessagesReceived();
|
|
|
final long bytesRead = statsTracker.getBytesRead();
|
|
|
- return new TransportStats(acceptedChannels.size(), messagesReceived, bytesRead, messagesSent, bytesWritten);
|
|
|
+ return new TransportStats(acceptedChannels.size(), outboundConnectionCount.get(),
|
|
|
+ messagesReceived, bytesRead, messagesSent, bytesWritten);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -928,6 +932,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
final TcpChannel handshakeChannel = channels.get(0);
|
|
|
try {
|
|
|
executeHandshake(node, handshakeChannel, connectionProfile, ActionListener.wrap(version -> {
|
|
|
+ final long connectionId = outboundConnectionCount.incrementAndGet();
|
|
|
+ logger.debug("opened transport connection [{}] to [{}] using channels [{}]", connectionId, node, channels);
|
|
|
NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
|
|
|
long relativeMillisTime = threadPool.relativeTimeInMillis();
|
|
|
nodeChannels.channels.forEach(ch -> {
|
|
@@ -936,6 +942,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
|
|
|
});
|
|
|
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
|
|
|
+ nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime));
|
|
|
listener.onResponse(nodeChannels);
|
|
|
}, e -> closeAndFail(e instanceof ConnectTransportException ?
|
|
|
e : new ConnectTransportException(node, "general node connection failure", e))));
|
|
@@ -968,4 +975,28 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private class ChannelCloseLogger implements ActionListener<Void> {
|
|
|
+ private final DiscoveryNode node;
|
|
|
+ private final long connectionId;
|
|
|
+ private final long openTimeMillis;
|
|
|
+
|
|
|
+ ChannelCloseLogger(DiscoveryNode node, long connectionId, long openTimeMillis) {
|
|
|
+ this.node = node;
|
|
|
+ this.connectionId = connectionId;
|
|
|
+ this.openTimeMillis = openTimeMillis;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onResponse(Void ignored) {
|
|
|
+ long closeTimeMillis = threadPool.relativeTimeInMillis();
|
|
|
+ logger.debug("closed transport connection [{}] to [{}] with age [{}ms]", connectionId, node, closeTimeMillis - openTimeMillis);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ assert false : e; // never called
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|