|  | @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.concurrent.RunOnce;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.concurrent.ThreadContext;
 | 
	
		
			
				|  |  | +import org.elasticsearch.core.AbstractRefCounted;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.Releasable;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.Releasables;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.TimeValue;
 | 
	
	
		
			
				|  | @@ -116,15 +117,16 @@ public class ClusterConnectionManagerTests extends ESTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          assertFalse(connectionManager.nodeConnected(node));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        AtomicReference<Transport.Connection> connectionRef = new AtomicReference<>();
 | 
	
		
			
				|  |  | +        final var validatedConnectionRef = new AtomicReference<Transport.Connection>();
 | 
	
		
			
				|  |  |          ConnectionManager.ConnectionValidator validator = (c, p, l) -> {
 | 
	
		
			
				|  |  | -            connectionRef.set(c);
 | 
	
		
			
				|  |  | +            validatedConnectionRef.set(c);
 | 
	
		
			
				|  |  |              l.onResponse(null);
 | 
	
		
			
				|  |  |          };
 | 
	
		
			
				|  |  |          PlainActionFuture.get(fut -> connectionManager.connectToNode(node, connectionProfile, validator, fut.map(x -> null)));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          assertFalse(connection.isClosed());
 | 
	
		
			
				|  |  |          assertTrue(connectionManager.nodeConnected(node));
 | 
	
		
			
				|  |  | +        assertSame(connection, validatedConnectionRef.get());
 | 
	
		
			
				|  |  |          assertSame(connection, connectionManager.getConnection(node));
 | 
	
		
			
				|  |  |          assertEquals(1, connectionManager.size());
 | 
	
		
			
				|  |  |          assertEquals(1, nodeConnectedCount.get());
 | 
	
	
		
			
				|  | @@ -145,7 +147,7 @@ public class ClusterConnectionManagerTests extends ESTestCase {
 | 
	
		
			
				|  |  |          reason = "testing log messages emitted on disconnect",
 | 
	
		
			
				|  |  |          value = "org.elasticsearch.transport.ClusterConnectionManager:TRACE"
 | 
	
		
			
				|  |  |      )
 | 
	
		
			
				|  |  | -    public void testDisconnectLogging() throws IllegalAccessException {
 | 
	
		
			
				|  |  | +    public void testDisconnectLogging() {
 | 
	
		
			
				|  |  |          final Supplier<DiscoveryNode> nodeFactory = () -> new DiscoveryNode(
 | 
	
		
			
				|  |  |              randomAlphaOfLength(10),
 | 
	
		
			
				|  |  |              new TransportAddress(InetAddress.getLoopbackAddress(), 0),
 | 
	
	
		
			
				|  | @@ -495,14 +497,17 @@ public class ClusterConnectionManagerTests extends ESTestCase {
 | 
	
		
			
				|  |  |              });
 | 
	
		
			
				|  |  |          };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        final Semaphore pendingConnections = new Semaphore(between(1, 1000));
 | 
	
		
			
				|  |  | +        final int connectionCount = between(1, 1000);
 | 
	
		
			
				|  |  | +        final int disconnectionCount = randomFrom(connectionCount, connectionCount - 1, between(0, connectionCount - 1));
 | 
	
		
			
				|  |  | +        final var connectionPermits = new Semaphore(connectionCount);
 | 
	
		
			
				|  |  | +        final var disconnectionPermits = new Semaphore(disconnectionCount);
 | 
	
		
			
				|  |  |          final int threadCount = between(1, 10);
 | 
	
		
			
				|  |  | -        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
 | 
	
		
			
				|  |  | +        final var countDownLatch = new CountDownLatch(threadCount);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final Runnable action = new Runnable() {
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  |              public void run() {
 | 
	
		
			
				|  |  | -                if (pendingConnections.tryAcquire()) {
 | 
	
		
			
				|  |  | +                if (connectionPermits.tryAcquire()) {
 | 
	
		
			
				|  |  |                      connectionManager.connectToNode(node, null, validator, new ActionListener<>() {
 | 
	
		
			
				|  |  |                          @Override
 | 
	
		
			
				|  |  |                          public void onResponse(Releasable releasable) {
 | 
	
	
		
			
				|  | @@ -510,20 +515,26 @@ public class ClusterConnectionManagerTests extends ESTestCase {
 | 
	
		
			
				|  |  |                                  final String description = releasable.toString();
 | 
	
		
			
				|  |  |                                  fail(description);
 | 
	
		
			
				|  |  |                              }
 | 
	
		
			
				|  |  | -                            Releasables.close(releasable);
 | 
	
		
			
				|  |  | -                            threadPool.generic().execute(() -> run());
 | 
	
		
			
				|  |  | +                            if (disconnectionPermits.tryAcquire()) {
 | 
	
		
			
				|  |  | +                                Releasables.close(releasable);
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                            runAgain();
 | 
	
		
			
				|  |  |                          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                          @Override
 | 
	
		
			
				|  |  |                          public void onFailure(Exception e) {
 | 
	
		
			
				|  |  |                              if (e instanceof ConnectTransportException
 | 
	
		
			
				|  |  |                                  && e.getMessage().contains("concurrently connecting and disconnecting")) {
 | 
	
		
			
				|  |  | -                                pendingConnections.release();
 | 
	
		
			
				|  |  | -                                threadPool.generic().execute(() -> run());
 | 
	
		
			
				|  |  | +                                connectionPermits.release();
 | 
	
		
			
				|  |  | +                                runAgain();
 | 
	
		
			
				|  |  |                              } else {
 | 
	
		
			
				|  |  |                                  throw new AssertionError("unexpected", e);
 | 
	
		
			
				|  |  |                              }
 | 
	
		
			
				|  |  |                          }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        private void runAgain() {
 | 
	
		
			
				|  |  | +                            threadPool.generic().execute(() -> run());
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  |                      });
 | 
	
		
			
				|  |  |                  } else {
 | 
	
		
			
				|  |  |                      countDownLatch.countDown();
 | 
	
	
		
			
				|  | @@ -537,7 +548,116 @@ public class ClusterConnectionManagerTests extends ESTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          assertTrue("threads did not all complete", countDownLatch.await(10, TimeUnit.SECONDS));
 | 
	
		
			
				|  |  |          assertTrue("validatorPermits not all released", validatorPermits.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
 | 
	
		
			
				|  |  | -        assertFalse("node still connected", connectionManager.nodeConnected(node));
 | 
	
		
			
				|  |  | +        assertEquals("node still connected", disconnectionCount < connectionCount, connectionManager.nodeConnected(node));
 | 
	
		
			
				|  |  | +        connectionManager.close();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @TestLogging(reason = "ignore copious 'closed by remote' messages", value = "org.elasticsearch.transport.ClusterConnectionManager:WARN")
 | 
	
		
			
				|  |  | +    public void testConcurrentConnectsAndCloses() throws Exception {
 | 
	
		
			
				|  |  | +        final DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
 | 
	
		
			
				|  |  | +        doAnswer(invocationOnMock -> {
 | 
	
		
			
				|  |  | +            @SuppressWarnings("unchecked")
 | 
	
		
			
				|  |  | +            ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
 | 
	
		
			
				|  |  | +            listener.onResponse(new TestConnect(node));
 | 
	
		
			
				|  |  | +            return null;
 | 
	
		
			
				|  |  | +        }).when(transport).openConnection(eq(node), any(), anyActionListener());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final Semaphore validatorPermits = new Semaphore(Integer.MAX_VALUE);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final ConnectionManager.ConnectionValidator validator = (c, p, l) -> {
 | 
	
		
			
				|  |  | +            assertTrue(validatorPermits.tryAcquire());
 | 
	
		
			
				|  |  | +            threadPool.executor(randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME)).execute(() -> {
 | 
	
		
			
				|  |  | +                try {
 | 
	
		
			
				|  |  | +                    l.onResponse(null);
 | 
	
		
			
				|  |  | +                } finally {
 | 
	
		
			
				|  |  | +                    validatorPermits.release();
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            });
 | 
	
		
			
				|  |  | +        };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final var closePermits = new Semaphore(between(1, 1000));
 | 
	
		
			
				|  |  | +        final int connectThreadCount = between(1, 3);
 | 
	
		
			
				|  |  | +        final int closeThreadCount = between(1, 3);
 | 
	
		
			
				|  |  | +        final var countDownLatch = new CountDownLatch(connectThreadCount + closeThreadCount);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final var cleanlyOpenedConnectionFuture = new PlainActionFuture<Boolean>();
 | 
	
		
			
				|  |  | +        final var closingRefs = AbstractRefCounted.of(
 | 
	
		
			
				|  |  | +            () -> connectionManager.connectToNode(
 | 
	
		
			
				|  |  | +                node,
 | 
	
		
			
				|  |  | +                null,
 | 
	
		
			
				|  |  | +                validator,
 | 
	
		
			
				|  |  | +                cleanlyOpenedConnectionFuture.map(r -> connectionManager.nodeConnected(node))
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final Runnable connectAction = new Runnable() {
 | 
	
		
			
				|  |  | +            private void runAgain() {
 | 
	
		
			
				|  |  | +                threadPool.generic().execute(this);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public void run() {
 | 
	
		
			
				|  |  | +                if (cleanlyOpenedConnectionFuture.isDone() == false) {
 | 
	
		
			
				|  |  | +                    connectionManager.connectToNode(node, null, validator, new ActionListener<>() {
 | 
	
		
			
				|  |  | +                        @Override
 | 
	
		
			
				|  |  | +                        public void onResponse(Releasable releasable) {
 | 
	
		
			
				|  |  | +                            runAgain();
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        @Override
 | 
	
		
			
				|  |  | +                        public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | +                            if (e instanceof ConnectTransportException
 | 
	
		
			
				|  |  | +                                && e.getMessage().contains("concurrently connecting and disconnecting")) {
 | 
	
		
			
				|  |  | +                                runAgain();
 | 
	
		
			
				|  |  | +                            } else {
 | 
	
		
			
				|  |  | +                                throw new AssertionError("unexpected", e);
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    });
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    countDownLatch.countDown();
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final Runnable closeAction = new Runnable() {
 | 
	
		
			
				|  |  | +            private void runAgain() {
 | 
	
		
			
				|  |  | +                threadPool.generic().execute(this);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public void run() {
 | 
	
		
			
				|  |  | +                closingRefs.decRef();
 | 
	
		
			
				|  |  | +                if (closePermits.tryAcquire() && closingRefs.tryIncRef()) {
 | 
	
		
			
				|  |  | +                    try {
 | 
	
		
			
				|  |  | +                        var connection = connectionManager.getConnection(node);
 | 
	
		
			
				|  |  | +                        connection.addRemovedListener(ActionListener.wrap(this::runAgain));
 | 
	
		
			
				|  |  | +                        connection.close();
 | 
	
		
			
				|  |  | +                    } catch (NodeNotConnectedException e) {
 | 
	
		
			
				|  |  | +                        closePermits.release();
 | 
	
		
			
				|  |  | +                        runAgain();
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    countDownLatch.countDown();
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        for (int i = 0; i < connectThreadCount; i++) {
 | 
	
		
			
				|  |  | +            connectAction.run();
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        for (int i = 0; i < closeThreadCount; i++) {
 | 
	
		
			
				|  |  | +            closingRefs.incRef();
 | 
	
		
			
				|  |  | +            closeAction.run();
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        closingRefs.decRef();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        assertTrue("threads did not all complete", countDownLatch.await(10, TimeUnit.SECONDS));
 | 
	
		
			
				|  |  | +        assertFalse(closingRefs.hasReferences());
 | 
	
		
			
				|  |  | +        assertTrue(cleanlyOpenedConnectionFuture.actionGet(0, TimeUnit.SECONDS));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        assertTrue("validatorPermits not all released", validatorPermits.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
 | 
	
		
			
				|  |  |          connectionManager.close();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 |