|
@@ -2252,7 +2252,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
assertPendingConnections(0, serviceC.getOriginalTransport());
|
|
|
}
|
|
|
|
|
|
- public void testTransportStats() throws IOException, InterruptedException {
|
|
|
+ public void testTransportStats() throws Exception {
|
|
|
MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
|
|
|
CountDownLatch receivedLatch = new CountDownLatch(1);
|
|
|
CountDownLatch sendResponseLatch = new CountDownLatch(1);
|
|
@@ -2316,19 +2316,23 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
TransportRequestOptions.Type.REG,
|
|
|
TransportRequestOptions.Type.STATE);
|
|
|
try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
|
|
|
- stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
|
|
|
- assertEquals(1, stats.getRxCount());
|
|
|
- assertEquals(1, stats.getTxCount());
|
|
|
- assertEquals(25, stats.getRxSize().getBytes());
|
|
|
- assertEquals(45, stats.getTxSize().getBytes());
|
|
|
+ assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
|
|
|
+ TransportStats transportStats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
|
|
|
+ assertEquals(1, transportStats.getRxCount());
|
|
|
+ assertEquals(1, transportStats.getTxCount());
|
|
|
+ assertEquals(25, transportStats.getRxSize().getBytes());
|
|
|
+ assertEquals(45, transportStats.getTxSize().getBytes());
|
|
|
+ });
|
|
|
serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY,
|
|
|
transportResponseHandler);
|
|
|
receivedLatch.await();
|
|
|
- stats = serviceC.transport.getStats(); // request has ben send
|
|
|
- assertEquals(1, stats.getRxCount());
|
|
|
- assertEquals(2, stats.getTxCount());
|
|
|
- assertEquals(25, stats.getRxSize().getBytes());
|
|
|
- assertEquals(91, stats.getTxSize().getBytes());
|
|
|
+ assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
|
|
|
+ TransportStats transportStats = serviceC.transport.getStats(); // request has ben send
|
|
|
+ assertEquals(1, transportStats.getRxCount());
|
|
|
+ assertEquals(2, transportStats.getTxCount());
|
|
|
+ assertEquals(25, transportStats.getRxSize().getBytes());
|
|
|
+ assertEquals(91, transportStats.getTxSize().getBytes());
|
|
|
+ });
|
|
|
sendResponseLatch.countDown();
|
|
|
responseLatch.await();
|
|
|
stats = serviceC.transport.getStats(); // response has been received
|
|
@@ -2345,7 +2349,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void testTransportStatsWithException() throws IOException, InterruptedException {
|
|
|
+ public void testTransportStatsWithException() throws Exception {
|
|
|
MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
|
|
|
CountDownLatch receivedLatch = new CountDownLatch(1);
|
|
|
CountDownLatch sendResponseLatch = new CountDownLatch(1);
|
|
@@ -2411,19 +2415,23 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
TransportRequestOptions.Type.REG,
|
|
|
TransportRequestOptions.Type.STATE);
|
|
|
try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
|
|
|
- stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
|
|
|
- assertEquals(1, stats.getRxCount());
|
|
|
- assertEquals(1, stats.getTxCount());
|
|
|
- assertEquals(25, stats.getRxSize().getBytes());
|
|
|
- assertEquals(45, stats.getTxSize().getBytes());
|
|
|
+ assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
|
|
|
+ TransportStats transportStats = serviceC.transport.getStats(); // request has ben send
|
|
|
+ assertEquals(1, transportStats.getRxCount());
|
|
|
+ assertEquals(1, transportStats.getTxCount());
|
|
|
+ assertEquals(25, transportStats.getRxSize().getBytes());
|
|
|
+ assertEquals(45, transportStats.getTxSize().getBytes());
|
|
|
+ });
|
|
|
serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY,
|
|
|
transportResponseHandler);
|
|
|
receivedLatch.await();
|
|
|
- stats = serviceC.transport.getStats(); // request has ben send
|
|
|
- assertEquals(1, stats.getRxCount());
|
|
|
- assertEquals(2, stats.getTxCount());
|
|
|
- assertEquals(25, stats.getRxSize().getBytes());
|
|
|
- assertEquals(91, stats.getTxSize().getBytes());
|
|
|
+ assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
|
|
|
+ TransportStats transportStats = serviceC.transport.getStats(); // request has ben send
|
|
|
+ assertEquals(1, transportStats.getRxCount());
|
|
|
+ assertEquals(2, transportStats.getTxCount());
|
|
|
+ assertEquals(25, transportStats.getRxSize().getBytes());
|
|
|
+ assertEquals(91, transportStats.getTxSize().getBytes());
|
|
|
+ });
|
|
|
sendResponseLatch.countDown();
|
|
|
responseLatch.await();
|
|
|
stats = serviceC.transport.getStats(); // exception response has been received
|