|
@@ -12,29 +12,39 @@ package org.elasticsearch.transport;
|
|
|
import org.apache.logging.log4j.Level;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
-import org.apache.lucene.store.AlreadyClosedException;
|
|
|
import org.apache.lucene.util.BytesRef;
|
|
|
import org.elasticsearch.TransportVersion;
|
|
|
import org.elasticsearch.TransportVersions;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
+import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
+import org.elasticsearch.common.bytes.CompositeBytesReference;
|
|
|
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
|
|
+import org.elasticsearch.common.compress.CompressorFactory;
|
|
|
+import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
|
|
|
+import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
+import org.elasticsearch.common.io.stream.Writeable;
|
|
|
import org.elasticsearch.common.network.CloseableChannel;
|
|
|
import org.elasticsearch.common.network.HandlingTimeTracker;
|
|
|
import org.elasticsearch.common.recycler.Recycler;
|
|
|
import org.elasticsearch.common.transport.NetworkExceptionHelper;
|
|
|
+import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
|
import org.elasticsearch.core.Nullable;
|
|
|
+import org.elasticsearch.core.RefCounted;
|
|
|
import org.elasticsearch.core.Releasable;
|
|
|
import org.elasticsearch.core.Releasables;
|
|
|
+import org.elasticsearch.core.Streams;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.function.Supplier;
|
|
|
|
|
|
import static org.elasticsearch.core.Strings.format;
|
|
|
|
|
|
-final class OutboundHandler {
|
|
|
+public final class OutboundHandler {
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(OutboundHandler.class);
|
|
|
|
|
@@ -83,7 +93,7 @@ final class OutboundHandler {
|
|
|
* thread.
|
|
|
*/
|
|
|
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
|
|
|
- internalSend(channel, bytes, null, listener);
|
|
|
+ internalSend(channel, bytes, () -> "raw bytes", listener);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -102,26 +112,17 @@ final class OutboundHandler {
|
|
|
final boolean isHandshake
|
|
|
) throws IOException, TransportException {
|
|
|
assert assertValidTransportVersion(transportVersion);
|
|
|
- final OutboundMessage.Request message = new OutboundMessage.Request(
|
|
|
- threadPool.getThreadContext(),
|
|
|
- request,
|
|
|
- transportVersion,
|
|
|
+ sendMessage(
|
|
|
+ channel,
|
|
|
action,
|
|
|
+ request,
|
|
|
requestId,
|
|
|
isHandshake,
|
|
|
- compressionScheme
|
|
|
+ compressionScheme,
|
|
|
+ transportVersion,
|
|
|
+ ResponseStatsConsumer.NONE,
|
|
|
+ () -> messageListener.onRequestSent(node, requestId, action, request, options)
|
|
|
);
|
|
|
- if (request.tryIncRef() == false) {
|
|
|
- assert false : "request [" + request + "] has been released already";
|
|
|
- throw new AlreadyClosedException("request [" + request + "] has been released already");
|
|
|
- }
|
|
|
- sendMessage(channel, message, ResponseStatsConsumer.NONE, () -> {
|
|
|
- try {
|
|
|
- messageListener.onRequestSent(node, requestId, action, request, options);
|
|
|
- } finally {
|
|
|
- request.decRef();
|
|
|
- }
|
|
|
- });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -141,23 +142,19 @@ final class OutboundHandler {
|
|
|
final ResponseStatsConsumer responseStatsConsumer
|
|
|
) {
|
|
|
assert assertValidTransportVersion(transportVersion);
|
|
|
- OutboundMessage.Response message = new OutboundMessage.Response(
|
|
|
- threadPool.getThreadContext(),
|
|
|
- response,
|
|
|
- transportVersion,
|
|
|
- requestId,
|
|
|
- isHandshake,
|
|
|
- compressionScheme
|
|
|
- );
|
|
|
- response.mustIncRef();
|
|
|
+ assert response.hasReferences();
|
|
|
try {
|
|
|
- sendMessage(channel, message, responseStatsConsumer, () -> {
|
|
|
- try {
|
|
|
- messageListener.onResponseSent(requestId, action);
|
|
|
- } finally {
|
|
|
- response.decRef();
|
|
|
- }
|
|
|
- });
|
|
|
+ sendMessage(
|
|
|
+ channel,
|
|
|
+ null,
|
|
|
+ response,
|
|
|
+ requestId,
|
|
|
+ isHandshake,
|
|
|
+ compressionScheme,
|
|
|
+ transportVersion,
|
|
|
+ responseStatsConsumer,
|
|
|
+ () -> messageListener.onResponseSent(requestId, action)
|
|
|
+ );
|
|
|
} catch (Exception ex) {
|
|
|
if (isHandshake) {
|
|
|
logger.error(
|
|
@@ -187,16 +184,19 @@ final class OutboundHandler {
|
|
|
final Exception error
|
|
|
) {
|
|
|
assert assertValidTransportVersion(transportVersion);
|
|
|
- OutboundMessage.Response message = new OutboundMessage.Response(
|
|
|
- threadPool.getThreadContext(),
|
|
|
- new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error),
|
|
|
- transportVersion,
|
|
|
- requestId,
|
|
|
- false,
|
|
|
- null
|
|
|
- );
|
|
|
+ var msg = new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error);
|
|
|
try {
|
|
|
- sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action, error));
|
|
|
+ sendMessage(
|
|
|
+ channel,
|
|
|
+ null,
|
|
|
+ msg,
|
|
|
+ requestId,
|
|
|
+ false,
|
|
|
+ null,
|
|
|
+ transportVersion,
|
|
|
+ responseStatsConsumer,
|
|
|
+ () -> messageListener.onResponseSent(requestId, action, error)
|
|
|
+ );
|
|
|
} catch (Exception sendException) {
|
|
|
sendException.addSuppressed(error);
|
|
|
logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException);
|
|
@@ -206,42 +206,183 @@ final class OutboundHandler {
|
|
|
|
|
|
private void sendMessage(
|
|
|
TcpChannel channel,
|
|
|
- OutboundMessage networkMessage,
|
|
|
+ @Nullable String requestAction,
|
|
|
+ Writeable writeable,
|
|
|
+ long requestId,
|
|
|
+ boolean isHandshake,
|
|
|
+ Compression.Scheme compressionScheme,
|
|
|
+ TransportVersion version,
|
|
|
ResponseStatsConsumer responseStatsConsumer,
|
|
|
Releasable onAfter
|
|
|
) throws IOException {
|
|
|
- final RecyclerBytesStreamOutput byteStreamOutput;
|
|
|
- boolean bufferSuccess = false;
|
|
|
- try {
|
|
|
- byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
|
|
|
- bufferSuccess = true;
|
|
|
- } finally {
|
|
|
- if (bufferSuccess == false) {
|
|
|
- Releasables.closeExpectNoException(onAfter);
|
|
|
- }
|
|
|
- }
|
|
|
- final Releasable release = Releasables.wrap(byteStreamOutput, onAfter);
|
|
|
+ compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme;
|
|
|
final BytesReference message;
|
|
|
boolean serializeSuccess = false;
|
|
|
+ final boolean isError = writeable instanceof RemoteTransportException;
|
|
|
+ final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
|
|
|
try {
|
|
|
- message = networkMessage.serialize(byteStreamOutput);
|
|
|
+ message = serialize(
|
|
|
+ requestAction,
|
|
|
+ requestId,
|
|
|
+ isHandshake,
|
|
|
+ version,
|
|
|
+ isError,
|
|
|
+ compressionScheme,
|
|
|
+ writeable,
|
|
|
+ threadPool.getThreadContext(),
|
|
|
+ byteStreamOutput
|
|
|
+ );
|
|
|
serializeSuccess = true;
|
|
|
} catch (Exception e) {
|
|
|
- logger.warn(() -> "failed to serialize outbound message [" + networkMessage + "]", e);
|
|
|
+ logger.warn(() -> "failed to serialize outbound message [" + writeable + "]", e);
|
|
|
throw e;
|
|
|
} finally {
|
|
|
if (serializeSuccess == false) {
|
|
|
- release.close();
|
|
|
+ Releasables.close(byteStreamOutput, onAfter);
|
|
|
}
|
|
|
}
|
|
|
responseStatsConsumer.addResponseStats(message.length());
|
|
|
- internalSend(channel, message, networkMessage, ActionListener.running(release::close));
|
|
|
+ final var responseType = writeable.getClass();
|
|
|
+ final boolean compress = compressionScheme != null;
|
|
|
+ internalSend(
|
|
|
+ channel,
|
|
|
+ message,
|
|
|
+ requestAction == null
|
|
|
+ ? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
|
|
|
+ : () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}",
|
|
|
+ ActionListener.releasing(
|
|
|
+ message instanceof ReleasableBytesReference r
|
|
|
+ ? Releasables.wrap(byteStreamOutput, onAfter, r)
|
|
|
+ : Releasables.wrap(byteStreamOutput, onAfter)
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ // public for tests
|
|
|
+ public static BytesReference serialize(
|
|
|
+ @Nullable String requestAction,
|
|
|
+ long requestId,
|
|
|
+ boolean isHandshake,
|
|
|
+ TransportVersion version,
|
|
|
+ boolean isError,
|
|
|
+ Compression.Scheme compressionScheme,
|
|
|
+ Writeable writeable,
|
|
|
+ ThreadContext threadContext,
|
|
|
+ RecyclerBytesStreamOutput byteStreamOutput
|
|
|
+ ) throws IOException {
|
|
|
+ compressionScheme = compressionScheme == Compression.Scheme.LZ4 && version.before(Compression.Scheme.LZ4_VERSION)
|
|
|
+ ? null
|
|
|
+ : compressionScheme;
|
|
|
+ assert byteStreamOutput.position() == 0;
|
|
|
+ byteStreamOutput.setTransportVersion(version);
|
|
|
+ final int headerSize = TcpHeader.headerSize(version);
|
|
|
+ byteStreamOutput.skip(headerSize);
|
|
|
+ final int variableHeaderLength;
|
|
|
+ if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
|
|
|
+ threadContext.writeTo(byteStreamOutput);
|
|
|
+ if (requestAction != null) {
|
|
|
+ if (version.before(TransportVersions.V_8_0_0)) {
|
|
|
+ // empty features array
|
|
|
+ byteStreamOutput.writeStringArray(Strings.EMPTY_ARRAY);
|
|
|
+ }
|
|
|
+ byteStreamOutput.writeString(requestAction);
|
|
|
+ }
|
|
|
+ variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - headerSize);
|
|
|
+ } else {
|
|
|
+ variableHeaderLength = -1;
|
|
|
+ }
|
|
|
+ BytesReference message = serializeMessageBody(
|
|
|
+ writeable,
|
|
|
+ compressionScheme,
|
|
|
+ version,
|
|
|
+ byteStreamOutput,
|
|
|
+ variableHeaderLength,
|
|
|
+ threadContext,
|
|
|
+ requestAction
|
|
|
+ );
|
|
|
+ byte status = 0;
|
|
|
+ if (requestAction == null) {
|
|
|
+ status = TransportStatus.setResponse(status);
|
|
|
+ }
|
|
|
+ if (isHandshake) {
|
|
|
+ status = TransportStatus.setHandshake(status);
|
|
|
+ }
|
|
|
+ if (isError) {
|
|
|
+ status = TransportStatus.setError(status);
|
|
|
+ }
|
|
|
+ if (compressionScheme != null) {
|
|
|
+ status = TransportStatus.setCompress(status);
|
|
|
+ }
|
|
|
+ byteStreamOutput.seek(0);
|
|
|
+ TcpHeader.writeHeader(byteStreamOutput, requestId, status, version, message.length() - headerSize, variableHeaderLength);
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static BytesReference serializeMessageBody(
|
|
|
+ Writeable writeable,
|
|
|
+ Compression.Scheme compressionScheme,
|
|
|
+ TransportVersion version,
|
|
|
+ RecyclerBytesStreamOutput byteStreamOutput,
|
|
|
+ int variableHeaderLength,
|
|
|
+ ThreadContext threadContext,
|
|
|
+ String requestAction
|
|
|
+ ) throws IOException {
|
|
|
+ // The compressible bytes stream will not close the underlying bytes stream
|
|
|
+ final StreamOutput stream = compressionScheme != null ? wrapCompressed(compressionScheme, byteStreamOutput) : byteStreamOutput;
|
|
|
+ final ReleasableBytesReference zeroCopyBuffer;
|
|
|
+ try {
|
|
|
+ stream.setTransportVersion(version);
|
|
|
+ if (variableHeaderLength == -1) {
|
|
|
+ threadContext.writeTo(stream);
|
|
|
+ if (requestAction != null) {
|
|
|
+ stream.writeStringArray(Strings.EMPTY_ARRAY);
|
|
|
+ stream.writeString(requestAction);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (writeable instanceof BytesTransportRequest bRequest) {
|
|
|
+ bRequest.writeThin(stream);
|
|
|
+ zeroCopyBuffer = bRequest.bytes;
|
|
|
+ } else if (writeable instanceof RemoteTransportException remoteTransportException) {
|
|
|
+ stream.writeException(remoteTransportException);
|
|
|
+ zeroCopyBuffer = ReleasableBytesReference.empty();
|
|
|
+ } else {
|
|
|
+ writeable.writeTo(stream);
|
|
|
+ zeroCopyBuffer = ReleasableBytesReference.empty();
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ // We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker)
|
|
|
+ // are written.
|
|
|
+ if (compressionScheme != null) {
|
|
|
+ stream.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ final BytesReference msg = byteStreamOutput.bytes();
|
|
|
+ if (zeroCopyBuffer.length() == 0) {
|
|
|
+ return msg;
|
|
|
+ }
|
|
|
+ zeroCopyBuffer.mustIncRef();
|
|
|
+ return new ReleasableBytesReference(CompositeBytesReference.of(msg, zeroCopyBuffer), (RefCounted) zeroCopyBuffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ // compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release
|
|
|
+ // resources and write EOS marker bytes but must not yet release the bytes themselves
|
|
|
+ private static StreamOutput wrapCompressed(Compression.Scheme compressionScheme, RecyclerBytesStreamOutput bytesStream)
|
|
|
+ throws IOException {
|
|
|
+ if (compressionScheme == Compression.Scheme.DEFLATE) {
|
|
|
+ return new OutputStreamStreamOutput(
|
|
|
+ CompressorFactory.COMPRESSOR.threadLocalOutputStream(org.elasticsearch.core.Streams.noCloseStream(bytesStream))
|
|
|
+ );
|
|
|
+ } else if (compressionScheme == Compression.Scheme.LZ4) {
|
|
|
+ return new OutputStreamStreamOutput(Compression.Scheme.lz4OutputStream(Streams.noCloseStream(bytesStream)));
|
|
|
+ } else {
|
|
|
+ throw new IllegalArgumentException("Invalid compression scheme: " + compressionScheme);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void internalSend(
|
|
|
TcpChannel channel,
|
|
|
BytesReference reference,
|
|
|
- @Nullable OutboundMessage message,
|
|
|
+ Supplier<String> messageDescription,
|
|
|
ActionListener<Void> listener
|
|
|
) {
|
|
|
final long startTime = threadPool.rawRelativeTimeInMillis();
|
|
@@ -281,7 +422,7 @@ final class OutboundHandler {
|
|
|
logger.warn(
|
|
|
"sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn "
|
|
|
+ "threshold of [{}ms] with success [{}]",
|
|
|
- message,
|
|
|
+ messageDescription.get(),
|
|
|
messageSize,
|
|
|
channel,
|
|
|
took,
|