Browse Source

Add docker-composed based test fixture for GCS (#48762)

Similarly to what has be done for Azure in #48636, this commit 
adds a new :test:fixtures:gcs-fixture project which provides two 
docker-compose based fixtures that emulate a Google Cloud 
Storage service.

Some code has been extracted from existing tests and placed 
into this new project so that it can be easily reused in other 
projects.
Tanguy Leroux 6 years ago
parent
commit
625c00ddd6

+ 2 - 0
plugins/repository-gcs/build.gradle

@@ -52,6 +52,8 @@ dependencies {
   compile 'io.opencensus:opencensus-api:0.18.0'
   compile 'io.opencensus:opencensus-contrib-http-util:0.18.0'
   compile 'com.google.apis:google-api-services-storage:v1-rev20190426-1.28.0'
+
+  testCompile project(':test:fixtures:gcs-fixture')
 }
 
 dependencyLicenses {

+ 24 - 37
plugins/repository-gcs/qa/google-cloud-storage/build.gradle

@@ -20,7 +20,6 @@
 
 import org.elasticsearch.gradle.MavenFilteringHack
 import org.elasticsearch.gradle.info.BuildParams
-import org.elasticsearch.gradle.test.AntFixture
 
 import java.nio.file.Files
 import java.security.KeyPair
@@ -30,12 +29,14 @@ import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
 
 apply plugin: 'elasticsearch.standalone-rest-test'
 apply plugin: 'elasticsearch.rest-test'
+apply plugin: 'elasticsearch.test.fixtures'
 
 // TODO think about flattening qa:google-cloud-storage project into parent
 dependencies {
     testCompile project(path: ':plugins:repository-gcs')
 }
 
+testFixtures.useFixture(':test:fixtures:gcs-fixture')
 boolean useFixture = false
 
 String gcsServiceAccount = System.getenv("google_storage_service_account")
@@ -45,7 +46,7 @@ String gcsBasePath = System.getenv("google_storage_base_path")
 File serviceAccountFile = null
 if (!gcsServiceAccount && !gcsBucket && !gcsBasePath) {
     serviceAccountFile = new File(project.buildDir, 'generated-resources/service_account_test.json')
-    gcsBucket = 'bucket_test'
+    gcsBucket = 'bucket'
     gcsBasePath = 'integration_test'
     useFixture = true
 } else if (!gcsServiceAccount || !gcsBucket || !gcsBasePath) {
@@ -58,12 +59,11 @@ def encodedCredentials = {
     Base64.encoder.encodeToString(Files.readAllBytes(serviceAccountFile.toPath()))
 }
 
-/** A task to start the GoogleCloudStorageFixture which emulates a Google Cloud Storage service **/
-task googleCloudStorageFixture(type: AntFixture) {
-    dependsOn testClasses
-    env 'CLASSPATH', "${ -> project.sourceSets.test.runtimeClasspath.asPath }"
-    executable = "${BuildParams.runtimeJavaHome}/bin/java"
-    args 'org.elasticsearch.repositories.gcs.GoogleCloudStorageFixture', baseDir, 'bucket_test'
+def fixtureAddress = { fixture ->
+    assert useFixture : 'closure should not be used without a fixture'
+    int ephemeralPort = project(':test:fixtures:gcs-fixture').postProcessFixture.ext."test.fixtures.${fixture}.tcp.80"
+    assert ephemeralPort > 0
+    'http://127.0.0.1:' + ephemeralPort
 }
 
 /** A service account file that points to the Google Cloud Storage service emulated by the fixture **/
@@ -87,6 +87,19 @@ task createServiceAccountFile() {
 }
 
 task thirdPartyTest (type: Test) {
+    if (useFixture) {
+        thirdPartyTest.dependsOn createServiceAccountFile
+        nonInputProperties.systemProperty 'test.google.endpoint', "${ -> fixtureAddress('gcs-fixture-third-party') }"
+        nonInputProperties.systemProperty 'test.google.tokenURI', "${ -> fixtureAddress('gcs-fixture-third-party') }/o/oauth2/token"
+
+        gradle.taskGraph.whenReady {
+            if (it.hasTask(gcsThirdPartyTests)) {
+                throw new IllegalStateException("Tried to run third party tests but not all of the necessary environment variables " +
+                        "'google_storage_service_account', 'google_storage_bucket', 'google_storage_base_path' are set.")
+            }
+        }
+    }
+
     include '**/GoogleCloudStorageThirdPartyTests.class'
     systemProperty 'tests.security.manager', false
     systemProperty 'test.google.bucket', gcsBucket
@@ -98,32 +111,6 @@ task gcsThirdPartyTests {
     dependsOn check
 }
 
-if (useFixture) {
-    // TODO think about running the fixture in the same JVM as tests
-    thirdPartyTest.dependsOn createServiceAccountFile, googleCloudStorageFixture
-    thirdPartyTest.finalizedBy googleCloudStorageFixture.getStopTask()
-
-    def fixtureEndpoint = {
-        "http://${googleCloudStorageFixture.addressAndPort}"
-    }
-
-    def tokenURI = {
-        "http://${googleCloudStorageFixture.addressAndPort}/o/oauth2/token"
-    }
-
-    thirdPartyTest {
-        nonInputProperties.systemProperty 'test.google.endpoint', "${ -> fixtureEndpoint.call() }"
-        nonInputProperties.systemProperty 'test.google.tokenURI', "${ -> tokenURI.call() }"
-    }
-
-    gradle.taskGraph.whenReady {
-        if (it.hasTask(gcsThirdPartyTests)) {
-            throw new IllegalStateException("Tried to run third party tests but not all of the necessary environment variables 'google_storage_service_account', " +
-                    "'google_storage_bucket', 'google_storage_base_path' are set.")
-        }
-    }
-}
-
 integTest.mustRunAfter(thirdPartyTest)
 check.dependsOn thirdPartyTest
 
@@ -147,10 +134,10 @@ testClusters.integTest {
     keystore 'gcs.client.integration_test.credentials_file', serviceAccountFile, IGNORE_VALUE
 
     if (useFixture) {
-        tasks.integTest.dependsOn createServiceAccountFile, googleCloudStorageFixture
+        tasks.integTest.dependsOn createServiceAccountFile
         /* Use a closure on the string to delay evaluation until tests are executed */
-        setting 'gcs.client.integration_test.endpoint', { "http://${googleCloudStorageFixture.addressAndPort}" }, IGNORE_VALUE
-        setting 'gcs.client.integration_test.token_uri', { "http://${googleCloudStorageFixture.addressAndPort}/o/oauth2/token" }, IGNORE_VALUE
+        setting 'gcs.client.integration_test.endpoint', { "${ -> fixtureAddress('gcs-fixture') }" }, IGNORE_VALUE
+        setting 'gcs.client.integration_test.token_uri', { "${ -> fixtureAddress('gcs-fixture') }/o/oauth2/token" }, IGNORE_VALUE
     } else {
         println "Using an external service to test the repository-gcs plugin"
     }

+ 0 - 656
plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java

@@ -1,656 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.repositories.gcs;
-
-import org.elasticsearch.test.fixture.AbstractHttpFixture;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.path.PathTrie;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.rest.RestUtils;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.GZIPInputStream;
-
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.singletonList;
-import static java.util.Collections.singletonMap;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
-/**
- * {@link GoogleCloudStorageFixture} emulates a Google Cloud Storage service.
- *
- * The implementation is based on official documentation available at https://cloud.google.com/storage/docs/json_api/v1/.
- */
-public class GoogleCloudStorageFixture extends AbstractHttpFixture {
-
-    /** List of the buckets stored on this test server **/
-    private final Map<String, Bucket> buckets = ConcurrentCollections.newConcurrentMap();
-
-    /** Request handlers for the requests made by the Google Cloud Storage client **/
-    private final PathTrie<RequestHandler> handlers;
-
-    /**
-     * Creates a {@link GoogleCloudStorageFixture}
-     */
-    private GoogleCloudStorageFixture(final String workingDir, final String bucket) {
-        super(workingDir);
-        this.buckets.put(bucket, new Bucket(bucket));
-        this.handlers = defaultHandlers(buckets);
-    }
-
-    @Override
-    protected Response handle(final Request request) throws IOException {
-        final RequestHandler handler = handlers.retrieve(request.getMethod() + " " + request.getPath(), request.getParameters());
-        if (handler != null) {
-            return handler.handle(request);
-        }
-        return null;
-    }
-
-    public static void main(final String[] args) throws Exception {
-        if (args == null || args.length != 2) {
-            throw new IllegalArgumentException("GoogleCloudStorageFixture <working directory> <bucket>");
-        }
-
-        final GoogleCloudStorageFixture fixture = new GoogleCloudStorageFixture(args[0], args[1]);
-        fixture.listen();
-    }
-
-    /** Builds the default request handlers **/
-    private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> buckets) {
-        final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
-
-        // GET Bucket
-        //
-        // https://cloud.google.com/storage/docs/json_api/v1/buckets/get
-        handlers.insert("GET /storage/v1/b/{bucket}", (request) -> {
-            final String name = request.getParam("bucket");
-            if (Strings.hasText(name) == false) {
-                return newError(RestStatus.INTERNAL_SERVER_ERROR, "bucket name is missing");
-            }
-
-            if (buckets.containsKey(name)) {
-                return newResponse(RestStatus.OK, emptyMap(), buildBucketResource(name));
-            } else {
-                return newError(RestStatus.NOT_FOUND, "bucket not found");
-            }
-        });
-
-        // GET Object
-        //
-        // https://cloud.google.com/storage/docs/json_api/v1/objects/get
-        handlers.insert("GET /storage/v1/b/{bucket}/o/{object}", (request) -> {
-            final String objectName = request.getParam("object");
-            if (Strings.hasText(objectName) == false) {
-                return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
-            }
-
-            final Bucket bucket = buckets.get(request.getParam("bucket"));
-            if (bucket == null) {
-                return newError(RestStatus.NOT_FOUND, "bucket not found");
-            }
-
-            for (final Map.Entry<String, Item> object : bucket.objects.entrySet()) {
-                if (object.getKey().equals(objectName)) {
-                    return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, object.getValue()));
-                }
-            }
-            return newError(RestStatus.NOT_FOUND, "object not found");
-        });
-
-        // Delete Object
-        //
-        // https://cloud.google.com/storage/docs/json_api/v1/objects/delete
-        handlers.insert("DELETE /storage/v1/b/{bucket}/o/{object}", (request) -> {
-            final String objectName = request.getParam("object");
-            if (Strings.hasText(objectName) == false) {
-                return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
-            }
-
-            final Bucket bucket = buckets.get(request.getParam("bucket"));
-            if (bucket == null) {
-                return newError(RestStatus.NOT_FOUND, "bucket not found");
-            }
-
-            final Item item = bucket.objects.remove(objectName);
-            if (item != null) {
-                return new Response(RestStatus.NO_CONTENT.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
-            }
-            return newError(RestStatus.NOT_FOUND, "object not found");
-        });
-
-        // Insert Object (initialization)
-        //
-        // https://cloud.google.com/storage/docs/json_api/v1/objects/insert
-        handlers.insert("POST /upload/storage/v1/b/{bucket}/o", (request) -> {
-            final String ifGenerationMatch = request.getParam("ifGenerationMatch");
-            final String uploadType = request.getParam("uploadType");
-            if ("resumable".equals(uploadType)) {
-                final String objectName = request.getParam("name");
-                if (Strings.hasText(objectName) == false) {
-                    return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
-                }
-                final Bucket bucket = buckets.get(request.getParam("bucket"));
-                if (bucket == null) {
-                    return newError(RestStatus.NOT_FOUND, "bucket not found");
-                }
-                if ("0".equals(ifGenerationMatch)) {
-                    if (bucket.objects.putIfAbsent(objectName, Item.empty(objectName)) == null) {
-                        final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
-                            + objectName;
-                        return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
-                    } else {
-                        return newError(RestStatus.PRECONDITION_FAILED, "object already exist");
-                    }
-                } else {
-                    bucket.objects.put(objectName, Item.empty(objectName));
-                    final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
-                        + objectName;
-                    return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
-                }
-            } else if ("multipart".equals(uploadType)) {
-                /*
-                 *  A multipart/related request body looks like this (note the binary dump inside a text blob! nice!):
-                 * --__END_OF_PART__
-                 * Content-Length: 135
-                 * Content-Type: application/json; charset=UTF-8
-                 * content-transfer-encoding: binary
-                 *
-                 * {"bucket":"bucket_test","crc32c":"7XacHQ==","md5Hash":"fVztGkklMlUamsSmJK7W+w==",
-                 * "name":"tests-KEwE3bU4TuyetBgQIghmUw/master.dat-temp"}
-                 * --__END_OF_PART__
-                 * content-transfer-encoding: binary
-                 *
-                 * KEwE3bU4TuyetBgQIghmUw
-                 * --__END_OF_PART__--
-                 */
-                String boundary = "__END_OF_PART__";
-                // Determine the multipart boundary
-                final String contentType = request.getContentType();
-                if ((contentType != null) && contentType.contains("multipart/related; boundary=")) {
-                    boundary = contentType.replace("multipart/related; boundary=", "");
-                }
-
-                InputStream inputStreamBody = new ByteArrayInputStream(request.getBody());
-                final String contentEncoding = request.getHeader("Content-Encoding");
-                if (contentEncoding != null) {
-                    if ("gzip".equalsIgnoreCase(contentEncoding)) {
-                        inputStreamBody = new GZIPInputStream(inputStreamBody);
-                    }
-                }
-                // Read line by line ?both? parts of the multipart. Decoding headers as
-                // IS_8859_1 is safe.
-                try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStreamBody, StandardCharsets.ISO_8859_1))) {
-                    String line;
-                    // read first part delimiter
-                    line = reader.readLine();
-                    if ((line == null) || (line.equals("--" + boundary) == false)) {
-                        return newError(RestStatus.INTERNAL_SERVER_ERROR,
-                                "Error parsing multipart request. Does not start with the part delimiter.");
-                    }
-                    final Map<String, List<String>> firstPartHeaders = new HashMap<>();
-                    // Reads the first part's headers, if any
-                    while ((line = reader.readLine()) != null) {
-                        if (line.equals("\r\n") || (line.length() == 0)) {
-                            // end of headers
-                            break;
-                        } else {
-                            final String[] header = line.split(":", 2);
-                            firstPartHeaders.put(header[0], singletonList(header[1]));
-                        }
-                    }
-                    final List<String> firstPartContentTypes = firstPartHeaders.getOrDefault("Content-Type",
-                            firstPartHeaders.get("Content-type"));
-                    if ((firstPartContentTypes == null)
-                            || (firstPartContentTypes.stream().noneMatch(x -> x.contains("application/json")))) {
-                        return newError(RestStatus.INTERNAL_SERVER_ERROR,
-                                "Error parsing multipart request. Metadata part expected to have the \"application/json\" content type.");
-                    }
-                    // read metadata part, a single line
-                    line = reader.readLine();
-                    final byte[] metadata = line.getBytes(StandardCharsets.ISO_8859_1);
-                    if ((firstPartContentTypes != null) && (firstPartContentTypes.stream().anyMatch((x -> x.contains("charset=utf-8"))))) {
-                        // decode as utf-8
-                        line = new String(metadata, StandardCharsets.UTF_8);
-                    }
-                    final Matcher objectNameMatcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
-                    objectNameMatcher.find();
-                    final String objectName = objectNameMatcher.group(1);
-                    final Matcher bucketNameMatcher = Pattern.compile("\"bucket\":\"([^\"]*)\"").matcher(line);
-                    bucketNameMatcher.find();
-                    final String bucketName = bucketNameMatcher.group(1);
-                    // read second part delimiter
-                    line = reader.readLine();
-                    if ((line == null) || (line.equals("--" + boundary) == false)) {
-                        return newError(RestStatus.INTERNAL_SERVER_ERROR,
-                                "Error parsing multipart request. Second part does not start with delimiter. "
-                                        + "Is the metadata multi-line?");
-                    }
-                    final Map<String, List<String>> secondPartHeaders = new HashMap<>();
-                    // Reads the second part's headers, if any
-                    while ((line = reader.readLine()) != null) {
-                        if (line.equals("\r\n") || (line.length() == 0)) {
-                            // end of headers
-                            break;
-                        } else {
-                            final String[] header = line.split(":", 2);
-                            secondPartHeaders.put(header[0], singletonList(header[1]));
-                        }
-                    }
-                    final List<String> secondPartTransferEncoding = secondPartHeaders.getOrDefault("Content-Transfer-Encoding",
-                            secondPartHeaders.get("content-transfer-encoding"));
-                    if ((secondPartTransferEncoding == null)
-                            || (secondPartTransferEncoding.stream().noneMatch(x -> x.contains("binary")))) {
-                        return newError(RestStatus.INTERNAL_SERVER_ERROR,
-                                "Error parsing multipart request. Data part expected to have the \"binary\" content transfer encoding.");
-                    }
-                    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                    int c;
-                    while ((c = reader.read()) != -1) {
-                        // one char to one byte, because of the ISO_8859_1 encoding
-                        baos.write(c);
-                    }
-                    final byte[] temp = baos.toByteArray();
-                    final byte[] trailingEnding = ("\r\n--" + boundary + "--\r\n").getBytes(StandardCharsets.ISO_8859_1);
-                    // check trailing
-                    for (int i = trailingEnding.length - 1; i >= 0; i--) {
-                        if (trailingEnding[i] != temp[(temp.length - trailingEnding.length) + i]) {
-                            return newError(RestStatus.INTERNAL_SERVER_ERROR, "Error parsing multipart request.");
-                        }
-                    }
-                    final Bucket bucket = buckets.get(bucketName);
-                    if (bucket == null) {
-                        return newError(RestStatus.NOT_FOUND, "bucket not found");
-                    }
-                    final byte[] objectData = Arrays.copyOf(temp, temp.length - trailingEnding.length);
-                    if ((objectName != null) && (bucketName != null) && (objectData != null)) {
-                        bucket.objects.put(objectName, new Item(objectName, objectData));
-                        return new Response(RestStatus.OK.getStatus(), JSON_CONTENT_TYPE, metadata);
-                    } else {
-                        return newError(RestStatus.INTERNAL_SERVER_ERROR, "error parsing multipart request");
-                    }
-                }
-            } else {
-                return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload type must be resumable or multipart");
-            }
-        });
-
-        // Insert Object (upload)
-        //
-        // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
-        handlers.insert("PUT /upload/storage/v1/b/{bucket}/o", (request) -> {
-            final String objectId = request.getParam("upload_id");
-            if (Strings.hasText(objectId) == false) {
-                return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload id is missing");
-            }
-
-            final Bucket bucket = buckets.get(request.getParam("bucket"));
-            if (bucket == null) {
-                return newError(RestStatus.NOT_FOUND, "bucket not found");
-            }
-
-            if (bucket.objects.containsKey(objectId) == false) {
-                return newError(RestStatus.NOT_FOUND, "object name not found");
-            }
-
-            final Item item = new Item(objectId, request.getBody());
-
-            bucket.objects.put(objectId, item);
-            return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, item));
-        });
-
-        // List Objects
-        //
-        // https://cloud.google.com/storage/docs/json_api/v1/objects/list
-        handlers.insert("GET /storage/v1/b/{bucket}/o", (request) -> {
-            final Bucket bucket = buckets.get(request.getParam("bucket"));
-            if (bucket == null) {
-                return newError(RestStatus.NOT_FOUND, "bucket not found");
-            }
-
-            final XContentBuilder builder = jsonBuilder();
-            builder.startObject();
-            builder.field("kind", "storage#objects");
-            final Set<String> prefixes = new HashSet<>();
-            {
-                builder.startArray("items");
-
-                final String prefixParam = request.getParam("prefix");
-                final String delimiter = request.getParam("delimiter");
-
-                for (final Map.Entry<String, Item> object : bucket.objects.entrySet()) {
-                    String objectKey = object.getKey();
-                    if ((prefixParam != null) && (objectKey.startsWith(prefixParam) == false)) {
-                        continue;
-                    }
-
-                    if (Strings.isNullOrEmpty(delimiter)) {
-                        buildObjectResource(builder, bucket.name, object.getValue());
-                    } else {
-                        int prefixLength = prefixParam.length();
-                        String rest = objectKey.substring(prefixLength);
-                        int delimiterPos;
-                        if ((delimiterPos = rest.indexOf(delimiter)) != -1) {
-                            String key = objectKey.substring(0, prefixLength + delimiterPos + 1);
-                            prefixes.add(key);
-                        } else {
-                            buildObjectResource(builder, bucket.name, object.getValue());
-                        }
-                    }
-                }
-                builder.endArray();
-            }
-            {
-                if (prefixes.isEmpty() == false) {
-                    builder.array("prefixes", prefixes.toArray());
-                }
-            }
-            builder.endObject();
-            return newResponse(RestStatus.OK, emptyMap(), builder);
-        });
-
-        // Download Object
-        //
-        // https://cloud.google.com/storage/docs/request-body
-        handlers.insert("GET /download/storage/v1/b/{bucket}/o/{object}", (request) -> {
-            final String object = request.getParam("object");
-            if (Strings.hasText(object) == false) {
-                return newError(RestStatus.INTERNAL_SERVER_ERROR, "object id is missing");
-            }
-
-            final Bucket bucket = buckets.get(request.getParam("bucket"));
-            if (bucket == null) {
-                return newError(RestStatus.NOT_FOUND, "bucket not found");
-            }
-
-            if (bucket.objects.containsKey(object) == false) {
-                return newError(RestStatus.NOT_FOUND, "object name not found");
-            }
-
-            return new Response(RestStatus.OK.getStatus(), contentType("application/octet-stream"), bucket.objects.get(object).bytes);
-        });
-
-        // Batch
-        //
-        // https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
-        handlers.insert("POST /batch/storage/v1", (request) -> {
-            final List<Response> batchedResponses = new ArrayList<>();
-
-            // A batch request body looks like this:
-            //
-            //            --__END_OF_PART__
-            //            Content-Length: 71
-            //            Content-Type: application/http
-            //            content-id: 1
-            //            content-transfer-encoding: binary
-            //
-            //            DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/foo%2Ftest HTTP/1.1
-            //
-            //
-            //            --__END_OF_PART__
-            //            Content-Length: 71
-            //            Content-Type: application/http
-            //            content-id: 2
-            //            content-transfer-encoding: binary
-            //
-            //            DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/bar%2Ftest HTTP/1.1
-            //
-            //
-            //            --__END_OF_PART__--
-
-            // Default multipart boundary
-            String boundary = "__END_OF_PART__";
-
-            // Determine the multipart boundary
-            final String contentType = request.getContentType();
-            if ((contentType != null) && contentType.contains("multipart/mixed; boundary=")) {
-                boundary = contentType.replace("multipart/mixed; boundary=", "");
-            }
-
-            long batchedRequests = 0L;
-
-            // Read line by line the batched requests
-            try (BufferedReader reader = new BufferedReader(
-                                              new InputStreamReader(
-                                                  new ByteArrayInputStream(request.getBody()), StandardCharsets.UTF_8))) {
-                String line;
-                while ((line = reader.readLine()) != null) {
-                    // Start of a batched request
-                    if (line.equals("--" + boundary)) {
-                        final Map<String, String> batchedHeaders = new HashMap<>();
-
-                        // Reads the headers, if any
-                        while ((line = reader.readLine()) != null) {
-                            if (line.equals("\r\n") || (line.length() == 0)) {
-                                // end of headers
-                                break;
-                            } else {
-                                final String[] header = line.split(":", 2);
-                                batchedHeaders.put(header[0], header[1]);
-                            }
-                        }
-
-                        // Reads the method and URL
-                        line = reader.readLine();
-                        final String batchedMethod = line.substring(0, line.indexOf(' '));
-                        final URI batchedUri = URI.create(line.substring(batchedMethod.length() + 1, line.lastIndexOf(' ')));
-
-                        // Reads the body
-                        line = reader.readLine();
-                        byte[] batchedBody = new byte[0];
-                        if ((line != null) || (line.startsWith("--" + boundary) == false)) {
-                            batchedBody = line.getBytes(StandardCharsets.UTF_8);
-                        }
-
-                        final Request batchedRequest = new Request(batchedRequests, batchedMethod, batchedUri, batchedHeaders, batchedBody);
-                        batchedRequests = batchedRequests + 1;
-
-                        // Executes the batched request
-                        final RequestHandler handler =
-                            handlers.retrieve(batchedRequest.getMethod() + " " + batchedRequest.getPath(), batchedRequest.getParameters());
-                        if (handler != null) {
-                            try {
-                                batchedResponses.add(handler.handle(batchedRequest));
-                            } catch (final IOException e) {
-                                batchedResponses.add(newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
-                            }
-                        }
-                    }
-                }
-            }
-
-            // Now we can build the response
-            final String sep = "--";
-            final String line = "\r\n";
-
-            final StringBuilder builder = new StringBuilder();
-            for (final Response response : batchedResponses) {
-                builder.append(sep).append(boundary).append(line);
-                builder.append("Content-Type: application/http").append(line);
-                builder.append(line);
-                builder.append("HTTP/1.1 ")
-                    .append(response.getStatus())
-                    .append(' ')
-                    .append(RestStatus.fromCode(response.getStatus()).toString())
-                    .append(line);
-                builder.append("Content-Length: ").append(response.getBody().length).append(line);
-                builder.append("Content-Type: ").append(response.getContentType()).append(line);
-                response.getHeaders().forEach((k, v) -> builder.append(k).append(": ").append(v).append(line));
-                builder.append(line);
-                builder.append(new String(response.getBody(), StandardCharsets.UTF_8)).append(line);
-                builder.append(line);
-            }
-            builder.append(line);
-            builder.append(sep).append(boundary).append(sep);
-
-            final byte[] content = builder.toString().getBytes(StandardCharsets.UTF_8);
-            return new Response(RestStatus.OK.getStatus(), contentType("multipart/mixed; boundary=" + boundary), content);
-        });
-
-        // Fake refresh of an OAuth2 token
-        //
-        handlers.insert("POST /o/oauth2/token", (request) ->
-            newResponse(RestStatus.OK, emptyMap(), jsonBuilder()
-                .startObject()
-                    .field("access_token", "unknown")
-                    .field("token_type", "Bearer")
-                    .field("expires_in", 3600)
-                .endObject())
-        );
-
-        return handlers;
-    }
-
-    /**
-     * Represents a Storage bucket as if it was created on Google Cloud Storage.
-     */
-    static class Bucket {
-
-        /** Bucket name **/
-        final String name;
-
-        /** Blobs contained in the bucket **/
-        final Map<String, Item> objects;
-
-        Bucket(final String name) {
-            this.name = Objects.requireNonNull(name);
-            this.objects = ConcurrentCollections.newConcurrentMap();
-        }
-    }
-
-    static class Item {
-        final String name;
-        final LocalDateTime created;
-        final byte[] bytes;
-
-        Item(String name, byte[] bytes) {
-            this.name = name;
-            this.bytes = bytes;
-            this.created = LocalDateTime.now(ZoneOffset.UTC);
-        }
-
-        public static Item empty(String name) {
-            return new Item(name, EMPTY_BYTE);
-        }
-    }
-
-    /**
-     * Builds a JSON response
-     */
-    private static Response newResponse(final RestStatus status, final Map<String, String> headers, final XContentBuilder xContentBuilder) {
-        final Map<String, String> responseHeaders = new HashMap<>(JSON_CONTENT_TYPE);
-        responseHeaders.putAll(headers);
-
-        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-            BytesReference.bytes(xContentBuilder).writeTo(out);
-
-            return new Response(status.getStatus(), responseHeaders, out.toByteArray());
-        } catch (final IOException e) {
-            return newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage());
-        }
-    }
-
-    /**
-     * Storage Error JSON representation
-     */
-    private static Response newError(final RestStatus status, final String message) {
-        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-            try (XContentBuilder builder = jsonBuilder()) {
-                builder.startObject()
-                            .startObject("error")
-                                .field("code", status.getStatus())
-                                .field("message", message)
-                                .startArray("errors")
-                                    .startObject()
-                                        .field("domain", "global")
-                                        .field("reason", status.toString())
-                                        .field("message", message)
-                                    .endObject()
-                                .endArray()
-                            .endObject()
-                        .endObject();
-                BytesReference.bytes(builder).writeTo(out);
-            }
-            return new Response(status.getStatus(), JSON_CONTENT_TYPE, out.toByteArray());
-        } catch (final IOException e) {
-            final byte[] bytes = (message != null ? message : "something went wrong").getBytes(StandardCharsets.UTF_8);
-            return new Response(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), TEXT_PLAIN_CONTENT_TYPE, bytes);
-        }
-    }
-
-    /**
-     * Storage Bucket JSON representation as defined in
-     * https://cloud.google.com/storage/docs/json_api/v1/bucket#resource
-     */
-    private static XContentBuilder buildBucketResource(final String name) throws IOException {
-        return jsonBuilder().startObject()
-                                .field("kind", "storage#bucket")
-                                .field("name", name)
-                                .field("id", name)
-                            .endObject();
-    }
-
-    /**
-     * Storage Object JSON representation as defined in
-     * https://cloud.google.com/storage/docs/json_api/v1/objects#resource
-     */
-    private static XContentBuilder buildObjectResource(final String bucket, final Item item) throws IOException {
-        return buildObjectResource(jsonBuilder(), bucket, item);
-    }
-
-    /**
-     * Storage Object JSON representation as defined in
-     * https://cloud.google.com/storage/docs/json_api/v1/objects#resource
-     */
-    private static XContentBuilder buildObjectResource(final XContentBuilder builder,
-                                                       final String bucket,
-                                                       final Item item) throws IOException {
-        return builder.startObject()
-                            .field("kind", "storage#object")
-                            .field("id", String.join("/", bucket, item.name))
-                            .field("name", item.name)
-                            .field("timeCreated", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(item.created))
-                            .field("bucket", bucket)
-                            .field("size", String.valueOf(item.bytes.length))
-                        .endObject();
-    }
-}

+ 10 - 11
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

@@ -24,6 +24,7 @@ import com.google.cloud.storage.StorageException;
 import com.google.cloud.storage.StorageOptions;
 import com.sun.net.httpserver.HttpContext;
 import com.sun.net.httpserver.HttpServer;
+import fixture.gcs.FakeOAuth2HttpHandler;
 import org.apache.http.HttpStatus;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
@@ -68,6 +69,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeEnd;
+import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeLimit;
+import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeStart;
+import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
@@ -144,13 +149,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
 
         final List<HttpContext> httpContexts = Arrays.asList(
             // Auth
-            httpServer.createContext("/token", exchange -> {
-                byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
-                exchange.getResponseHeaders().add("Content-Type", "application/json");
-                exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
-                exchange.getResponseBody().write(response);
-                exchange.close();
-            }),
+            httpServer.createContext("/token", new FakeOAuth2HttpHandler()),
             // Does bucket exists?
             httpServer.createContext("/storage/v1/b/bucket", exchange -> {
                 byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\"bucket\",\"id\":\"0\"}").getBytes(UTF_8);
@@ -244,7 +243,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
         httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
             assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
             if (countDown.countDown()) {
-                Optional<Tuple<String, BytesArray>> content = TestUtils.parseMultipartRequestBody(exchange.getRequestBody());
+                Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
                 assertThat(content.isPresent(), is(true));
                 assertThat(content.get().v1(), equalTo("write_blob_max_retries"));
                 if (Objects.deepEquals(bytes, content.get().v2().array())) {
@@ -387,12 +386,12 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
                     final long bytesRead = Streams.copy(exchange.getRequestBody(), requestBody);
                     assertThat(Math.toIntExact(bytesRead), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize)));
 
-                    final int rangeStart = TestUtils.getContentRangeStart(range);
-                    final int rangeEnd = TestUtils.getContentRangeEnd(range);
+                    final int rangeStart = getContentRangeStart(range);
+                    final int rangeEnd = getContentRangeEnd(range);
                     assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(bytesRead)));
                     assertArrayEquals(Arrays.copyOfRange(data, rangeStart, rangeEnd + 1), requestBody.toByteArray());
 
-                    final Integer limit = TestUtils.getContentRangeLimit(range);
+                    final Integer limit = getContentRangeLimit(range);
                     if (limit != null) {
                         exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
                         exchange.close();

+ 3 - 200
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

@@ -24,16 +24,11 @@ import com.google.cloud.http.HttpTransportOptions;
 import com.google.cloud.storage.StorageOptions;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
-import org.apache.http.HttpStatus;
-import org.apache.lucene.util.ArrayUtil;
+import fixture.gcs.FakeOAuth2HttpHandler;
+import fixture.gcs.GoogleCloudStorageHttpHandler;
 import org.elasticsearch.cluster.metadata.RepositoryMetaData;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.SuppressForbidden;
-import org.elasticsearch.common.UUIDs;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.Streams;
-import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.MockSecureSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -43,36 +38,19 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.threeten.bp.Duration;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.net.URLDecoder;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
 import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
 @SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
 public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
@@ -99,7 +77,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
     @Override
     protected Map<String, HttpHandler> createHttpHandlers() {
         return Map.of(
-            "/", new InternalHttpHandler(),
+            "/", new GoogleCloudStorageHttpHandler("bucket"),
             "/token", new FakeOAuth2HttpHandler()
         );
     }
@@ -208,181 +186,6 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
         }
     }
 
-    /**
-     * Minimal HTTP handler that acts as a Google Cloud Storage compliant server
-     */
-    @SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
-    private static class InternalHttpHandler implements HttpHandler {
-
-        private final ConcurrentMap<String, BytesArray> blobs = new ConcurrentHashMap<>();
-
-        @Override
-        public void handle(final HttpExchange exchange) throws IOException {
-            final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
-            try {
-                if (Regex.simpleMatch("GET /storage/v1/b/bucket/o*", request)) {
-                    final Map<String, String> params = new HashMap<>();
-                    RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
-                    final String prefix = params.get("prefix");
-
-                    final List<Map.Entry<String, BytesArray>> listOfBlobs = blobs.entrySet().stream()
-                        .filter(blob -> prefix == null || blob.getKey().startsWith(prefix)).collect(Collectors.toList());
-
-                    final StringBuilder list = new StringBuilder();
-                    list.append("{\"kind\":\"storage#objects\",\"items\":[");
-                    for (Iterator<Map.Entry<String, BytesArray>> it = listOfBlobs.iterator(); it.hasNext(); ) {
-                        Map.Entry<String, BytesArray> blob = it.next();
-                        list.append("{\"kind\":\"storage#object\",");
-                        list.append("\"bucket\":\"bucket\",");
-                        list.append("\"name\":\"").append(blob.getKey()).append("\",");
-                        list.append("\"id\":\"").append(blob.getKey()).append("\",");
-                        list.append("\"size\":\"").append(blob.getValue().length()).append("\"");
-                        list.append('}');
-
-                        if (it.hasNext()) {
-                            list.append(',');
-                        }
-                    }
-                    list.append("]}");
-
-                    byte[] response = list.toString().getBytes(UTF_8);
-                    exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
-                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
-                    exchange.getResponseBody().write(response);
-
-                } else if (Regex.simpleMatch("GET /storage/v1/b/bucket*", request)) {
-                    byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\"bucket\",\"id\":\"0\"}").getBytes(UTF_8);
-                    exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
-                    exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
-                    exchange.getResponseBody().write(response);
-
-                } else if (Regex.simpleMatch("GET /download/storage/v1/b/bucket/o/*", request)) {
-                    BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/bucket/o/", ""));
-                    if (blob != null) {
-                        final String range = exchange.getRequestHeaders().getFirst("Range");
-                        Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
-                        assert matcher.find();
-
-                        byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
-                        exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
-                        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
-                        exchange.getResponseBody().write(response);
-                    } else {
-                        exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
-                    }
-
-                } else if (Regex.simpleMatch("DELETE /storage/v1/b/bucket/o/*", request)) {
-                    int deletions = 0;
-                    for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
-                        Map.Entry<String, BytesArray> blob = iterator.next();
-                        if (blob.getKey().equals(exchange.getRequestURI().toString())) {
-                            iterator.remove();
-                            deletions++;
-                        }
-                    }
-                    exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1);
-
-                } else if (Regex.simpleMatch("POST /batch/storage/v1", request)) {
-                    final String uri = "/storage/v1/b/bucket/o/";
-                    final StringBuilder batch = new StringBuilder();
-                    for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) {
-                        if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
-                            batch.append(line).append('\n');
-                        } else if (line.startsWith("DELETE")) {
-                            final String name = line.substring(line.indexOf(uri) + uri.length(), line.lastIndexOf(" HTTP"));
-                            if (Strings.hasText(name)) {
-                                if (blobs.entrySet().removeIf(blob -> blob.getKey().equals(URLDecoder.decode(name, UTF_8)))) {
-                                    batch.append("HTTP/1.1 204 NO_CONTENT").append('\n');
-                                    batch.append('\n');
-                                }
-                            }
-                        }
-                    }
-                    byte[] response = batch.toString().getBytes(UTF_8);
-                    exchange.getResponseHeaders().add("Content-Type", exchange.getRequestHeaders().getFirst("Content-Type"));
-                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
-                    exchange.getResponseBody().write(response);
-
-                } else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=multipart*", request)) {
-                    Optional<Tuple<String, BytesArray>> content = TestUtils.parseMultipartRequestBody(exchange.getRequestBody());
-                    if (content.isPresent()) {
-                        blobs.put(content.get().v1(), content.get().v2());
-
-                        byte[] response = ("{\"bucket\":\"bucket\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8);
-                        exchange.getResponseHeaders().add("Content-Type", "application/json");
-                        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
-                        exchange.getResponseBody().write(response);
-                    } else {
-                        exchange.sendResponseHeaders(RestStatus.BAD_REQUEST.getStatus(), -1);
-                    }
-
-                } else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=resumable*", request)) {
-                    final Map<String, String> params = new HashMap<>();
-                    RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
-                    final String blobName = params.get("name");
-                    blobs.put(blobName, BytesArray.EMPTY);
-
-                    byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
-                    exchange.getResponseHeaders().add("Content-Type", "application/json");
-                    exchange.getResponseHeaders().add("Location", httpServerUrl() + "/upload/storage/v1/b/bucket/o?"
-                        + "uploadType=resumable"
-                        + "&upload_id=" + UUIDs.randomBase64UUID()
-                        + "&test_blob_name=" + blobName); // not a Google Storage parameter, but it allows to pass the blob name
-                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
-                    exchange.getResponseBody().write(response);
-
-                } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/bucket/o?*uploadType=resumable*", request)) {
-                    final Map<String, String> params = new HashMap<>();
-                    RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
-
-                    final String blobName = params.get("test_blob_name");
-                    byte[] blob = blobs.get(blobName).array();
-                    assertNotNull(blob);
-
-                    final String range = exchange.getRequestHeaders().getFirst("Content-Range");
-                    final Integer limit = TestUtils.getContentRangeLimit(range);
-                    final int start = TestUtils.getContentRangeStart(range);
-                    final int end = TestUtils.getContentRangeEnd(range);
-
-                    final ByteArrayOutputStream out = new ByteArrayOutputStream();
-                    long bytesRead = Streams.copy(exchange.getRequestBody(), out);
-                    int length = Math.max(end + 1, limit != null ? limit : 0);
-                    assertThat((int) bytesRead, lessThanOrEqualTo(length));
-                    if (length > blob.length) {
-                        blob = ArrayUtil.growExact(blob, length);
-                    }
-                    System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(bytesRead));
-                    blobs.put(blobName, new BytesArray(blob));
-
-                    if (limit == null) {
-                        exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end));
-                        exchange.getResponseHeaders().add("Content-Length", "0");
-                        exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
-                    } else {
-                        assertThat(limit, lessThanOrEqualTo(blob.length));
-                        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
-                    }
-                } else {
-                    exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
-                }
-            } finally {
-                exchange.close();
-            }
-        }
-    }
-
-    @SuppressForbidden(reason = "this test uses a HttpServer to emulate a fake OAuth2 authentication service")
-    private static class FakeOAuth2HttpHandler implements HttpHandler {
-        @Override
-        public void handle(final HttpExchange exchange) throws IOException {
-            byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
-            exchange.getResponseHeaders().add("Content-Type", "application/json");
-            exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
-            exchange.getResponseBody().write(response);
-            exchange.close();
-        }
-    }
-
     /**
      * HTTP handler that injects random  Google Cloud Storage service errors
      *

+ 0 - 97
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/TestUtils.java

@@ -18,28 +18,14 @@
  */
 package org.elasticsearch.repositories.gcs;
 
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 
-import java.io.BufferedInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.security.KeyPairGenerator;
-import java.util.Arrays;
 import java.util.Base64;
-import java.util.Locale;
-import java.util.Optional;
 import java.util.Random;
 import java.util.UUID;
-import java.util.function.BiFunction;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.GZIPInputStream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
 
 final class TestUtils {
 
@@ -72,87 +58,4 @@ final class TestUtils {
             throw new AssertionError("Unable to create service account file", e);
         }
     }
-
-    static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
-        Tuple<String, BytesArray> content = null;
-        try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) {
-            String name = null;
-            int read;
-            while ((read = in.read()) != -1) {
-                boolean markAndContinue = false;
-                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-                    do { // search next consecutive {carriage return, new line} chars and stop
-                        if ((char) read == '\r') {
-                            int next = in.read();
-                            if (next != -1) {
-                                if (next == '\n') {
-                                    break;
-                                }
-                                out.write(read);
-                                out.write(next);
-                                continue;
-                            }
-                        }
-                        out.write(read);
-                    } while ((read = in.read()) != -1);
-
-                    final String line = new String(out.toByteArray(), UTF_8);
-                    if (line.length() == 0 || line.equals("\r\n") || line.startsWith("--")
-                        || line.toLowerCase(Locale.ROOT).startsWith("content")) {
-                        markAndContinue = true;
-                    } else if (line.startsWith("{\"bucket\":\"bucket\"")) {
-                        markAndContinue = true;
-                        Matcher matcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
-                        if (matcher.find()) {
-                            name = matcher.group(1);
-                        }
-                    }
-                    if (markAndContinue) {
-                        in.mark(Integer.MAX_VALUE);
-                        continue;
-                    }
-                }
-                if (name != null) {
-                    in.reset();
-                    try (ByteArrayOutputStream binary = new ByteArrayOutputStream()) {
-                        while ((read = in.read()) != -1) {
-                            binary.write(read);
-                        }
-                        binary.flush();
-                        byte[] tmp = binary.toByteArray();
-                        // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
-                        content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(tmp, tmp.length - 23)));
-                    }
-                }
-            }
-        }
-        return Optional.ofNullable(content);
-    }
-
-    private static final Pattern PATTERN_CONTENT_RANGE = Pattern.compile("bytes ([^/]*)/([0-9\\*]*)");
-    private static final Pattern PATTERN_CONTENT_RANGE_BYTES = Pattern.compile("([0-9]*)-([0-9]*)");
-
-    private static Integer parse(final Pattern pattern, final String contentRange, final BiFunction<String, String, Integer> fn) {
-        final Matcher matcher = pattern.matcher(contentRange);
-        if (matcher.matches() == false || matcher.groupCount() != 2) {
-            throw new IllegalArgumentException("Unable to parse content range header");
-        }
-        return fn.apply(matcher.group(1), matcher.group(2));
-    }
-
-    static Integer getContentRangeLimit(final String contentRange) {
-        return parse(PATTERN_CONTENT_RANGE, contentRange, (bytes, limit) -> "*".equals(limit) ? null : Integer.parseInt(limit));
-    }
-
-    static int getContentRangeStart(final String contentRange) {
-        return parse(PATTERN_CONTENT_RANGE, contentRange,
-            (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
-                (start, end) -> Integer.parseInt(start)));
-    }
-
-    static int getContentRangeEnd(final String contentRange) {
-        return parse(PATTERN_CONTENT_RANGE, contentRange,
-            (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
-                (start, end) -> Integer.parseInt(end)));
-    }
 }

+ 2 - 1
settings.gradle

@@ -55,10 +55,11 @@ List projects = [
   'server',
   'server:cli',
   'test:framework',
+  'test:fixtures:azure-fixture',
+  'test:fixtures:gcs-fixture',
   'test:fixtures:hdfs-fixture',
   'test:fixtures:krb5kdc-fixture',
   'test:fixtures:old-elasticsearch',
-  'test:fixtures:azure-fixture',
   'test:logger-usage'
 ]
 

+ 17 - 0
test/fixtures/gcs-fixture/Dockerfile

@@ -0,0 +1,17 @@
+FROM ubuntu:19.04
+
+RUN apt-get update -qqy
+RUN apt-get install -qqy openjdk-12-jre-headless
+
+ARG port
+ARG bucket
+ARG token
+
+ENV GCS_FIXTURE_PORT=${port}
+ENV GCS_FIXTURE_BUCKET=${bucket}
+ENV GCS_FIXTURE_TOKEN=${token}
+
+ENTRYPOINT exec java -classpath "/fixture/shared/*" \
+    fixture.gcs.GoogleCloudStorageHttpFixture 0.0.0.0 "$GCS_FIXTURE_PORT" "$GCS_FIXTURE_BUCKET" "$GCS_FIXTURE_TOKEN"
+
+EXPOSE $port

+ 39 - 0
test/fixtures/gcs-fixture/build.gradle

@@ -0,0 +1,39 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+apply plugin: 'elasticsearch.build'
+apply plugin: 'elasticsearch.test.fixtures'
+
+description = 'Fixture for Google Cloud Storage service'
+test.enabled = false
+
+dependencies {
+    compile project(':server')
+}
+
+preProcessFixture {
+    dependsOn jar
+    doLast {
+        file("${testFixturesDir}/shared").mkdirs()
+        project.copy {
+            from jar
+            from configurations.runtimeClasspath
+            into "${testFixturesDir}/shared"
+        }
+    }
+}

+ 26 - 0
test/fixtures/gcs-fixture/docker-compose.yml

@@ -0,0 +1,26 @@
+version: '3'
+services:
+  gcs-fixture:
+    build:
+      context: .
+      args:
+        port: 80
+        bucket: "bucket"
+        token: "o/oauth2/token"
+      dockerfile: Dockerfile
+    volumes:
+      - ./testfixtures_shared/shared:/fixture/shared
+    ports:
+      - "80"
+  gcs-fixture-third-party:
+    build:
+      context: .
+      args:
+        port: 80
+        bucket: "bucket"
+        token: "o/oauth2/token"
+      dockerfile: Dockerfile
+    volumes:
+      - ./testfixtures_shared/shared:/fixture/shared
+    ports:
+      - "80"

+ 41 - 0
test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package fixture.gcs;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.IOException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@SuppressForbidden(reason = "Uses a HttpServer to emulate a fake OAuth2 authentication service")
+public class FakeOAuth2HttpHandler implements HttpHandler {
+
+    @Override
+    public void handle(final HttpExchange exchange) throws IOException {
+        byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
+        exchange.getResponseHeaders().add("Content-Type", "application/json");
+        exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+        exchange.getResponseBody().write(response);
+        exchange.close();
+    }
+}

+ 55 - 0
test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpFixture.java

@@ -0,0 +1,55 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package fixture.gcs;
+
+import com.sun.net.httpserver.HttpServer;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+public class GoogleCloudStorageHttpFixture {
+
+    private final HttpServer server;
+
+    private GoogleCloudStorageHttpFixture(final String address, final int port,
+                                          final String bucket, final String token) throws IOException {
+        this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0);
+        server.createContext("/" + token, new FakeOAuth2HttpHandler());
+        server.createContext("/", new GoogleCloudStorageHttpHandler(bucket));
+    }
+
+    private void start() throws Exception {
+        try {
+            server.start();
+            // wait to be killed
+            Thread.sleep(Long.MAX_VALUE);
+        } finally {
+            server.stop(0);
+        }
+    }
+
+    public static void main(final String[] args) throws Exception {
+        if (args == null || args.length != 4) {
+            throw new IllegalArgumentException("GoogleCloudStorageHttpFixture expects 4 arguments [address, port, bucket, token]");
+        }
+        GoogleCloudStorageHttpFixture fixture = new GoogleCloudStorageHttpFixture(args[0], Integer.parseInt(args[1]), args[2], args[3]);
+        fixture.start();
+    }
+}

+ 339 - 0
test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

@@ -0,0 +1,339 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package fixture.gcs;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.lucene.util.ArrayUtil;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.network.InetAddresses;
+import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.RestUtils;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.GZIPInputStream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Minimal HTTP handler that acts as a Google Cloud Storage compliant server
+ */
+@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
+public class GoogleCloudStorageHttpHandler implements HttpHandler {
+
+    private final ConcurrentMap<String, BytesArray> blobs;
+    private final String bucket;
+
+    public GoogleCloudStorageHttpHandler(final String bucket) {
+        this.bucket = Objects.requireNonNull(bucket);
+        this.blobs = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void handle(final HttpExchange exchange) throws IOException {
+        final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
+        try {
+            if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
+                // List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
+                final Map<String, String> params = new HashMap<>();
+                RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
+                final String prefix = params.getOrDefault("prefix", "");
+                final String delimiter = params.get("delimiter");
+
+                final Set<String> prefixes = new HashSet<>();
+                final List<String> listOfBlobs = new ArrayList<>();
+
+                for (final Map.Entry<String, BytesArray> blob : blobs.entrySet()) {
+                    final String blobName = blob.getKey();
+                    if (prefix.isEmpty() || blobName.startsWith(prefix)) {
+                        int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
+                        if (delimiterPos > -1) {
+                            prefixes.add("\"" + blobName.substring(0, prefix.length() + delimiterPos + 1) + "\"");
+                        } else {
+                            listOfBlobs.add("{\"kind\":\"storage#object\","
+                                + "\"bucket\":\"" + bucket + "\","
+                                + "\"name\":\"" + blobName + "\","
+                                + "\"id\":\"" + blobName + "\","
+                                + "\"size\":\"" + blob.getValue().length() + "\""
+                                + "}");
+                        }
+                    }
+                }
+
+                byte[] response = ("{\"kind\":\"storage#objects\",\"items\":[" +
+                    String.join(",", listOfBlobs) +
+                    "],\"prefixes\":[" +
+                    String.join(",", prefixes) +
+                    "]}").getBytes(UTF_8);
+
+                exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
+                exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+                exchange.getResponseBody().write(response);
+
+            } else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "*", request)) {
+                // GET Bucket https://cloud.google.com/storage/docs/json_api/v1/buckets/get
+                byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\""+ bucket + "\",\"id\":\"0\"}").getBytes(UTF_8);
+                exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
+                exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+                exchange.getResponseBody().write(response);
+
+            } else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) {
+                // Download Object https://cloud.google.com/storage/docs/request-body
+                BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
+                if (blob != null) {
+                    final String range = exchange.getRequestHeaders().getFirst("Range");
+                    Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
+                    if (matcher.find() == false) {
+                        throw new AssertionError("Range bytes header does not match expected format: " + range);
+                    }
+
+                    byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
+                    exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
+                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+                    exchange.getResponseBody().write(response);
+                } else {
+                    exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
+                }
+
+            } else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) {
+                // Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete
+                int deletions = 0;
+                for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
+                    Map.Entry<String, BytesArray> blob = iterator.next();
+                    if (blob.getKey().equals(exchange.getRequestURI().toString())) {
+                        iterator.remove();
+                        deletions++;
+                    }
+                }
+                exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1);
+
+            } else if (Regex.simpleMatch("POST /batch/storage/v1", request)) {
+                // Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
+                final String uri = "/storage/v1/b/" + bucket + "/o/";
+                final StringBuilder batch = new StringBuilder();
+                for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) {
+                    if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
+                        batch.append(line).append('\n');
+                    } else if (line.startsWith("DELETE")) {
+                        final String name = line.substring(line.indexOf(uri) + uri.length(), line.lastIndexOf(" HTTP"));
+                        if (Strings.hasText(name)) {
+                            if (blobs.entrySet().removeIf(blob -> blob.getKey().equals(URLDecoder.decode(name, UTF_8)))) {
+                                batch.append("HTTP/1.1 204 NO_CONTENT").append('\n');
+                                batch.append('\n');
+                            }
+                        }
+                    }
+                }
+                byte[] response = batch.toString().getBytes(UTF_8);
+                exchange.getResponseHeaders().add("Content-Type", exchange.getRequestHeaders().getFirst("Content-Type"));
+                exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+                exchange.getResponseBody().write(response);
+
+            } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
+                // Multipart upload
+                Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
+                if (content.isPresent()) {
+                    blobs.put(content.get().v1(), content.get().v2());
+
+                    byte[] response = ("{\"bucket\":\"" + bucket + "\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8);
+                    exchange.getResponseHeaders().add("Content-Type", "application/json");
+                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+                    exchange.getResponseBody().write(response);
+                } else {
+                    exchange.sendResponseHeaders(RestStatus.BAD_REQUEST.getStatus(), -1);
+                }
+
+            } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=resumable*", request)) {
+                // Resumable upload initialization https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
+                final Map<String, String> params = new HashMap<>();
+                RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
+                final String blobName = params.get("name");
+                blobs.put(blobName, BytesArray.EMPTY);
+
+                byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
+                exchange.getResponseHeaders().add("Content-Type", "application/json");
+                exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?"
+                    + "uploadType=resumable"
+                    + "&upload_id=" + UUIDs.randomBase64UUID()
+                    + "&test_blob_name=" + blobName); // not a Google Storage parameter, but it allows to pass the blob name
+                exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
+                exchange.getResponseBody().write(response);
+
+            } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/" + bucket + "/o?*uploadType=resumable*", request)) {
+                // Resumable upload https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
+                final Map<String, String> params = new HashMap<>();
+                RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
+
+                final String blobName = params.get("test_blob_name");
+                byte[] blob = blobs.get(blobName).array();
+                if (blob == null) {
+                    exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
+                    return;
+                }
+
+                final String range = exchange.getRequestHeaders().getFirst("Content-Range");
+                final Integer limit = getContentRangeLimit(range);
+                final int start = getContentRangeStart(range);
+                final int end = getContentRangeEnd(range);
+
+                final ByteArrayOutputStream out = new ByteArrayOutputStream();
+                long bytesRead = Streams.copy(exchange.getRequestBody(), out);
+                int length = Math.max(end + 1, limit != null ? limit : 0);
+                if ((int) bytesRead > length) {
+                    throw new AssertionError("Requesting more bytes than available for blob");
+                }
+                if (length > blob.length) {
+                    blob = ArrayUtil.growExact(blob, length);
+                }
+                System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(bytesRead));
+                blobs.put(blobName, new BytesArray(blob));
+
+                if (limit == null) {
+                    exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end));
+                    exchange.getResponseHeaders().add("Content-Length", "0");
+                    exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
+                } else {
+                    if (limit > blob.length) {
+                        throw new AssertionError("Requesting more bytes than available for blob");
+                    }
+                    exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
+                }
+            } else {
+                exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
+            }
+        } finally {
+            exchange.close();
+        }
+    }
+
+    private String httpServerUrl(final HttpExchange exchange) {
+        final InetSocketAddress address = exchange.getLocalAddress();
+        return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
+    }
+
+    public static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
+        Tuple<String, BytesArray> content = null;
+        try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) {
+            String name = null;
+            int read;
+            while ((read = in.read()) != -1) {
+                boolean markAndContinue = false;
+                try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                    do { // search next consecutive {carriage return, new line} chars and stop
+                        if ((char) read == '\r') {
+                            int next = in.read();
+                            if (next != -1) {
+                                if (next == '\n') {
+                                    break;
+                                }
+                                out.write(read);
+                                out.write(next);
+                                continue;
+                            }
+                        }
+                        out.write(read);
+                    } while ((read = in.read()) != -1);
+
+                    final String line = new String(out.toByteArray(), UTF_8);
+                    if (line.length() == 0 || line.equals("\r\n") || line.startsWith("--")
+                        || line.toLowerCase(Locale.ROOT).startsWith("content")) {
+                        markAndContinue = true;
+                    } else if (line.startsWith("{\"bucket\":")) {
+                        markAndContinue = true;
+                        Matcher matcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
+                        if (matcher.find()) {
+                            name = matcher.group(1);
+                        }
+                    }
+                    if (markAndContinue) {
+                        in.mark(Integer.MAX_VALUE);
+                        continue;
+                    }
+                }
+                if (name != null) {
+                    in.reset();
+                    try (ByteArrayOutputStream binary = new ByteArrayOutputStream()) {
+                        while ((read = in.read()) != -1) {
+                            binary.write(read);
+                        }
+                        binary.flush();
+                        byte[] tmp = binary.toByteArray();
+                        // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
+                        content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(tmp, tmp.length - 23)));
+                    }
+                }
+            }
+        }
+        return Optional.ofNullable(content);
+    }
+
+    private static final Pattern PATTERN_CONTENT_RANGE = Pattern.compile("bytes ([^/]*)/([0-9\\*]*)");
+    private static final Pattern PATTERN_CONTENT_RANGE_BYTES = Pattern.compile("([0-9]*)-([0-9]*)");
+
+    private static Integer parse(final Pattern pattern, final String contentRange, final BiFunction<String, String, Integer> fn) {
+        final Matcher matcher = pattern.matcher(contentRange);
+        if (matcher.matches() == false || matcher.groupCount() != 2) {
+            throw new IllegalArgumentException("Unable to parse content range header");
+        }
+        return fn.apply(matcher.group(1), matcher.group(2));
+    }
+
+    public static Integer getContentRangeLimit(final String contentRange) {
+        return parse(PATTERN_CONTENT_RANGE, contentRange, (bytes, limit) -> "*".equals(limit) ? null : Integer.parseInt(limit));
+    }
+
+    public static int getContentRangeStart(final String contentRange) {
+        return parse(PATTERN_CONTENT_RANGE, contentRange,
+            (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
+                (start, end) -> Integer.parseInt(start)));
+    }
+
+    public static int getContentRangeEnd(final String contentRange) {
+        return parse(PATTERN_CONTENT_RANGE, contentRange,
+            (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
+                (start, end) -> Integer.parseInt(end)));
+    }
+}