|
@@ -1571,13 +1571,16 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
* Called once the channel is closed for instance due to a disconnect or a closed socket etc.
|
|
* Called once the channel is closed for instance due to a disconnect or a closed socket etc.
|
|
*/
|
|
*/
|
|
protected final void onChannelClosed(Channel channel) {
|
|
protected final void onChannelClosed(Channel channel) {
|
|
- Optional<Map.Entry<Long, HandshakeResponseHandler>> first = pendingHandshakes.entrySet().stream()
|
|
|
|
- .filter((entry) -> entry.getValue().channel == channel).findFirst();
|
|
|
|
|
|
+ final Optional<Long> first = pendingHandshakes.entrySet().stream()
|
|
|
|
+ .filter((entry) -> entry.getValue().channel == channel).map((e) -> e.getKey()).findFirst();
|
|
if(first.isPresent()) {
|
|
if(first.isPresent()) {
|
|
- final Long requestId = first.get().getKey();
|
|
|
|
- HandshakeResponseHandler handler = first.get().getValue();
|
|
|
|
- pendingHandshakes.remove(requestId);
|
|
|
|
- handler.handleException(new TransportException("connection reset"));
|
|
|
|
|
|
+ final Long requestId = first.get();
|
|
|
|
+ final HandshakeResponseHandler handler = pendingHandshakes.remove(requestId);
|
|
|
|
+ if (handler != null) {
|
|
|
|
+ // there might be a race removing this or this method might be called twice concurrently depending on how
|
|
|
|
+ // the channel is closed ie. due to connection reset or broken pipes
|
|
|
|
+ handler.handleException(new TransportException("connection reset"));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|