|
|
@@ -11,11 +11,13 @@ package org.elasticsearch.repositories.gcs;
|
|
|
import fixture.gcs.FakeOAuth2HttpHandler;
|
|
|
import fixture.gcs.GoogleCloudStorageHttpHandler;
|
|
|
|
|
|
+import com.google.api.client.http.HttpExecuteInterceptor;
|
|
|
+import com.google.api.client.http.HttpRequestInitializer;
|
|
|
import com.google.api.gax.retrying.RetrySettings;
|
|
|
+import com.google.cloud.ServiceOptions;
|
|
|
import com.google.cloud.http.HttpTransportOptions;
|
|
|
import com.google.cloud.storage.StorageException;
|
|
|
import com.google.cloud.storage.StorageOptions;
|
|
|
-import com.google.cloud.storage.StorageRetryStrategy;
|
|
|
import com.sun.net.httpserver.HttpHandler;
|
|
|
|
|
|
import org.apache.http.HttpStatus;
|
|
|
@@ -61,6 +63,7 @@ import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Optional;
|
|
|
import java.util.Queue;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
@@ -89,12 +92,19 @@ import static org.hamcrest.Matchers.notNullValue;
|
|
|
@SuppressForbidden(reason = "use a http server")
|
|
|
public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
|
|
|
|
|
|
+ private final Map<String, AtomicInteger> requestCounters = new ConcurrentHashMap<>();
|
|
|
+ private String endpointUrlOverride;
|
|
|
+
|
|
|
private String httpServerUrl() {
|
|
|
assertThat(httpServer, notNullValue());
|
|
|
InetSocketAddress address = httpServer.getAddress();
|
|
|
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
|
|
|
}
|
|
|
|
|
|
+ private String getEndpointUrl() {
|
|
|
+ return endpointUrlOverride != null ? endpointUrlOverride : httpServerUrl();
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected String downloadStorageEndpoint(BlobContainer container, String blob) {
|
|
|
return "/download/storage/v1/b/bucket/o/" + container.path().buildAsString() + blob;
|
|
|
@@ -120,7 +130,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
|
|
) {
|
|
|
final Settings.Builder clientSettings = Settings.builder();
|
|
|
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
|
|
- clientSettings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace(client).getKey(), httpServerUrl());
|
|
|
+ clientSettings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace(client).getKey(), getEndpointUrl());
|
|
|
clientSettings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace(client).getKey(), httpServerUrl() + "/token");
|
|
|
if (readTimeout != null) {
|
|
|
clientSettings.put(READ_TIMEOUT_SETTING.getConcreteSettingForNamespace(client).getKey(), readTimeout);
|
|
|
@@ -136,8 +146,33 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
|
|
final GoogleCloudStorageClientSettings gcsClientSettings,
|
|
|
final HttpTransportOptions httpTransportOptions
|
|
|
) {
|
|
|
- StorageOptions options = super.createStorageOptions(gcsClientSettings, httpTransportOptions);
|
|
|
- RetrySettings.Builder retrySettingsBuilder = RetrySettings.newBuilder()
|
|
|
+ final HttpTransportOptions requestCountingHttpTransportOptions = new HttpTransportOptions(
|
|
|
+ HttpTransportOptions.newBuilder()
|
|
|
+ .setConnectTimeout(httpTransportOptions.getConnectTimeout())
|
|
|
+ .setHttpTransportFactory(httpTransportOptions.getHttpTransportFactory())
|
|
|
+ .setReadTimeout(httpTransportOptions.getReadTimeout())
|
|
|
+ ) {
|
|
|
+ @Override
|
|
|
+ public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions<?, ?> serviceOptions) {
|
|
|
+ // Add initializer/interceptor without interfering with any pre-existing ones
|
|
|
+ HttpRequestInitializer httpRequestInitializer = super.getHttpRequestInitializer(serviceOptions);
|
|
|
+ return request -> {
|
|
|
+ if (httpRequestInitializer != null) {
|
|
|
+ httpRequestInitializer.initialize(request);
|
|
|
+ }
|
|
|
+ HttpExecuteInterceptor interceptor = request.getInterceptor();
|
|
|
+ request.setInterceptor(req -> {
|
|
|
+ if (interceptor != null) {
|
|
|
+ interceptor.intercept(req);
|
|
|
+ }
|
|
|
+ requestCounters.computeIfAbsent(request.getUrl().getRawPath(), (url) -> new AtomicInteger())
|
|
|
+ .incrementAndGet();
|
|
|
+ });
|
|
|
+ };
|
|
|
+ }
|
|
|
+ };
|
|
|
+ final StorageOptions options = super.createStorageOptions(gcsClientSettings, requestCountingHttpTransportOptions);
|
|
|
+ final RetrySettings.Builder retrySettingsBuilder = RetrySettings.newBuilder()
|
|
|
.setTotalTimeout(options.getRetrySettings().getTotalTimeout())
|
|
|
.setInitialRetryDelay(Duration.ofMillis(10L))
|
|
|
.setRetryDelayMultiplier(1.0d)
|
|
|
@@ -150,7 +185,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
|
|
retrySettingsBuilder.setMaxAttempts(maxRetries + 1);
|
|
|
}
|
|
|
return options.toBuilder()
|
|
|
- .setStorageRetryStrategy(StorageRetryStrategy.getLegacyStorageRetryStrategy())
|
|
|
+ .setStorageRetryStrategy(getRetryStrategy())
|
|
|
.setHost(options.getHost())
|
|
|
.setCredentials(options.getCredentials())
|
|
|
.setRetrySettings(retrySettingsBuilder.build())
|
|
|
@@ -173,6 +208,25 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
|
|
return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore);
|
|
|
}
|
|
|
|
|
|
+ public void testShouldRetryOnConnectionRefused() {
|
|
|
+ // port 1 should never be open
|
|
|
+ endpointUrlOverride = "http://127.0.0.1:1";
|
|
|
+ executeListBlobsAndAssertRetries();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testShouldRetryOnUnresolvableHost() {
|
|
|
+ // https://www.rfc-editor.org/rfc/rfc2606.html#page-2
|
|
|
+ endpointUrlOverride = "http://unresolvable.invalid";
|
|
|
+ executeListBlobsAndAssertRetries();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void executeListBlobsAndAssertRetries() {
|
|
|
+ final int maxRetries = randomIntBetween(3, 5);
|
|
|
+ final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
|
|
+ expectThrows(StorageException.class, () -> blobContainer.listBlobs(randomPurpose()));
|
|
|
+ assertEquals(maxRetries + 1, requestCounters.get("/storage/v1/b/bucket/o").get());
|
|
|
+ }
|
|
|
+
|
|
|
public void testReadLargeBlobWithRetries() throws Exception {
|
|
|
final int maxRetries = randomIntBetween(2, 10);
|
|
|
final AtomicInteger countDown = new AtomicInteger(maxRetries);
|