Browse Source

Save Memory on Large Repository Metadata Blob Writes (#74313)

This PR adds a new API for doing streaming serialization writes to a repository to enable repository metadata of arbitrary size and at bounded memory during writing. 
The existing write-APIs require knowledge of the eventual blob size beforehand. This forced us to materialize the serialized blob in memory before writing, costing a lot of memory in case of e.g. very large `RepositoryData` (and limiting us to `2G` max blob size).
With this PR the requirement to fully materialize the serialized metadata goes away and the memory overhead becomes completely bounded by the outbound buffer size of the repository implementation. 

As we move to larger repositories this makes master node stability a lot more predictable since writing out `RepositoryData` does not take as much memory any longer (same applies to shard level metadata), enables aggregating multiple metadata blobs into a single larger blobs without massive overhead and removes the 2G size limit on `RepositoryData`.
Armin Braun 4 years ago
parent
commit
8947c1e980
27 changed files with 765 additions and 182 deletions
  1. 10 0
      modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java
  2. 10 0
      plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
  3. 59 4
      plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
  4. 1 1
      plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java
  5. 79 1
      plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java
  6. 10 0
      plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java
  7. 77 18
      plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java
  8. 6 2
      plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java
  9. 34 0
      plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java
  10. 108 30
      plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
  11. 9 1
      plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java
  12. 1 1
      plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java
  13. 102 1
      plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java
  14. 14 0
      server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java
  15. 57 0
      server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java
  16. 8 0
      server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java
  17. 25 33
      server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
  18. 26 39
      server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java
  19. 6 6
      server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java
  20. 7 18
      server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java
  21. 0 9
      test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java
  22. 1 2
      test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
  23. 45 16
      test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
  24. 20 0
      x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java
  25. 12 0
      x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java
  26. 19 0
      x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java
  27. 19 0
      x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java

+ 10 - 0
modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.common.blobstore.url;
 
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetadata;
@@ -20,6 +21,7 @@ import java.io.BufferedInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URL;
 import java.nio.file.NoSuchFileException;
 import java.security.AccessController;
@@ -121,6 +123,14 @@ public class URLBlobContainer extends AbstractBlobContainer {
         throw new UnsupportedOperationException("URL repository doesn't support this operation");
     }
 
+    @Override
+    public void writeBlob(String blobName,
+                          boolean failIfAlreadyExists,
+                          boolean atomic,
+                          CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        throw new UnsupportedOperationException("URL repository doesn't support this operation");
+    }
+
     @Override
     public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
         throw new UnsupportedOperationException("URL repository doesn't support this operation");

+ 10 - 0
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

@@ -12,6 +12,7 @@ import com.azure.storage.blob.models.BlobStorageException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.util.Throwables;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetadata;
@@ -22,6 +23,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.NoSuchFileException;
 import java.util.Iterator;
 import java.util.Map;
@@ -99,6 +101,14 @@ public class AzureBlobContainer extends AbstractBlobContainer {
         blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
     }
 
+    @Override
+    public void writeBlob(String blobName,
+                          boolean failIfAlreadyExists,
+                          boolean atomic,
+                          CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
+    }
+
     @Override
     public DeleteResult delete() throws IOException {
         return blobStore.deleteBlobDirectory(keyPath);

+ 59 - 4
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

@@ -33,6 +33,8 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.blobstore.BlobContainer;
@@ -46,6 +48,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.repositories.azure.AzureRepository.Repository;
+import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -53,6 +56,7 @@ import reactor.core.scheduler.Schedulers;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -82,6 +86,8 @@ public class AzureBlobStore implements BlobStore {
 
     private final AzureStorageService service;
 
+    private final BigArrays bigArrays;
+
     private final String clientName;
     private final String container;
     private final LocationMode locationMode;
@@ -90,10 +96,11 @@ public class AzureBlobStore implements BlobStore {
     private final Stats stats = new Stats();
     private final BiConsumer<String, URL> statsConsumer;
 
-    public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service) {
+    public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, BigArrays bigArrays) {
         this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
         this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
         this.service = service;
+        this.bigArrays = bigArrays;
         // locationMode is set per repository, not per client
         this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
         this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings());
@@ -383,6 +390,49 @@ public class AzureBlobStore implements BlobStore {
         executeSingleUpload(blobName, byteBufferFlux, bytes.length(), failIfAlreadyExists);
     }
 
+    public void writeBlob(String blobName,
+                          boolean failIfAlreadyExists,
+                          CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        final BlockBlobAsyncClient blockBlobAsyncClient = asyncClient().getBlobContainerAsyncClient(container)
+                .getBlobAsyncClient(blobName).getBlockBlobAsyncClient();
+        try (ChunkedBlobOutputStream<String> out = new ChunkedBlobOutputStream<>(bigArrays, getUploadBlockSize()) {
+
+            @Override
+            protected void flushBuffer() {
+                if (buffer.size() == 0) {
+                    return;
+                }
+                final String blockId = makeMultipartBlockId();
+                SocketAccess.doPrivilegedVoidException(() -> blockBlobAsyncClient.stageBlock(
+                        blockId,
+                        Flux.fromArray(BytesReference.toByteBuffers(buffer.bytes())),
+                        buffer.size()
+                ).block());
+                finishPart(blockId);
+            }
+
+            @Override
+            protected void onCompletion() {
+                if (flushedBytes == 0L) {
+                    writeBlob(blobName, buffer.bytes(), failIfAlreadyExists);
+                } else {
+                    flushBuffer();
+                    SocketAccess.doPrivilegedVoidException(
+                            () -> blockBlobAsyncClient.commitBlockList(parts, failIfAlreadyExists == false).block());
+                }
+            }
+
+            @Override
+            protected void onFailure() {
+                // Nothing to do here, already uploaded blocks will be GCed by Azure after a week.
+                // see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks
+            }
+        }) {
+            writer.accept(out);
+            out.markSuccess();
+        }
+    }
+
     public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
         assert inputStream.markSupported()
             : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
@@ -439,13 +489,11 @@ public class AzureBlobStore implements BlobStore {
             assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";
 
             final List<String> blockIds = new ArrayList<>(nbParts);
-            final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
-            final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();
             for (int i = 0; i < nbParts; i++) {
                 final long length = i < nbParts - 1 ? partSize : lastPartSize;
                 Flux<ByteBuffer> byteBufferFlux = convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE);
 
-                final String blockId = base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
+                final String blockId = makeMultipartBlockId();
                 blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block();
                 blockIds.add(blockId);
             }
@@ -454,6 +502,13 @@ public class AzureBlobStore implements BlobStore {
         });
     }
 
+    private static final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
+    private static final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();
+
+    private String makeMultipartBlockId() {
+        return base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
+    }
+
     /**
      * Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding
      * memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size.

+ 1 - 1
plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java

@@ -124,7 +124,7 @@ public class AzureRepository extends MeteredBlobStoreRepository {
 
     @Override
     protected AzureBlobStore createBlobStore() {
-        final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
+        final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, bigArrays);
 
         logger.debug(() -> new ParameterizedMessage(
             "using container [{}], chunk_size [{}], compress [{}], base_path [{}]",

+ 79 - 1
plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java

@@ -15,6 +15,7 @@ import com.sun.net.httpserver.HttpServer;
 import fixture.azure.AzureHttpHandler;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.blobstore.BlobContainer;
@@ -58,6 +59,7 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -172,7 +174,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
                 .put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
                 .build());
 
-        return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service));
+        return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service, BigArrays.NON_RECYCLING_INSTANCE));
     }
 
     public void testReadNonexistentBlobThrowsNoSuchFileException() {
@@ -391,6 +393,82 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
         assertThat(blocks.isEmpty(), is(true));
     }
 
+    public void testWriteLargeBlobStreaming() throws Exception {
+        final int maxRetries = randomIntBetween(2, 5);
+
+        final int blobSize = (int) ByteSizeUnit.MB.toBytes(10);
+        final byte[] data = randomBytes(blobSize);
+
+        final int nbErrors = 2; // we want all requests to fail at least once
+        final AtomicInteger counterUploads = new AtomicInteger(0);
+        final AtomicLong bytesReceived = new AtomicLong(0L);
+        final CountDown countDownComplete = new CountDown(nbErrors);
+
+        final Map<String, BytesReference> blocks = new ConcurrentHashMap<>();
+        httpServer.createContext("/account/container/write_large_blob_streaming", exchange -> {
+
+            if ("PUT".equals(exchange.getRequestMethod())) {
+                final Map<String, String> params = new HashMap<>();
+                RestUtils.decodeQueryString(exchange.getRequestURI().getRawQuery(), 0, params);
+
+                final String blockId = params.get("blockid");
+                assert Strings.hasText(blockId) == false || AzureFixtureHelper.assertValidBlockId(blockId);
+
+                if (Strings.hasText(blockId) && (counterUploads.incrementAndGet() % 2 == 0)) {
+                    final BytesReference blockData = Streams.readFully(exchange.getRequestBody());
+                    blocks.put(blockId, blockData);
+                    bytesReceived.addAndGet(blockData.length());
+                    exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
+                    exchange.close();
+                    return;
+                }
+
+                final String complete = params.get("comp");
+                if ("blocklist".equals(complete) && (countDownComplete.countDown())) {
+                    final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8));
+                    final List<String> blockUids = Arrays.stream(blockList.split("<Latest>"))
+                            .filter(line -> line.contains("</Latest>"))
+                            .map(line -> line.substring(0, line.indexOf("</Latest>")))
+                            .collect(Collectors.toList());
+
+                    final ByteArrayOutputStream blob = new ByteArrayOutputStream();
+                    for (String blockUid : blockUids) {
+                        BytesReference block = blocks.remove(blockUid);
+                        assert block != null;
+                        block.writeTo(blob);
+                    }
+                    assertArrayEquals(data, blob.toByteArray());
+                    exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false");
+                    exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
+                    exchange.close();
+                    return;
+                }
+            }
+
+            if (randomBoolean()) {
+                Streams.readFully(exchange.getRequestBody());
+                AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
+            }
+            exchange.close();
+        });
+
+        final BlobContainer blobContainer = createBlobContainer(maxRetries);
+        blobContainer.writeBlob("write_large_blob_streaming", false, randomBoolean(), out -> {
+            int outstanding = data.length;
+            while (outstanding > 0) {
+                if (randomBoolean()) {
+                    int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, data.length), outstanding));
+                    out.write(data, data.length - outstanding, toWrite);
+                    outstanding -= toWrite;
+                } else {
+                    out.write(data[data.length - outstanding]);
+                    outstanding--;
+                }
+            }
+        });
+        assertEquals(blobSize, bytesReceived.get());
+    }
+
     public void testRetryUntilFail() throws Exception {
         final int maxRetries = randomIntBetween(2, 5);
         final AtomicInteger requestsReceived = new AtomicInteger(0);

+ 10 - 0
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

@@ -15,9 +15,11 @@ import org.elasticsearch.common.blobstore.BlobStoreException;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.core.CheckedConsumer;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -76,6 +78,14 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
         blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
     }
 
+    @Override
+    public void writeBlob(String blobName,
+                          boolean failIfAlreadyExists,
+                          boolean atomic,
+                          CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
+    }
+
     @Override
     public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
         writeBlob(blobName, bytes, failIfAlreadyExists);

+ 77 - 18
plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

@@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetadata;
@@ -37,8 +38,10 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 
 import java.io.ByteArrayInputStream;
+import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
@@ -265,6 +268,52 @@ class GoogleCloudStorageBlobStore implements BlobStore {
             {Storage.BlobWriteOption.doesNotExist(), Storage.BlobWriteOption.md5Match()};
     private static final Storage.BlobWriteOption[] OVERWRITE_CHECK_MD5 = {Storage.BlobWriteOption.md5Match()};
 
+    void writeBlob(String blobName, boolean failIfAlreadyExists, CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
+        final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_MD5 : OVERWRITE_NO_MD5;
+
+        StorageException storageException = null;
+
+        for (int retry = 0; retry < 3; ++retry) {
+            try {
+                final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
+                try (OutputStream out = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) {
+                    @Override
+                    public void write(byte[] b, int off, int len) throws IOException {
+                        int written = 0;
+                        while (written < len) {
+                            // at most write the default chunk size in one go to prevent allocating huge buffers in the SDK
+                            // see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE
+                            final int toWrite = Math.min(len - written, 60 * 256 * 1024);
+                            out.write(b, off + written, toWrite);
+                            written += toWrite;
+                        }
+                    }
+                }) {
+                    writer.accept(out);
+                    SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
+                }
+                stats.trackPutOperation();
+                return;
+            } catch (final StorageException se) {
+                final int errorCode = se.getCode();
+                if (errorCode == HTTP_GONE) {
+                    logger.warn(() -> new ParameterizedMessage("Retrying broken resumable upload session for blob {}", blobInfo), se);
+                    storageException = ExceptionsHelper.useOrSuppress(storageException, se);
+                    continue;
+                } else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
+                    throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
+                }
+                if (storageException != null) {
+                    se.addSuppressed(storageException);
+                }
+                throw se;
+            }
+        }
+        assert storageException != null;
+        throw storageException;
+    }
+
     /**
      * Uploads a blob using the "resumable upload" method (multiple requests, which
      * can be independently retried in case of failure, see
@@ -297,24 +346,9 @@ class GoogleCloudStorageBlobStore implements BlobStore {
                  * It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy
                  * is in the stacktrace and is not granted the permissions needed to close and write the channel.
                  */
-                org.elasticsearch.core.internal.io.Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
-
-                    @SuppressForbidden(reason = "channel is based on a socket")
-                    @Override
-                    public int write(final ByteBuffer src) throws IOException {
-                        return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
-                    }
-
-                    @Override
-                    public boolean isOpen() {
-                        return writeChannel.isOpen();
-                    }
-
-                    @Override
-                    public void close() throws IOException {
-                        SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
-                    }
-                }), buffer);
+                org.elasticsearch.core.internal.io.Streams.copy(
+                        inputStream, Channels.newOutputStream(new WritableBlobChannel(writeChannel)), buffer);
+                SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
                 // We don't track this operation on the http layer as
                 // we do with the GET/LIST operations since this operations
                 // can trigger multiple underlying http requests but only one
@@ -478,4 +512,29 @@ class GoogleCloudStorageBlobStore implements BlobStore {
     public Map<String, Long> stats() {
         return stats.toMap();
     }
+
+    private static final class WritableBlobChannel implements WritableByteChannel {
+
+        private final WriteChannel channel;
+
+        WritableBlobChannel(WriteChannel writeChannel) {
+            this.channel = writeChannel;
+        }
+
+        @SuppressForbidden(reason = "channel is based on a socket")
+        @Override
+        public int write(final ByteBuffer src) throws IOException {
+            return SocketAccess.doPrivilegedIOException(() -> channel.write(src));
+        }
+
+        @Override
+        public boolean isOpen() {
+            return channel.isOpen();
+        }
+
+        @Override
+        public void close() {
+            // we manually close the channel later to have control over whether or not we want to finalize a blob
+        }
+    }
 }

+ 6 - 2
plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

@@ -343,8 +343,12 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
         final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
 
         final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null);
-        try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
-            blobContainer.writeBlob("write_large_blob", stream, data.length, false);
+        if (randomBoolean()) {
+            try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
+                blobContainer.writeBlob("write_large_blob", stream, data.length, false);
+            }
+        } else {
+            blobContainer.writeBlob("write_large_blob", false, randomBoolean(), out -> out.write(data));
         }
 
         assertThat(countInits.get(), equalTo(0));

+ 34 - 0
plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

@@ -15,6 +15,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Path;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.blobstore.BlobMetadata;
@@ -31,6 +32,7 @@ import java.io.FileNotFoundException;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.NoSuchFileException;
 import java.util.Collections;
@@ -156,6 +158,38 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
         });
     }
 
+    @Override
+    public void writeBlob(String blobName,
+                          boolean failIfAlreadyExists,
+                          boolean atomic,
+                          CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        Path blob = new Path(path, blobName);
+        if (atomic) {
+            final Path tempBlobPath = new Path(path, FsBlobContainer.tempBlobName(blobName));
+            store.execute((Operation<Void>) fileContext -> {
+                try (FSDataOutputStream stream = fileContext.create(tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK))) {
+                    writer.accept(stream);
+                    fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE);
+                } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
+                    throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
+                }
+                return null;
+            });
+        } else {
+            // we pass CREATE, which means it fails if a blob already exists.
+            final EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
+                    : EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
+            store.execute((Operation<Void>) fileContext -> {
+                try (FSDataOutputStream stream = fileContext.create(blob, flags)) {
+                    writer.accept(stream);
+                } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
+                    throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
+                }
+                return null;
+            });
+        }
+    }
+
     @Override
     public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
         final String tempBlob = FsBlobContainer.tempBlobName(blobName);

+ 108 - 30
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.blobstore.BlobContainer;
@@ -41,10 +42,12 @@ import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -129,6 +132,105 @@ class S3BlobContainer extends AbstractBlobContainer {
         });
     }
 
+    @Override
+    public void writeBlob(String blobName,
+                          boolean failIfAlreadyExists,
+                          boolean atomic,
+                          CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        try (AmazonS3Reference clientReference = blobStore.clientReference();
+             ChunkedBlobOutputStream<PartETag> out = new ChunkedBlobOutputStream<>(blobStore.bigArrays(), blobStore.bufferSizeInBytes()) {
+
+                 private final SetOnce<String> uploadId = new SetOnce<>();
+
+                 @Override
+                 protected void flushBuffer() throws IOException {
+                     flushBuffer(false);
+                 }
+
+                 private void flushBuffer(boolean lastPart) throws IOException {
+                     if (buffer.size() == 0) {
+                         return;
+                     }
+                     if (flushedBytes == 0L) {
+                         assert lastPart == false : "use single part upload if there's only a single part";
+                         uploadId.set(SocketAccess.doPrivileged(() ->
+                                 clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(blobName)).getUploadId()));
+                         if (Strings.isEmpty(uploadId.get())) {
+                             throw new IOException("Failed to initialize multipart upload " + blobName);
+                         }
+                     }
+                     assert lastPart == false || successful : "must only write last part if successful";
+                     final UploadPartRequest uploadRequest = createPartUploadRequest(
+                             buffer.bytes().streamInput(), uploadId.get(), parts.size() + 1, blobName, buffer.size(), lastPart);
+                     final UploadPartResult uploadResponse =
+                             SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest));
+                     finishPart(uploadResponse.getPartETag());
+                 }
+
+                 @Override
+                 protected void onCompletion() throws IOException {
+                     if (flushedBytes == 0L) {
+                         writeBlob(blobName, buffer.bytes(), failIfAlreadyExists);
+                     } else {
+                         flushBuffer(true);
+                         final CompleteMultipartUploadRequest complRequest =
+                                 new CompleteMultipartUploadRequest(blobStore.bucket(), blobName, uploadId.get(), parts);
+                         complRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
+                         SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
+                     }
+                 }
+
+                 @Override
+                 protected void onFailure() {
+                     if (Strings.hasText(uploadId.get())) {
+                         abortMultiPartUpload(uploadId.get(), blobName);
+                     }
+                 }
+             }) {
+            writer.accept(out);
+            out.markSuccess();
+        }
+    }
+
+    private UploadPartRequest createPartUploadRequest(InputStream stream,
+                                                      String uploadId,
+                                                      int number,
+                                                      String blobName,
+                                                      long size,
+                                                      boolean lastPart) {
+        final UploadPartRequest uploadRequest = new UploadPartRequest();
+        uploadRequest.setBucketName(blobStore.bucket());
+        uploadRequest.setKey(blobName);
+        uploadRequest.setUploadId(uploadId);
+        uploadRequest.setPartNumber(number);
+        uploadRequest.setInputStream(stream);
+        uploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
+        uploadRequest.setPartSize(size);
+        uploadRequest.setLastPart(lastPart);
+        return uploadRequest;
+    }
+
+    private void abortMultiPartUpload(String uploadId, String blobName) {
+        final AbortMultipartUploadRequest abortRequest =
+                new AbortMultipartUploadRequest(blobStore.bucket(), blobName, uploadId);
+        try (AmazonS3Reference clientReference = blobStore.clientReference()) {
+            SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest));
+        }
+    }
+
+    private InitiateMultipartUploadRequest initiateMultiPartUpload(String blobName) {
+        final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(blobStore.bucket(), blobName);
+        initRequest.setStorageClass(blobStore.getStorageClass());
+        initRequest.setCannedACL(blobStore.getCannedACL());
+        initRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
+        if (blobStore.serverSideEncryption()) {
+            final ObjectMetadata md = new ObjectMetadata();
+            md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+            initRequest.setObjectMetadata(md);
+        }
+        return initRequest;
+    }
+
     // package private for testing
     long getLargeBlobThresholdInBytes() {
         return blobStore.bufferSizeInBytes();
@@ -389,19 +491,10 @@ class S3BlobContainer extends AbstractBlobContainer {
         final SetOnce<String> uploadId = new SetOnce<>();
         final String bucketName = blobStore.bucket();
         boolean success = false;
-
-        final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, blobName);
-        initRequest.setStorageClass(blobStore.getStorageClass());
-        initRequest.setCannedACL(blobStore.getCannedACL());
-        initRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
-        if (blobStore.serverSideEncryption()) {
-            final ObjectMetadata md = new ObjectMetadata();
-            md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
-            initRequest.setObjectMetadata(md);
-        }
         try (AmazonS3Reference clientReference = blobStore.clientReference()) {
 
-            uploadId.set(SocketAccess.doPrivileged(() -> clientReference.client().initiateMultipartUpload(initRequest).getUploadId()));
+            uploadId.set(SocketAccess.doPrivileged(() ->
+                clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(blobName)).getUploadId()));
             if (Strings.isEmpty(uploadId.get())) {
                 throw new IOException("Failed to initialize multipart upload " + blobName);
             }
@@ -410,21 +503,9 @@ class S3BlobContainer extends AbstractBlobContainer {
 
             long bytesCount = 0;
             for (int i = 1; i <= nbParts; i++) {
-                final UploadPartRequest uploadRequest = new UploadPartRequest();
-                uploadRequest.setBucketName(bucketName);
-                uploadRequest.setKey(blobName);
-                uploadRequest.setUploadId(uploadId.get());
-                uploadRequest.setPartNumber(i);
-                uploadRequest.setInputStream(input);
-                uploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
-
-                if (i < nbParts) {
-                    uploadRequest.setPartSize(partSize);
-                    uploadRequest.setLastPart(false);
-                } else {
-                    uploadRequest.setPartSize(lastPartSize);
-                    uploadRequest.setLastPart(true);
-                }
+                final boolean lastPart = i == nbParts;
+                final UploadPartRequest uploadRequest =
+                        createPartUploadRequest(input, uploadId.get(), i, blobName, lastPart ? lastPartSize : partSize, lastPart);
                 bytesCount += uploadRequest.getPartSize();
 
                 final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest));
@@ -446,10 +527,7 @@ class S3BlobContainer extends AbstractBlobContainer {
             throw new IOException("Unable to upload object [" + blobName + "] using multipart upload", e);
         } finally {
             if ((success == false) && Strings.hasLength(uploadId.get())) {
-                final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, blobName, uploadId.get());
-                try (AmazonS3Reference clientReference = blobStore.clientReference()) {
-                    SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest));
-                }
+                abortMultiPartUpload(uploadId.get(), blobName);
             }
         }
     }

+ 9 - 1
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.BlobStoreException;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.BigArrays;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -35,6 +36,8 @@ class S3BlobStore implements BlobStore {
 
     private final S3Service service;
 
+    private final BigArrays bigArrays;
+
     private final String bucket;
 
     private final ByteSizeValue bufferSize;
@@ -56,8 +59,9 @@ class S3BlobStore implements BlobStore {
 
     S3BlobStore(S3Service service, String bucket, boolean serverSideEncryption,
                 ByteSizeValue bufferSize, String cannedACL, String storageClass,
-                RepositoryMetadata repositoryMetadata) {
+                RepositoryMetadata repositoryMetadata, BigArrays bigArrays) {
         this.service = service;
+        this.bigArrays = bigArrays;
         this.bucket = bucket;
         this.serverSideEncryption = serverSideEncryption;
         this.bufferSize = bufferSize;
@@ -136,6 +140,10 @@ class S3BlobStore implements BlobStore {
         return bucket;
     }
 
+    public BigArrays bigArrays() {
+        return bigArrays;
+    }
+
     public boolean serverSideEncryption() {
         return serverSideEncryption;
     }

+ 1 - 1
plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

@@ -308,7 +308,7 @@ class S3Repository extends MeteredBlobStoreRepository {
 
     @Override
     protected S3BlobStore createBlobStore() {
-        return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, metadata);
+        return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, metadata, bigArrays);
     }
 
     // only use for testing

+ 102 - 1
plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

@@ -12,6 +12,7 @@ import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
 import com.amazonaws.util.Base16;
 import org.apache.http.HttpStatus;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.common.blobstore.BlobContainer;
@@ -42,6 +43,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Locale;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
 import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING;
@@ -126,7 +128,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
             bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize,
             S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
             S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
-            repositoryMetadata)) {
+            repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE)) {
                 @Override
                 public InputStream readBlob(String blobName) throws IOException {
                     return new AssertingInputStream(super.readBlob(blobName), blobName);
@@ -296,6 +298,105 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
         assertThat(countDownComplete.isCountedDown(), is(true));
     }
 
+    public void testWriteLargeBlobStreaming() throws Exception {
+        final boolean useTimeout = rarely();
+        final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
+        final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
+        final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);
+
+        final int parts = randomIntBetween(1, 5);
+        final long lastPartSize = randomLongBetween(10, 512);
+        final long blobSize = (parts * bufferSize.getBytes()) + lastPartSize;
+
+        final int nbErrors = 2; // we want all requests to fail at least once
+        final CountDown countDownInitiate = new CountDown(nbErrors);
+        final AtomicInteger counterUploads = new AtomicInteger(0);
+        final AtomicLong bytesReceived = new AtomicLong(0L);
+        final CountDown countDownComplete = new CountDown(nbErrors);
+
+        httpServer.createContext("/bucket/write_large_blob_streaming", exchange -> {
+            final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));
+
+            if ("POST".equals(exchange.getRequestMethod())
+                    && exchange.getRequestURI().getQuery().equals("uploads")) {
+                // initiate multipart upload request
+                if (countDownInitiate.countDown()) {
+                    byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
+                            "<InitiateMultipartUploadResult>\n" +
+                            "  <Bucket>bucket</Bucket>\n" +
+                            "  <Key>write_large_blob_streaming</Key>\n" +
+                            "  <UploadId>TEST</UploadId>\n" +
+                            "</InitiateMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
+                    exchange.getResponseHeaders().add("Content-Type", "application/xml");
+                    exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
+                    exchange.getResponseBody().write(response);
+                    exchange.close();
+                    return;
+                }
+            } else if ("PUT".equals(exchange.getRequestMethod())
+                    && exchange.getRequestURI().getQuery().contains("uploadId=TEST")
+                    && exchange.getRequestURI().getQuery().contains("partNumber=")) {
+                // upload part request
+                MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
+                BytesReference bytes = Streams.readFully(md5);
+
+                if (counterUploads.incrementAndGet() % 2 == 0) {
+                    bytesReceived.addAndGet(bytes.length());
+                    exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
+                    exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
+                    exchange.close();
+                    return;
+                }
+
+            } else if ("POST".equals(exchange.getRequestMethod())
+                    && exchange.getRequestURI().getQuery().equals("uploadId=TEST")) {
+                // complete multipart upload request
+                if (countDownComplete.countDown()) {
+                    Streams.readFully(exchange.getRequestBody());
+                    byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
+                            "<CompleteMultipartUploadResult>\n" +
+                            "  <Bucket>bucket</Bucket>\n" +
+                            "  <Key>write_large_blob_streaming</Key>\n" +
+                            "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
+                    exchange.getResponseHeaders().add("Content-Type", "application/xml");
+                    exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
+                    exchange.getResponseBody().write(response);
+                    exchange.close();
+                    return;
+                }
+            }
+
+            // sends an error back or let the request time out
+            if (useTimeout == false) {
+                if (randomBoolean() && contentLength > 0) {
+                    Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]);
+                } else {
+                    Streams.readFully(exchange.getRequestBody());
+                    exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
+                            HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
+                }
+                exchange.close();
+            }
+        });
+
+        blobContainer.writeBlob("write_large_blob_streaming", false, randomBoolean(), out -> {
+            final byte[] buffer = new byte[16 * 1024];
+            long outstanding = blobSize;
+            while (outstanding > 0) {
+                if (randomBoolean()) {
+                    int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, buffer.length), outstanding));
+                    out.write(buffer, 0, toWrite);
+                    outstanding -= toWrite;
+                } else {
+                    out.write(0);
+                    outstanding--;
+                }
+            }
+        });
+
+        assertEquals(blobSize, bytesReceived.get());
+    }
+
     /**
      * Asserts that an InputStream is fully consumed, or aborted, when it is closed
      */

+ 14 - 0
server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

@@ -9,9 +9,11 @@
 package org.elasticsearch.common.blobstore;
 
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.core.CheckedConsumer;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.NoSuchFileException;
 import java.util.Iterator;
@@ -116,6 +118,18 @@ public interface BlobContainer {
         writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
     }
 
+    /**
+     * Write a blob by providing a consumer that will write its contents to an output stream. This method allows serializing a blob's
+     * contents directly to the blob store without having to materialize the serialized version in full before writing.
+     *
+     * @param blobName            the name of the blob to write
+     * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
+     * @param atomic              whether the write should be atomic in case the implementation supports it
+     * @param writer              consumer for an output stream that will write the blob contents to the stream
+     */
+    void writeBlob(String blobName, boolean failIfAlreadyExists, boolean atomic,
+                   CheckedConsumer<OutputStream, IOException> writer) throws IOException;
+
     /**
      * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name,
      * using an atomic write operation if the implementation supports it.

+ 57 - 0
server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

@@ -18,9 +18,11 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.internal.io.IOUtils;
 
 import java.io.FileNotFoundException;
+import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -243,6 +245,49 @@ public class FsBlobContainer extends AbstractBlobContainer {
         IOUtils.fsync(path, true);
     }
 
+    @Override
+    public void writeBlob(String blobName,
+                          boolean failIfAlreadyExists,
+                          boolean atomic,
+                          CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        if (atomic) {
+            final String tempBlob = tempBlobName(blobName);
+            try {
+                writeToPath(tempBlob, true, writer);
+                moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists);
+            } catch (IOException ex) {
+                try {
+                    deleteBlobsIgnoringIfNotExists(Iterators.single(tempBlob));
+                } catch (IOException e) {
+                    ex.addSuppressed(e);
+                }
+                throw ex;
+            }
+        } else {
+            writeToPath(blobName, failIfAlreadyExists, writer);
+        }
+        IOUtils.fsync(path, true);
+    }
+
+    private void writeToPath(String blobName, boolean failIfAlreadyExists, CheckedConsumer<OutputStream, IOException> writer)
+            throws IOException {
+        final Path file = path.resolve(blobName);
+        try {
+            try (OutputStream out = new BlobOutputStream(file)) {
+                writer.accept(out);
+            }
+        } catch (FileAlreadyExistsException faee) {
+            if (failIfAlreadyExists) {
+                throw faee;
+            }
+            deleteBlobsIgnoringIfNotExists(Iterators.single(blobName));
+            try (OutputStream out = new BlobOutputStream(file)) {
+                writer.accept(out);
+            }
+        }
+        IOUtils.fsync(file, false);
+    }
+
     @Override
     public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
         final String tempBlob = tempBlobName(blobName);
@@ -306,4 +351,16 @@ public class FsBlobContainer extends AbstractBlobContainer {
     public static boolean isTempBlobName(final String blobName) {
         return blobName.startsWith(TEMP_FILE_PREFIX);
     }
+
+    private static class BlobOutputStream extends FilterOutputStream {
+
+        BlobOutputStream(Path file) throws IOException {
+            super(Files.newOutputStream(file, StandardOpenOption.CREATE_NEW));
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            out.write(b, off, len);
+        }
+    }
 }

+ 8 - 0
server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java

@@ -13,9 +13,11 @@ import org.elasticsearch.common.blobstore.BlobMetadata;
 import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.core.CheckedConsumer;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
@@ -61,6 +63,12 @@ public abstract class FilterBlobContainer implements BlobContainer {
         delegate.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
     }
 
+    @Override
+    public void writeBlob(String blobName, boolean failIfAlreadyExists, boolean atomic,
+                          CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+        delegate.writeBlob(blobName, failIfAlreadyExists, atomic, writer);
+    }
+
     @Override
     public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
         delegate.writeBlobAtomic(blobName, bytes, failIfAlreadyExists);

+ 25 - 33
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -53,11 +53,9 @@ import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
 import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.compress.NotXContentException;
 import org.elasticsearch.common.io.Streams;
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
 import org.elasticsearch.common.metrics.CounterMetric;
@@ -74,6 +72,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Tuple;
@@ -118,6 +117,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.NoSuchFileException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -553,15 +553,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime),
                 shardContainer,
                 target.getUUID(),
-                compress,
-                bigArrays
+                compress
             );
             INDEX_SHARD_SNAPSHOTS_FORMAT.write(
                 existingSnapshots.withClone(source.getName(), target.getName()),
                 shardContainer,
                 newGen,
-                compress,
-                bigArrays
+                compress
             );
             return new ShardSnapshotResult(
                 newGen,
@@ -1394,7 +1392,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             executor.execute(
                 ActionRunnable.run(
                     allMetaListener,
-                    () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress, bigArrays)
+                    () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
                 )
             );
 
@@ -1408,7 +1406,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                         if (metaUUID == null) {
                             // We don't yet have this version of the metadata so we write it
                             metaUUID = UUIDs.base64UUID();
-                            INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress, bigArrays);
+                            INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
                             indexMetaIdentifiers.put(identifiers, metaUUID);
                         }
                         indexMetas.put(index, identifiers);
@@ -1417,8 +1415,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                             clusterMetadata.index(index.getName()),
                             indexContainer(index),
                             snapshotId.getUUID(),
-                            compress,
-                            bigArrays
+                            compress
                         );
                     }
                 }));
@@ -1426,7 +1423,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             executor.execute(
                 ActionRunnable.run(
                     allMetaListener,
-                    () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress, bigArrays)
+                    () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
                 )
             );
         }, onUpdateFailure);
@@ -2250,13 +2247,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             }
             final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
             logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
-            try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) {
+            writeAtomic(blobContainer(), indexBlob, out -> {
                 try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(Streams.noCloseStream(out))) {
                     newRepositoryData.snapshotsToXContent(xContentBuilder, version);
                 }
-                final BytesReference serializedRepoData = out.bytes();
-                writeAtomic(blobContainer(), indexBlob, serializedRepoData, true);
-            }
+            }, true);
             maybeWriteIndexLatest(newGen);
 
             // Step 3: Update CS to reflect new repository generation.
@@ -2349,7 +2344,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         if (supportURLRepo) {
             logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
             try {
-                writeAtomic(blobContainer(), INDEX_LATEST_BLOB, new BytesArray(Numbers.longToBytes(newGen)), false);
+                writeAtomic(blobContainer(), INDEX_LATEST_BLOB, out -> out.write(Numbers.longToBytes(newGen)), false);
             } catch (Exception e) {
                 logger.warn(
                     () -> new ParameterizedMessage(
@@ -2530,10 +2525,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         return latest;
     }
 
-    private void writeAtomic(BlobContainer container, final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists)
-        throws IOException {
+    private void writeAtomic(
+        BlobContainer container,
+        final String blobName,
+        CheckedConsumer<OutputStream, IOException> writer,
+        boolean failIfAlreadyExists
+    ) throws IOException {
         logger.trace(() -> new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path()));
-        container.writeBlobAtomic(blobName, bytesRef, failIfAlreadyExists);
+        container.writeBlob(blobName, failIfAlreadyExists, true, writer);
     }
 
     @Override
@@ -2684,13 +2683,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 // reference a generation that has not had all its files fully upload.
                 indexGeneration = UUIDs.randomBase64UUID();
                 try {
-                    INDEX_SHARD_SNAPSHOTS_FORMAT.write(
-                        updatedBlobStoreIndexShardSnapshots,
-                        shardContainer,
-                        indexGeneration,
-                        compress,
-                        bigArrays
-                    );
+                    INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress);
                 } catch (IOException e) {
                     throw new IndexShardSnapshotFailedException(
                         shardId,
@@ -2767,7 +2760,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 );
                 try {
                     final String snapshotUUID = snapshotId.getUUID();
-                    INDEX_SHARD_SNAPSHOT_FORMAT.write(blobStoreIndexShardSnapshot, shardContainer, snapshotUUID, compress, bigArrays);
+                    INDEX_SHARD_SNAPSHOT_FORMAT.write(blobStoreIndexShardSnapshot, shardContainer, snapshotUUID, compress);
                 } catch (IOException e) {
                     throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
                 }
@@ -3140,7 +3133,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
                 if (indexGeneration < 0L) {
                     writtenGeneration = UUIDs.randomBase64UUID();
-                    INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress, bigArrays);
+                    INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress);
                 } else {
                     writtenGeneration = String.valueOf(indexGeneration);
                     writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
@@ -3180,12 +3173,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             () -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(), indexGeneration, shardContainer.path())
         );
         final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration));
-        INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(
-            updatedSnapshots,
+        writeAtomic(
+            shardContainer,
             blobName,
-            compress,
-            bigArrays,
-            bytes -> writeAtomic(shardContainer, blobName, bytes, true)
+            out -> INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compress, out),
+            true
         );
     }
 

+ 26 - 39
server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java

@@ -19,12 +19,9 @@ import org.elasticsearch.common.CheckedBiFunction;
 import org.elasticsearch.common.Numbers;
 import org.elasticsearch.common.blobstore.BlobContainer;
 import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.io.Streams;
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
 import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.ToXContent;
@@ -32,7 +29,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.gateway.CorruptStateException;
 
 import java.io.FilterInputStream;
@@ -272,47 +268,38 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
      * @param name                blob name
      * @param compress            whether to use compression
      */
-    public void write(T obj, BlobContainer blobContainer, String name, boolean compress, BigArrays bigArrays) throws IOException {
+    public void write(T obj, BlobContainer blobContainer, String name, boolean compress) throws IOException {
         final String blobName = blobName(name);
-        serialize(obj, blobName, compress, bigArrays, bytes -> blobContainer.writeBlob(blobName, bytes, false));
+        blobContainer.writeBlob(blobName, false, false, out -> serialize(obj, blobName, compress, out));
     }
 
-    public void serialize(
-        final T obj,
-        final String blobName,
-        final boolean compress,
-        BigArrays bigArrays,
-        CheckedConsumer<BytesReference, IOException> consumer
-    ) throws IOException {
-        try (ReleasableBytesStreamOutput outputStream = new ReleasableBytesStreamOutput(bigArrays)) {
-            try (
-                OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
-                    "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")",
-                    blobName,
-                    org.elasticsearch.common.io.Streams.noCloseStream(outputStream),
-                    BUFFER_SIZE
+    public void serialize(final T obj, final String blobName, final boolean compress, OutputStream outputStream) throws IOException {
+        try (
+            OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
+                "ChecksumBlobStoreFormat.serialize(blob=\"" + blobName + "\")",
+                blobName,
+                org.elasticsearch.common.io.Streams.noCloseStream(outputStream),
+                BUFFER_SIZE
+            )
+        ) {
+            CodecUtil.writeHeader(indexOutput, codec, VERSION);
+            try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
+                @Override
+                public void close() {
+                    // this is important since some of the XContentBuilders write bytes on close.
+                    // in order to write the footer we need to prevent closing the actual index input.
+                }
+            };
+                XContentBuilder builder = XContentFactory.contentBuilder(
+                    XContentType.SMILE,
+                    compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream) : indexOutputOutputStream
                 )
             ) {
-                CodecUtil.writeHeader(indexOutput, codec, VERSION);
-                try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
-                    @Override
-                    public void close() {
-                        // this is important since some of the XContentBuilders write bytes on close.
-                        // in order to write the footer we need to prevent closing the actual index input.
-                    }
-                };
-                    XContentBuilder builder = XContentFactory.contentBuilder(
-                        XContentType.SMILE,
-                        compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream) : indexOutputOutputStream
-                    )
-                ) {
-                    builder.startObject();
-                    obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);
-                    builder.endObject();
-                }
-                CodecUtil.writeFooter(indexOutput);
+                builder.startObject();
+                obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);
+                builder.endObject();
             }
-            consumer.accept(outputStream.bytes());
+            CodecUtil.writeFooter(indexOutput);
         }
     }
 }

+ 6 - 6
server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java

@@ -17,7 +17,6 @@ import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.fs.FsBlobStore;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.io.Streams;
-import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentFragment;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -87,9 +86,9 @@ public class BlobStoreFormatTests extends ESTestCase {
         // Write blobs in different formats
         final String randomText = randomAlphaOfLengthBetween(0, 1024 * 8 * 3);
         final String normalText = "checksum smile: " + randomText;
-        checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE);
+        checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false);
         final String compressedText = "checksum smile compressed: " + randomText;
-        checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE);
+        checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true);
 
         // Assert that all checksum blobs can be read
         assertEquals(normalText, checksumSMILE.read("repo", blobContainer, "check-smile", xContentRegistry()).getText());
@@ -109,8 +108,8 @@ public class BlobStoreFormatTests extends ESTestCase {
             (repo, parser) -> BlobObj.fromXContent(parser)
         );
         BlobObj blobObj = new BlobObj(veryRedundantText.toString());
-        checksumFormat.write(blobObj, blobContainer, "blob-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE);
-        checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false, MockBigArrays.NON_RECYCLING_INSTANCE);
+        checksumFormat.write(blobObj, blobContainer, "blob-comp", true);
+        checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false);
         Map<String, BlobMetadata> blobs = blobContainer.listBlobsByPrefix("blob-");
         assertEquals(blobs.size(), 2);
         assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length()));
@@ -121,12 +120,13 @@ public class BlobStoreFormatTests extends ESTestCase {
         BlobContainer blobContainer = blobStore.blobContainer(BlobPath.EMPTY);
         String testString = randomAlphaOfLength(randomInt(10000));
         BlobObj blobObj = new BlobObj(testString);
+
         ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(
             BLOB_CODEC,
             "%s",
             (repo, parser) -> BlobObj.fromXContent(parser)
         );
-        checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean(), MockBigArrays.NON_RECYCLING_INSTANCE);
+        checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean());
         assertEquals(checksumFormat.read("repo", blobContainer, "test-path", xContentRegistry()).getText(), testString);
         randomCorruption(blobContainer, "test-path");
         try {

+ 7 - 18
server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java

@@ -9,9 +9,7 @@
 package org.elasticsearch.snapshots;
 
 import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.support.PlainActionFuture;
-import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.test.AbstractWireTestCase;
@@ -32,22 +30,13 @@ public class SnapshotInfoBlobSerializationTests extends AbstractWireTestCase<Sna
 
     @Override
     protected SnapshotInfo copyInstance(SnapshotInfo instance, Version version) throws IOException {
-        final PlainActionFuture<SnapshotInfo> future = new PlainActionFuture<>();
-        BlobStoreRepository.SNAPSHOT_FORMAT.serialize(
-            instance,
-            "test",
-            randomBoolean(),
-            BigArrays.NON_RECYCLING_INSTANCE,
-            bytes -> ActionListener.completeWith(
-                future,
-                () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(
-                    instance.repository(),
-                    NamedXContentRegistry.EMPTY,
-                    bytes.streamInput()
-                )
-            )
+        final BytesStreamOutput out = new BytesStreamOutput();
+        BlobStoreRepository.SNAPSHOT_FORMAT.serialize(instance, "test", randomBoolean(), out);
+        return BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(
+            instance.repository(),
+            NamedXContentRegistry.EMPTY,
+            out.bytes().streamInput()
         );
-        return future.actionGet();
     }
 
 }

+ 0 - 9
test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

@@ -20,11 +20,8 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.CompositeBytesReference;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.regex.Regex;
-import org.elasticsearch.common.xcontent.XContentHelper;
-import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.core.Tuple;
-import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.RestUtils;
 
@@ -33,7 +30,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URLDecoder;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -211,11 +207,6 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
                 blobs.put(blobName, BytesArray.EMPTY);
 
                 byte[] response = requestBody.utf8ToString().getBytes(UTF_8);
-                if (Paths.get(blobName).getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX) == false) {
-                    final Map<String, Object> parsedBody = XContentHelper.convertToMap(requestBody, false, XContentType.JSON).v2();
-                    assert parsedBody.get("md5Hash") != null
-                        : "file [" + blobName + "] is not a data blob but did not come with a md5 checksum";
-                }
                 exchange.getResponseHeaders().add("Content-Type", "application/json");
                 exchange.getResponseHeaders()
                     .add(

+ 1 - 2
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -37,7 +37,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -382,7 +381,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
         PlainActionFuture.get(f -> blobStoreRepository.threadPool().generic().execute(ActionRunnable.run(f, () ->
                 BlobStoreRepository.SNAPSHOT_FORMAT.write(downgradedSnapshotInfo,
                         blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath()), snapshotInfo.snapshotId().getUUID(),
-                        randomBoolean(), internalCluster().getCurrentMasterNodeInstance(BigArrays.class)))));
+                        randomBoolean()))));
         return oldVersionSnapshot;
     }
 

+ 45 - 16
test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

@@ -24,6 +24,7 @@ import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
 import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
@@ -40,6 +41,7 @@ import org.elasticsearch.repositories.fs.FsRepository;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.file.Path;
 import java.security.MessageDigest;
@@ -488,11 +490,7 @@ public class MockRepository extends FsRepository {
             @Override
             public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
                 throws IOException {
-                maybeIOExceptionOrBlock(blobName);
-                if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
-                        && path().equals(basePath()) == false) {
-                    blockExecutionAndMaybeWait(blobName);
-                }
+                beforeWrite(blobName);
                 super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
                 if (RandomizedContext.current().getRandom().nextBoolean()) {
                     // for network based repositories, the blob may have been written but we may still
@@ -502,19 +500,35 @@ public class MockRepository extends FsRepository {
             }
 
             @Override
-            public void writeBlobAtomic(final String blobName, final BytesReference bytes,
-                                        final boolean failIfAlreadyExists) throws IOException {
-                final Random random = RandomizedContext.current().getRandom();
-                if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) {
-                    throw new IOException("Random IOException");
+            public void writeBlob(String blobName,
+                                  boolean failIfAlreadyExists,
+                                  boolean atomic,
+                                  CheckedConsumer<OutputStream, IOException> writer) throws IOException {
+                if (atomic) {
+                    beforeAtomicWrite(blobName);
+                } else {
+                    beforeWrite(blobName);
                 }
-                if (blobName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) {
-                    if (blockAndFailOnWriteIndexFile) {
-                        blockExecutionAndFail(blobName);
-                    } else if (blockOnWriteIndexFile) {
-                        blockExecutionAndMaybeWait(blobName);
-                    }
+                super.writeBlob(blobName, failIfAlreadyExists, atomic, writer);
+                if (RandomizedContext.current().getRandom().nextBoolean()) {
+                    // for network based repositories, the blob may have been written but we may still
+                    // get an error with the client connection, so an IOException here simulates this
+                    maybeIOExceptionOrBlock(blobName);
+                }
+            }
+
+            private void beforeWrite(String blobName) throws IOException {
+                maybeIOExceptionOrBlock(blobName);
+                if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
+                        && path().equals(basePath()) == false) {
+                    blockExecutionAndMaybeWait(blobName);
                 }
+            }
+
+            @Override
+            public void writeBlobAtomic(final String blobName, final BytesReference bytes,
+                                        final boolean failIfAlreadyExists) throws IOException {
+                final Random random = beforeAtomicWrite(blobName);
                 if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) {
                     // Simulate a failure between the write and move operation in FsBlobContainer
                     final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
@@ -529,6 +543,21 @@ public class MockRepository extends FsRepository {
                     super.writeBlobAtomic(blobName, bytes, failIfAlreadyExists);
                 }
             }
+
+            private Random beforeAtomicWrite(String blobName) throws IOException {
+                final Random random = RandomizedContext.current().getRandom();
+                if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) {
+                    throw new IOException("Random IOException");
+                }
+                if (blobName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) {
+                    if (blockAndFailOnWriteIndexFile) {
+                        blockExecutionAndFail(blobName);
+                    } else if (blockOnWriteIndexFile) {
+                        blockExecutionAndMaybeWait(blobName);
+                    }
+                }
+                return random;
+            }
         }
     }
 }

+ 20 - 0
x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java

@@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.indices.recovery.RecoverySettings;
@@ -48,6 +49,7 @@ import org.elasticsearch.snapshots.SnapshotInfo;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.NoSuchFileException;
 import java.security.GeneralSecurityException;
@@ -622,6 +624,24 @@ public class EncryptedRepository extends BlobStoreRepository {
             }
         }
 
+        @Override
+        public void writeBlob(
+            String blobName,
+            boolean failIfAlreadyExists,
+            boolean atomic,
+            CheckedConsumer<OutputStream, IOException> writer
+        ) throws IOException {
+            // TODO: this is just a stop-gap solution for until we have an encrypted output stream wrapper
+            try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) {
+                writer.accept(out);
+                if (atomic) {
+                    writeBlobAtomic(blobName, out.bytes(), failIfAlreadyExists);
+                } else {
+                    writeBlob(blobName, out.bytes(), failIfAlreadyExists);
+                }
+            }
+        }
+
         private ChainingInputStream encryptedInput(InputStream inputStream, SingleUseKey singleUseNonceAndDEK, BytesReference dekIdBytes)
             throws IOException {
             return ChainingInputStream.chain(

+ 12 - 0
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.PathUtilsForTesting;
 import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService;
@@ -32,6 +33,7 @@ import java.io.ByteArrayInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.channels.FileChannel;
 import java.nio.file.FileSystem;
 import java.nio.file.OpenOption;
@@ -269,6 +271,16 @@ public final class TestUtils {
             throw unsupportedException();
         }
 
+        @Override
+        public void writeBlob(
+            String blobName,
+            boolean failIfAlreadyExists,
+            boolean atomic,
+            CheckedConsumer<OutputStream, IOException> writer
+        ) {
+            throw unsupportedException();
+        }
+
         @Override
         public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) {
             throw unsupportedException();

+ 19 - 0
x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java

@@ -17,12 +17,14 @@ import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.CountDown;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.indices.recovery.RecoverySettings;
@@ -42,6 +44,7 @@ import java.io.ByteArrayInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.FileAlreadyExistsException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -453,6 +456,22 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
             writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
         }
 
+        @Override
+        public void writeBlob(
+            String blobName,
+            boolean failIfAlreadyExists,
+            boolean atomic,
+            CheckedConsumer<OutputStream, IOException> writer
+        ) throws IOException {
+            final BytesStreamOutput out = new BytesStreamOutput();
+            writer.accept(out);
+            if (atomic) {
+                writeBlobAtomic(blobName, out.bytes(), failIfAlreadyExists);
+            } else {
+                writeBlob(blobName, out.bytes(), failIfAlreadyExists);
+            }
+        }
+
         @Override
         public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
             final StreamInput inputStream;

+ 19 - 0
x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java

@@ -16,11 +16,13 @@ import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.env.Environment;
@@ -40,6 +42,7 @@ import java.io.ByteArrayInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -326,6 +329,22 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
             writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
         }
 
+        @Override
+        public void writeBlob(
+            String blobName,
+            boolean failIfAlreadyExists,
+            boolean atomic,
+            CheckedConsumer<OutputStream, IOException> writer
+        ) throws IOException {
+            final BytesStreamOutput out = new BytesStreamOutput();
+            writer.accept(out);
+            if (atomic) {
+                writeBlobAtomic(blobName, out.bytes(), failIfAlreadyExists);
+            } else {
+                writeBlob(blobName, out.bytes(), failIfAlreadyExists);
+            }
+        }
+
         @Override
         public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
             writeBlobAtomic(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);