瀏覽代碼

User proper write-once semantics for GCS repository (#30438)

There's no need for an extra blobExists() call when writing a blob to the GCS service. GCS provides
an option (with stronger consistency guarantees) on the insert method that guarantees that the
blob that's uploaded does not already exist.

Relates to #19749
Yannick Welsch 7 年之前
父節點
當前提交
b57d21bab1

+ 0 - 3
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

@@ -66,9 +66,6 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
 
     @Override
     public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
-        if (blobExists(blobName)) {
-            throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
-        }
         blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
     }
 

+ 40 - 18
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

@@ -28,6 +28,7 @@ import com.google.cloud.storage.Bucket;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.Storage.BlobListOption;
 import com.google.cloud.storage.Storage.CopyRequest;
+import com.google.cloud.storage.StorageException;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetaData;
@@ -47,12 +48,15 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.NoSuchFileException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
 class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore {
 
     // The recommended maximum size of a blob that should be uploaded in a single
@@ -204,24 +208,32 @@ class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore
      * @param inputStream the stream containing the blob data
      */
     private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
-        final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> storage.writer(blobInfo));
-        Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
-            @Override
-            public boolean isOpen() {
-                return writeChannel.isOpen();
-            }
+        try {
+            final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(
+                () -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
+            Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
+                @Override
+                public boolean isOpen() {
+                    return writeChannel.isOpen();
+                }
 
-            @Override
-            public void close() throws IOException {
-                SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
-            }
+                @Override
+                public void close() throws IOException {
+                    SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
+                }
 
-            @SuppressForbidden(reason = "Channel is based of a socket not a file")
-            @Override
-            public int write(ByteBuffer src) throws IOException {
-                return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
+                @SuppressForbidden(reason = "Channel is based of a socket not a file")
+                @Override
+                public int write(ByteBuffer src) throws IOException {
+                    return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
+                }
+            }));
+        } catch (StorageException se) {
+            if (se.getCode() == HTTP_PRECON_FAILED) {
+                throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
             }
-        }));
+            throw se;
+        }
     }
 
     /**
@@ -238,7 +250,17 @@ class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore
         assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
         final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
         Streams.copy(inputStream, baos);
-        SocketAccess.doPrivilegedVoidIOException(() -> storage.create(blobInfo, baos.toByteArray()));
+        SocketAccess.doPrivilegedVoidIOException(
+            () -> {
+                try {
+                    storage.create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist());
+                } catch (StorageException se) {
+                    if (se.getCode() == HTTP_PRECON_FAILED) {
+                        throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
+                    }
+                    throw se;
+                }
+            });
     }
 
     /**
@@ -295,8 +317,8 @@ class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore
     /**
      * Moves a blob within the same bucket
      *
-     * @param sourceBlob name of the blob to move
-     * @param targetBlob new name of the blob in the same bucket
+     * @param sourceBlobName name of the blob to move
+     * @param targetBlobName new name of the blob in the same bucket
      */
     void moveBlob(String sourceBlobName, String targetBlobName) throws IOException {
         final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName);

+ 18 - 3
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java

@@ -56,6 +56,7 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
@@ -113,7 +114,14 @@ class MockStorage implements Storage {
         if (bucketName.equals(blobInfo.getBucket()) == false) {
             throw new StorageException(404, "Bucket not found");
         }
-        blobs.put(blobInfo.getName(), content);
+        if (Stream.of(options).anyMatch(option -> option.equals(BlobTargetOption.doesNotExist()))) {
+            byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), content);
+            if (existingBytes != null) {
+                throw new StorageException(412, "Blob already exists");
+            }
+        } else {
+            blobs.put(blobInfo.getName(), content);
+        }
         return get(BlobId.of(blobInfo.getBucket(), blobInfo.getName()));
     }
 
@@ -243,9 +251,16 @@ class MockStorage implements Storage {
                 }
 
                 @Override
-                public void close() throws IOException {
+                public void close() {
                     IOUtils.closeWhileHandlingException(writableByteChannel);
-                    blobs.put(blobInfo.getName(), output.toByteArray());
+                    if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
+                        byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
+                        if (existingBytes != null) {
+                            throw new StorageException(412, "Blob already exists");
+                        }
+                    } else {
+                        blobs.put(blobInfo.getName(), output.toByteArray());
+                    }
                 }
             };
         }