|
@@ -19,6 +19,7 @@ import org.elasticsearch.TransportVersions;
|
|
|
import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.ActionListenerResponseHandler;
|
|
|
+import org.elasticsearch.action.ActionResponse;
|
|
|
import org.elasticsearch.action.support.ActionTestUtils;
|
|
|
import org.elasticsearch.action.support.ChannelActionListener;
|
|
|
import org.elasticsearch.action.support.PlainActionFuture;
|
|
@@ -518,7 +519,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
final TransportRequestHandler<EmptyRequest> requestHandler = (request, channel, task) -> {
|
|
|
try {
|
|
|
if (randomBoolean()) {
|
|
|
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
|
+ channel.sendResponse(ActionResponse.Empty.INSTANCE);
|
|
|
} else {
|
|
|
channel.sendResponse(new ElasticsearchException("simulated"));
|
|
|
}
|
|
@@ -652,7 +653,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
EmptyRequest::new,
|
|
|
(request, channel, task) -> {
|
|
|
try {
|
|
|
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
|
+ channel.sendResponse(ActionResponse.Empty.INSTANCE);
|
|
|
} catch (Exception e) {
|
|
|
logger.error("Unexpected failure", e);
|
|
|
fail(e.getMessage());
|
|
@@ -670,15 +671,15 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
|
|
|
connectToNode(serviceC, serviceA.getLocalNode(), connectionProfile);
|
|
|
|
|
|
- Future<TransportResponse.Empty> res = submitRequest(
|
|
|
+ Future<ActionResponse.Empty> res = submitRequest(
|
|
|
serviceC,
|
|
|
nodeA,
|
|
|
"internal:sayHello",
|
|
|
new EmptyRequest(),
|
|
|
new TransportResponseHandler<>() {
|
|
|
@Override
|
|
|
- public TransportResponse.Empty read(StreamInput in) {
|
|
|
- return TransportResponse.Empty.INSTANCE;
|
|
|
+ public ActionResponse.Empty read(StreamInput in) {
|
|
|
+ return ActionResponse.Empty.INSTANCE;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -687,7 +688,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void handleResponse(TransportResponse.Empty response) {}
|
|
|
+ public void handleResponse(ActionResponse.Empty response) {}
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
@@ -1085,7 +1086,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
);
|
|
|
- Future<TransportResponse.Empty> foobar = submitRequest(
|
|
|
+ Future<ActionResponse.Empty> foobar = submitRequest(
|
|
|
serviceB,
|
|
|
nodeA,
|
|
|
"internal:foobar",
|
|
@@ -1974,7 +1975,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
TestRequest::new,
|
|
|
(request, channel, task) -> {
|
|
|
requestProcessed.set(true);
|
|
|
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
|
+ channel.sendResponse(ActionResponse.Empty.INSTANCE);
|
|
|
}
|
|
|
);
|
|
|
|
|
@@ -2499,7 +2500,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
if ("fail".equals(request.info)) {
|
|
|
throw new RuntimeException("boom");
|
|
|
} else {
|
|
|
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
|
+ channel.sendResponse(ActionResponse.Empty.INSTANCE);
|
|
|
}
|
|
|
}
|
|
|
);
|
|
@@ -2512,13 +2513,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
|
|
|
@Override
|
|
|
public TransportResponse read(StreamInput in) {
|
|
|
- return TransportResponse.Empty.INSTANCE;
|
|
|
+ return ActionResponse.Empty.INSTANCE;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void handleResponse(TransportResponse response) {
|
|
|
try {
|
|
|
- assertSame(response, TransportResponse.Empty.INSTANCE);
|
|
|
+ assertSame(response, ActionResponse.Empty.INSTANCE);
|
|
|
assertTrue(threadPool.getThreadContext().getResponseHeaders().containsKey("foo.bar"));
|
|
|
assertEquals(1, threadPool.getThreadContext().getResponseHeaders().get("foo.bar").size());
|
|
|
assertEquals("baz", threadPool.getThreadContext().getResponseHeaders().get("foo.bar").get(0));
|
|
@@ -2568,7 +2569,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
|
|
|
@Override
|
|
|
public TransportResponse read(StreamInput in) {
|
|
|
- return TransportResponse.Empty.INSTANCE;
|
|
|
+ return ActionResponse.Empty.INSTANCE;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -2641,13 +2642,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
protected void doRun() throws Exception {
|
|
|
receivedLatch.countDown();
|
|
|
sendResponseLatch.await();
|
|
|
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
|
+ channel.sendResponse(ActionResponse.Empty.INSTANCE);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
);
|
|
|
CountDownLatch responseLatch = new CountDownLatch(1);
|
|
|
- TransportResponseHandler<TransportResponse.Empty> transportResponseHandler = new TransportResponseHandler.Empty() {
|
|
|
+ TransportResponseHandler<ActionResponse.Empty> transportResponseHandler = new TransportResponseHandler.Empty() {
|
|
|
@Override
|
|
|
public Executor executor() {
|
|
|
return TransportResponseHandler.TRANSPORT_WORKER;
|
|
@@ -2709,13 +2710,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
protected void doRun() throws Exception {
|
|
|
receivedLatch.countDown();
|
|
|
sendResponseLatch.await();
|
|
|
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
|
+ channel.sendResponse(ActionResponse.Empty.INSTANCE);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
);
|
|
|
CountDownLatch responseLatch = new CountDownLatch(1);
|
|
|
- TransportResponseHandler<TransportResponse.Empty> transportResponseHandler = new TransportResponseHandler.Empty() {
|
|
|
+ TransportResponseHandler<ActionResponse.Empty> transportResponseHandler = new TransportResponseHandler.Empty() {
|
|
|
@Override
|
|
|
public Executor executor() {
|
|
|
return TransportResponseHandler.TRANSPORT_WORKER;
|
|
@@ -2829,7 +2830,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
);
|
|
|
CountDownLatch responseLatch = new CountDownLatch(1);
|
|
|
AtomicReference<TransportException> receivedException = new AtomicReference<>(null);
|
|
|
- TransportResponseHandler<TransportResponse.Empty> transportResponseHandler = new TransportResponseHandler.Empty() {
|
|
|
+ TransportResponseHandler<ActionResponse.Empty> transportResponseHandler = new TransportResponseHandler.Empty() {
|
|
|
@Override
|
|
|
public Executor executor() {
|
|
|
return TransportResponseHandler.TRANSPORT_WORKER;
|
|
@@ -3179,7 +3180,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
)
|
|
|
);
|
|
|
assertThat(new ChannelActionListener<>(channel).toString(), containsString(channel.toString()));
|
|
|
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
|
+ channel.sendResponse(ActionResponse.Empty.INSTANCE);
|
|
|
});
|
|
|
serviceB.registerRequestHandler(ACTION, EsExecutors.DIRECT_EXECUTOR_SERVICE, EmptyRequest::new, (request, channel, task) -> {
|
|
|
assertThat(
|
|
@@ -3192,7 +3193,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
containsString(serviceB.getLocalNode().getAddress().toString())
|
|
|
)
|
|
|
);
|
|
|
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
|
+ channel.sendResponse(ActionResponse.Empty.INSTANCE);
|
|
|
});
|
|
|
|
|
|
safeAwait(
|
|
@@ -3203,7 +3204,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
new EmptyRequest(),
|
|
|
new ActionListenerResponseHandler<>(
|
|
|
listener,
|
|
|
- ignored -> TransportResponse.Empty.INSTANCE,
|
|
|
+ ignored -> ActionResponse.Empty.INSTANCE,
|
|
|
TransportResponseHandler.TRANSPORT_WORKER
|
|
|
)
|
|
|
)
|
|
@@ -3217,7 +3218,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
new EmptyRequest(),
|
|
|
new ActionListenerResponseHandler<>(
|
|
|
listener,
|
|
|
- ignored -> TransportResponse.Empty.INSTANCE,
|
|
|
+ ignored -> ActionResponse.Empty.INSTANCE,
|
|
|
TransportResponseHandler.TRANSPORT_WORKER
|
|
|
)
|
|
|
)
|
|
@@ -3331,7 +3332,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
serviceA.registerRequestHandler(actionName, EsExecutors.DIRECT_EXECUTOR_SERVICE, EmptyRequest::new, (request, channel, task) -> {
|
|
|
threadNameFuture.onResponse(Thread.currentThread().getName());
|
|
|
safeAwait(barrier);
|
|
|
- channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
|
+ channel.sendResponse(ActionResponse.Empty.INSTANCE);
|
|
|
});
|
|
|
|
|
|
final var responseLatch = new CountDownLatch(1);
|
|
@@ -3342,7 +3343,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|
|
new EmptyRequest(),
|
|
|
new ActionListenerResponseHandler<TransportResponse>(
|
|
|
ActionTestUtils.assertNoFailureListener(t -> responseLatch.countDown()),
|
|
|
- in -> TransportResponse.Empty.INSTANCE,
|
|
|
+ in -> ActionResponse.Empty.INSTANCE,
|
|
|
EsExecutors.DIRECT_EXECUTOR_SERVICE
|
|
|
)
|
|
|
);
|