Browse Source

Add Ability to Write a BytesReference to BlobContainer (#66501)

Except when writing actual segment files to the blob store
we always write `BytesReference` instead of a stream.
Only having the stream API available forces needless copies
on us. I fixed the straight-forward needless copying for
HDFS and FS repos in this PR, we could do similar fixes for
GCS and Azure as well and thus significantly reduce the peak
memory use of these writes on master nodes in particular.
Armin Braun 4 years ago
parent
commit
3819fcb582
18 changed files with 117 additions and 75 deletions
  1. 2 1
      modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java
  2. 3 6
      plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
  3. 3 2
      plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
  4. 3 2
      plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java
  5. 26 2
      plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java
  6. 2 6
      plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java
  7. 3 2
      plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
  8. 3 3
      server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java
  9. 22 10
      server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java
  10. 25 3
      server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java
  11. 3 2
      server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java
  12. 5 12
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  13. 1 2
      server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java
  14. 1 5
      server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java
  15. 4 3
      server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java
  16. 6 10
      test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java
  17. 4 3
      test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
  18. 1 1
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java

+ 2 - 1
modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobMetadata;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
+import org.elasticsearch.common.bytes.BytesReference;
 
 import java.io.BufferedInputStream;
 import java.io.FileNotFoundException;
@@ -132,7 +133,7 @@ public class URLBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
+    public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
         throw new UnsupportedOperationException("URL repository doesn't support this operation");
     }
 

+ 3 - 6
plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

@@ -28,6 +28,7 @@ import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.MockSecureSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -36,9 +37,7 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
 import org.elasticsearch.rest.RestStatus;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Collection;
@@ -221,10 +220,8 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
             final BlobContainer container = store.blobContainer(new BlobPath());
             for (int i = 0; i < numberOfBlobs; i++) {
                 byte[] bytes = randomBytes(randomInt(100));
-                try (InputStream inputStream = new ByteArrayInputStream(bytes)) {
-                    String blobName = randomAlphaOfLength(10);
-                    container.writeBlob(blobName, inputStream, bytes.length, false);
-                }
+                String blobName = randomAlphaOfLength(10);
+                container.writeBlob(blobName, new BytesArray(bytes), false);
             }
 
             container.delete();

+ 3 - 2
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

@@ -29,6 +29,7 @@ import org.elasticsearch.common.blobstore.BlobMetadata;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
+import org.elasticsearch.common.bytes.BytesReference;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -101,8 +102,8 @@ public class AzureBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
-        writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
+    public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
+        writeBlob(blobName, bytes, failIfAlreadyExists);
     }
 
     @Override

+ 3 - 2
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStoreException;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
+import org.elasticsearch.common.bytes.BytesReference;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -83,8 +84,8 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
-        writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
+    public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
+        writeBlob(blobName, bytes, failIfAlreadyExists);
     }
 
     @Override

+ 26 - 2
plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

@@ -34,6 +34,7 @@ import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;
 
@@ -150,12 +151,28 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
+    public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
+        Path blob = new Path(path, blobName);
+        // we pass CREATE, which means it fails if a blob already exists.
+        final EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
+                : EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
+        store.execute((Operation<Void>) fileContext -> {
+            try {
+                writeToPath(bytes, blob, fileContext, flags);
+            } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
+                throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
+            }
+            return null;
+        });
+    }
+
+    @Override
+    public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
         final String tempBlob = FsBlobContainer.tempBlobName(blobName);
         final Path tempBlobPath = new Path(path, tempBlob);
         final Path blob = new Path(path, blobName);
         store.execute((Operation<Void>) fileContext -> {
-            writeToPath(inputStream, blobSize, fileContext, tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK));
+            writeToPath(bytes, tempBlobPath, fileContext, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK));
             try {
                 fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE);
             } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
@@ -165,6 +182,13 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
         });
     }
 
+    private void writeToPath(BytesReference bytes, Path blobPath, FileContext fileContext,
+                             EnumSet<CreateFlag> createFlags) throws IOException {
+        try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags)) {
+            bytes.writeTo(stream);
+        }
+    }
+
     private void writeToPath(InputStream inputStream, long blobSize, FileContext fileContext, Path blobPath,
                              EnumSet<CreateFlag> createFlags) throws IOException {
         final byte[] buffer = new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize];

+ 2 - 6
plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

@@ -54,7 +54,6 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -158,12 +157,9 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
             SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
         final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(),
             SnapshotsService.OLD_SNAPSHOT_FORMAT));
-        PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
-            try (InputStream stream = serialized.streamInput()) {
+        PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () ->
                 repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
-                    BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true);
-            }
-        })));
+                        BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), serialized, true))));
 
         final String newSnapshotName = "snapshot-new";
         final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();

+ 3 - 2
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -46,6 +46,7 @@ import org.elasticsearch.common.blobstore.BlobStoreException;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -143,8 +144,8 @@ class S3BlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
-        writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
+    public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
+        writeBlob(blobName, bytes, failIfAlreadyExists);
     }
 
     @Override

+ 3 - 3
server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java

@@ -22,12 +22,12 @@ import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.RepositoryCleanupInProgress;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
 import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.test.ESIntegTestCase;
 
-import java.io.ByteArrayInputStream;
 import java.util.concurrent.ExecutionException;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
@@ -85,7 +85,7 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
         logger.info("--> creating a garbage data blob");
         final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
         repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore()
-            .blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true)));
+            .blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new BytesArray(new byte[1]), true)));
         garbageFuture.get();
 
         blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
@@ -120,7 +120,7 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
             final int generation = i;
             repository.threadPool().generic().execute(ActionRunnable.run(createOldIndexNFuture, () -> repository.blobStore()
                 .blobContainer(repository.basePath()).writeBlob(BlobStoreRepository.INDEX_FILE_PREFIX + generation,
-                    new ByteArrayInputStream(new byte[1]), 1, true)));
+                        new BytesArray(new byte[1]), true)));
             createOldIndexNFuture.get();
         }
 

+ 22 - 10
server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.common.blobstore;
 
+import org.elasticsearch.common.bytes.BytesReference;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.FileAlreadyExistsException;
@@ -110,25 +112,35 @@ public interface BlobContainer {
     void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
 
     /**
-     * Reads blob content from the input stream and writes it to the container in a new blob with the given name,
-     * using an atomic write operation if the implementation supports it.
+     * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name.
      *
-     * This method assumes the container does not already contain a blob of the same blobName.  If a blob by the
-     * same name already exists, the operation will fail and an {@link IOException} will be thrown.
+     * @param   blobName
+     *          The name of the blob to write the contents of the input stream to.
+     * @param   bytes
+     *          The bytes to write
+     * @param   failIfAlreadyExists
+     *          whether to throw a FileAlreadyExistsException if the given blob already exists
+     * @throws  FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
+     * @throws  IOException if the input stream could not be read, or the target blob could not be written to.
+     */
+    default void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
+        writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
+    }
+
+    /**
+     * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name,
+     * using an atomic write operation if the implementation supports it.
      *
      * @param   blobName
      *          The name of the blob to write the contents of the input stream to.
-     * @param   inputStream
-     *          The input stream from which to retrieve the bytes to write to the blob.
-     * @param   blobSize
-     *          The size of the blob to be written, in bytes.  It is implementation dependent whether
-     *          this value is used in writing the blob to the repository.
+     * @param   bytes
+     *          The bytes to write
      * @param   failIfAlreadyExists
      *          whether to throw a FileAlreadyExistsException if the given blob already exists
      * @throws  FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
      * @throws  IOException if the input stream could not be read, or the target blob could not be written to.
      */
-    void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
+    void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException;
 
     /**
      * Deletes this container and all its contents from the repository.

+ 25 - 3
server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.core.internal.io.IOUtils;
 
@@ -190,12 +191,26 @@ public class FsBlobContainer extends AbstractBlobContainer {
     }
 
     @Override
-    public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists)
-        throws IOException {
+    public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
+        final Path file = path.resolve(blobName);
+        try {
+            writeToPath(bytes, file);
+        } catch (FileAlreadyExistsException faee) {
+            if (failIfAlreadyExists) {
+                throw faee;
+            }
+            deleteBlobsIgnoringIfNotExists(Collections.singletonList(blobName));
+            writeToPath(bytes, file);
+        }
+        IOUtils.fsync(path, true);
+    }
+
+    @Override
+    public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
         final String tempBlob = tempBlobName(blobName);
         final Path tempBlobPath = path.resolve(tempBlob);
         try {
-            writeToPath(inputStream, tempBlobPath, blobSize);
+            writeToPath(bytes, tempBlobPath);
             moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists);
         } catch (IOException ex) {
             try {
@@ -209,6 +224,13 @@ public class FsBlobContainer extends AbstractBlobContainer {
         }
     }
 
+    private void writeToPath(BytesReference bytes, Path tempBlobPath) throws IOException {
+        try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
+            bytes.writeTo(outputStream);
+        }
+        IOUtils.fsync(tempBlobPath, false);
+    }
+
     private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
         try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
             final int bufferSize = blobStore.bufferSizeInBytes();

+ 3 - 2
server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java

@@ -23,6 +23,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetadata;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
+import org.elasticsearch.common.bytes.BytesReference;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -72,8 +73,8 @@ public abstract class FilterBlobContainer implements BlobContainer {
     }
 
     @Override
-    public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
-        delegate.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists);
+    public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
+        delegate.writeBlobAtomic(blobName, bytes, failIfAlreadyExists);
     }
 
     @Override

+ 5 - 12
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -1273,9 +1273,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 byte[] testBytes = Strings.toUTF8Bytes(seed);
                 BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
                 BytesArray bytes = new BytesArray(testBytes);
-                try (InputStream stream = bytes.streamInput()) {
-                    testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true);
-                }
+                testContainer.writeBlobAtomic("master.dat", new BytesArray(testBytes), true);
                 return seed;
             }
         } catch (Exception exp) {
@@ -1880,11 +1878,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
 
     private void writeAtomic(BlobContainer container, final String blobName, final BytesReference bytesRef,
                              boolean failIfAlreadyExists) throws IOException {
-        try (InputStream stream = bytesRef.streamInput()) {
-            logger.trace(() ->
-                    new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path()));
-            container.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists);
-        }
+        logger.trace(() ->
+                new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path()));
+        container.writeBlobAtomic(blobName, bytesRef, failIfAlreadyExists);
     }
 
     @Override
@@ -2291,10 +2287,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         } else {
             BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
             try {
-                BytesArray bytes = new BytesArray(seed);
-                try (InputStream stream = bytes.streamInput()) {
-                    testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length(), true);
-                }
+                testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed), true);
             } catch (Exception exp) {
                 throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() +
                     "] is not accessible on the node [" + localNode + "]", exp);

+ 1 - 2
server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java

@@ -150,8 +150,7 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
      */
     public void write(T obj, BlobContainer blobContainer, String name, boolean compress, BigArrays bigArrays) throws IOException {
         final String blobName = blobName(name);
-        serialize(obj, blobName, compress, bigArrays, bytes -> blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(),
-                false));
+        serialize(obj, blobName, compress, bigArrays, bytes -> blobContainer.writeBlob(blobName, bytes, false));
     }
 
     public void serialize(final T obj, final String blobName, final boolean compress, BigArrays bigArrays,

+ 1 - 5
server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java

@@ -29,7 +29,6 @@ import org.elasticsearch.common.blobstore.fs.FsBlobStore;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentFragment;
@@ -166,10 +165,7 @@ public class BlobStoreFormatTests extends ESTestCase {
             int location = randomIntBetween(0, buffer.length - 1);
             buffer[location] = (byte) (buffer[location] ^ 42);
         } while (originalChecksum == checksum(buffer));
-        BytesArray bytesArray = new BytesArray(buffer);
-        try (StreamInput stream = bytesArray.streamInput()) {
-            blobContainer.writeBlob(blobName, stream, bytesArray.length(), false);
-        }
+        blobContainer.writeBlob(blobName, new BytesArray(buffer), false);
     }
 
     private long checksum(byte[] buffer) throws IOException {

+ 4 - 3
server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java

@@ -29,6 +29,7 @@ import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.common.util.MockBigArrays;
@@ -378,9 +379,9 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
             }
 
             @Override
-            public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize,
-                final boolean failIfAlreadyExists) throws IOException {
-                writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
+            public void writeBlobAtomic(final String blobName, final BytesReference bytes,
+                                        final boolean failIfAlreadyExists) throws IOException {
+                writeBlob(blobName, bytes, failIfAlreadyExists);
             }
         }
     }

+ 6 - 10
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java

@@ -204,13 +204,11 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
     }
 
     public static void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray,
-        boolean failIfAlreadyExists) throws IOException {
-        try (InputStream stream = bytesArray.streamInput()) {
-            if (randomBoolean()) {
-                container.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists);
-            } else {
-                container.writeBlobAtomic(blobName, stream, bytesArray.length(), failIfAlreadyExists);
-            }
+                                 boolean failIfAlreadyExists) throws IOException {
+        if (randomBoolean()) {
+            container.writeBlob(blobName, bytesArray, failIfAlreadyExists);
+        } else {
+            container.writeBlobAtomic(blobName, bytesArray, failIfAlreadyExists);
         }
     }
 
@@ -257,9 +255,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
     }
 
     protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException {
-        try (InputStream stream = bytesArray.streamInput()) {
-            container.writeBlob(blobName, stream, bytesArray.length(), true);
-        }
+        container.writeBlob(blobName, bytesArray, true);
     }
 
     protected BlobStore newBlobStore() {

+ 4 - 3
test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

@@ -33,6 +33,7 @@ import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
 import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.PathUtils;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
@@ -477,7 +478,7 @@ public class MockRepository extends FsRepository {
             }
 
             @Override
-            public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize,
+            public void writeBlobAtomic(final String blobName, final BytesReference bytes,
                                         final boolean failIfAlreadyExists) throws IOException {
                 final Random random = RandomizedContext.current().getRandom();
                 if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) {
@@ -493,7 +494,7 @@ public class MockRepository extends FsRepository {
                 if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) {
                     // Simulate a failure between the write and move operation in FsBlobContainer
                     final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
-                    super.writeBlob(tempBlobName, inputStream, blobSize, failIfAlreadyExists);
+                    super.writeBlob(tempBlobName, bytes, failIfAlreadyExists);
                     maybeIOExceptionOrBlock(blobName);
                     final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate();
                     fsBlobContainer.moveBlobAtomic(tempBlobName, blobName, failIfAlreadyExists);
@@ -501,7 +502,7 @@ public class MockRepository extends FsRepository {
                     // Atomic write since it is potentially supported
                     // by the delegating blob container
                     maybeIOExceptionOrBlock(blobName);
-                    super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists);
+                    super.writeBlobAtomic(blobName, bytes, failIfAlreadyExists);
                 }
             }
         }

+ 1 - 1
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java

@@ -267,7 +267,7 @@ public final class TestUtils {
         }
 
         @Override
-        public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
+        public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) {
             throw unsupportedException();
         }