|  | @@ -8,6 +8,7 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  package org.elasticsearch.transport.netty4;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import org.apache.lucene.util.Constants;
 | 
	
		
			
				|  |  |  import org.elasticsearch.TransportVersion;
 | 
	
		
			
				|  |  |  import org.elasticsearch.Version;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.ActionListener;
 | 
	
	
		
			
				|  | @@ -19,7 +20,9 @@ import org.elasticsearch.common.settings.Settings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.transport.TransportAddress;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.PageCacheRecycler;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.IOUtils;
 | 
	
		
			
				|  |  | +import org.elasticsearch.core.TimeValue;
 | 
	
		
			
				|  |  |  import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 | 
	
		
			
				|  |  | +import org.elasticsearch.mocksocket.MockServerSocket;
 | 
	
		
			
				|  |  |  import org.elasticsearch.test.transport.MockTransportService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.test.transport.StubbableTransport;
 | 
	
		
			
				|  |  |  import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
 | 
	
	
		
			
				|  | @@ -31,10 +34,12 @@ import org.elasticsearch.transport.TcpTransport;
 | 
	
		
			
				|  |  |  import org.elasticsearch.transport.TestProfiles;
 | 
	
		
			
				|  |  |  import org.elasticsearch.transport.Transport;
 | 
	
		
			
				|  |  |  import org.elasticsearch.transport.TransportRequestOptions;
 | 
	
		
			
				|  |  | +import org.elasticsearch.transport.TransportService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.transport.TransportSettings;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import java.io.IOException;
 | 
	
		
			
				|  |  |  import java.net.InetAddress;
 | 
	
		
			
				|  |  | +import java.net.ServerSocket;
 | 
	
		
			
				|  |  |  import java.net.UnknownHostException;
 | 
	
		
			
				|  |  |  import java.nio.channels.SocketChannel;
 | 
	
		
			
				|  |  |  import java.util.Collections;
 | 
	
	
		
			
				|  | @@ -183,4 +188,66 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
 | 
	
		
			
				|  |  |          assertNotNull(keepInterval);
 | 
	
		
			
				|  |  |          assertThat(keepInterval, lessThanOrEqualTo(500));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    public void testTimeoutPerConnection() throws IOException {
 | 
	
		
			
				|  |  | +        assumeTrue("Works only on BSD network stacks", Constants.MAC_OS_X || Constants.FREE_BSD);
 | 
	
		
			
				|  |  | +        try (ServerSocket socket = new MockServerSocket()) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // note - this test uses backlog=1 which is implementation specific ie. it might not work on some TCP/IP stacks
 | 
	
		
			
				|  |  | +            // on linux (at least newer ones) the listen(addr, backlog=1) should just ignore new connections if the queue is full which
 | 
	
		
			
				|  |  | +            // means that once we received an ACK from the client we just drop the packet on the floor (which is what we want) and we run
 | 
	
		
			
				|  |  | +            // into a connection timeout quickly. Yet other implementations can for instance can terminate the connection within the 3 way
 | 
	
		
			
				|  |  | +            // handshake which I haven't tested yet.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // note - this test doesn't work with security enabled because it relies on connecting to the MockServerSocket and we are not
 | 
	
		
			
				|  |  | +            // set up to accept a TLS handshake on this socket
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            socket.bind(getLocalEphemeral(), 1);
 | 
	
		
			
				|  |  | +            socket.setReuseAddress(true);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            DiscoveryNode first = new DiscoveryNode(
 | 
	
		
			
				|  |  | +                "TEST",
 | 
	
		
			
				|  |  | +                new TransportAddress(socket.getInetAddress(), socket.getLocalPort()),
 | 
	
		
			
				|  |  | +                emptyMap(),
 | 
	
		
			
				|  |  | +                emptySet(),
 | 
	
		
			
				|  |  | +                version0
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            DiscoveryNode second = new DiscoveryNode(
 | 
	
		
			
				|  |  | +                "TEST",
 | 
	
		
			
				|  |  | +                new TransportAddress(socket.getInetAddress(), socket.getLocalPort()),
 | 
	
		
			
				|  |  | +                emptyMap(),
 | 
	
		
			
				|  |  | +                emptySet(),
 | 
	
		
			
				|  |  | +                version0
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
 | 
	
		
			
				|  |  | +            builder.addConnections(
 | 
	
		
			
				|  |  | +                1,
 | 
	
		
			
				|  |  | +                TransportRequestOptions.Type.BULK,
 | 
	
		
			
				|  |  | +                TransportRequestOptions.Type.PING,
 | 
	
		
			
				|  |  | +                TransportRequestOptions.Type.RECOVERY,
 | 
	
		
			
				|  |  | +                TransportRequestOptions.Type.REG,
 | 
	
		
			
				|  |  | +                TransportRequestOptions.Type.STATE
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            // connection with one connection and a large timeout -- should consume the one spot in the backlog queue
 | 
	
		
			
				|  |  | +            try (TransportService service = buildService("TS_TPC", Version.CURRENT, null, Settings.EMPTY, true, false)) {
 | 
	
		
			
				|  |  | +                IOUtils.close(openConnection(service, first, builder.build()));
 | 
	
		
			
				|  |  | +                builder.setConnectTimeout(TimeValue.timeValueMillis(1));
 | 
	
		
			
				|  |  | +                final ConnectionProfile profile = builder.build();
 | 
	
		
			
				|  |  | +                // now with the 1ms timeout we got and test that is it's applied
 | 
	
		
			
				|  |  | +                long startTime = System.nanoTime();
 | 
	
		
			
				|  |  | +                ConnectTransportException ex = expectThrows(
 | 
	
		
			
				|  |  | +                    ConnectTransportException.class,
 | 
	
		
			
				|  |  | +                    () -> openConnection(service, second, profile)
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +                final long now = System.nanoTime();
 | 
	
		
			
				|  |  | +                final long timeTaken = TimeValue.nsecToMSec(now - startTime);
 | 
	
		
			
				|  |  | +                assertTrue(
 | 
	
		
			
				|  |  | +                    "test didn't timeout quick enough, time taken: [" + timeTaken + "]",
 | 
	
		
			
				|  |  | +                    timeTaken < TimeValue.timeValueSeconds(5).millis()
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +                assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  }
 |