1
0
Эх сурвалжийг харах

Implement Atomic Blob Writes for HDFS Repository (#37066)

* Implement atomic writes the same way we do for the FsBlobContainer via rename which is atomic
* Relates #37011
Armin Braun 6 жил өмнө
parent
commit
10d9819f99

+ 25 - 0
plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

@@ -21,11 +21,13 @@ package org.elasticsearch.repositories.hdfs;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Path;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.blobstore.BlobMetaData;
 import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
 import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;
@@ -116,6 +118,29 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
         });
     }
 
+    @Override
+    public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
+        final String tempBlob = FsBlobContainer.tempBlobName(blobName);
+        store.execute((Operation<Void>) fileContext -> {
+            final Path tempBlobPath = new Path(path, tempBlob);
+            try (FSDataOutputStream stream = fileContext.create(
+                tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK),  CreateOpts.bufferSize(bufferSize))) {
+                int bytesRead;
+                byte[] buffer = new byte[bufferSize];
+                while ((bytesRead = inputStream.read(buffer)) != -1) {
+                    stream.write(buffer, 0, bytesRead);
+                }
+            }
+            final Path blob = new Path(path, blobName);
+            try {
+                fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE);
+            } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
+                throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
+            }
+            return null;
+        });
+    }
+
     @Override
     public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
         FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,