|
|
@@ -40,7 +40,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
|
-final class OutboundHandler {
|
|
|
+public final class OutboundHandler {
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(OutboundHandler.class);
|
|
|
|
|
|
@@ -61,12 +61,7 @@ final class OutboundHandler {
|
|
|
|
|
|
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
|
|
|
SendContext sendContext = new SendContext(channel, () -> bytes, listener);
|
|
|
- try {
|
|
|
- internalSend(channel, sendContext);
|
|
|
- } catch (IOException e) {
|
|
|
- // This should not happen as the bytes are already serialized
|
|
|
- throw new AssertionError(e);
|
|
|
- }
|
|
|
+ internalSend(channel, sendContext);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -119,11 +114,10 @@ final class OutboundHandler {
|
|
|
internalSend(channel, sendContext);
|
|
|
}
|
|
|
|
|
|
- private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException {
|
|
|
+ private void internalSend(TcpChannel channel, SendContext sendContext) {
|
|
|
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
|
|
|
- BytesReference reference = sendContext.get();
|
|
|
try {
|
|
|
- channel.sendMessage(reference, sendContext);
|
|
|
+ channel.sendMessage(sendContext);
|
|
|
} catch (RuntimeException ex) {
|
|
|
sendContext.onFailure(ex);
|
|
|
CloseableChannel.closeChannel(channel);
|
|
|
@@ -142,7 +136,7 @@ final class OutboundHandler {
|
|
|
|
|
|
private static class MessageSerializer implements CheckedSupplier<BytesReference, IOException>, Releasable {
|
|
|
|
|
|
- private final OutboundMessage message;
|
|
|
+ private OutboundMessage message;
|
|
|
private final BigArrays bigArrays;
|
|
|
private volatile ReleasableBytesStreamOutput bytesStreamOutput;
|
|
|
|
|
|
@@ -153,8 +147,12 @@ final class OutboundHandler {
|
|
|
|
|
|
@Override
|
|
|
public BytesReference get() throws IOException {
|
|
|
- bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
|
|
|
- return message.serialize(bytesStreamOutput);
|
|
|
+ try {
|
|
|
+ bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
|
|
|
+ return message.serialize(bytesStreamOutput);
|
|
|
+ } finally {
|
|
|
+ message = null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -163,10 +161,10 @@ final class OutboundHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
|
|
|
+ public class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
|
|
|
|
|
|
private final TcpChannel channel;
|
|
|
- private final CheckedSupplier<BytesReference, IOException> messageSupplier;
|
|
|
+ private CheckedSupplier<BytesReference, IOException> messageSupplier;
|
|
|
private final ActionListener<Void> listener;
|
|
|
private final Releasable optionalReleasable;
|
|
|
private long messageSize = -1;
|
|
|
@@ -184,10 +182,13 @@ final class OutboundHandler {
|
|
|
this.optionalReleasable = optionalReleasable;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public BytesReference get() throws IOException {
|
|
|
BytesReference message;
|
|
|
try {
|
|
|
+ assert messageSupplier != null;
|
|
|
message = messageSupplier.get();
|
|
|
+ messageSupplier = null;
|
|
|
messageSize = message.length();
|
|
|
TransportLogger.logOutboundMessage(channel, message);
|
|
|
return message;
|
|
|
@@ -206,6 +207,7 @@ final class OutboundHandler {
|
|
|
|
|
|
@Override
|
|
|
protected void innerOnFailure(Exception e) {
|
|
|
+ messageSupplier = null;
|
|
|
if (NetworkExceptionHelper.isCloseConnectionException(e)) {
|
|
|
logger.debug(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e);
|
|
|
} else {
|