浏览代码

Retry GCS Resumable Upload on Error 410 (#45963)

A resumable upload session can fail on with a 410 error and should
be retried in that case. I added retrying twice using resetting of
the given `InputStream` as the retry mechanism since the same
approach is used by the AWS S3 SDK already as well and relied upon
by the S3 repository implementation.

Related GCS documentation:
https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
Armin Braun 6 年之前
父节点
当前提交
3f9e86b709

+ 49 - 24
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

@@ -31,6 +31,10 @@ import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.Storage.BlobListOption;
 import com.google.cloud.storage.StorageBatch;
 import com.google.cloud.storage.StorageException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
@@ -60,16 +64,19 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static java.net.HttpURLConnection.HTTP_GONE;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
 
 class GoogleCloudStorageBlobStore implements BlobStore {
 
+    private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);
+
     // The recommended maximum size of a blob that should be uploaded in a single
     // request. Larger files should be uploaded over multiple requests (this is
     // called "resumable upload")
     // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
-    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
 
     private final String bucketName;
     private final String clientName;
@@ -224,35 +231,53 @@ class GoogleCloudStorageBlobStore implements BlobStore {
      * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
      */
     private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
-        try {
-            final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
-                new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
-                new Storage.BlobWriteOption[0];
-            final WriteChannel writeChannel = SocketAccess
+        // We retry 410 GONE errors to cover the unlikely but possible scenario where a resumable upload session becomes broken and
+        // needs to be restarted from scratch. Given how unlikely a 410 error should be according to SLAs we retry only twice.
+        assert inputStream.markSupported();
+        inputStream.mark(Integer.MAX_VALUE);
+        StorageException storageException = null;
+        final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
+            new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
+        for (int retry = 0; retry < 3; ++retry) {
+            try {
+                final WriteChannel writeChannel = SocketAccess
                     .doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
-            Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
-                @Override
-                public boolean isOpen() {
-                    return writeChannel.isOpen();
-                }
+                Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
+                    @Override
+                    public boolean isOpen() {
+                        return writeChannel.isOpen();
+                    }
 
-                @Override
-                public void close() throws IOException {
-                    SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
-                }
+                    @Override
+                    public void close() throws IOException {
+                        SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
+                    }
 
-                @SuppressForbidden(reason = "Channel is based of a socket not a file")
-                @Override
-                public int write(ByteBuffer src) throws IOException {
-                    return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
+                    @SuppressForbidden(reason = "Channel is based of a socket not a file")
+                    @Override
+                    public int write(ByteBuffer src) throws IOException {
+                        return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
+                    }
+                }));
+                return;
+            } catch (final StorageException se) {
+                final int errorCode = se.getCode();
+                if (errorCode == HTTP_GONE) {
+                    logger.warn(() -> new ParameterizedMessage("Retrying broken resumable upload session for blob {}", blobInfo), se);
+                    storageException = ExceptionsHelper.useOrSuppress(storageException, se);
+                    inputStream.reset();
+                    continue;
+                } else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
+                    throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
                 }
-            }));
-        } catch (final StorageException se) {
-            if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
-                throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
+                if (storageException != null) {
+                    se.addSuppressed(storageException);
+                }
+                throw se;
             }
-            throw se;
         }
+        assert storageException != null;
+        throw storageException;
     }
 
     /**

+ 35 - 1
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java

@@ -19,12 +19,21 @@
 
 package org.elasticsearch.repositories.gcs;
 
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
+import org.elasticsearch.common.blobstore.BlobContainer;
+import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
 import java.util.Locale;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -37,10 +46,35 @@ public class GoogleCloudStorageBlobStoreContainerTests extends ESBlobStoreContai
         final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
         final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
         try {
-            when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
+            when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
         } catch (final Exception e) {
             throw new RuntimeException(e);
         }
         return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService);
     }
+
+    public void testWriteReadLarge() throws IOException {
+        try(BlobStore store = newBlobStore()) {
+            final BlobContainer container = store.blobContainer(new BlobPath());
+            byte[] data = randomBytes(GoogleCloudStorageBlobStore.LARGE_BLOB_THRESHOLD_BYTE_SIZE + 1);
+            writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
+            if (randomBoolean()) {
+                // override file, to check if we get latest contents
+                random().nextBytes(data);
+                writeBlob(container, "foobar", new BytesArray(data), false);
+            }
+            try (InputStream stream = container.readBlob("foobar")) {
+                BytesRefBuilder target = new BytesRefBuilder();
+                while (target.length() < data.length) {
+                    byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
+                    int offset = scaledRandomIntBetween(0, buffer.length - 1);
+                    int read = stream.read(buffer, offset, buffer.length - offset);
+                    target.append(new BytesRef(buffer, offset, read));
+                }
+                assertEquals(data.length, target.length());
+                assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
+            }
+        }
+    }
+
 }

+ 0 - 1
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

@@ -124,7 +124,6 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
         final MockSecureSettings secureSettings = new MockSecureSettings();
         secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace("test").getKey(), serviceAccount);
         settings.setSecureSettings(secureSettings);
-
         return settings.build();
     }
 

+ 1 - 1
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreTests.java

@@ -37,7 +37,7 @@ public class GoogleCloudStorageBlobStoreTests extends ESBlobStoreTestCase {
         final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
         final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
         try {
-            when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
+            when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
         } catch (final Exception e) {
             throw new RuntimeException(e);
         }

+ 24 - 7
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java

@@ -40,6 +40,7 @@ import com.google.cloud.storage.StorageException;
 import com.google.cloud.storage.StorageOptions;
 import com.google.cloud.storage.StorageRpcOptionUtils;
 import com.google.cloud.storage.StorageTestUtils;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.mockito.stubbing.Answer;
 
@@ -47,6 +48,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
@@ -55,6 +57,8 @@ import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -71,10 +75,12 @@ import static org.mockito.Mockito.mock;
  */
 class MockStorage implements Storage {
 
+    private final Random random;
     private final String bucketName;
     private final ConcurrentMap<String, byte[]> blobs;
 
-    MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs) {
+    MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs, final Random random) {
+        this.random = random;
         this.bucketName = Objects.requireNonNull(bucket);
         this.blobs = Objects.requireNonNull(blobs);
     }
@@ -236,12 +242,16 @@ class MockStorage implements Storage {
         return null;
     }
 
+    private final Set<BlobInfo> simulated410s = ConcurrentCollections.newConcurrentSet();
+
     @Override
     public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
         if (bucketName.equals(blobInfo.getBucket())) {
             final ByteArrayOutputStream output = new ByteArrayOutputStream();
             return new WriteChannel() {
 
+                private volatile boolean failed;
+
                 final WritableByteChannel writableByteChannel = Channels.newChannel(output);
 
                 @Override
@@ -256,6 +266,11 @@ class MockStorage implements Storage {
 
                 @Override
                 public int write(ByteBuffer src) throws IOException {
+                    // Only fail a blob once on a 410 error since the error is so unlikely in practice
+                    if (simulated410s.add(blobInfo) && random.nextBoolean()) {
+                        failed = true;
+                        throw new StorageException(HttpURLConnection.HTTP_GONE, "Simulated lost resumeable upload session");
+                    }
                     return writableByteChannel.write(src);
                 }
 
@@ -267,13 +282,15 @@ class MockStorage implements Storage {
                 @Override
                 public void close() {
                     IOUtils.closeWhileHandlingException(writableByteChannel);
-                    if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
-                        byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
-                        if (existingBytes != null) {
-                            throw new StorageException(412, "Blob already exists");
+                    if (failed == false) {
+                        if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
+                            byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
+                            if (existingBytes != null) {
+                                throw new StorageException(412, "Blob already exists");
+                            }
+                        } else {
+                            blobs.put(blobInfo.getName(), output.toByteArray());
                         }
-                    } else {
-                        blobs.put(blobInfo.getName(), output.toByteArray());
                     }
                 }
             };