|  | @@ -19,6 +19,9 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  package org.elasticsearch.repositories.gcs;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import com.google.api.gax.retrying.RetrySettings;
 | 
	
		
			
				|  |  | +import com.google.cloud.http.HttpTransportOptions;
 | 
	
		
			
				|  |  | +import com.google.cloud.storage.StorageOptions;
 | 
	
		
			
				|  |  |  import com.sun.net.httpserver.HttpExchange;
 | 
	
		
			
				|  |  |  import com.sun.net.httpserver.HttpHandler;
 | 
	
		
			
				|  |  |  import com.sun.net.httpserver.HttpServer;
 | 
	
	
		
			
				|  | @@ -46,6 +49,7 @@ import org.junit.After;
 | 
	
		
			
				|  |  |  import org.junit.AfterClass;
 | 
	
		
			
				|  |  |  import org.junit.Before;
 | 
	
		
			
				|  |  |  import org.junit.BeforeClass;
 | 
	
		
			
				|  |  | +import org.threeten.bp.Duration;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import java.io.BufferedInputStream;
 | 
	
		
			
				|  |  |  import java.io.ByteArrayOutputStream;
 | 
	
	
		
			
				|  | @@ -66,6 +70,7 @@ import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.UUID;
 | 
	
		
			
				|  |  |  import java.util.concurrent.ConcurrentHashMap;
 | 
	
		
			
				|  |  |  import java.util.concurrent.ConcurrentMap;
 | 
	
		
			
				|  |  | +import java.util.concurrent.atomic.AtomicInteger;
 | 
	
		
			
				|  |  |  import java.util.regex.Matcher;
 | 
	
		
			
				|  |  |  import java.util.regex.Pattern;
 | 
	
		
			
				|  |  |  import java.util.stream.Collectors;
 | 
	
	
		
			
				|  | @@ -82,18 +87,24 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CL
 | 
	
		
			
				|  |  |  public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private static HttpServer httpServer;
 | 
	
		
			
				|  |  | +    private static boolean randomServerErrors;
 | 
	
		
			
				|  |  |      private static byte[] serviceAccount;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @BeforeClass
 | 
	
		
			
				|  |  |      public static void startHttpServer() throws Exception {
 | 
	
		
			
				|  |  |          httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
 | 
	
		
			
				|  |  |          httpServer.start();
 | 
	
		
			
				|  |  | +        randomServerErrors = randomBoolean();
 | 
	
		
			
				|  |  |          serviceAccount = createServiceAccount();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Before
 | 
	
		
			
				|  |  |      public void setUpHttpServer() {
 | 
	
		
			
				|  |  | -        httpServer.createContext("/", new InternalHttpHandler());
 | 
	
		
			
				|  |  | +        HttpHandler handler = new InternalHttpHandler();
 | 
	
		
			
				|  |  | +        if (randomServerErrors) {
 | 
	
		
			
				|  |  | +            handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        httpServer.createContext("/", handler);
 | 
	
		
			
				|  |  |          httpServer.createContext("/token", new FakeOAuth2HttpHandler());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -125,7 +136,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Override
 | 
	
		
			
				|  |  |      protected Collection<Class<? extends Plugin>> nodePlugins() {
 | 
	
		
			
				|  |  | -        return Collections.singletonList(GoogleCloudStoragePlugin.class);
 | 
	
		
			
				|  |  | +        return Collections.singletonList(TestGoogleCloudStoragePlugin.class);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Override
 | 
	
	
		
			
				|  | @@ -183,6 +194,39 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
 | 
	
		
			
				|  |  |          assertEquals("failed to parse value [101mb] for setting [chunk_size], must be <= [100mb]", e.getMessage());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * GoogleCloudStoragePlugin that allows to set low values for the client retry policy
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    public static class TestGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public TestGoogleCloudStoragePlugin(Settings settings) {
 | 
	
		
			
				|  |  | +            super(settings);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        @Override
 | 
	
		
			
				|  |  | +        protected GoogleCloudStorageService createStorageService() {
 | 
	
		
			
				|  |  | +            if (randomServerErrors) {
 | 
	
		
			
				|  |  | +                return new GoogleCloudStorageService() {
 | 
	
		
			
				|  |  | +                    @Override
 | 
	
		
			
				|  |  | +                    StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
 | 
	
		
			
				|  |  | +                                                        final HttpTransportOptions httpTransportOptions) {
 | 
	
		
			
				|  |  | +                        return super.createStorageOptions(clientSettings, httpTransportOptions)
 | 
	
		
			
				|  |  | +                            .toBuilder()
 | 
	
		
			
				|  |  | +                            .setRetrySettings(RetrySettings.newBuilder()
 | 
	
		
			
				|  |  | +                                .setMaxAttempts(10)
 | 
	
		
			
				|  |  | +                                .setInitialRetryDelay(Duration.ofMillis(10L))
 | 
	
		
			
				|  |  | +                                .setRetryDelayMultiplier(2.0d)
 | 
	
		
			
				|  |  | +                                .setMaxRetryDelay(Duration.ofSeconds(1L))
 | 
	
		
			
				|  |  | +                                .setTotalTimeout(Duration.ofSeconds(30L))
 | 
	
		
			
				|  |  | +                                .build())
 | 
	
		
			
				|  |  | +                            .build();
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                };
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            return super.createStorageService();
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      private static byte[] createServiceAccount() throws Exception {
 | 
	
		
			
				|  |  |          final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
 | 
	
		
			
				|  |  |          keyPairGenerator.initialize(1024);
 | 
	
	
		
			
				|  | @@ -376,4 +420,53 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
 | 
	
		
			
				|  |  |              exchange.close();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * HTTP handler that injects random  Google Cloud Storage service errors
 | 
	
		
			
				|  |  | +     *
 | 
	
		
			
				|  |  | +     * Note: it is not a good idea to allow this handler to simulate too many errors as it would
 | 
	
		
			
				|  |  | +     * slow down the test suite.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    @SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
 | 
	
		
			
				|  |  | +    private static class ErroneousHttpHandler implements HttpHandler {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // first key is the remote address, second key is the HTTP request unique id provided by the AWS SDK client,
 | 
	
		
			
				|  |  | +        // value is the number of times the request has been seen
 | 
	
		
			
				|  |  | +        private final Map<String, AtomicInteger> requests;
 | 
	
		
			
				|  |  | +        private final HttpHandler delegate;
 | 
	
		
			
				|  |  | +        private final int maxErrorsPerRequest;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
 | 
	
		
			
				|  |  | +            this.requests = new ConcurrentHashMap<>();
 | 
	
		
			
				|  |  | +            this.delegate = delegate;
 | 
	
		
			
				|  |  | +            this.maxErrorsPerRequest = maxErrorsPerRequest;
 | 
	
		
			
				|  |  | +            assert maxErrorsPerRequest > 1;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        @Override
 | 
	
		
			
				|  |  | +        public void handle(final HttpExchange exchange) throws IOException {
 | 
	
		
			
				|  |  | +            final String requestId = exchange.getRemoteAddress().toString()
 | 
	
		
			
				|  |  | +                + " " + exchange.getRequestMethod()
 | 
	
		
			
				|  |  | +                + " " + exchange.getRequestURI();
 | 
	
		
			
				|  |  | +            assert Strings.hasText(requestId);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // Batch requests are not retried so we don't want to fail them
 | 
	
		
			
				|  |  | +            // The batched request are supposed to be retried (not tested here)
 | 
	
		
			
				|  |  | +            final boolean noError = exchange.getRequestURI().toString().startsWith("/batch/") || randomBoolean();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
 | 
	
		
			
				|  |  | +            if (count >= maxErrorsPerRequest || noError) {
 | 
	
		
			
				|  |  | +                requests.remove(requestId);
 | 
	
		
			
				|  |  | +                delegate.handle(exchange);
 | 
	
		
			
				|  |  | +            } else {
 | 
	
		
			
				|  |  | +                handleAsError(exchange);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private void handleAsError(final HttpExchange exchange) throws IOException {
 | 
	
		
			
				|  |  | +            Streams.readFully(exchange.getRequestBody());
 | 
	
		
			
				|  |  | +            exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
 | 
	
		
			
				|  |  | +            exchange.close();
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |