Browse Source

Optimize Away Small Resumable Uploads in GCS Repository (#74813)

Similar to ChunkedOutputStream we can optimize away small resumable uploads
to speed up small meta writes (and improve stability with JDK-8 in 7.x).
Armin Braun 4 years ago
parent
commit
85f0a029c2

+ 1 - 1
plugins/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

@@ -226,7 +226,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
                     @Override
                     protected GoogleCloudStorageBlobStore createBlobStore() {
                         return new GoogleCloudStorageBlobStore(
-                                metadata.settings().get("bucket"), "test", metadata.name(), storageService,
+                                metadata.settings().get("bucket"), "test", metadata.name(), storageService, bigArrays,
                             randomIntBetween(1, 8) * 1024) {
                             @Override
                             long getLargeBlobThresholdInBytes() {

+ 79 - 20
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

@@ -22,6 +22,8 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.common.blobstore.BlobContainer;
@@ -94,16 +96,19 @@ class GoogleCloudStorageBlobStore implements BlobStore {
     private final GoogleCloudStorageService storageService;
     private final GoogleCloudStorageOperationsStats stats;
     private final int bufferSize;
+    private final BigArrays bigArrays;
 
     GoogleCloudStorageBlobStore(String bucketName,
                                 String clientName,
                                 String repositoryName,
                                 GoogleCloudStorageService storageService,
+                                BigArrays bigArrays,
                                 int bufferSize) {
         this.bucketName = bucketName;
         this.clientName = clientName;
         this.repositoryName = repositoryName;
         this.storageService = storageService;
+        this.bigArrays = bigArrays;
         this.stats = new GoogleCloudStorageOperationsStats(bucketName);
         this.bufferSize = bufferSize;
     }
@@ -234,7 +239,12 @@ class GoogleCloudStorageBlobStore implements BlobStore {
             writeBlobResumable(BlobInfo.newBuilder(bucketName, blobName).setMd5(md5).build(), bytes.streamInput(), bytes.length(),
                 failIfAlreadyExists);
         } else {
-            writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, BlobInfo.newBuilder(bucketName, blobName).build());
+            final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
+            if (bytes.hasArray()) {
+                writeBlobMultipart(blobInfo, bytes.array(), bytes.arrayOffset(), bytes.length(), failIfAlreadyExists);
+            } else {
+                writeBlob(bytes.streamInput(), bytes.length(), failIfAlreadyExists, blobInfo);
+            }
         }
     }
 
@@ -252,7 +262,9 @@ class GoogleCloudStorageBlobStore implements BlobStore {
         if (blobSize > getLargeBlobThresholdInBytes()) {
             writeBlobResumable(blobInfo, inputStream, blobSize, failIfAlreadyExists);
         } else {
-            writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
+            final byte[] buffer = new byte[Math.toIntExact(blobSize)];
+            Streams.readFully(inputStream, buffer);
+            writeBlobMultipart(blobInfo, buffer, 0, Math.toIntExact(blobSize), failIfAlreadyExists);
         }
     }
 
@@ -275,23 +287,71 @@ class GoogleCloudStorageBlobStore implements BlobStore {
         StorageException storageException = null;
 
         for (int retry = 0; retry < 3; ++retry) {
-            try {
-                final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
-                writer.accept(new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) {
+            // we start out by buffering the write to a buffer, if it exceeds the large blob threshold we start a resumable upload, flush
+            // the buffer to it and keep writing to the resumable upload. If we never exceed the large blob threshold we just write the
+            // buffer via a standard blob write
+            try (ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(bigArrays)) {
+                final AtomicReference<WriteChannel> channelRef = new AtomicReference<>();
+                writer.accept(new OutputStream() {
+
+                    private OutputStream resumableStream;
+
+                    @Override
+                    public void write(int b) throws IOException {
+                        if (resumableStream != null) {
+                            resumableStream.write(b);
+                        } else {
+                            if (buffer.size() + 1 > getLargeBlobThresholdInBytes()) {
+                                initResumableStream();
+                                resumableStream.write(b);
+                            } else {
+                                buffer.write(b);
+                            }
+                        }
+                    }
+
                     @Override
                     public void write(byte[] b, int off, int len) throws IOException {
-                        int written = 0;
-                        while (written < len) {
-                            // at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
-                            // see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
-                            final int toWrite = Math.min(len - written, 60 * 256 * 1024);
-                            out.write(b, off + written, toWrite);
-                            written += toWrite;
+                        if (resumableStream != null) {
+                            resumableStream.write(b, off, len);
+                        } else {
+                            if (buffer.size() + len > getLargeBlobThresholdInBytes()) {
+                                initResumableStream();
+                                resumableStream.write(b, off, len);
+                            } else {
+                                buffer.write(b, off, len);
+                            }
                         }
                     }
+
+                    private void initResumableStream() throws IOException {
+                        final WriteChannel writeChannel =
+                                SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
+                        channelRef.set(writeChannel);
+                        resumableStream = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) {
+                            @Override
+                            public void write(byte[] b, int off, int len) throws IOException {
+                                int written = 0;
+                                while (written < len) {
+                                    // at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
+                                    // see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
+                                    final int toWrite = Math.min(len - written, 60 * 256 * 1024);
+                                    out.write(b, off + written, toWrite);
+                                    written += toWrite;
+                                }
+                            }
+                        };
+                        buffer.bytes().writeTo(resumableStream);
+                        buffer.close();
+                    }
                 });
-                SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
-                stats.trackPutOperation();
+                final WritableByteChannel writeChannel = channelRef.get();
+                if (writeChannel != null) {
+                    SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
+                    stats.trackPutOperation();
+                } else {
+                    writeBlob(blobName, buffer.bytes(), failIfAlreadyExists);
+                }
                 return;
             } catch (final StorageException se) {
                 final int errorCode = se.getCode();
@@ -378,22 +438,21 @@ class GoogleCloudStorageBlobStore implements BlobStore {
      * 'multipart/related' request containing both data and metadata. The request is
      * gziped), see:
      * https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload
-     *  @param blobInfo the info for the blob to be uploaded
-     * @param inputStream the stream containing the blob data
+     * @param blobInfo the info for the blob to be uploaded
+     * @param buffer the byte array containing the data
+     * @param offset offset at which the blob contents start in the buffer
      * @param blobSize the size
      * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
      */
-    private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
+    private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, int blobSize, boolean failIfAlreadyExists)
         throws IOException {
         assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method";
-        final byte[] buffer = new byte[Math.toIntExact(blobSize)];
-        Streams.readFully(inputStream, buffer);
         try {
             final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
                 new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
                 new Storage.BlobTargetOption[0];
             SocketAccess.doPrivilegedVoidIOException(
-                    () -> client().create(blobInfo, buffer, targetOptions));
+                    () -> client().create(blobInfo, buffer, offset, blobSize, targetOptions));
             // We don't track this operation on the http layer as
             // we do with the GET/LIST operations since this operations
             // can trigger multiple underlying http requests but only one

+ 1 - 1
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java

@@ -95,7 +95,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
 
     @Override
     protected GoogleCloudStorageBlobStore createBlobStore() {
-        return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bufferSize);
+        return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bigArrays, bufferSize);
     }
 
     @Override

+ 2 - 1
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

@@ -14,6 +14,7 @@ import com.google.cloud.storage.StorageOptions;
 import com.sun.net.httpserver.HttpHandler;
 import fixture.gcs.FakeOAuth2HttpHandler;
 import org.apache.http.HttpStatus;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.core.SuppressForbidden;
@@ -140,7 +141,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
 
         httpServer.createContext("/token", new FakeOAuth2HttpHandler());
         final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service,
-            randomIntBetween(1, 8) * 1024);
+            BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024);
 
         return new GoogleCloudStorageBlobContainer(BlobPath.EMPTY, blobStore);
     }

+ 2 - 1
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java

@@ -18,6 +18,7 @@ import com.google.cloud.storage.StorageException;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
@@ -77,7 +78,7 @@ public class GoogleCloudStorageBlobStoreContainerTests extends ESTestCase {
         when(storageService.client(any(String.class), any(String.class), any(GoogleCloudStorageOperationsStats.class))).thenReturn(storage);
 
         try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", "repo", storageService,
-            randomIntBetween(1, 8) * 1024)) {
+            BigArrays.NON_RECYCLING_INSTANCE, randomIntBetween(1, 8) * 1024)) {
             final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
 
             IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs.iterator()));