|
@@ -26,11 +26,15 @@ import com.sun.net.httpserver.HttpServer;
|
|
|
import org.apache.http.Consts;
|
|
|
import org.apache.http.Header;
|
|
|
import org.apache.http.HttpHost;
|
|
|
+import org.apache.http.HttpResponse;
|
|
|
import org.apache.http.auth.AuthScope;
|
|
|
import org.apache.http.auth.UsernamePasswordCredentials;
|
|
|
+import org.apache.http.client.methods.HttpGet;
|
|
|
+import org.apache.http.client.methods.HttpRequestBase;
|
|
|
import org.apache.http.entity.ContentType;
|
|
|
import org.apache.http.impl.client.BasicCredentialsProvider;
|
|
|
import org.apache.http.impl.client.TargetAuthenticationStrategy;
|
|
|
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
|
|
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
|
|
|
import org.apache.http.message.BasicHeader;
|
|
|
import org.apache.http.nio.entity.NStringEntity;
|
|
@@ -49,16 +53,22 @@ import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.CancellationException;
|
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
|
|
|
import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
|
|
|
+import static org.elasticsearch.client.RestClientTestUtil.randomHttpMethod;
|
|
|
import static org.elasticsearch.client.RestClientTestUtil.randomStatusCode;
|
|
|
+import static org.hamcrest.Matchers.instanceOf;
|
|
|
import static org.hamcrest.Matchers.nullValue;
|
|
|
import static org.hamcrest.Matchers.startsWith;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertThat;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
@@ -73,6 +83,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|
|
private RestClient restClient;
|
|
|
private String pathPrefix;
|
|
|
private Header[] defaultHeaders;
|
|
|
+ private WaitForCancelHandler waitForCancelHandler;
|
|
|
|
|
|
@Before
|
|
|
public void startHttpServer() throws Exception {
|
|
@@ -89,9 +100,31 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|
|
for (int statusCode : getAllStatusCodes()) {
|
|
|
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
|
|
|
}
|
|
|
+ waitForCancelHandler = new WaitForCancelHandler();
|
|
|
+ httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler);
|
|
|
return httpServer;
|
|
|
}
|
|
|
|
|
|
+ private class WaitForCancelHandler implements HttpHandler {
|
|
|
+
|
|
|
+ private final CountDownLatch cancelHandlerLatch = new CountDownLatch(1);
|
|
|
+
|
|
|
+ void cancelDone() {
|
|
|
+ cancelHandlerLatch.countDown();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handle(HttpExchange exchange) throws IOException {
|
|
|
+ try {
|
|
|
+ cancelHandlerLatch.await();
|
|
|
+ } catch (InterruptedException ignore) {
|
|
|
+ } finally {
|
|
|
+ exchange.sendResponseHeaders(200, 0);
|
|
|
+ exchange.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static class ResponseHandler implements HttpHandler {
|
|
|
private final int statusCode;
|
|
|
|
|
@@ -201,6 +234,75 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testCancelAsyncRequest() throws Exception {
|
|
|
+ Request request = new Request(randomHttpMethod(getRandom()), "/wait");
|
|
|
+ CountDownLatch requestLatch = new CountDownLatch(1);
|
|
|
+ AtomicReference<Exception> error = new AtomicReference<>();
|
|
|
+ Cancellable cancellable = restClient.performRequestAsync(request, new ResponseListener() {
|
|
|
+ @Override
|
|
|
+ public void onSuccess(Response response) {
|
|
|
+ throw new AssertionError("onResponse called unexpectedly");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception exception) {
|
|
|
+ error.set(exception);
|
|
|
+ requestLatch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ cancellable.cancel();
|
|
|
+ waitForCancelHandler.cancelDone();
|
|
|
+ assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
|
|
|
+ assertThat(error.get(), instanceOf(CancellationException.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This test verifies some assumptions that we rely upon around the way the async http client works when reusing the same request
|
|
|
+ * throughout multiple retries, and the use of the {@link HttpRequestBase#abort()} method.
|
|
|
+ */
|
|
|
+ public void testRequestResetAndAbort() throws Exception {
|
|
|
+ try (CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().build()) {
|
|
|
+ client.start();
|
|
|
+ HttpHost httpHost = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort());
|
|
|
+ HttpGet httpGet = new HttpGet(pathPrefix + "/200");
|
|
|
+
|
|
|
+ //calling abort before the request is sent is a no-op
|
|
|
+ httpGet.abort();
|
|
|
+ assertTrue(httpGet.isAborted());
|
|
|
+
|
|
|
+ {
|
|
|
+ httpGet.reset();
|
|
|
+ assertFalse(httpGet.isAborted());
|
|
|
+ httpGet.abort();//this has no effect on the next call (although isAborted will return true until the next reset)
|
|
|
+ Future<HttpResponse> future = client.execute(httpHost, httpGet, null);
|
|
|
+ assertEquals(200, future.get().getStatusLine().getStatusCode());
|
|
|
+ assertFalse(future.isCancelled());
|
|
|
+ }
|
|
|
+ {
|
|
|
+ httpGet.reset();
|
|
|
+ Future<HttpResponse> future = client.execute(httpHost, httpGet, null);
|
|
|
+ assertFalse(httpGet.isAborted());
|
|
|
+ httpGet.abort();
|
|
|
+ assertTrue(httpGet.isAborted());
|
|
|
+ try {
|
|
|
+ assertTrue(future.isCancelled());
|
|
|
+ future.get();
|
|
|
+ throw new AssertionError("exception should have been thrown");
|
|
|
+ } catch(CancellationException e) {
|
|
|
+ //expected
|
|
|
+ }
|
|
|
+ }
|
|
|
+ {
|
|
|
+ httpGet.reset();
|
|
|
+ assertFalse(httpGet.isAborted());
|
|
|
+ Future<HttpResponse> future = client.execute(httpHost, httpGet, null);
|
|
|
+ assertFalse(httpGet.isAborted());
|
|
|
+ assertEquals(200, future.get().getStatusLine().getStatusCode());
|
|
|
+ assertFalse(future.isCancelled());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* End to end test for headers. We test it explicitly against a real http client as there are different ways
|
|
|
* to set/add headers to the {@link org.apache.http.client.HttpClient}.
|
|
@@ -356,7 +458,6 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
|
|
assertThat(response200.getHeader("Authorization"), startsWith("Basic"));
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
public void testUrlWithoutLeadingSlash() throws Exception {
|