|
@@ -31,9 +31,11 @@ import org.junit.BeforeClass;
|
|
import org.junit.Ignore;
|
|
import org.junit.Ignore;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.io.OutputStream;
|
|
import java.net.ConnectException;
|
|
import java.net.ConnectException;
|
|
import java.net.InetAddress;
|
|
import java.net.InetAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -41,6 +43,7 @@ import java.util.concurrent.CancellationException;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
|
|
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
|
|
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
|
|
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
|
|
@@ -86,10 +89,17 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|
}
|
|
}
|
|
|
|
|
|
private static RestClient buildRestClient(NodeSelector nodeSelector) {
|
|
private static RestClient buildRestClient(NodeSelector nodeSelector) {
|
|
|
|
+ return buildRestClient(nodeSelector, null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static RestClient buildRestClient(NodeSelector nodeSelector, RestClient.FailureListener failureListener) {
|
|
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
|
|
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
|
|
if (pathPrefix.length() > 0) {
|
|
if (pathPrefix.length() > 0) {
|
|
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
|
|
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
|
|
}
|
|
}
|
|
|
|
+ if (failureListener != null) {
|
|
|
|
+ restClientBuilder.setFailureListener(failureListener);
|
|
|
|
+ }
|
|
restClientBuilder.setNodeSelector(nodeSelector);
|
|
restClientBuilder.setNodeSelector(nodeSelector);
|
|
return restClientBuilder.build();
|
|
return restClientBuilder.build();
|
|
}
|
|
}
|
|
@@ -101,6 +111,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|
for (int statusCode : getAllStatusCodes()) {
|
|
for (int statusCode : getAllStatusCodes()) {
|
|
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
|
|
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
|
|
}
|
|
}
|
|
|
|
+ httpServer.createContext(pathPrefix + "/20bytes", new ResponseHandlerWithContent());
|
|
httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler);
|
|
httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler);
|
|
return httpServer;
|
|
return httpServer;
|
|
}
|
|
}
|
|
@@ -153,6 +164,18 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static class ResponseHandlerWithContent implements HttpHandler {
|
|
|
|
+ @Override
|
|
|
|
+ public void handle(HttpExchange httpExchange) throws IOException {
|
|
|
|
+ byte[] body = "01234567890123456789".getBytes(StandardCharsets.UTF_8);
|
|
|
|
+ httpExchange.sendResponseHeaders(200, body.length);
|
|
|
|
+ try (OutputStream out = httpExchange.getResponseBody()) {
|
|
|
|
+ out.write(body);
|
|
|
|
+ }
|
|
|
|
+ httpExchange.close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@AfterClass
|
|
@AfterClass
|
|
public static void stopHttpServers() throws IOException {
|
|
public static void stopHttpServers() throws IOException {
|
|
restClient.close();
|
|
restClient.close();
|
|
@@ -303,6 +326,34 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public void testNonRetryableException() throws Exception {
|
|
|
|
+ RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder();
|
|
|
|
+ options.setHttpAsyncResponseConsumerFactory(
|
|
|
|
+ // Limit to very short responses to trigger a ContentTooLongException
|
|
|
|
+ () -> new HeapBufferedAsyncResponseConsumer(10)
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ AtomicInteger failureCount = new AtomicInteger();
|
|
|
|
+ RestClient client = buildRestClient(NodeSelector.ANY, new RestClient.FailureListener() {
|
|
|
|
+ @Override
|
|
|
|
+ public void onFailure(Node node) {
|
|
|
|
+ failureCount.incrementAndGet();
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ failureCount.set(0);
|
|
|
|
+ Request request = new Request("POST", "/20bytes");
|
|
|
|
+ request.setOptions(options);
|
|
|
|
+ try {
|
|
|
|
+ RestClientSingleHostTests.performRequestSyncOrAsync(client, request);
|
|
|
|
+ fail("Request should not succeed");
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ assertEquals(stoppedFirstHost ? 2 : 1, failureCount.intValue());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ client.close();
|
|
|
|
+ }
|
|
|
|
+
|
|
private static class TestResponse {
|
|
private static class TestResponse {
|
|
private final String method;
|
|
private final String method;
|
|
private final int statusCode;
|
|
private final int statusCode;
|