|
@@ -33,8 +33,11 @@ import org.junit.Before;
|
|
|
|
|
|
import java.net.InetAddress;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -124,16 +127,22 @@ public class ConnectionManagerTests extends ESTestCase {
|
|
|
assertEquals(1, nodeDisconnectedCount.get());
|
|
|
}
|
|
|
|
|
|
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/49903")
|
|
|
- public void testConcurrentConnectsAndDisconnects() throws BrokenBarrierException, InterruptedException {
|
|
|
+ public void testConcurrentConnects() throws Exception {
|
|
|
+ Set<Transport.Connection> connections = Collections.newSetFromMap(new ConcurrentHashMap<>());
|
|
|
+
|
|
|
DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
|
|
|
- Transport.Connection connection = new TestConnect(node);
|
|
|
doAnswer(invocationOnMock -> {
|
|
|
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
|
|
|
- if (rarely()) {
|
|
|
- listener.onResponse(connection);
|
|
|
- } else if (frequently()) {
|
|
|
- threadPool.generic().execute(() -> listener.onResponse(connection));
|
|
|
+
|
|
|
+ boolean success = randomBoolean();
|
|
|
+ if (success) {
|
|
|
+ Transport.Connection connection = new TestConnect(node);
|
|
|
+ connections.add(connection);
|
|
|
+ if (randomBoolean()) {
|
|
|
+ listener.onResponse(connection);
|
|
|
+ } else {
|
|
|
+ threadPool.generic().execute(() -> listener.onResponse(connection));
|
|
|
+ }
|
|
|
} else {
|
|
|
threadPool.generic().execute(() -> listener.onFailure(new IllegalStateException("dummy exception")));
|
|
|
}
|
|
@@ -143,10 +152,13 @@ public class ConnectionManagerTests extends ESTestCase {
|
|
|
assertFalse(connectionManager.nodeConnected(node));
|
|
|
|
|
|
ConnectionManager.ConnectionValidator validator = (c, p, l) -> {
|
|
|
- if (rarely()) {
|
|
|
- l.onResponse(null);
|
|
|
- } else if (frequently()) {
|
|
|
- threadPool.generic().execute(() -> l.onResponse(null));
|
|
|
+ boolean success = randomBoolean();
|
|
|
+ if (success) {
|
|
|
+ if (randomBoolean()) {
|
|
|
+ l.onResponse(null);
|
|
|
+ } else {
|
|
|
+ threadPool.generic().execute(() -> l.onResponse(null));
|
|
|
+ }
|
|
|
} else {
|
|
|
threadPool.generic().execute(() -> l.onFailure(new IllegalStateException("dummy exception")));
|
|
|
}
|
|
@@ -198,6 +210,24 @@ public class ConnectionManagerTests extends ESTestCase {
|
|
|
});
|
|
|
|
|
|
assertEquals(10, nodeConnectedCount.get() + nodeFailureCount.get());
|
|
|
+
|
|
|
+ int managedConnections = connectionManager.size();
|
|
|
+ if (managedConnections != 0) {
|
|
|
+ assertEquals(1, managedConnections);
|
|
|
+
|
|
|
+ // Only a single connection attempt should be open.
|
|
|
+ assertEquals(1, connections.stream().filter(c -> c.isClosed() == false).count());
|
|
|
+ } else {
|
|
|
+ // No connections succeeded
|
|
|
+ assertEquals(0, connections.stream().filter(c -> c.isClosed() == false).count());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ connectionManager.close();
|
|
|
+ // The connection manager will close all open connections
|
|
|
+ for (Transport.Connection connection : connections) {
|
|
|
+ assertTrue(connection.isClosed());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void testConnectFailsDuringValidation() {
|