|
@@ -27,7 +27,6 @@ import org.elasticsearch.mocksocket.MockHttpServer;
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.BeforeClass;
|
|
|
-import org.junit.Ignore;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.net.ConnectException;
|
|
@@ -75,6 +74,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|
|
int numHttpServers = randomIntBetween(2, 4);
|
|
|
httpServers = new HttpServer[numHttpServers];
|
|
|
httpHosts = new HttpHost[numHttpServers];
|
|
|
+ waitForCancelHandler = new WaitForCancelHandler();
|
|
|
for (int i = 0; i < numHttpServers; i++) {
|
|
|
HttpServer httpServer = createHttpServer();
|
|
|
httpServers[i] = httpServer;
|
|
@@ -99,24 +99,30 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|
|
for (int statusCode : getAllStatusCodes()) {
|
|
|
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
|
|
|
}
|
|
|
- waitForCancelHandler = new WaitForCancelHandler();
|
|
|
httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler);
|
|
|
return httpServer;
|
|
|
}
|
|
|
|
|
|
private static class WaitForCancelHandler implements HttpHandler {
|
|
|
- private CountDownLatch cancelHandlerLatch;
|
|
|
+ private volatile CountDownLatch requestCameInLatch;
|
|
|
+ private volatile CountDownLatch cancelHandlerLatch;
|
|
|
|
|
|
void reset() {
|
|
|
cancelHandlerLatch = new CountDownLatch(1);
|
|
|
+ requestCameInLatch = new CountDownLatch(1);
|
|
|
}
|
|
|
|
|
|
void cancelDone() {
|
|
|
cancelHandlerLatch.countDown();
|
|
|
}
|
|
|
|
|
|
+ void awaitRequest() throws InterruptedException {
|
|
|
+ requestCameInLatch.await();
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void handle(HttpExchange exchange) throws IOException {
|
|
|
+ requestCameInLatch.countDown();
|
|
|
try {
|
|
|
cancelHandlerLatch.await();
|
|
|
} catch (InterruptedException ignore) {
|
|
@@ -225,16 +231,14 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Ignore("https://github.com/elastic/elasticsearch/issues/45577")
|
|
|
public void testCancelAsyncRequests() throws Exception {
|
|
|
int numRequests = randomIntBetween(5, 20);
|
|
|
- final CountDownLatch latch = new CountDownLatch(numRequests);
|
|
|
final List<Response> responses = new CopyOnWriteArrayList<>();
|
|
|
final List<Exception> exceptions = new CopyOnWriteArrayList<>();
|
|
|
for (int i = 0; i < numRequests; i++) {
|
|
|
+ CountDownLatch latch = new CountDownLatch(1);
|
|
|
waitForCancelHandler.reset();
|
|
|
- final String method = RestClientTestUtil.randomHttpMethod(getRandom());
|
|
|
- Cancellable cancellable = restClient.performRequestAsync(new Request(method, "/wait"), new ResponseListener() {
|
|
|
+ Cancellable cancellable = restClient.performRequestAsync(new Request("GET", "/wait"), new ResponseListener() {
|
|
|
@Override
|
|
|
public void onSuccess(Response response) {
|
|
|
responses.add(response);
|
|
@@ -247,10 +251,15 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|
|
latch.countDown();
|
|
|
}
|
|
|
});
|
|
|
+ if (randomBoolean()) {
|
|
|
+ //we wait for the request to get to the server-side otherwise we almost always cancel
|
|
|
+ // the request artificially on the client-side before even sending it
|
|
|
+ waitForCancelHandler.awaitRequest();
|
|
|
+ }
|
|
|
cancellable.cancel();
|
|
|
waitForCancelHandler.cancelDone();
|
|
|
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
|
|
|
}
|
|
|
- assertTrue(latch.await(5, TimeUnit.SECONDS));
|
|
|
assertEquals(0, responses.size());
|
|
|
assertEquals(numRequests, exceptions.size());
|
|
|
for (Exception exception : exceptions) {
|