|
@@ -49,6 +49,7 @@ import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
@@ -282,15 +283,44 @@ public class RestClient implements Closeable {
|
|
|
public void performRequestAsync(String method, String endpoint, Map<String, String> params,
|
|
|
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
|
|
|
ResponseListener responseListener, Header... headers) {
|
|
|
- URI uri = buildUri(pathPrefix, endpoint, params);
|
|
|
+ Objects.requireNonNull(params, "params must not be null");
|
|
|
+ Map<String, String> requestParams = new HashMap<>(params);
|
|
|
+ //ignore is a special parameter supported by the clients, shouldn't be sent to es
|
|
|
+ String ignoreString = requestParams.remove("ignore");
|
|
|
+ Set<Integer> ignoreErrorCodes;
|
|
|
+ if (ignoreString == null) {
|
|
|
+ if (HttpHead.METHOD_NAME.equals(method)) {
|
|
|
+ //404 never causes error if returned for a HEAD request
|
|
|
+ ignoreErrorCodes = Collections.singleton(404);
|
|
|
+ } else {
|
|
|
+ ignoreErrorCodes = Collections.emptySet();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ String[] ignoresArray = ignoreString.split(",");
|
|
|
+ ignoreErrorCodes = new HashSet<>();
|
|
|
+ if (HttpHead.METHOD_NAME.equals(method)) {
|
|
|
+ //404 never causes error if returned for a HEAD request
|
|
|
+ ignoreErrorCodes.add(404);
|
|
|
+ }
|
|
|
+ for (String ignoreCode : ignoresArray) {
|
|
|
+ try {
|
|
|
+ ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ URI uri = buildUri(pathPrefix, endpoint, requestParams);
|
|
|
HttpRequestBase request = createHttpRequest(method, uri, entity);
|
|
|
setHeaders(request, headers);
|
|
|
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
|
|
|
long startTime = System.nanoTime();
|
|
|
- performRequestAsync(startTime, nextHost().iterator(), request, httpAsyncResponseConsumerFactory, failureTrackingResponseListener);
|
|
|
+ performRequestAsync(startTime, nextHost().iterator(), request, ignoreErrorCodes, httpAsyncResponseConsumerFactory,
|
|
|
+ failureTrackingResponseListener);
|
|
|
}
|
|
|
|
|
|
private void performRequestAsync(final long startTime, final Iterator<HttpHost> hosts, final HttpRequestBase request,
|
|
|
+ final Set<Integer> ignoreErrorCodes,
|
|
|
final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
|
|
|
final FailureTrackingResponseListener listener) {
|
|
|
final HttpHost host = hosts.next();
|
|
@@ -304,7 +334,7 @@ public class RestClient implements Closeable {
|
|
|
RequestLogger.logResponse(logger, request, host, httpResponse);
|
|
|
int statusCode = httpResponse.getStatusLine().getStatusCode();
|
|
|
Response response = new Response(request.getRequestLine(), host, httpResponse);
|
|
|
- if (isSuccessfulResponse(request.getMethod(), statusCode)) {
|
|
|
+ if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
|
|
|
onResponse(host);
|
|
|
listener.onSuccess(response);
|
|
|
} else {
|
|
@@ -312,7 +342,7 @@ public class RestClient implements Closeable {
|
|
|
if (isRetryStatus(statusCode)) {
|
|
|
//mark host dead and retry against next one
|
|
|
onFailure(host);
|
|
|
- retryIfPossible(responseException, hosts, request);
|
|
|
+ retryIfPossible(responseException);
|
|
|
} else {
|
|
|
//mark host alive and don't retry, as the error should be a request problem
|
|
|
onResponse(host);
|
|
@@ -329,13 +359,13 @@ public class RestClient implements Closeable {
|
|
|
try {
|
|
|
RequestLogger.logFailedRequest(logger, request, host, failure);
|
|
|
onFailure(host);
|
|
|
- retryIfPossible(failure, hosts, request);
|
|
|
+ retryIfPossible(failure);
|
|
|
} catch(Exception e) {
|
|
|
listener.onDefinitiveFailure(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void retryIfPossible(Exception exception, Iterator<HttpHost> hosts, HttpRequestBase request) {
|
|
|
+ private void retryIfPossible(Exception exception) {
|
|
|
if (hosts.hasNext()) {
|
|
|
//in case we are retrying, check whether maxRetryTimeout has been reached
|
|
|
long timeElapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
|
|
@@ -347,7 +377,7 @@ public class RestClient implements Closeable {
|
|
|
} else {
|
|
|
listener.trackFailure(exception);
|
|
|
request.reset();
|
|
|
- performRequestAsync(startTime, hosts, request, httpAsyncResponseConsumerFactory, listener);
|
|
|
+ performRequestAsync(startTime, hosts, request, ignoreErrorCodes, httpAsyncResponseConsumerFactory, listener);
|
|
|
}
|
|
|
} else {
|
|
|
listener.onDefinitiveFailure(exception);
|
|
@@ -452,8 +482,8 @@ public class RestClient implements Closeable {
|
|
|
client.close();
|
|
|
}
|
|
|
|
|
|
- private static boolean isSuccessfulResponse(String method, int statusCode) {
|
|
|
- return statusCode < 300 || (HttpHead.METHOD_NAME.equals(method) && statusCode == 404);
|
|
|
+ private static boolean isSuccessfulResponse(int statusCode) {
|
|
|
+ return statusCode < 300;
|
|
|
}
|
|
|
|
|
|
private static boolean isRetryStatus(int statusCode) {
|
|
@@ -510,7 +540,6 @@ public class RestClient implements Closeable {
|
|
|
}
|
|
|
|
|
|
private static URI buildUri(String pathPrefix, String path, Map<String, String> params) {
|
|
|
- Objects.requireNonNull(params, "params must not be null");
|
|
|
Objects.requireNonNull(path, "path must not be null");
|
|
|
try {
|
|
|
String fullPath;
|