|
|
@@ -40,6 +40,7 @@ import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|
|
+import org.elasticsearch.mocksocket.MockServerSocket;
|
|
|
import org.elasticsearch.node.Node;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.test.transport.MockTransportService;
|
|
|
@@ -1366,7 +1367,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
// all is well
|
|
|
}
|
|
|
|
|
|
- try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)){
|
|
|
+ try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)) {
|
|
|
serviceB.handshake(connection, 100);
|
|
|
fail("exception should be thrown");
|
|
|
} catch (IllegalStateException e) {
|
|
|
@@ -1424,7 +1425,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
// all is well
|
|
|
}
|
|
|
|
|
|
- try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)){
|
|
|
+ try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)) {
|
|
|
serviceB.handshake(connection, 100);
|
|
|
fail("exception should be thrown");
|
|
|
} catch (IllegalStateException e) {
|
|
|
@@ -1778,7 +1779,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
public void testTimeoutPerConnection() throws IOException {
|
|
|
assumeTrue("Works only on BSD network stacks and apparently windows",
|
|
|
Constants.MAC_OS_X || Constants.FREE_BSD || Constants.WINDOWS);
|
|
|
- try (ServerSocket socket = new ServerSocket()) {
|
|
|
+ try (ServerSocket socket = new MockServerSocket()) {
|
|
|
// note - this test uses backlog=1 which is implementation specific ie. it might not work on some TCP/IP stacks
|
|
|
// on linux (at least newer ones) the listen(addr, backlog=1) should just ignore new connections if the queue is full which
|
|
|
// means that once we received an ACK from the client we just drop the packet on the floor (which is what we want) and we run
|
|
|
@@ -1823,7 +1824,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
|
|
|
|
|
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
|
|
- new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList())){
|
|
|
+ new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList())) {
|
|
|
@Override
|
|
|
protected String handleRequest(MockChannel mockChannel, String profileName, StreamInput stream, long requestId,
|
|
|
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
|
|
|
@@ -1854,7 +1855,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testTcpHandshakeTimeout() throws IOException {
|
|
|
- try (ServerSocket socket = new ServerSocket()) {
|
|
|
+ try (ServerSocket socket = new MockServerSocket()) {
|
|
|
socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1);
|
|
|
socket.setReuseAddress(true);
|
|
|
DiscoveryNode dummy = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
|
|
|
@@ -1870,12 +1871,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
builder.setHandshakeTimeout(TimeValue.timeValueMillis(1));
|
|
|
ConnectTransportException ex = expectThrows(ConnectTransportException.class,
|
|
|
() -> serviceA.connectToNode(dummy, builder.build()));
|
|
|
- assertEquals("[][" + dummy.getAddress() +"] handshake_timeout[1ms]", ex.getMessage());
|
|
|
+ assertEquals("[][" + dummy.getAddress() + "] handshake_timeout[1ms]", ex.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void testTcpHandshakeConnectionReset() throws IOException, InterruptedException {
|
|
|
- try (ServerSocket socket = new ServerSocket()) {
|
|
|
+ try (ServerSocket socket = new MockServerSocket()) {
|
|
|
socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1);
|
|
|
socket.setReuseAddress(true);
|
|
|
DiscoveryNode dummy = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
|
|
|
@@ -1904,7 +1905,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
builder.setHandshakeTimeout(TimeValue.timeValueHours(1));
|
|
|
ConnectTransportException ex = expectThrows(ConnectTransportException.class,
|
|
|
() -> serviceA.connectToNode(dummy, builder.build()));
|
|
|
- assertEquals(ex.getMessage(), "[][" + dummy.getAddress() +"] general node connection failure");
|
|
|
+ assertEquals(ex.getMessage(), "[][" + dummy.getAddress() + "] general node connection failure");
|
|
|
assertThat(ex.getCause().getMessage(), startsWith("handshake failed"));
|
|
|
t.join();
|
|
|
}
|