|
@@ -25,8 +25,10 @@ import org.apache.logging.log4j.util.Supplier;
|
|
|
import org.apache.lucene.util.IOUtils;
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.Version;
|
|
|
+import org.elasticsearch.action.ActionFuture;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.NotifyOnceListener;
|
|
|
+import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.common.CheckedBiConsumer;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
@@ -104,7 +106,6 @@ import java.util.function.Consumer;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
import static java.util.Collections.emptyList;
|
|
|
import static java.util.Collections.unmodifiableMap;
|
|
@@ -117,7 +118,7 @@ import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseC
|
|
|
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
|
|
|
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
|
|
|
|
|
-public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent implements Transport {
|
|
|
+public abstract class TcpTransport<Channel extends TcpChannel> extends AbstractLifecycleComponent implements Transport {
|
|
|
|
|
|
public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker";
|
|
|
public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
|
|
@@ -178,7 +179,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
public static final Setting.AffixSetting<List<String>> PUBLISH_HOST_PROFILE = affixKeySetting("transport.profiles.", "publish_host",
|
|
|
key -> listSetting(key, PUBLISH_HOST, Function.identity(), Setting.Property.NodeScope));
|
|
|
public static final Setting.AffixSetting<String> PORT_PROFILE = affixKeySetting("transport.profiles.", "port",
|
|
|
- key -> new Setting(key, PORT, Function.identity(), Setting.Property.NodeScope));
|
|
|
+ key -> new Setting<>(key, PORT, Function.identity(), Setting.Property.NodeScope));
|
|
|
public static final Setting.AffixSetting<Integer> PUBLISH_PORT_PROFILE = affixKeySetting("transport.profiles.", "publish_port",
|
|
|
key -> intSetting(key, -1, -1, Setting.Property.NodeScope));
|
|
|
|
|
@@ -197,8 +198,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
// node id to actual channel
|
|
|
protected final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
|
|
|
|
|
|
- protected final Map<String, List<Channel>> serverChannels = newConcurrentMap();
|
|
|
protected final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
|
|
|
+ private final Map<String, List<Channel>> serverChannels = newConcurrentMap();
|
|
|
+ private final Set<Channel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
|
|
|
|
|
|
protected final KeyedLock<String> connectionLock = new KeyedLock<>();
|
|
|
private final NamedWriteableRegistry namedWriteableRegistry;
|
|
@@ -347,7 +349,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
|
|
|
@Override
|
|
|
protected void innerOnFailure(Exception e) {
|
|
|
- if (isOpen(channel)) {
|
|
|
+ if (channel.isOpen()) {
|
|
|
logger.debug(
|
|
|
(Supplier<?>) () -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e);
|
|
|
failedPings.inc();
|
|
@@ -395,29 +397,22 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
|
|
|
public final class NodeChannels implements Connection {
|
|
|
private final Map<TransportRequestOptions.Type, ConnectionProfile.ConnectionTypeHandle> typeMapping;
|
|
|
- private final Channel[] channels;
|
|
|
+ private final List<Channel> channels;
|
|
|
private final DiscoveryNode node;
|
|
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
private final Version version;
|
|
|
|
|
|
- public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile connectionProfile) {
|
|
|
+ NodeChannels(DiscoveryNode node, List<Channel> channels, ConnectionProfile connectionProfile, Version handshakeVersion) {
|
|
|
this.node = node;
|
|
|
- this.channels = channels;
|
|
|
- assert channels.length == connectionProfile.getNumConnections() : "expected channels size to be == "
|
|
|
- + connectionProfile.getNumConnections() + " but was: [" + channels.length + "]";
|
|
|
+ this.channels = Collections.unmodifiableList(channels);
|
|
|
+ assert channels.size() == connectionProfile.getNumConnections() : "expected channels size to be == "
|
|
|
+ + connectionProfile.getNumConnections() + " but was: [" + channels.size() + "]";
|
|
|
typeMapping = new EnumMap<>(TransportRequestOptions.Type.class);
|
|
|
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) {
|
|
|
for (TransportRequestOptions.Type type : handle.getTypes())
|
|
|
typeMapping.put(type, handle);
|
|
|
}
|
|
|
- version = node.getVersion();
|
|
|
- }
|
|
|
-
|
|
|
- NodeChannels(NodeChannels channels, Version handshakeVersion) {
|
|
|
- this.node = channels.node;
|
|
|
- this.channels = channels.channels;
|
|
|
- this.typeMapping = channels.typeMapping;
|
|
|
- this.version = handshakeVersion;
|
|
|
+ version = handshakeVersion;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -426,7 +421,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
}
|
|
|
|
|
|
public List<Channel> getChannels() {
|
|
|
- return Arrays.asList(channels);
|
|
|
+ return channels;
|
|
|
}
|
|
|
|
|
|
public Channel channel(TransportRequestOptions.Type type) {
|
|
@@ -437,12 +432,34 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
return connectionTypeHandle.getChannel(channels);
|
|
|
}
|
|
|
|
|
|
+ public boolean allChannelsOpen() {
|
|
|
+ return channels.stream().allMatch(TcpChannel::isOpen);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
if (closed.compareAndSet(false, true)) {
|
|
|
try {
|
|
|
- closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false,
|
|
|
- lifecycle.stopped());
|
|
|
+ if (lifecycle.stopped()) {
|
|
|
+ /* We set SO_LINGER timeout to 0 to ensure that when we shutdown the node we don't
|
|
|
+ * have a gazillion connections sitting in TIME_WAIT to free up resources quickly.
|
|
|
+ * This is really the only part where we close the connection from the server side
|
|
|
+ * otherwise the client (node) initiates the TCP closing sequence which doesn't cause
|
|
|
+ * these issues. Setting this by default from the beginning can have unexpected
|
|
|
+ * side-effects an should be avoided, our protocol is designed in a way that clients
|
|
|
+ * close connection which is how it should be*/
|
|
|
+
|
|
|
+ channels.forEach(c -> {
|
|
|
+ try {
|
|
|
+ c.setSoLinger(0);
|
|
|
+ } catch (IOException e) {
|
|
|
+ logger.warn(new ParameterizedMessage("unexpected exception when setting SO_LINGER on channel {}", c), e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false;
|
|
|
+ TcpChannel.closeChannels(channels, block);
|
|
|
} finally {
|
|
|
transportService.onConnectionClosed(this);
|
|
|
}
|
|
@@ -478,7 +495,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
|
|
|
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
|
|
|
throws ConnectTransportException {
|
|
|
- connectionProfile = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
|
|
|
+ connectionProfile = resolveConnectionProfile(connectionProfile);
|
|
|
if (node == null) {
|
|
|
throw new ConnectTransportException(null, "can't connect to a null node");
|
|
|
}
|
|
@@ -559,6 +576,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
|
|
|
+ return resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
|
|
|
if (node == null) {
|
|
@@ -566,40 +587,66 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
}
|
|
|
boolean success = false;
|
|
|
NodeChannels nodeChannels = null;
|
|
|
- connectionProfile = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
|
|
|
+ connectionProfile = resolveConnectionProfile(connectionProfile);
|
|
|
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
|
|
|
try {
|
|
|
ensureOpen();
|
|
|
try {
|
|
|
+ int numConnections = connectionProfile.getNumConnections();
|
|
|
+ assert numConnections > 0 : "A connection profile must be configured with at least one connection";
|
|
|
+ List<Channel> channels = new ArrayList<>(numConnections);
|
|
|
+ List<ActionFuture<Channel>> connectionFutures = new ArrayList<>(numConnections);
|
|
|
+ for (int i = 0; i < numConnections; ++i) {
|
|
|
+ try {
|
|
|
+ PlainActionFuture<Channel> connectFuture = PlainActionFuture.newFuture();
|
|
|
+ connectionFutures.add(connectFuture);
|
|
|
+ Channel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture);
|
|
|
+ channels.add(channel);
|
|
|
+ } catch (Exception e) {
|
|
|
+ // If there was an exception when attempting to instantiate the raw channels, we close all of the channels
|
|
|
+ TcpChannel.closeChannels(channels, false);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we make it past the block above, we successfully instantiated all of the channels
|
|
|
+ try {
|
|
|
+ TcpChannel.awaitConnected(node, connectionFutures, connectionProfile.getConnectTimeout());
|
|
|
+ } catch (Exception ex) {
|
|
|
+ TcpChannel.closeChannels(channels, false);
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we make it past the block above, we have successfully established connections for all of the channels
|
|
|
+ final Channel handshakeChannel = channels.get(0); // one channel is guaranteed by the connection profile
|
|
|
+ handshakeChannel.addCloseListener(ActionListener.wrap(() -> cancelHandshakeForChannel(handshakeChannel)));
|
|
|
+ Version version;
|
|
|
+ try {
|
|
|
+ version = executeHandshake(node, handshakeChannel, connectionProfile.getHandshakeTimeout());
|
|
|
+ } catch (Exception ex) {
|
|
|
+ TcpChannel.closeChannels(channels, false);
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we make it past the block above, we have successfully completed the handshake and the connection is now open.
|
|
|
+ // At this point we should construct the connection, notify the transport service, and attach close listeners to the
|
|
|
+ // underlying channels.
|
|
|
+ nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
|
|
|
+ transportService.onConnectionOpened(nodeChannels);
|
|
|
+ final NodeChannels finalNodeChannels = nodeChannels;
|
|
|
final AtomicBoolean runOnce = new AtomicBoolean(false);
|
|
|
- final AtomicReference<NodeChannels> connectionRef = new AtomicReference<>();
|
|
|
Consumer<Channel> onClose = c -> {
|
|
|
- assert isOpen(c) == false : "channel is still open when onClose is called";
|
|
|
- try {
|
|
|
- onChannelClosed(c);
|
|
|
- } finally {
|
|
|
- // we only need to disconnect from the nodes once since all other channels
|
|
|
- // will also try to run this we protect it from running multiple times.
|
|
|
- if (runOnce.compareAndSet(false, true)) {
|
|
|
- NodeChannels connection = connectionRef.get();
|
|
|
- if (connection != null) {
|
|
|
- disconnectFromNodeCloseAndNotify(node, connection);
|
|
|
- }
|
|
|
- }
|
|
|
+ assert c.isOpen() == false : "channel is still open when onClose is called";
|
|
|
+ // we only need to disconnect from the nodes once since all other channels
|
|
|
+ // will also try to run this we protect it from running multiple times.
|
|
|
+ if (runOnce.compareAndSet(false, true)) {
|
|
|
+ disconnectFromNodeCloseAndNotify(node, finalNodeChannels);
|
|
|
}
|
|
|
};
|
|
|
- nodeChannels = connectToChannels(node, connectionProfile, onClose);
|
|
|
- final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile
|
|
|
- final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
|
|
|
- defaultConnectionProfile.getConnectTimeout() :
|
|
|
- connectionProfile.getConnectTimeout();
|
|
|
- final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
|
|
|
- connectTimeout : connectionProfile.getHandshakeTimeout();
|
|
|
- final Version version = executeHandshake(node, channel, handshakeTimeout);
|
|
|
- nodeChannels = new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
|
|
|
- transportService.onConnectionOpened(nodeChannels);
|
|
|
- connectionRef.set(nodeChannels);
|
|
|
- if (Arrays.stream(nodeChannels.channels).allMatch(this::isOpen) == false) {
|
|
|
+
|
|
|
+ nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(() -> onClose.accept(ch))));
|
|
|
+
|
|
|
+ if (nodeChannels.allChannelsOpen() == false) {
|
|
|
throw new ConnectTransportException(node, "a channel closed while connecting");
|
|
|
}
|
|
|
success = true;
|
|
@@ -637,19 +684,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Disconnects from a node if a channel is found as part of that nodes channels.
|
|
|
- */
|
|
|
- protected final void closeChannelWhileHandlingExceptions(final Channel channel) {
|
|
|
- if (isOpen(channel)) {
|
|
|
- try {
|
|
|
- closeChannels(Collections.singletonList(channel), false, false);
|
|
|
- } catch (IOException e) {
|
|
|
- logger.warn("failed to close channel", e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public NodeChannels getConnection(DiscoveryNode node) {
|
|
|
NodeChannels nodeChannels = connectedNodes.get(node);
|
|
@@ -904,12 +938,20 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
try {
|
|
|
// first stop to accept any incoming connections so nobody can connect to this transport
|
|
|
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
|
|
|
- try {
|
|
|
- closeChannels(entry.getValue(), true, false);
|
|
|
- } catch (Exception e) {
|
|
|
- logger.warn(new ParameterizedMessage("Error closing serverChannel for profile [{}]", entry.getKey()), e);
|
|
|
- }
|
|
|
+ String profile = entry.getKey();
|
|
|
+ List<Channel> channels = entry.getValue();
|
|
|
+ ActionListener<TcpChannel> closeFailLogger = ActionListener.wrap(c -> {},
|
|
|
+ e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e));
|
|
|
+ channels.forEach(c -> c.addCloseListener(closeFailLogger));
|
|
|
+ TcpChannel.closeChannels(channels, true);
|
|
|
}
|
|
|
+ serverChannels.clear();
|
|
|
+
|
|
|
+ // close all of the incoming channels. The closeChannels method takes a list so we must convert the set.
|
|
|
+ TcpChannel.closeChannels(new ArrayList<>(acceptedChannels), true);
|
|
|
+ acceptedChannels.clear();
|
|
|
+
|
|
|
+
|
|
|
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
|
|
|
// all instances and then clear them maps
|
|
|
Iterator<Map.Entry<DiscoveryNode, NodeChannels>> iterator = connectedNodes.entrySet().iterator();
|
|
@@ -940,7 +982,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
protected void onException(Channel channel, Exception e) {
|
|
|
if (!lifecycle.started()) {
|
|
|
// just close and ignore - we are already stopped and just need to make sure we release all resources
|
|
|
- closeChannelWhileHandlingExceptions(channel);
|
|
|
+ TcpChannel.closeChannel(channel, false);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -951,15 +993,15 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
channel),
|
|
|
e);
|
|
|
// close the channel, which will cause a node to be disconnected if relevant
|
|
|
- closeChannelWhileHandlingExceptions(channel);
|
|
|
+ TcpChannel.closeChannel(channel, false);
|
|
|
} else if (isConnectException(e)) {
|
|
|
logger.trace((Supplier<?>) () -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e);
|
|
|
// close the channel as safe measure, which will cause a node to be disconnected if relevant
|
|
|
- closeChannelWhileHandlingExceptions(channel);
|
|
|
+ TcpChannel.closeChannel(channel, false);
|
|
|
} else if (e instanceof BindException) {
|
|
|
logger.trace((Supplier<?>) () -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e);
|
|
|
// close the channel as safe measure, which will cause a node to be disconnected if relevant
|
|
|
- closeChannelWhileHandlingExceptions(channel);
|
|
|
+ TcpChannel.closeChannel(channel, false);
|
|
|
} else if (e instanceof CancelledKeyException) {
|
|
|
logger.trace(
|
|
|
(Supplier<?>) () -> new ParameterizedMessage(
|
|
@@ -967,29 +1009,21 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
channel),
|
|
|
e);
|
|
|
// close the channel as safe measure, which will cause a node to be disconnected if relevant
|
|
|
- closeChannelWhileHandlingExceptions(channel);
|
|
|
+ TcpChannel.closeChannel(channel, false);
|
|
|
} else if (e instanceof TcpTransport.HttpOnTransportException) {
|
|
|
// in case we are able to return data, serialize the exception content and sent it back to the client
|
|
|
- if (isOpen(channel)) {
|
|
|
+ if (channel.isOpen()) {
|
|
|
BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8));
|
|
|
final SendMetricListener<Channel> closeChannel = new SendMetricListener<Channel>(message.length()) {
|
|
|
@Override
|
|
|
protected void innerInnerOnResponse(Channel channel) {
|
|
|
- try {
|
|
|
- closeChannels(Collections.singletonList(channel), false, false);
|
|
|
- } catch (IOException e1) {
|
|
|
- logger.debug("failed to close httpOnTransport channel", e1);
|
|
|
- }
|
|
|
+ TcpChannel.closeChannel(channel, false);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void innerOnFailure(Exception e) {
|
|
|
- try {
|
|
|
- closeChannels(Collections.singletonList(channel), false, false);
|
|
|
- } catch (IOException e1) {
|
|
|
- e.addSuppressed(e1);
|
|
|
- logger.debug("failed to close httpOnTransport channel", e1);
|
|
|
- }
|
|
|
+ logger.debug("failed to send message to httpOnTransport channel", e);
|
|
|
+ TcpChannel.closeChannel(channel, false);
|
|
|
}
|
|
|
};
|
|
|
internalSendMessage(channel, message, closeChannel);
|
|
@@ -998,10 +1032,16 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
logger.warn(
|
|
|
(Supplier<?>) () -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e);
|
|
|
// close the channel, which will cause a node to be disconnected if relevant
|
|
|
- closeChannelWhileHandlingExceptions(channel);
|
|
|
+ TcpChannel.closeChannel(channel, false);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ protected void serverAcceptedChannel(Channel channel) {
|
|
|
+ boolean addedOnThisCall = acceptedChannels.add(channel);
|
|
|
+ assert addedOnThisCall : "Channel should only be added to accept channel set once";
|
|
|
+ channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Returns the channels local address
|
|
|
*/
|
|
@@ -1015,44 +1055,34 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
*/
|
|
|
protected abstract Channel bind(String name, InetSocketAddress address) throws IOException;
|
|
|
|
|
|
- /**
|
|
|
- * Closes all channels in this list. If the blocking boolean is set to true, the channels must be
|
|
|
- * closed before the method returns. This should never be called with blocking set to true from a
|
|
|
- * network thread.
|
|
|
- *
|
|
|
- * @param channels the channels to close
|
|
|
- * @param blocking whether the channels should be closed synchronously
|
|
|
- * @param doNotLinger whether we abort the connection on RST instead of FIN
|
|
|
- */
|
|
|
- protected abstract void closeChannels(List<Channel> channels, boolean blocking, boolean doNotLinger) throws IOException;
|
|
|
-
|
|
|
/**
|
|
|
* Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception
|
|
|
* is thrown during the send. If an exception is thrown, the listener's onException method will be called.
|
|
|
- * @param channel the destination channel
|
|
|
+ *
|
|
|
+ * @param channel the destination channel
|
|
|
* @param reference the byte reference for the message
|
|
|
- * @param listener the listener to call when the operation has completed
|
|
|
+ * @param listener the listener to call when the operation has completed
|
|
|
*/
|
|
|
protected abstract void sendMessage(Channel channel, BytesReference reference, ActionListener<Channel> listener);
|
|
|
|
|
|
/**
|
|
|
- * Connect to the node with channels as defined by the specified connection profile. Implementations must invoke the specified channel
|
|
|
- * close callback when a channel is closed.
|
|
|
+ * Initiate a single tcp socket channel to a node. Implementations do not have to observe the connectTimeout.
|
|
|
+ * It is provided for synchronous connection implementations.
|
|
|
*
|
|
|
- * @param node the node to connect to
|
|
|
- * @param connectionProfile the connection profile
|
|
|
- * @param onChannelClose callback to invoke when a channel is closed
|
|
|
- * @return the channels
|
|
|
- * @throws IOException if an I/O exception occurs while opening channels
|
|
|
+ * @param node the node
|
|
|
+ * @param connectTimeout the connection timeout
|
|
|
+ * @param connectListener listener to be called when connection complete
|
|
|
+ * @return the pending connection
|
|
|
+ * @throws IOException if an I/O exception occurs while opening the channel
|
|
|
*/
|
|
|
- protected abstract NodeChannels connectToChannels(DiscoveryNode node,
|
|
|
- ConnectionProfile connectionProfile,
|
|
|
- Consumer<Channel> onChannelClose) throws IOException;
|
|
|
+ protected abstract Channel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Channel> connectListener)
|
|
|
+ throws IOException;
|
|
|
|
|
|
/**
|
|
|
* Called to tear down internal resources
|
|
|
*/
|
|
|
- protected void stopInternal() {}
|
|
|
+ protected void stopInternal() {
|
|
|
+ }
|
|
|
|
|
|
public boolean canCompress(TransportRequest request) {
|
|
|
return compress && (!(request instanceof BytesTransportRequest));
|
|
@@ -1118,10 +1148,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
* Sends back an error response to the caller via the given channel
|
|
|
*
|
|
|
* @param nodeVersion the caller node version
|
|
|
- * @param channel the channel to send the response to
|
|
|
- * @param error the error to return
|
|
|
- * @param requestId the request ID this response replies to
|
|
|
- * @param action the action this response replies to
|
|
|
+ * @param channel the channel to send the response to
|
|
|
+ * @param error the error to return
|
|
|
+ * @param requestId the request ID this response replies to
|
|
|
+ * @param action the action this response replies to
|
|
|
*/
|
|
|
public void sendErrorResponse(Version nodeVersion, Channel channel, final Exception error, final long requestId,
|
|
|
final String action) throws IOException {
|
|
@@ -1146,7 +1176,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
/**
|
|
|
* Sends the response to the given channel. This method should be used to send {@link TransportResponse} objects back to the caller.
|
|
|
*
|
|
|
- * @see #sendErrorResponse(Version, Object, Exception, long, String) for sending back errors to the caller
|
|
|
+ * @see #sendErrorResponse(Version, TcpChannel, Exception, long, String) for sending back errors to the caller
|
|
|
*/
|
|
|
public void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId,
|
|
|
final String action, TransportResponseOptions options) throws IOException {
|
|
@@ -1154,7 +1184,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
}
|
|
|
|
|
|
private void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId,
|
|
|
- final String action, TransportResponseOptions options, byte status) throws IOException {
|
|
|
+ final String action, TransportResponseOptions options, byte status) throws IOException {
|
|
|
if (compress) {
|
|
|
options = TransportResponseOptions.builder(options).withCompress(true).build();
|
|
|
}
|
|
@@ -1232,10 +1262,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
* Validates the first N bytes of the message header and returns <code>false</code> if the message is
|
|
|
* a ping message and has no payload ie. isn't a real user level message.
|
|
|
*
|
|
|
- * @throws IllegalStateException if the message is too short, less than the header or less that the header plus the message size
|
|
|
+ * @throws IllegalStateException if the message is too short, less than the header or less that the header plus the message size
|
|
|
* @throws HttpOnTransportException if the message has no valid header and appears to be a HTTP message
|
|
|
* @throws IllegalArgumentException if the message is greater that the maximum allowed frame size. This is dependent on the available
|
|
|
- * memory.
|
|
|
+ * memory.
|
|
|
*/
|
|
|
public static boolean validateMessageHeader(BytesReference buffer) throws IOException {
|
|
|
final int sizeHeaderLength = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
|
|
@@ -1246,23 +1276,23 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
if (buffer.get(offset) != 'E' || buffer.get(offset + 1) != 'S') {
|
|
|
// special handling for what is probably HTTP
|
|
|
if (bufferStartsWith(buffer, offset, "GET ") ||
|
|
|
- bufferStartsWith(buffer, offset, "POST ") ||
|
|
|
- bufferStartsWith(buffer, offset, "PUT ") ||
|
|
|
- bufferStartsWith(buffer, offset, "HEAD ") ||
|
|
|
- bufferStartsWith(buffer, offset, "DELETE ") ||
|
|
|
- bufferStartsWith(buffer, offset, "OPTIONS ") ||
|
|
|
- bufferStartsWith(buffer, offset, "PATCH ") ||
|
|
|
- bufferStartsWith(buffer, offset, "TRACE ")) {
|
|
|
+ bufferStartsWith(buffer, offset, "POST ") ||
|
|
|
+ bufferStartsWith(buffer, offset, "PUT ") ||
|
|
|
+ bufferStartsWith(buffer, offset, "HEAD ") ||
|
|
|
+ bufferStartsWith(buffer, offset, "DELETE ") ||
|
|
|
+ bufferStartsWith(buffer, offset, "OPTIONS ") ||
|
|
|
+ bufferStartsWith(buffer, offset, "PATCH ") ||
|
|
|
+ bufferStartsWith(buffer, offset, "TRACE ")) {
|
|
|
|
|
|
throw new HttpOnTransportException("This is not a HTTP port");
|
|
|
}
|
|
|
|
|
|
// we have 6 readable bytes, show 4 (should be enough)
|
|
|
throw new StreamCorruptedException("invalid internal transport message format, got ("
|
|
|
- + Integer.toHexString(buffer.get(offset) & 0xFF) + ","
|
|
|
- + Integer.toHexString(buffer.get(offset + 1) & 0xFF) + ","
|
|
|
- + Integer.toHexString(buffer.get(offset + 2) & 0xFF) + ","
|
|
|
- + Integer.toHexString(buffer.get(offset + 3) & 0xFF) + ")");
|
|
|
+ + Integer.toHexString(buffer.get(offset) & 0xFF) + ","
|
|
|
+ + Integer.toHexString(buffer.get(offset + 1) & 0xFF) + ","
|
|
|
+ + Integer.toHexString(buffer.get(offset + 2) & 0xFF) + ","
|
|
|
+ + Integer.toHexString(buffer.get(offset + 3) & 0xFF) + ")");
|
|
|
}
|
|
|
|
|
|
final int dataLen;
|
|
@@ -1322,8 +1352,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected abstract boolean isOpen(Channel channel);
|
|
|
-
|
|
|
/**
|
|
|
* This method handles the message receive part for both request and responses
|
|
|
*/
|
|
@@ -1410,7 +1438,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
final Version compatibilityVersion = isHandshake ? currentVersion.minimumCompatibilityVersion() : currentVersion;
|
|
|
if (version.isCompatible(compatibilityVersion) == false) {
|
|
|
final Version minCompatibilityVersion = isHandshake ? compatibilityVersion : compatibilityVersion.minimumCompatibilityVersion();
|
|
|
- String msg = "Received " + (isHandshake? "handshake " : "") + "message from unsupported version: [";
|
|
|
+ String msg = "Received " + (isHandshake ? "handshake " : "") + "message from unsupported version: [";
|
|
|
throw new IllegalStateException(msg + version + "] minimal compatible version is: [" + minCompatibilityVersion + "]");
|
|
|
}
|
|
|
}
|
|
@@ -1566,7 +1594,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
this.version = version;
|
|
|
}
|
|
|
|
|
|
- private VersionHandshakeResponse() {}
|
|
|
+ private VersionHandshakeResponse() {
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public void readFrom(StreamInput in) throws IOException {
|
|
@@ -1591,7 +1620,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
pendingHandshakes.put(requestId, handler);
|
|
|
boolean success = false;
|
|
|
try {
|
|
|
- if (isOpen(channel) == false) {
|
|
|
+ if (channel.isOpen() == false) {
|
|
|
// we have to protect us here since sendRequestToChannel won't barf if the channel is closed.
|
|
|
// it's weird but to change it will cause a lot of impact on the exception handling code all over the codebase.
|
|
|
// yet, if we don't check the state here we might have registered a pending handshake handler but the close
|
|
@@ -1642,9 +1671,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
/**
|
|
|
* Called once the channel is closed for instance due to a disconnect or a closed socket etc.
|
|
|
*/
|
|
|
- private void onChannelClosed(Channel channel) {
|
|
|
+ private void cancelHandshakeForChannel(Channel channel) {
|
|
|
final Optional<Long> first = pendingHandshakes.entrySet().stream()
|
|
|
- .filter((entry) -> entry.getValue().channel == channel).map((e) -> e.getKey()).findFirst();
|
|
|
+ .filter((entry) -> entry.getValue().channel == channel).map(Map.Entry::getKey).findFirst();
|
|
|
if (first.isPresent()) {
|
|
|
final Long requestId = first.get();
|
|
|
final HandshakeResponseHandler handler = pendingHandshakes.remove(requestId);
|
|
@@ -1778,5 +1807,4 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|
|
PUBLISH_PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|