|
@@ -227,18 +227,17 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
// ISE if we fail the handshake with an version incompatible node
|
|
|
if (seedNodes.hasNext()) {
|
|
|
logger.debug(() -> new ParameterizedMessage(
|
|
|
- "fetching nodes from external cluster [{}] failed moving to next node", clusterAlias), e);
|
|
|
+ "fetching nodes from external cluster [{}] failed moving to next seed node", clusterAlias), e);
|
|
|
collectRemoteNodes(seedNodes, listener);
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
- logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), e);
|
|
|
+ logger.warn(new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), e);
|
|
|
listener.onFailure(e);
|
|
|
};
|
|
|
|
|
|
final DiscoveryNode seedNode = seedNodes.next().get();
|
|
|
- logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode,
|
|
|
- proxyAddress);
|
|
|
+ logger.trace("[{}] opening transient connection to seed node: [{}]", clusterAlias, seedNode);
|
|
|
final StepListener<Transport.Connection> openConnectionStep = new StepListener<>();
|
|
|
try {
|
|
|
connectionManager.openConnection(seedNode, null, openConnectionStep);
|
|
@@ -255,17 +254,21 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
|
|
|
final StepListener<Void> fullConnectionStep = new StepListener<>();
|
|
|
handshakeStep.whenComplete(handshakeResponse -> {
|
|
|
- final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode());
|
|
|
+ final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
|
|
|
|
|
|
if (nodePredicate.test(handshakeNode) && shouldOpenMoreConnections()) {
|
|
|
- connectionManager.connectToNode(handshakeNode, null,
|
|
|
- transportService.connectionValidator(handshakeNode), fullConnectionStep);
|
|
|
+ logger.trace("[{}] opening managed connection to seed node: [{}] proxy address: [{}]", clusterAlias, handshakeNode,
|
|
|
+ proxyAddress);
|
|
|
+ final DiscoveryNode handshakeNodeWithProxy = maybeAddProxyAddress(proxyAddress, handshakeNode);
|
|
|
+ connectionManager.connectToNode(handshakeNodeWithProxy, null,
|
|
|
+ transportService.connectionValidator(handshakeNodeWithProxy), fullConnectionStep);
|
|
|
} else {
|
|
|
fullConnectionStep.onResponse(null);
|
|
|
}
|
|
|
}, e -> {
|
|
|
final Transport.Connection connection = openConnectionStep.result();
|
|
|
- logger.warn(new ParameterizedMessage("failed to connect to seed node [{}]", connection.getNode()), e);
|
|
|
+ final DiscoveryNode node = connection.getNode();
|
|
|
+ logger.debug(() -> new ParameterizedMessage("[{}] failed to handshake with seed node: [{}]", clusterAlias, node), e);
|
|
|
IOUtils.closeWhileHandlingException(connection);
|
|
|
onFailure.accept(e);
|
|
|
});
|
|
@@ -297,6 +300,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
responseHandler);
|
|
|
}
|
|
|
}, e -> {
|
|
|
+ final Transport.Connection connection = openConnectionStep.result();
|
|
|
+ final DiscoveryNode node = connection.getNode();
|
|
|
+ logger.debug(() -> new ParameterizedMessage(
|
|
|
+ "[{}] failed to open managed connection to seed node: [{}]", clusterAlias, node), e);
|
|
|
IOUtils.closeWhileHandlingException(openConnectionStep.result());
|
|
|
onFailure.accept(e);
|
|
|
});
|
|
@@ -331,9 +338,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
|
|
|
private void handleNodes(Iterator<DiscoveryNode> nodesIter) {
|
|
|
while (nodesIter.hasNext()) {
|
|
|
- final DiscoveryNode node = maybeAddProxyAddress(proxyAddress, nodesIter.next());
|
|
|
+ final DiscoveryNode node = nodesIter.next();
|
|
|
if (nodePredicate.test(node) && shouldOpenMoreConnections()) {
|
|
|
- connectionManager.connectToNode(node, null,
|
|
|
+ logger.trace("[{}] opening managed connection to node: [{}] proxy address: [{}]", clusterAlias, node, proxyAddress);
|
|
|
+ final DiscoveryNode nodeWithProxy = maybeAddProxyAddress(proxyAddress, node);
|
|
|
+ connectionManager.connectToNode(nodeWithProxy, null,
|
|
|
transportService.connectionValidator(node), new ActionListener<>() {
|
|
|
@Override
|
|
|
public void onResponse(Void aVoid) {
|
|
@@ -345,11 +354,12 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
if (e instanceof ConnectTransportException || e instanceof IllegalStateException) {
|
|
|
// ISE if we fail the handshake with an version incompatible node
|
|
|
// fair enough we can't connect just move on
|
|
|
- logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", node), e);
|
|
|
+ logger.debug(() -> new ParameterizedMessage(
|
|
|
+ "[{}] failed to open managed connection to node [{}]", clusterAlias, node), e);
|
|
|
handleNodes(nodesIter);
|
|
|
} else {
|
|
|
- logger.warn(() ->
|
|
|
- new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), e);
|
|
|
+ logger.warn(new ParameterizedMessage(
|
|
|
+ "[{}] failed to open managed connection to node [{}]", clusterAlias, node), e);
|
|
|
IOUtils.closeWhileHandlingException(connection);
|
|
|
collectRemoteNodes(seedNodes, listener);
|
|
|
}
|
|
@@ -372,7 +382,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
|
- logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), exp);
|
|
|
+ logger.warn(new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), exp);
|
|
|
try {
|
|
|
IOUtils.closeWhileHandlingException(connection);
|
|
|
} finally {
|