|
@@ -20,22 +20,19 @@ package org.elasticsearch.transport;
|
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
-import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
-import org.elasticsearch.common.lease.Releasable;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
-import org.elasticsearch.common.util.concurrent.KeyedLock;
|
|
|
+import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
|
|
|
import org.elasticsearch.common.util.concurrent.RunOnce;
|
|
|
import org.elasticsearch.core.internal.io.IOUtils;
|
|
|
|
|
|
import java.io.Closeable;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Iterator;
|
|
|
-import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
@@ -53,8 +50,7 @@ public class ConnectionManager implements Closeable {
|
|
|
private static final Logger logger = LogManager.getLogger(ConnectionManager.class);
|
|
|
|
|
|
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
|
|
|
- private final KeyedLock<String> connectionLock = new KeyedLock<>(); // protects concurrent access to connectingNodes
|
|
|
- private final Map<DiscoveryNode, List<ActionListener<Void>>> connectingNodes = ConcurrentCollections.newConcurrentMap();
|
|
|
+ private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
|
|
|
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
|
|
|
@Override
|
|
|
protected void closeInternal() {
|
|
@@ -122,40 +118,37 @@ public class ConnectionManager implements Closeable {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- try (Releasable lock = connectionLock.acquire(node.getId())) {
|
|
|
- Transport.Connection connection = connectedNodes.get(node);
|
|
|
- if (connection != null) {
|
|
|
- assert connectingNodes.containsKey(node) == false;
|
|
|
- lock.close();
|
|
|
- connectingRefCounter.decRef();
|
|
|
- listener.onResponse(null);
|
|
|
- return;
|
|
|
- }
|
|
|
+ if (connectedNodes.containsKey(node)) {
|
|
|
+ connectingRefCounter.decRef();
|
|
|
+ listener.onResponse(null);
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- final List<ActionListener<Void>> connectionListeners = connectingNodes.computeIfAbsent(node, n -> new ArrayList<>());
|
|
|
- connectionListeners.add(listener);
|
|
|
- if (connectionListeners.size() > 1) {
|
|
|
+ final ListenableFuture<Void> currentListener = new ListenableFuture<>();
|
|
|
+ final ListenableFuture<Void> existingListener = pendingConnections.putIfAbsent(node, currentListener);
|
|
|
+ if (existingListener != null) {
|
|
|
+ try {
|
|
|
// wait on previous entry to complete connection attempt
|
|
|
+ existingListener.addListener(listener, EsExecutors.newDirectExecutorService());
|
|
|
+ } finally {
|
|
|
connectingRefCounter.decRef();
|
|
|
- return;
|
|
|
}
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
- final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
|
|
|
+ currentListener.addListener(listener, EsExecutors.newDirectExecutorService());
|
|
|
|
|
|
+ final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef);
|
|
|
internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> {
|
|
|
connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap(
|
|
|
ignored -> {
|
|
|
assert Transports.assertNotTransportThread("connection validator success");
|
|
|
- boolean success = false;
|
|
|
- List<ActionListener<Void>> listeners = null;
|
|
|
try {
|
|
|
- // we acquire a connection lock, so no way there is an existing connection
|
|
|
- try (Releasable ignored2 = connectionLock.acquire(node.getId())) {
|
|
|
- connectedNodes.put(node, conn);
|
|
|
- if (logger.isDebugEnabled()) {
|
|
|
- logger.debug("connected to node [{}]", node);
|
|
|
- }
|
|
|
+ if (connectedNodes.putIfAbsent(node, conn) != null) {
|
|
|
+ logger.debug("existing connection to node [{}], closing new redundant connection", node);
|
|
|
+ IOUtils.closeWhileHandlingException(conn);
|
|
|
+ } else {
|
|
|
+ logger.debug("connected to node [{}]", node);
|
|
|
try {
|
|
|
connectionListener.onNodeConnected(node);
|
|
|
} finally {
|
|
@@ -166,45 +159,21 @@ public class ConnectionManager implements Closeable {
|
|
|
connectionListener.onNodeDisconnected(node);
|
|
|
}));
|
|
|
}
|
|
|
- if (conn.isClosed()) {
|
|
|
- throw new NodeNotConnectedException(node, "connection concurrently closed");
|
|
|
- }
|
|
|
- success = true;
|
|
|
- listeners = connectingNodes.remove(node);
|
|
|
}
|
|
|
- } catch (ConnectTransportException e) {
|
|
|
- throw e;
|
|
|
- } catch (Exception e) {
|
|
|
- throw new ConnectTransportException(node, "general node connection failure", e);
|
|
|
} finally {
|
|
|
- if (success == false) { // close the connection if there is a failure
|
|
|
- logger.trace(() -> new ParameterizedMessage("failed to connect to [{}], cleaning dangling connections", node));
|
|
|
- IOUtils.closeWhileHandlingException(conn);
|
|
|
- } else {
|
|
|
- releaseOnce.run();
|
|
|
- ActionListener.onResponse(listeners, null);
|
|
|
- }
|
|
|
+ ListenableFuture<Void> future = pendingConnections.remove(node);
|
|
|
+ assert future == currentListener : "Listener in pending map is different than the expected listener";
|
|
|
+ releaseOnce.run();
|
|
|
+ future.onResponse(null);
|
|
|
}
|
|
|
}, e -> {
|
|
|
assert Transports.assertNotTransportThread("connection validator failure");
|
|
|
IOUtils.closeWhileHandlingException(conn);
|
|
|
- final List<ActionListener<Void>> listeners;
|
|
|
- try (Releasable ignored = connectionLock.acquire(node.getId())) {
|
|
|
- listeners = connectingNodes.remove(node);
|
|
|
- }
|
|
|
- releaseOnce.run();
|
|
|
- ActionListener.onFailure(listeners, e);
|
|
|
+ failConnectionListeners(node, releaseOnce, e, currentListener);
|
|
|
}));
|
|
|
}, e -> {
|
|
|
assert Transports.assertNotTransportThread("internalOpenConnection failure");
|
|
|
- final List<ActionListener<Void>> listeners;
|
|
|
- try (Releasable ignored = connectionLock.acquire(node.getId())) {
|
|
|
- listeners = connectingNodes.remove(node);
|
|
|
- }
|
|
|
- releaseOnce.run();
|
|
|
- if (listeners != null) {
|
|
|
- ActionListener.onFailure(listeners, e);
|
|
|
- }
|
|
|
+ failConnectionListeners(node, releaseOnce, e, currentListener);
|
|
|
}));
|
|
|
}
|
|
|
|
|
@@ -296,6 +265,15 @@ public class ConnectionManager implements Closeable {
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
+ private void failConnectionListeners(DiscoveryNode node, RunOnce releaseOnce, Exception e, ListenableFuture<Void> expectedListener) {
|
|
|
+ ListenableFuture<Void> future = pendingConnections.remove(node);
|
|
|
+ releaseOnce.run();
|
|
|
+ if (future != null) {
|
|
|
+ assert future == expectedListener : "Listener in pending map is different than the expected listener";
|
|
|
+ future.onFailure(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
ConnectionProfile getConnectionProfile() {
|
|
|
return defaultProfile;
|
|
|
}
|