Przeglądaj źródła

Add blob container retries tests for Google Cloud Storage (#46968)

Similarly to what has been done for S3 in #45383, this commit 
adds unit tests that verify the behavior of the SDK client and 
blob container implementation for Google Storage when the 
remote service returns errors.

The main purpose was to add an extra test to the specific retry 
logic for 410-Gone errors added in #45963.

Relates #45963
Tanguy Leroux 6 lat temu
rodzic
commit
60619126db

+ 433 - 0
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

@@ -0,0 +1,433 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.repositories.gcs;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.cloud.http.HttpTransportOptions;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import com.sun.net.httpserver.HttpContext;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.http.HttpStatus;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.blobstore.BlobContainer;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
+import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
+import org.elasticsearch.common.network.InetAddresses;
+import org.elasticsearch.common.settings.MockSecureSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.mocksocket.MockHttpServer;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.RestUtils;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.threeten.bp.Duration;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
+import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
+import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
+import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
+import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
+import static org.elasticsearch.repositories.gcs.TestUtils.createServiceAccount;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@SuppressForbidden(reason = "use a http server")
+public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
+
+    private HttpServer httpServer;
+
+    private String httpServerUrl() {
+        assertThat(httpServer, notNullValue());
+        InetSocketAddress address = httpServer.getAddress();
+        return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
+        httpServer.start();
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        httpServer.stop(0);
+        super.tearDown();
+    }
+
+    private BlobContainer createBlobContainer(final int maxRetries, final @Nullable TimeValue readTimeout) {
+        final Settings.Builder clientSettings = Settings.builder();
+        final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
+        clientSettings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace(client).getKey(), httpServerUrl());
+        clientSettings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace(client).getKey(), httpServerUrl() + "/token");
+        if (readTimeout != null) {
+            clientSettings.put(READ_TIMEOUT_SETTING.getConcreteSettingForNamespace(client).getKey(), readTimeout);
+        }
+
+        final MockSecureSettings secureSettings = new MockSecureSettings();
+        secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace(client).getKey(), createServiceAccount(random()));
+        clientSettings.setSecureSettings(secureSettings);
+
+        final GoogleCloudStorageService service = new GoogleCloudStorageService() {
+            @Override
+            StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
+                                                final HttpTransportOptions httpTransportOptions) {
+                StorageOptions options = super.createStorageOptions(clientSettings, httpTransportOptions);
+                return options.toBuilder()
+                    .setRetrySettings(RetrySettings.newBuilder()
+                        .setTotalTimeout(options.getRetrySettings().getTotalTimeout())
+                        .setInitialRetryDelay(Duration.ofMillis(10L))
+                        .setRetryDelayMultiplier(options.getRetrySettings().getRetryDelayMultiplier())
+                        .setMaxRetryDelay(Duration.ofSeconds(1L))
+                        .setMaxAttempts(maxRetries)
+                        .setJittered(false)
+                        .setInitialRpcTimeout(options.getRetrySettings().getInitialRpcTimeout())
+                        .setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier())
+                        .setMaxRpcTimeout(options.getRetrySettings().getMaxRpcTimeout())
+                        .build())
+                    .build();
+            }
+        };
+        service.refreshAndClearCache(GoogleCloudStorageClientSettings.load(clientSettings.build()));
+
+        final List<HttpContext> httpContexts = Arrays.asList(
+            // Auth
+            httpServer.createContext("/token", exchange -> {
+                byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
+                exchange.getResponseHeaders().add("Content-Type", "application/json");
+                exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
+                exchange.getResponseBody().write(response);
+                exchange.close();
+            }),
+            // Does bucket exists?
+            httpServer.createContext("/storage/v1/b/bucket", exchange -> {
+                byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\"bucket\",\"id\":\"0\"}").getBytes(UTF_8);
+                exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
+                exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
+                exchange.getResponseBody().write(response);
+                exchange.close();
+            })
+        );
+
+        final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, service);
+        httpContexts.forEach(httpContext -> httpServer.removeContext(httpContext));
+
+        return new GoogleCloudStorageBlobContainer(BlobPath.cleanPath(), blobStore);
+    }
+
+    public void testReadNonexistentBlobThrowsNoSuchFileException() {
+        final BlobContainer blobContainer = createBlobContainer(between(1, 5), null);
+        final Exception exception = expectThrows(NoSuchFileException.class,
+            () -> Streams.readFully(blobContainer.readBlob("read_nonexistent_blob")));
+        assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob [read_nonexistent_blob] does not exist"));
+    }
+
+    public void testReadBlobWithRetries() throws Exception {
+        final int maxRetries = randomIntBetween(2, 10);
+        final CountDown countDown = new CountDown(maxRetries);
+
+        final byte[] bytes = randomBlobContent();
+        httpServer.createContext("/download/storage/v1/b/bucket/o/read_blob_max_retries", exchange -> {
+            Streams.readFully(exchange.getRequestBody());
+            if (countDown.countDown()) {
+                exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
+                exchange.sendResponseHeaders(RestStatus.OK.getStatus(), bytes.length);
+                exchange.getResponseBody().write(bytes);
+                exchange.close();
+                return;
+            }
+            exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
+            if (randomBoolean()) {
+                exchange.close();
+            }
+        });
+
+        final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 500)));
+        try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
+            assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
+            assertThat(countDown.isCountedDown(), is(true));
+        }
+    }
+
+    public void testReadBlobWithReadTimeouts() {
+        final int maxRetries = randomIntBetween(1, 3);
+        final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 200)));
+
+        // HTTP server does not send a response
+        httpServer.createContext("/download/storage/v1/b/bucket/o/read_blob_unresponsive", exchange -> {});
+
+        StorageException storageException = expectThrows(StorageException.class,
+            () -> Streams.readFully(blobContainer.readBlob("read_blob_unresponsive")));
+        assertThat(storageException.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
+        assertThat(storageException.getCause(), instanceOf(SocketTimeoutException.class));
+
+        // HTTP server sends a partial response
+        final byte[] bytes = randomBlobContent();
+        httpServer.createContext("/download/storage/v1/b/bucket/o/read_blob_incomplete", exchange -> {
+            exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
+            exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
+            final int bytesToSend = randomIntBetween(0, bytes.length - 1);
+            if (bytesToSend > 0) {
+                exchange.getResponseBody().write(bytes, 0, bytesToSend);
+            }
+            if (randomBoolean()) {
+                exchange.getResponseBody().flush();
+            }
+        });
+
+        storageException = expectThrows(StorageException.class, () -> {
+            try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) {
+                Streams.readFully(stream);
+            }
+        });
+        assertThat(storageException.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
+        assertThat(storageException.getCause(), instanceOf(SocketTimeoutException.class));
+    }
+
+    public void testWriteBlobWithRetries() throws Exception {
+        final int maxRetries = randomIntBetween(2, 10);
+        final CountDown countDown = new CountDown(maxRetries);
+
+        final byte[] bytes = randomBlobContent();
+        httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
+            assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
+            if (countDown.countDown()) {
+                Optional<Tuple<String, BytesArray>> content = TestUtils.parseMultipartRequestBody(exchange.getRequestBody());
+                assertThat(content.isPresent(), is(true));
+                assertThat(content.get().v1(), equalTo("write_blob_max_retries"));
+                if (Objects.deepEquals(bytes, content.get().v2().array())) {
+                    byte[] response = ("{\"bucket\":\"bucket\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8);
+                    exchange.getResponseHeaders().add("Content-Type", "application/json");
+                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+                    exchange.getResponseBody().write(response);
+                } else {
+                    exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1);
+                }
+                exchange.close();
+                return;
+            }
+            if (randomBoolean()) {
+                if (randomBoolean()) {
+                    Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))]);
+                } else {
+                    Streams.readFully(exchange.getRequestBody());
+                    exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
+                }
+            }
+            exchange.close();
+        });
+
+        final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
+        try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
+            blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false);
+        }
+        assertThat(countDown.isCountedDown(), is(true));
+    }
+
+    public void testWriteBlobWithReadTimeouts() {
+        final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
+        final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
+        final BlobContainer blobContainer = createBlobContainer(1, readTimeout);
+
+        // HTTP server does not send a response
+        httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
+            if (randomBoolean()) {
+                if (randomBoolean()) {
+                    Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]);
+                } else {
+                    Streams.readFully(exchange.getRequestBody());
+                }
+            }
+        });
+
+        Exception exception = expectThrows(StorageException.class, () -> {
+            try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
+                blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false);
+            }
+        });
+        assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
+
+        assertThat(exception.getCause(), instanceOf(SocketTimeoutException.class));
+        assertThat(exception.getCause().getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
+    }
+
+    public void testWriteLargeBlob() throws IOException {
+        // See {@link BaseWriteChannel#DEFAULT_CHUNK_SIZE}
+        final int defaultChunkSize = 8 * 256 * 1024;
+        final int nbChunks = randomIntBetween(3, 5);
+        final int lastChunkSize = randomIntBetween(1, defaultChunkSize - 1);
+        final int totalChunks = nbChunks + 1;
+        final byte[] data = randomBytes(defaultChunkSize * nbChunks + lastChunkSize);
+        assertThat(data.length, greaterThan(GoogleCloudStorageBlobStore.LARGE_BLOB_THRESHOLD_BYTE_SIZE));
+
+        logger.debug("resumable upload is composed of [{}] total chunks ([{}] chunks of length [{}] and last chunk of length [{}]",
+            totalChunks, nbChunks, defaultChunkSize, lastChunkSize);
+
+        final int nbErrors = 2; // we want all requests to fail at least once
+        final AtomicInteger countInits = new AtomicInteger(nbErrors);
+        final AtomicInteger countUploads = new AtomicInteger(nbErrors * totalChunks);
+        final AtomicBoolean allow410Gone = new AtomicBoolean(randomBoolean());
+        final AtomicBoolean allowReadTimeout = new AtomicBoolean(rarely());
+        final int wrongChunk = randomIntBetween(1, totalChunks);
+
+        final AtomicReference<String> sessionUploadId = new AtomicReference<>(UUIDs.randomBase64UUID());
+        logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());
+
+        httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
+            final Map<String, String> params = new HashMap<>();
+            RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
+            assertThat(params.get("uploadType"), equalTo("resumable"));
+
+            if ("POST".equals(exchange.getRequestMethod())) {
+                assertThat(params.get("name"), equalTo("write_large_blob"));
+                if (countInits.decrementAndGet() <= 0) {
+                    byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
+                    exchange.getResponseHeaders().add("Content-Type", "application/json");
+                    exchange.getResponseHeaders().add("Location", httpServerUrl() +
+                        "/upload/storage/v1/b/bucket/o?uploadType=resumable&upload_id=" + sessionUploadId.get());
+                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+                    exchange.getResponseBody().write(response);
+                    exchange.close();
+                    return;
+                }
+                if (allowReadTimeout.get()) {
+                    assertThat(wrongChunk, greaterThan(0));
+                    return;
+                }
+
+            } else if ("PUT".equals(exchange.getRequestMethod())) {
+                final String uploadId = params.get("upload_id");
+                if (uploadId.equals(sessionUploadId.get()) == false) {
+                    logger.debug("session id [{}] is gone", uploadId);
+                    assertThat(wrongChunk, greaterThan(0));
+                    Streams.readFully(exchange.getRequestBody());
+                    exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1);
+                    exchange.close();
+                    return;
+                }
+
+                if (countUploads.get() == (wrongChunk * nbErrors)) {
+                    if (allowReadTimeout.compareAndSet(true, false)) {
+                        assertThat(wrongChunk, greaterThan(0));
+                        return;
+                    }
+                    if (allow410Gone.compareAndSet(true, false)) {
+                        final String newUploadId = UUIDs.randomBase64UUID(random());
+                        logger.debug("chunk [{}] gone, updating session ids [{} -> {}]", wrongChunk, sessionUploadId.get(), newUploadId);
+                        sessionUploadId.set(newUploadId);
+
+                        // we must reset the counters because the whole object upload will be retried
+                        countInits.set(nbErrors);
+                        countUploads.set(nbErrors * totalChunks);
+
+                        Streams.readFully(exchange.getRequestBody());
+                        exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1);
+                        exchange.close();
+                        return;
+                    }
+                }
+
+                final String range = exchange.getRequestHeaders().getFirst("Content-Range");
+                assertTrue(Strings.hasLength(range));
+
+                if (countUploads.decrementAndGet() % 2 == 0) {
+                    final ByteArrayOutputStream requestBody = new ByteArrayOutputStream();
+                    final long bytesRead = Streams.copy(exchange.getRequestBody(), requestBody);
+                    assertThat(Math.toIntExact(bytesRead), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize)));
+
+                    final int rangeStart = TestUtils.getContentRangeStart(range);
+                    final int rangeEnd = TestUtils.getContentRangeEnd(range);
+                    assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(bytesRead)));
+                    assertArrayEquals(Arrays.copyOfRange(data, rangeStart, rangeEnd + 1), requestBody.toByteArray());
+
+                    final Integer limit = TestUtils.getContentRangeLimit(range);
+                    if (limit != null) {
+                        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
+                        exchange.close();
+                        return;
+                    } else {
+                        exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", rangeStart, rangeEnd));
+                        exchange.getResponseHeaders().add("Content-Length", "0");
+                        exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
+                        exchange.close();
+                        return;
+                    }
+                }
+            }
+
+            // read all the request body, otherwise the SDK client throws a non-retryable StorageException
+            Streams.readFully(exchange.getRequestBody());
+            if (randomBoolean()) {
+                exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
+            }
+            exchange.close();
+        });
+
+        final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
+
+        final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout);
+        try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
+            blobContainer.writeBlob("write_large_blob", stream, data.length, false);
+        }
+
+        assertThat(countInits.get(), equalTo(0));
+        assertThat(countUploads.get(), equalTo(0));
+        assertThat(allow410Gone.get(), is(false));
+    }
+
+    private static byte[] randomBlobContent() {
+        return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
+    }
+}

+ 37 - 125
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

@@ -31,6 +31,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.MockSecureSettings;
@@ -38,8 +39,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.repositories.Repository;
@@ -53,9 +52,6 @@ import java.io.BufferedInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.URLDecoder;
-import java.security.KeyPairGenerator;
-import java.util.Arrays;
-import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -63,13 +59,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.UUID;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
@@ -77,6 +72,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSetting
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
 @SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
 public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
@@ -118,7 +114,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
     @Override
     protected Settings nodeSettings(int nodeOrdinal) {
         if (serviceAccount == null) {
-            serviceAccount = createServiceAccount();
+            serviceAccount = TestUtils.createServiceAccount(random());
         }
 
         final Settings.Builder settings = Settings.builder();
@@ -217,31 +213,6 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
         }
     }
 
-    private static byte[] createServiceAccount() {
-        try {
-            final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
-            keyPairGenerator.initialize(1024);
-            final String privateKey = Base64.getEncoder().encodeToString(keyPairGenerator.generateKeyPair().getPrivate().getEncoded());
-
-            final ByteArrayOutputStream out = new ByteArrayOutputStream();
-            try (XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), out)) {
-                builder.startObject();
-                {
-                    builder.field("type", "service_account");
-                    builder.field("project_id", getTestClass().getName().toLowerCase(Locale.ROOT));
-                    builder.field("private_key_id", UUID.randomUUID().toString());
-                    builder.field("private_key", "-----BEGIN PRIVATE KEY-----\n" + privateKey + "\n-----END PRIVATE KEY-----\n");
-                    builder.field("client_email", "elastic@appspot.gserviceaccount.com");
-                    builder.field("client_id", String.valueOf(randomNonNegativeLong()));
-                }
-                builder.endObject();
-            }
-            return out.toByteArray();
-        } catch (Exception e) {
-            throw new AssertionError("Unable to create service account file", e);
-        }
-    }
-
     /**
      * Minimal HTTP handler that acts as a Google Cloud Storage compliant server
      */
@@ -338,65 +309,16 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
                     exchange.getResponseBody().write(response);
 
                 } else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=multipart*", request)) {
-                    try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(exchange.getRequestBody()))) {
-                        byte[] response = new byte[0];
-                        String blob = null;
-                        int read;
-                        while ((read = in.read()) != -1) {
-                            boolean markAndContinue = false;
-                            try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                                do { // search next consecutive {carriage return, new line} chars and stop
-                                    if ((char) read == '\r') {
-                                        int next = in.read();
-                                        if (next != -1) {
-                                            if (next == '\n') {
-                                                break;
-                                            }
-                                            out.write(read);
-                                            out.write(next);
-                                            continue;
-                                        }
-                                    }
-                                    out.write(read);
-                                } while ((read = in.read()) != -1);
-
-                                final String line = new String(out.toByteArray(), UTF_8);
-                                if (line.length() == 0 || line.equals("\r\n") || line.startsWith("--")
-                                    || line.toLowerCase(Locale.ROOT).startsWith("content")) {
-                                    markAndContinue = true;
-                                } else if (line.startsWith("{\"bucket\":\"bucket\"")) {
-                                    markAndContinue = true;
-                                    Matcher matcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
-                                    if (matcher.find()) {
-                                        blob = matcher.group(1);
-                                        response = line.getBytes(UTF_8);
-                                    }
-                                }
-                                if (markAndContinue) {
-                                    in.mark(Integer.MAX_VALUE);
-                                    continue;
-                                }
-                            }
-                            if (blob != null) {
-                                in.reset();
-                                try (ByteArrayOutputStream binary = new ByteArrayOutputStream()) {
-                                    while ((read = in.read()) != -1) {
-                                        binary.write(read);
-                                    }
-                                    binary.flush();
-                                    byte[] tmp = binary.toByteArray();
-                                    // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
-                                    blobs.put(blob, new BytesArray(Arrays.copyOf(tmp, tmp.length - 23)));
-
-                                    exchange.getResponseHeaders().add("Content-Type", "application/json");
-                                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
-                                    exchange.getResponseBody().write(response);
-
-                                } finally {
-                                    blob = null;
-                                }
-                            }
-                        }
+                    Optional<Tuple<String, BytesArray>> content = TestUtils.parseMultipartRequestBody(exchange.getRequestBody());
+                    if (content.isPresent()) {
+                        blobs.put(content.get().v1(), content.get().v2());
+
+                        byte[] response = ("{\"bucket\":\"bucket\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8);
+                        exchange.getResponseHeaders().add("Content-Type", "application/json");
+                        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+                        exchange.getResponseBody().write(response);
+                    } else {
+                        exchange.sendResponseHeaders(RestStatus.BAD_REQUEST.getStatus(), -1);
                     }
 
                 } else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=resumable*", request)) {
@@ -419,41 +341,31 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
                     RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
 
                     final String blobName = params.get("test_blob_name");
-                    final String range = exchange.getRequestHeaders().getFirst("Content-Range");
-                    assert Strings.hasLength(range);
-
-                    Matcher matcher = Pattern.compile("bytes ([^/]*)/([0-9\\*]*)").matcher(range);
-                    if (matcher.find()) {
-                        String bytes = matcher.group(1);
-                        String limit = matcher.group(2);
-                        byte[] blob = blobs.get(blobName).array();
-                        assert blob != null;
-                        // client is uploading a chunk
-                        matcher = Pattern.compile("([0-9]*)-([0-9]*)").matcher(bytes);
-                        assert matcher.find();
+                    byte[] blob = blobs.get(blobName).array();
+                    assertNotNull(blob);
 
-                        int end = Integer.parseInt(matcher.group(2));
-                        int start = Integer.parseInt(matcher.group(1));
+                    final String range = exchange.getRequestHeaders().getFirst("Content-Range");
+                    final Integer limit = TestUtils.getContentRangeLimit(range);
+                    final int start = TestUtils.getContentRangeStart(range);
+                    final int end = TestUtils.getContentRangeEnd(range);
+
+                    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+                    long bytesRead = Streams.copy(exchange.getRequestBody(), out);
+                    int length = Math.max(end + 1, limit != null ? limit : 0);
+                    assertThat((int) bytesRead, lessThanOrEqualTo(length));
+                    if (length > blob.length) {
+                        blob = ArrayUtil.growExact(blob, length);
+                    }
+                    System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(bytesRead));
+                    blobs.put(blobName, new BytesArray(blob));
 
-                        final ByteArrayOutputStream out = new ByteArrayOutputStream();
-                        long count = Streams.copy(exchange.getRequestBody(), out);
-                        int length = Math.max(end + 1, "*".equals(limit) ? 0 : Integer.parseInt(limit));
-                        assert count <= length;
-                        if (length > blob.length) {
-                            blob = ArrayUtil.growExact(blob, length);
-                        }
-                        assert blob.length >= end;
-                        System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(count));
-                        blobs.put(blobName, new BytesArray(blob));
-
-                        if ("*".equals(limit)) {
-                            exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end));
-                            exchange.getResponseHeaders().add("Content-Length", "0");
-                            exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
-                        } else {
-                            assert blob.length == Integer.parseInt(limit);
-                            exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
-                        }
+                    if (limit == null) {
+                        exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end));
+                        exchange.getResponseHeaders().add("Content-Length", "0");
+                        exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
+                    } else {
+                        assertThat(limit, lessThanOrEqualTo(blob.length));
+                        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
                     }
                 } else {
                     exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);

+ 158 - 0
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/TestUtils.java

@@ -0,0 +1,158 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.repositories.gcs;
+
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyPairGenerator;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.GZIPInputStream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+final class TestUtils {
+
+    private TestUtils() {}
+
+    /**
+     * Creates a random Service Account file for testing purpose
+     */
+    static byte[] createServiceAccount(final Random random) {
+        try {
+            final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
+            keyPairGenerator.initialize(1024);
+            final String privateKey = Base64.getEncoder().encodeToString(keyPairGenerator.generateKeyPair().getPrivate().getEncoded());
+
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+            try (XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), out)) {
+                builder.startObject();
+                {
+                    builder.field("type", "service_account");
+                    builder.field("project_id", "test");
+                    builder.field("private_key_id", UUID.randomUUID().toString());
+                    builder.field("private_key", "-----BEGIN PRIVATE KEY-----\n" + privateKey + "\n-----END PRIVATE KEY-----\n");
+                    builder.field("client_email", "elastic@appspot.gserviceaccount.com");
+                    builder.field("client_id", String.valueOf(Math.abs(random.nextLong())));
+                }
+                builder.endObject();
+            }
+            return out.toByteArray();
+        } catch (Exception e) {
+            throw new AssertionError("Unable to create service account file", e);
+        }
+    }
+
+    static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
+        Tuple<String, BytesArray> content = null;
+        try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) {
+            String name = null;
+            int read;
+            while ((read = in.read()) != -1) {
+                boolean markAndContinue = false;
+                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                    do { // search next consecutive {carriage return, new line} chars and stop
+                        if ((char) read == '\r') {
+                            int next = in.read();
+                            if (next != -1) {
+                                if (next == '\n') {
+                                    break;
+                                }
+                                out.write(read);
+                                out.write(next);
+                                continue;
+                            }
+                        }
+                        out.write(read);
+                    } while ((read = in.read()) != -1);
+
+                    final String line = new String(out.toByteArray(), UTF_8);
+                    if (line.length() == 0 || line.equals("\r\n") || line.startsWith("--")
+                        || line.toLowerCase(Locale.ROOT).startsWith("content")) {
+                        markAndContinue = true;
+                    } else if (line.startsWith("{\"bucket\":\"bucket\"")) {
+                        markAndContinue = true;
+                        Matcher matcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
+                        if (matcher.find()) {
+                            name = matcher.group(1);
+                        }
+                    }
+                    if (markAndContinue) {
+                        in.mark(Integer.MAX_VALUE);
+                        continue;
+                    }
+                }
+                if (name != null) {
+                    in.reset();
+                    try (ByteArrayOutputStream binary = new ByteArrayOutputStream()) {
+                        while ((read = in.read()) != -1) {
+                            binary.write(read);
+                        }
+                        binary.flush();
+                        byte[] tmp = binary.toByteArray();
+                        // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
+                        content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(tmp, tmp.length - 23)));
+                    }
+                }
+            }
+        }
+        return Optional.ofNullable(content);
+    }
+
+    private static final Pattern PATTERN_CONTENT_RANGE = Pattern.compile("bytes ([^/]*)/([0-9\\*]*)");
+    private static final Pattern PATTERN_CONTENT_RANGE_BYTES = Pattern.compile("([0-9]*)-([0-9]*)");
+
+    private static Integer parse(final Pattern pattern, final String contentRange, final BiFunction<String, String, Integer> fn) {
+        final Matcher matcher = pattern.matcher(contentRange);
+        if (matcher.matches() == false || matcher.groupCount() != 2) {
+            throw new IllegalArgumentException("Unable to parse content range header");
+        }
+        return fn.apply(matcher.group(1), matcher.group(2));
+    }
+
+    static Integer getContentRangeLimit(final String contentRange) {
+        return parse(PATTERN_CONTENT_RANGE, contentRange, (bytes, limit) -> "*".equals(limit) ? null : Integer.parseInt(limit));
+    }
+
+    static int getContentRangeStart(final String contentRange) {
+        return parse(PATTERN_CONTENT_RANGE, contentRange,
+            (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
+                (start, end) -> Integer.parseInt(start)));
+    }
+
+    static int getContentRangeEnd(final String contentRange) {
+        return parse(PATTERN_CONTENT_RANGE, contentRange,
+            (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
+                (start, end) -> Integer.parseInt(end)));
+    }
+}