|
|
@@ -19,6 +19,8 @@
|
|
|
|
|
|
package org.elasticsearch.transport;
|
|
|
|
|
|
+import org.apache.logging.log4j.Level;
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
import org.apache.logging.log4j.util.Supplier;
|
|
|
import org.apache.lucene.util.CollectionUtil;
|
|
|
@@ -35,6 +37,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
+import org.elasticsearch.common.logging.Loggers;
|
|
|
import org.elasticsearch.common.network.CloseableChannel;
|
|
|
import org.elasticsearch.common.network.NetworkService;
|
|
|
import org.elasticsearch.common.network.NetworkUtils;
|
|
|
@@ -53,7 +56,9 @@ import org.elasticsearch.mocksocket.MockServerSocket;
|
|
|
import org.elasticsearch.node.Node;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
+import org.elasticsearch.test.MockLogAppender;
|
|
|
import org.elasticsearch.test.VersionUtils;
|
|
|
+import org.elasticsearch.test.junit.annotations.TestLogging;
|
|
|
import org.elasticsearch.test.transport.MockTransportService;
|
|
|
import org.elasticsearch.test.transport.StubbableTransport;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
@@ -397,7 +402,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
assertThat(responseString.get(), equalTo("test"));
|
|
|
}
|
|
|
|
|
|
- public void testAdapterSendReceiveCallbacks() throws Exception {
|
|
|
+ public void testMessageListeners() throws Exception {
|
|
|
final TransportRequestHandler<TransportRequest.Empty> requestHandler = (request, channel, task) -> {
|
|
|
try {
|
|
|
if (randomBoolean()) {
|
|
|
@@ -416,56 +421,56 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
serviceB.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
|
|
requestHandler);
|
|
|
|
|
|
-
|
|
|
- class CountingTracer extends MockTransportService.Tracer {
|
|
|
+ class CountingListener implements TransportMessageListener {
|
|
|
AtomicInteger requestsReceived = new AtomicInteger();
|
|
|
AtomicInteger requestsSent = new AtomicInteger();
|
|
|
AtomicInteger responseReceived = new AtomicInteger();
|
|
|
AtomicInteger responseSent = new AtomicInteger();
|
|
|
|
|
|
@Override
|
|
|
- public void receivedRequest(long requestId, String action) {
|
|
|
+ public void onRequestReceived(long requestId, String action) {
|
|
|
if (action.equals(ACTION)) {
|
|
|
requestsReceived.incrementAndGet();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void responseSent(long requestId, String action) {
|
|
|
+ public void onResponseSent(long requestId, String action, TransportResponse response) {
|
|
|
if (action.equals(ACTION)) {
|
|
|
responseSent.incrementAndGet();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void responseSent(long requestId, String action, Throwable t) {
|
|
|
+ public void onResponseSent(long requestId, String action, Exception error) {
|
|
|
if (action.equals(ACTION)) {
|
|
|
responseSent.incrementAndGet();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
|
|
|
- if (action.equals(ACTION)) {
|
|
|
+ public void onResponseReceived(long requestId, Transport.ResponseContext context) {
|
|
|
+ if (context.action().equals(ACTION)) {
|
|
|
responseReceived.incrementAndGet();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
|
|
|
+ public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
|
|
|
+ TransportRequestOptions options) {
|
|
|
if (action.equals(ACTION)) {
|
|
|
requestsSent.incrementAndGet();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- final CountingTracer tracerA = new CountingTracer();
|
|
|
- final CountingTracer tracerB = new CountingTracer();
|
|
|
- serviceA.addTracer(tracerA);
|
|
|
- serviceB.addTracer(tracerB);
|
|
|
+
|
|
|
+ final CountingListener tracerA = new CountingListener();
|
|
|
+ final CountingListener tracerB = new CountingListener();
|
|
|
+ serviceA.transport().addMessageListener(tracerA);
|
|
|
+ serviceB.transport().addMessageListener(tracerB);
|
|
|
|
|
|
try {
|
|
|
- serviceA
|
|
|
- .submitRequest(nodeB, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
|
|
|
+ serviceA.submitRequest(nodeB, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
|
|
|
} catch (ExecutionException e) {
|
|
|
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
|
|
|
assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated"));
|
|
|
@@ -484,8 +489,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
});
|
|
|
|
|
|
try {
|
|
|
- serviceA
|
|
|
- .submitRequest(nodeA, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
|
|
|
+ serviceB.submitRequest(nodeA, ACTION, TransportRequest.Empty.INSTANCE, EmptyTransportResponseHandler.INSTANCE_SAME).get();
|
|
|
} catch (ExecutionException e) {
|
|
|
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
|
|
|
assertThat(ExceptionsHelper.unwrapCause(e.getCause()).getMessage(), equalTo("simulated"));
|
|
|
@@ -494,12 +498,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
// use assert busy as call backs are sometime called after the response have been sent
|
|
|
assertBusy(() -> {
|
|
|
assertThat(tracerA.requestsReceived.get(), equalTo(1));
|
|
|
- assertThat(tracerA.requestsSent.get(), equalTo(2));
|
|
|
- assertThat(tracerA.responseReceived.get(), equalTo(2));
|
|
|
+ assertThat(tracerA.requestsSent.get(), equalTo(1));
|
|
|
+ assertThat(tracerA.responseReceived.get(), equalTo(1));
|
|
|
assertThat(tracerA.responseSent.get(), equalTo(1));
|
|
|
assertThat(tracerB.requestsReceived.get(), equalTo(1));
|
|
|
- assertThat(tracerB.requestsSent.get(), equalTo(0));
|
|
|
- assertThat(tracerB.responseReceived.get(), equalTo(0));
|
|
|
+ assertThat(tracerB.requestsSent.get(), equalTo(1));
|
|
|
+ assertThat(tracerB.responseReceived.get(), equalTo(1));
|
|
|
assertThat(tracerB.responseSent.get(), equalTo(1));
|
|
|
});
|
|
|
}
|
|
|
@@ -973,20 +977,17 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
|
|
|
}
|
|
|
|
|
|
- public void testTracerLog() throws InterruptedException {
|
|
|
+ @TestLogging(value = "org.elasticsearch.transport.TransportService.tracer:trace")
|
|
|
+ public void testTracerLog() throws Exception {
|
|
|
TransportRequestHandler<TransportRequest> handler = (request, channel, task) -> channel.sendResponse(new StringMessageResponse(""));
|
|
|
- TransportRequestHandler<StringMessageRequest> handlerWithError = new TransportRequestHandler<StringMessageRequest>() {
|
|
|
- @Override
|
|
|
- public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws Exception {
|
|
|
- if (request.timeout() > 0) {
|
|
|
- Thread.sleep(request.timeout);
|
|
|
- }
|
|
|
- channel.sendResponse(new RuntimeException(""));
|
|
|
-
|
|
|
+ TransportRequestHandler<StringMessageRequest> handlerWithError = (request, channel, task) -> {
|
|
|
+ if (request.timeout() > 0) {
|
|
|
+ Thread.sleep(request.timeout);
|
|
|
}
|
|
|
+ channel.sendResponse(new RuntimeException(""));
|
|
|
+
|
|
|
};
|
|
|
|
|
|
- final Semaphore requestCompleted = new Semaphore(0);
|
|
|
TransportResponseHandler<StringMessageResponse> noopResponseHandler = new TransportResponseHandler<StringMessageResponse>() {
|
|
|
|
|
|
@Override
|
|
|
@@ -996,12 +997,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
|
|
|
@Override
|
|
|
public void handleResponse(StringMessageResponse response) {
|
|
|
- requestCompleted.release();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
|
- requestCompleted.release();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -1011,48 +1010,20 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
};
|
|
|
|
|
|
serviceA.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
|
|
|
+ serviceA.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
|
|
|
serviceA.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError);
|
|
|
serviceB.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
|
|
|
+ serviceB.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
|
|
|
serviceB.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError);
|
|
|
|
|
|
- final Tracer tracer = new Tracer(new HashSet<>(Arrays.asList("internal:test", "internal:testError")));
|
|
|
- // the tracer is invoked concurrently after the actual action is executed. that means a Tracer#requestSent can still be in-flight
|
|
|
- // from a handshake executed on connect in the setup method. this might confuse this test since it expects exact number of
|
|
|
- // invocations. To prevent any unrelated events messing with this test we filter on the actions we execute in this test.
|
|
|
- serviceA.addTracer(tracer);
|
|
|
- serviceB.addTracer(tracer);
|
|
|
-
|
|
|
- tracer.reset(4);
|
|
|
- boolean timeout = randomBoolean();
|
|
|
- TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build() :
|
|
|
- TransportRequestOptions.EMPTY;
|
|
|
- serviceA.sendRequest(nodeB, "internal:test", new StringMessageRequest("", 10), options, noopResponseHandler);
|
|
|
- requestCompleted.acquire();
|
|
|
- tracer.expectedEvents.get().await();
|
|
|
- assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true));
|
|
|
- assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
|
|
|
- assertThat("didn't see response sent", tracer.sawResponseSent, equalTo(true));
|
|
|
- assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true));
|
|
|
- assertThat("saw error sent", tracer.sawErrorSent, equalTo(false));
|
|
|
-
|
|
|
- tracer.reset(4);
|
|
|
- serviceA.sendRequest(nodeB, "internal:testError", new StringMessageRequest(""), noopResponseHandler);
|
|
|
- requestCompleted.acquire();
|
|
|
- tracer.expectedEvents.get().await();
|
|
|
- assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true));
|
|
|
- assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
|
|
|
- assertThat("saw response sent", tracer.sawResponseSent, equalTo(false));
|
|
|
- assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true));
|
|
|
- assertThat("didn't see error sent", tracer.sawErrorSent, equalTo(true));
|
|
|
-
|
|
|
String includeSettings;
|
|
|
String excludeSettings;
|
|
|
if (randomBoolean()) {
|
|
|
// sometimes leave include empty (default)
|
|
|
includeSettings = randomBoolean() ? "*" : "";
|
|
|
- excludeSettings = "*Error";
|
|
|
+ excludeSettings = "internal:testNotSeen";
|
|
|
} else {
|
|
|
- includeSettings = "internal:test";
|
|
|
+ includeSettings = "internal:test,internal:testError";
|
|
|
excludeSettings = "DOESN'T_MATCH";
|
|
|
}
|
|
|
clusterSettingsA.applySettings(Settings.builder()
|
|
|
@@ -1060,97 +1031,78 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
.put(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.getKey(), excludeSettings)
|
|
|
.build());
|
|
|
|
|
|
- tracer.reset(4);
|
|
|
- serviceA.sendRequest(nodeB, "internal:test", new StringMessageRequest(""), noopResponseHandler);
|
|
|
- requestCompleted.acquire();
|
|
|
- tracer.expectedEvents.get().await();
|
|
|
- assertThat("didn't see request sent", tracer.sawRequestSent, equalTo(true));
|
|
|
- assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
|
|
|
- assertThat("didn't see response sent", tracer.sawResponseSent, equalTo(true));
|
|
|
- assertThat("didn't see response received", tracer.sawResponseReceived, equalTo(true));
|
|
|
- assertThat("saw error sent", tracer.sawErrorSent, equalTo(false));
|
|
|
-
|
|
|
- tracer.reset(2);
|
|
|
- serviceA.sendRequest(nodeB, "internal:testError", new StringMessageRequest(""), noopResponseHandler);
|
|
|
- requestCompleted.acquire();
|
|
|
- tracer.expectedEvents.get().await();
|
|
|
- assertThat("saw request sent", tracer.sawRequestSent, equalTo(false));
|
|
|
- assertThat("didn't see request received", tracer.sawRequestReceived, equalTo(true));
|
|
|
- assertThat("saw response sent", tracer.sawResponseSent, equalTo(false));
|
|
|
- assertThat("saw response received", tracer.sawResponseReceived, equalTo(false));
|
|
|
- assertThat("didn't see error sent", tracer.sawErrorSent, equalTo(true));
|
|
|
- }
|
|
|
-
|
|
|
- private static class Tracer extends MockTransportService.Tracer {
|
|
|
- private final Set<String> actions;
|
|
|
- public volatile boolean sawRequestSent;
|
|
|
- public volatile boolean sawRequestReceived;
|
|
|
- public volatile boolean sawResponseSent;
|
|
|
- public volatile boolean sawErrorSent;
|
|
|
- public volatile boolean sawResponseReceived;
|
|
|
-
|
|
|
- public AtomicReference<CountDownLatch> expectedEvents = new AtomicReference<>();
|
|
|
-
|
|
|
- Tracer(Set<String> actions) {
|
|
|
- this.actions = actions;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void receivedRequest(long requestId, String action) {
|
|
|
- super.receivedRequest(requestId, action);
|
|
|
- if (actions.contains(action)) {
|
|
|
- sawRequestReceived = true;
|
|
|
- expectedEvents.get().countDown();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
|
|
|
- super.requestSent(node, requestId, action, options);
|
|
|
- if (actions.contains(action)) {
|
|
|
- sawRequestSent = true;
|
|
|
- expectedEvents.get().countDown();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void responseSent(long requestId, String action) {
|
|
|
- super.responseSent(requestId, action);
|
|
|
- if (actions.contains(action)) {
|
|
|
- sawResponseSent = true;
|
|
|
- expectedEvents.get().countDown();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void responseSent(long requestId, String action, Throwable t) {
|
|
|
- super.responseSent(requestId, action, t);
|
|
|
- if (actions.contains(action)) {
|
|
|
- sawErrorSent = true;
|
|
|
- expectedEvents.get().countDown();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
|
|
|
- super.receivedResponse(requestId, sourceNode, action);
|
|
|
- if (actions.contains(action)) {
|
|
|
- sawResponseReceived = true;
|
|
|
- expectedEvents.get().countDown();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void reset(int expectedCount) {
|
|
|
- sawRequestSent = false;
|
|
|
- sawRequestReceived = false;
|
|
|
- sawResponseSent = false;
|
|
|
- sawErrorSent = false;
|
|
|
- sawResponseReceived = false;
|
|
|
- expectedEvents.set(new CountDownLatch(expectedCount));
|
|
|
+ MockLogAppender appender = new MockLogAppender();
|
|
|
+ Loggers.addAppender(LogManager.getLogger("org.elasticsearch.transport.TransportService.tracer"), appender);
|
|
|
+ try {
|
|
|
+ appender.start();
|
|
|
+
|
|
|
+ final String requestSent = ".*\\[internal:test].*sent to.*\\{TS_B}.*";
|
|
|
+ final MockLogAppender.LoggingExpectation requestSentExpectation =
|
|
|
+ new MockLogAppender.PatternSeenEventExcpectation(
|
|
|
+ "sent request", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, requestSent);
|
|
|
+ final String requestReceived = ".*\\[internal:test].*received request.*";
|
|
|
+ final MockLogAppender.LoggingExpectation requestReceivedExpectation =
|
|
|
+ new MockLogAppender.PatternSeenEventExcpectation(
|
|
|
+ "received request", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, requestReceived);
|
|
|
+ final String responseSent = ".*\\[internal:test].*sent response.*";
|
|
|
+ final MockLogAppender.LoggingExpectation responseSentExpectation =
|
|
|
+ new MockLogAppender.PatternSeenEventExcpectation(
|
|
|
+ "sent response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, responseSent);
|
|
|
+ final String responseReceived = ".*\\[internal:test].*received response from.*\\{TS_B}.*";
|
|
|
+ final MockLogAppender.LoggingExpectation responseReceivedExpectation =
|
|
|
+ new MockLogAppender.PatternSeenEventExcpectation(
|
|
|
+ "received response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, responseReceived);
|
|
|
+
|
|
|
+ appender.addExpectation(requestSentExpectation);
|
|
|
+ appender.addExpectation(requestReceivedExpectation);
|
|
|
+ appender.addExpectation(responseSentExpectation);
|
|
|
+ appender.addExpectation(responseReceivedExpectation);
|
|
|
+
|
|
|
+ StringMessageRequest request = new StringMessageRequest("", 10);
|
|
|
+ serviceA.sendRequest(nodeB, "internal:test", request, TransportRequestOptions.EMPTY, noopResponseHandler);
|
|
|
+
|
|
|
+ assertBusy(appender::assertAllExpectationsMatched);
|
|
|
+
|
|
|
+ final String errorResponseSent = ".*\\[internal:testError].*sent error response.*";
|
|
|
+ final MockLogAppender.LoggingExpectation errorResponseSentExpectation =
|
|
|
+ new MockLogAppender.PatternSeenEventExcpectation(
|
|
|
+ "sent error response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, errorResponseSent);
|
|
|
+
|
|
|
+ final String errorResponseReceived = ".*\\[internal:testError].*received response from.*\\{TS_B}.*";
|
|
|
+ final MockLogAppender.LoggingExpectation errorResponseReceivedExpectation =
|
|
|
+ new MockLogAppender.PatternSeenEventExcpectation(
|
|
|
+ "received error response", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, errorResponseReceived);
|
|
|
+
|
|
|
+ appender.addExpectation(errorResponseSentExpectation);
|
|
|
+ appender.addExpectation(errorResponseReceivedExpectation);
|
|
|
+
|
|
|
+ serviceA.sendRequest(nodeB, "internal:testError", new StringMessageRequest(""), noopResponseHandler);
|
|
|
+
|
|
|
+ assertBusy(appender::assertAllExpectationsMatched);
|
|
|
+
|
|
|
+ final String notSeenSent = "*[internal:testNotSeen]*sent to*";
|
|
|
+ final MockLogAppender.LoggingExpectation notSeenSentExpectation =
|
|
|
+ new MockLogAppender.UnseenEventExpectation(
|
|
|
+ "not seen request sent", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, notSeenSent);
|
|
|
+ final String notSeenReceived = ".*\\[internal:testNotSeen].*received request.*";
|
|
|
+ final MockLogAppender.LoggingExpectation notSeenReceivedExpectation =
|
|
|
+ new MockLogAppender.PatternSeenEventExcpectation(
|
|
|
+ "not seen request received", "org.elasticsearch.transport.TransportService.tracer", Level.TRACE, notSeenReceived);
|
|
|
+
|
|
|
+ appender.addExpectation(notSeenSentExpectation);
|
|
|
+ appender.addExpectation(notSeenReceivedExpectation);
|
|
|
+
|
|
|
+ PlainTransportFuture<StringMessageResponse> future = new PlainTransportFuture<>(noopResponseHandler);
|
|
|
+ serviceA.sendRequest(nodeB, "internal:testNotSeen", new StringMessageRequest(""), future);
|
|
|
+ future.txGet();
|
|
|
+
|
|
|
+ assertBusy(appender::assertAllExpectationsMatched);
|
|
|
+ } finally {
|
|
|
+ Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.transport.TransportService.tracer"), appender);
|
|
|
+ appender.stop();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
public static class StringMessageRequest extends TransportRequest {
|
|
|
|
|
|
private String message;
|