|
@@ -97,19 +97,13 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|
|
store.execute((Operation<Void>) fileContext -> {
|
|
|
Path blob = new Path(path, blobName);
|
|
|
// we pass CREATE, which means it fails if a blob already exists.
|
|
|
- EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) :
|
|
|
- EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
|
|
|
- CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)};
|
|
|
- try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
|
|
|
+ EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
|
|
|
+ : EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
|
|
|
+ try (FSDataOutputStream stream = fileContext.create(blob, flags, CreateOpts.bufferSize(bufferSize))) {
|
|
|
int bytesRead;
|
|
|
byte[] buffer = new byte[bufferSize];
|
|
|
while ((bytesRead = inputStream.read(buffer)) != -1) {
|
|
|
stream.write(buffer, 0, bytesRead);
|
|
|
- // For safety we also hsync each write as well, because of its docs:
|
|
|
- // SYNC_BLOCK - to force closed blocks to the disk device
|
|
|
- // "In addition Syncable.hsync() should be called after each write,
|
|
|
- // if true synchronous behavior is required"
|
|
|
- stream.hsync();
|
|
|
}
|
|
|
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
|
|
|
throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
|