|
|
@@ -32,9 +32,7 @@ import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.breaker.CircuitBreaker;
|
|
|
import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
-import org.elasticsearch.common.collect.MapBuilder;
|
|
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|
|
-import org.elasticsearch.common.component.Lifecycle;
|
|
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.lease.Releasable;
|
|
|
@@ -52,10 +50,8 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.common.util.PageCacheRecycler;
|
|
|
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.common.util.concurrent.CountDown;
|
|
|
-import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
|
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
|
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
|
|
import org.elasticsearch.node.Node;
|
|
|
@@ -107,15 +103,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]);
|
|
|
|
|
|
protected final Settings settings;
|
|
|
- private final CircuitBreakerService circuitBreakerService;
|
|
|
protected final ThreadPool threadPool;
|
|
|
- protected final BigArrays bigArrays;
|
|
|
protected final PageCacheRecycler pageCacheRecycler;
|
|
|
protected final NetworkService networkService;
|
|
|
protected final Set<ProfileSettings> profileSettings;
|
|
|
|
|
|
- private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
|
|
|
-
|
|
|
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
|
|
|
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
|
|
|
private final Set<TcpChannel> acceptedChannels = ConcurrentCollections.newConcurrentSet();
|
|
|
@@ -125,14 +117,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
|
|
|
private volatile BoundTransportAddress boundAddress;
|
|
|
|
|
|
- private final MeanMetric readBytesMetric = new MeanMetric();
|
|
|
- private volatile Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers = Collections.emptyMap();
|
|
|
- private final ResponseHandlers responseHandlers = new ResponseHandlers();
|
|
|
- private final TransportLogger transportLogger;
|
|
|
private final TransportHandshaker handshaker;
|
|
|
private final TransportKeepAlive keepAlive;
|
|
|
- private final InboundMessage.Reader reader;
|
|
|
private final OutboundHandler outboundHandler;
|
|
|
+ private final InboundHandler inboundHandler;
|
|
|
|
|
|
public TcpTransport(Settings settings, Version version, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
|
|
|
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
|
|
|
@@ -140,11 +128,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
this.settings = settings;
|
|
|
this.profileSettings = getProfileSettings(settings);
|
|
|
this.threadPool = threadPool;
|
|
|
- this.bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
|
|
|
this.pageCacheRecycler = pageCacheRecycler;
|
|
|
- this.circuitBreakerService = circuitBreakerService;
|
|
|
this.networkService = networkService;
|
|
|
- this.transportLogger = new TransportLogger();
|
|
|
+ TransportLogger transportLogger = new TransportLogger();
|
|
|
String nodeName = Node.NODE_NAME_SETTING.get(settings);
|
|
|
final Settings defaultFeatures = TransportSettings.DEFAULT_FEATURES_SETTING.get(settings);
|
|
|
String[] features;
|
|
|
@@ -159,16 +145,19 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
// use a sorted set to present the features in a consistent order
|
|
|
features = new TreeSet<>(defaultFeatures.names()).toArray(new String[defaultFeatures.names().size()]);
|
|
|
}
|
|
|
- this.outboundHandler = new OutboundHandler(nodeName, version, features, threadPool, bigArrays, transportLogger);
|
|
|
+ BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
|
|
|
|
|
|
+ this.outboundHandler = new OutboundHandler(nodeName, version, features, threadPool, bigArrays, transportLogger);
|
|
|
this.handshaker = new TransportHandshaker(version, threadPool,
|
|
|
(node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
|
|
|
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
|
|
|
TransportRequestOptions.EMPTY, v, false, true),
|
|
|
(v, features1, channel, response, requestId) -> outboundHandler.sendResponse(v, features1, channel, requestId,
|
|
|
TransportHandshaker.HANDSHAKE_ACTION_NAME, response, false, true));
|
|
|
+ InboundMessage.Reader reader = new InboundMessage.Reader(version, namedWriteableRegistry, threadPool.getThreadContext());
|
|
|
this.keepAlive = new TransportKeepAlive(threadPool, this.outboundHandler::sendBytes);
|
|
|
- this.reader = new InboundMessage.Reader(version, namedWriteableRegistry, threadPool.getThreadContext());
|
|
|
+ this.inboundHandler = new InboundHandler(threadPool, outboundHandler, reader, circuitBreakerService, transportLogger, handshaker,
|
|
|
+ keepAlive);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -177,26 +166,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
|
|
|
@Override
|
|
|
public synchronized void setMessageListener(TransportMessageListener listener) {
|
|
|
- if (messageListener == TransportMessageListener.NOOP_LISTENER) {
|
|
|
- messageListener = listener;
|
|
|
- outboundHandler.setMessageListener(listener);
|
|
|
- } else {
|
|
|
- throw new IllegalStateException("Cannot set message listener twice");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public CircuitBreaker getInFlightRequestBreaker() {
|
|
|
- // We always obtain a fresh breaker to reflect changes to the breaker configuration.
|
|
|
- return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
|
|
|
+ outboundHandler.setMessageListener(listener);
|
|
|
+ inboundHandler.setMessageListener(listener);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public synchronized <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
|
|
|
- if (requestHandlers.containsKey(reg.getAction())) {
|
|
|
- throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
|
|
|
- }
|
|
|
- requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
|
|
|
+ inboundHandler.registerRequestHandler(reg);
|
|
|
}
|
|
|
|
|
|
public final class NodeChannels extends CloseableConnection {
|
|
|
@@ -665,14 +641,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
*/
|
|
|
public void inboundMessage(TcpChannel channel, BytesReference message) {
|
|
|
try {
|
|
|
- channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
|
|
|
- transportLogger.logInboundMessage(channel, message);
|
|
|
- // Message length of 0 is a ping
|
|
|
- if (message.length() != 0) {
|
|
|
- messageReceived(message, channel);
|
|
|
- } else {
|
|
|
- keepAlive.receiveKeepAlive(channel);
|
|
|
- }
|
|
|
+ inboundHandler.inboundMessage(channel, message);
|
|
|
} catch (Exception e) {
|
|
|
onException(channel, e);
|
|
|
}
|
|
|
@@ -820,200 +789,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This method handles the message receive part for both request and responses
|
|
|
- */
|
|
|
- public final void messageReceived(BytesReference reference, TcpChannel channel) throws IOException {
|
|
|
- readBytesMetric.inc(reference.length() + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE);
|
|
|
- InetSocketAddress remoteAddress = channel.getRemoteAddress();
|
|
|
-
|
|
|
- ThreadContext threadContext = threadPool.getThreadContext();
|
|
|
- try (ThreadContext.StoredContext existing = threadContext.stashContext();
|
|
|
- InboundMessage message = reader.deserialize(reference)) {
|
|
|
- // Place the context with the headers from the message
|
|
|
- message.getStoredContext().restore();
|
|
|
- threadContext.putTransient("_remote_address", remoteAddress);
|
|
|
- if (message.isRequest()) {
|
|
|
- handleRequest(channel, (InboundMessage.Request) message, reference.length());
|
|
|
- } else {
|
|
|
- final TransportResponseHandler<?> handler;
|
|
|
- long requestId = message.getRequestId();
|
|
|
- if (message.isHandshake()) {
|
|
|
- handler = handshaker.removeHandlerForHandshake(requestId);
|
|
|
- } else {
|
|
|
- TransportResponseHandler<? extends TransportResponse> theHandler =
|
|
|
- responseHandlers.onResponseReceived(requestId, messageListener);
|
|
|
- if (theHandler == null && message.isError()) {
|
|
|
- handler = handshaker.removeHandlerForHandshake(requestId);
|
|
|
- } else {
|
|
|
- handler = theHandler;
|
|
|
- }
|
|
|
- }
|
|
|
- // ignore if its null, the service logs it
|
|
|
- if (handler != null) {
|
|
|
- if (message.isError()) {
|
|
|
- handlerResponseError(message.getStreamInput(), handler);
|
|
|
- } else {
|
|
|
- handleResponse(remoteAddress, message.getStreamInput(), handler);
|
|
|
- }
|
|
|
- // Check the entire message has been read
|
|
|
- final int nextByte = message.getStreamInput().read();
|
|
|
- // calling read() is useful to make sure the message is fully read, even if there is an EOS marker
|
|
|
- if (nextByte != -1) {
|
|
|
- throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
|
|
|
- + handler + "], error [" + message.isError() + "]; resetting");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private <T extends TransportResponse> void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream,
|
|
|
- final TransportResponseHandler<T> handler) {
|
|
|
- final T response;
|
|
|
- try {
|
|
|
- response = handler.read(stream);
|
|
|
- response.remoteAddress(new TransportAddress(remoteAddress));
|
|
|
- } catch (Exception e) {
|
|
|
- handleException(handler, new TransportSerializationException(
|
|
|
- "Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
|
|
|
- return;
|
|
|
- }
|
|
|
- threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- handleException(handler, new ResponseHandlerFailureTransportException(e));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected void doRun() throws Exception {
|
|
|
- handler.handleResponse(response);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Executed for a received response error
|
|
|
- */
|
|
|
- private void handlerResponseError(StreamInput stream, final TransportResponseHandler handler) {
|
|
|
- Exception error;
|
|
|
- try {
|
|
|
- error = stream.readException();
|
|
|
- } catch (Exception e) {
|
|
|
- error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
|
|
|
- }
|
|
|
- handleException(handler, error);
|
|
|
- }
|
|
|
-
|
|
|
- private void handleException(final TransportResponseHandler handler, Throwable error) {
|
|
|
- if (!(error instanceof RemoteTransportException)) {
|
|
|
- error = new RemoteTransportException(error.getMessage(), error);
|
|
|
- }
|
|
|
- final RemoteTransportException rtx = (RemoteTransportException) error;
|
|
|
- threadPool.executor(handler.executor()).execute(() -> {
|
|
|
- try {
|
|
|
- handler.handleException(rtx);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(() -> new ParameterizedMessage("failed to handle exception response [{}]", handler), e);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- protected void handleRequest(TcpChannel channel, InboundMessage.Request message, int messageLengthBytes) throws IOException {
|
|
|
- final Set<String> features = message.getFeatures();
|
|
|
- final String profileName = channel.getProfile();
|
|
|
- final String action = message.getActionName();
|
|
|
- final long requestId = message.getRequestId();
|
|
|
- final StreamInput stream = message.getStreamInput();
|
|
|
- final Version version = message.getVersion();
|
|
|
- messageListener.onRequestReceived(requestId, action);
|
|
|
- TransportChannel transportChannel = null;
|
|
|
- try {
|
|
|
- if (message.isHandshake()) {
|
|
|
- handshaker.handleHandshake(version, features, channel, requestId, stream);
|
|
|
- } else {
|
|
|
- final RequestHandlerRegistry reg = getRequestHandler(action);
|
|
|
- if (reg == null) {
|
|
|
- throw new ActionNotFoundTransportException(action);
|
|
|
- }
|
|
|
- if (reg.canTripCircuitBreaker()) {
|
|
|
- getInFlightRequestBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
|
|
|
- } else {
|
|
|
- getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
|
|
|
- }
|
|
|
- transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version, features,
|
|
|
- circuitBreakerService, messageLengthBytes, message.isCompress());
|
|
|
- final TransportRequest request = reg.newRequest(stream);
|
|
|
- request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
|
|
|
- // in case we throw an exception, i.e. when the limit is hit, we don't want to verify
|
|
|
- validateRequest(stream, requestId, action);
|
|
|
- threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- // the circuit breaker tripped
|
|
|
- if (transportChannel == null) {
|
|
|
- transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version, features,
|
|
|
- circuitBreakerService, 0, message.isCompress());
|
|
|
- }
|
|
|
- try {
|
|
|
- transportChannel.sendResponse(e);
|
|
|
- } catch (IOException inner) {
|
|
|
- inner.addSuppressed(e);
|
|
|
- logger.warn(() -> new ParameterizedMessage("Failed to send error message back to client for action [{}]", action), inner);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // This template method is needed to inject custom error checking logic in tests.
|
|
|
- protected void validateRequest(StreamInput stream, long requestId, String action) throws IOException {
|
|
|
- final int nextByte = stream.read();
|
|
|
- // calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
|
|
|
- if (nextByte != -1) {
|
|
|
- throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action
|
|
|
- + "], available [" + stream.available() + "]; resetting");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- class RequestHandler extends AbstractRunnable {
|
|
|
- private final RequestHandlerRegistry reg;
|
|
|
- private final TransportRequest request;
|
|
|
- private final TransportChannel transportChannel;
|
|
|
-
|
|
|
- RequestHandler(RequestHandlerRegistry reg, TransportRequest request, TransportChannel transportChannel) {
|
|
|
- this.reg = reg;
|
|
|
- this.request = request;
|
|
|
- this.transportChannel = transportChannel;
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings({"unchecked"})
|
|
|
- @Override
|
|
|
- protected void doRun() throws Exception {
|
|
|
- reg.processMessageReceived(request, transportChannel);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean isForceExecution() {
|
|
|
- return reg.isForceExecution();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- if (lifecycleState() == Lifecycle.State.STARTED) {
|
|
|
- // we can only send a response transport is started....
|
|
|
- try {
|
|
|
- transportChannel.sendResponse(e);
|
|
|
- } catch (Exception inner) {
|
|
|
- inner.addSuppressed(e);
|
|
|
- logger.warn(() -> new ParameterizedMessage(
|
|
|
- "Failed to send error message back to client for action [{}]", reg.getAction()), inner);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, ActionListener<Version> listener) {
|
|
|
- handshaker.sendHandshake(responseHandlers.newRequestId(), node, channel, profile.getHandshakeTimeout(), listener);
|
|
|
+ long requestId = inboundHandler.getResponseHandlers().newRequestId();
|
|
|
+ handshaker.sendHandshake(requestId, node, channel, profile.getHandshakeTimeout(), listener);
|
|
|
}
|
|
|
|
|
|
final TransportKeepAlive getKeepAlive() {
|
|
|
@@ -1037,7 +815,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
*
|
|
|
* @throws IllegalStateException if the transport is not started / open
|
|
|
*/
|
|
|
- protected final void ensureOpen() {
|
|
|
+ private void ensureOpen() {
|
|
|
if (lifecycle.started() == false) {
|
|
|
throw new IllegalStateException("transport has been stopped");
|
|
|
}
|
|
|
@@ -1046,7 +824,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
@Override
|
|
|
public final TransportStats getStats() {
|
|
|
MeanMetric transmittedBytes = outboundHandler.getTransmittedBytes();
|
|
|
- return new TransportStats(acceptedChannels.size(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytes.count(),
|
|
|
+ MeanMetric readBytes = inboundHandler.getReadBytes();
|
|
|
+ return new TransportStats(acceptedChannels.size(), readBytes.count(), readBytes.sum(), transmittedBytes.count(),
|
|
|
transmittedBytes.sum());
|
|
|
}
|
|
|
|
|
|
@@ -1107,12 +886,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|
|
|
|
|
@Override
|
|
|
public final ResponseHandlers getResponseHandlers() {
|
|
|
- return responseHandlers;
|
|
|
+ return inboundHandler.getResponseHandlers();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public final RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
|
|
|
- return requestHandlers.get(action);
|
|
|
+ return inboundHandler.getRequestHandler(action);
|
|
|
}
|
|
|
|
|
|
private final class ChannelsConnectedListener implements ActionListener<Void> {
|