Browse Source

Deserialize BlobStore Metadata Files in a Streaming Manner (#73149)

We were reading the full file contents up-front here because of the complexity
of verifying the footer otherwise. This commit moves the logic for reading metadata
blobs (that can become quite sizable in some cases) in a streaming manner by
manually doing the footer verification as Lucene's utility methods don't allow for
verification on top of a stream.
Armin Braun 4 years ago
parent
commit
6dd2a2a5d6

+ 1 - 2
server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

@@ -16,7 +16,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.ShardSnapshotResult;
@@ -677,7 +676,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
                                                                     String generation) {
         return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f,
                 () -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(repository.shardContainer(repositoryShardId.index(),
-                        repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY, MockBigArrays.NON_RECYCLING_INSTANCE))));
+                        repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY))));
     }
 
     private static BlobStoreIndexShardSnapshot readShardSnapshot(BlobStoreRepository repository, RepositoryShardId repositoryShardId,

+ 8 - 8
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -862,8 +862,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             for (String indexMetaGeneration : indexMetaGenerations) {
                 executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
                     try {
-                        return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry,
-                                bigArrays).getNumberOfShards();
+                        return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry
+                        ).getNumberOfShards();
                     } catch (Exception ex) {
                         logger.warn(() -> new ParameterizedMessage(
                                 "[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
@@ -1220,7 +1220,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     @Override
     public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
         try {
-            return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
+            return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
         } catch (NoSuchFileException ex) {
             throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
         } catch (IOException | NotXContentException ex) {
@@ -1231,7 +1231,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     @Override
     public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
         try {
-            return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
+            return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
         } catch (NoSuchFileException ex) {
             throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
         } catch (IOException ex) {
@@ -1243,7 +1243,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
         try {
             return INDEX_METADATA_FORMAT.read(indexContainer(index),
-                repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry, bigArrays);
+                repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry);
         } catch (NoSuchFileException e) {
             throw new SnapshotMissingException(metadata.name(), snapshotId, e);
         }
@@ -2722,7 +2722,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
      */
     public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
         try {
-            return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays);
+            return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry);
         } catch (NoSuchFileException ex) {
             throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
         } catch (IOException ex) {
@@ -2748,7 +2748,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
                 return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
             }
-            return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation);
+            return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation);
         }
         final Tuple<BlobStoreIndexShardSnapshots, Long> legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
         return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
@@ -2765,7 +2765,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         long latest = latestGeneration(blobs);
         if (latest >= 0) {
             final BlobStoreIndexShardSnapshots shardSnapshots =
-                    INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry, bigArrays);
+                    INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry);
             return new Tuple<>(shardSnapshots, latest);
         } else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX)
                                                                               || b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) {

+ 162 - 26
server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java

@@ -11,19 +11,19 @@ import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.IndexFormatTooNewException;
 import org.apache.lucene.index.IndexFormatTooOldException;
-import org.apache.lucene.store.ByteBuffersDataInput;
-import org.apache.lucene.store.ByteBuffersIndexInput;
-import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.InputStreamDataInput;
 import org.apache.lucene.store.OutputStreamIndexOutput;
-import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.CheckedFunction;
+import org.elasticsearch.common.Numbers;
 import org.elasticsearch.common.blobstore.BlobContainer;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.compress.CompressorFactory;
+import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
-import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
 import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@@ -31,20 +31,19 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.core.internal.io.Streams;
 import org.elasticsearch.gateway.CorruptStateException;
 import org.elasticsearch.snapshots.SnapshotInfo;
 
+import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.zip.CRC32;
 
 /**
  * Snapshot metadata file format used in v2.0 and above
@@ -93,13 +92,10 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
      * @param name          name to be translated into
      * @return parsed blob object
      */
-    public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry,
-                  BigArrays bigArrays) throws IOException {
+    public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException {
         String blobName = blobName(name);
-        try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
-             InputStream in = blobContainer.readBlob(blobName)) {
-            Streams.copy(in, out, false);
-            return deserialize(blobName, namedXContentRegistry, out.bytes());
+        try (InputStream in = blobContainer.readBlob(blobName)) {
+            return deserialize(namedXContentRegistry, in);
         }
     }
 
@@ -107,23 +103,163 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
         return String.format(Locale.ROOT, blobNameFormat, name);
     }
 
-    public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException {
-        final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
+    public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
+        final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input);
         try {
-            final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput(
-                    new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
-                    : new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
-            CodecUtil.checksumEntireFile(indexInput);
-            CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
-            long filePointer = indexInput.getFilePointer();
-            long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
-            try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
-                bytes.slice((int) filePointer, (int) contentSize), XContentType.SMILE)) {
-                return reader.apply(parser);
+            CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION);
+            final InputStream wrappedStream;
+            if (deserializeMetaBlobInputStream.nextBytesCompressed()) {
+                wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream);
+            } else {
+                wrappedStream = deserializeMetaBlobInputStream;
             }
+            final T result;
+            try (XContentParser parser = XContentType.SMILE.xContent().createParser(
+                    namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)) {
+                result = reader.apply(parser);
+            }
+            deserializeMetaBlobInputStream.verifyFooter();
+            return result;
         } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
             // we trick this into a dedicated exception with the original stacktrace
             throw new CorruptStateException(ex);
+        } catch (Exception e) {
+            try {
+                // drain stream fully and check whether the footer is corrupted
+                Streams.consumeFully(deserializeMetaBlobInputStream);
+                deserializeMetaBlobInputStream.verifyFooter();
+            } catch (CorruptStateException cse) {
+                cse.addSuppressed(e);
+                throw cse;
+            } catch (Exception ex) {
+                e.addSuppressed(ex);
+            }
+            throw e;
+        }
+    }
+
+    /**
+     * Wrapper input stream for deserializing blobs that come with a Lucene header and footer in a streaming manner. It manually manages
+     * a read buffer to enable not reading into the last 16 bytes (the footer length) of the buffer via the standard read methods so that
+     * a parser backed by this stream will only see the blob's body.
+     */
+    private static final class DeserializeMetaBlobInputStream extends FilterInputStream {
+
+        // checksum updated with all but the last 8 bytes read from the wrapped stream
+        private final CRC32 crc32 = new CRC32();
+
+        // Only the first buffer.length - 16 bytes are exposed by the read() methods; once the read position reaches 16 bytes from the end
+        // of the buffer the remaining 16 bytes are moved to the start of the buffer and the rest of the buffer is filled from the stream.
+        private final byte[] buffer = new byte[1024 * 8];
+
+        // the number of bytes in the buffer, in [0, buffer.length], equal to buffer.length unless the last fill hit EOF
+        private int bufferCount;
+
+        // the current read position within the buffer, in [0, bufferCount - 16]
+        private int bufferPos;
+
+        DeserializeMetaBlobInputStream(InputStream in) {
+            super(in);
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (getAvailable() <= 0) {
+                return -1;
+            }
+            return buffer[bufferPos++];
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            int remaining = len;
+            int read = 0;
+            while (remaining > 0) {
+                final int r = doRead(b, off + read, remaining);
+                if (r <= 0) {
+                    break;
+                }
+                read += r;
+                remaining -= r;
+            }
+            if (len > 0 && remaining == len) {
+                // nothing to read, EOF
+                return -1;
+            }
+            return read;
+        }
+
+        @Override
+        public void close() throws IOException {
+            // not closing the wrapped stream
+        }
+
+        private int doRead(byte[] b, int off, int len) throws IOException {
+            final int available = getAvailable();
+            if (available < 0) {
+                return -1;
+            }
+            final int read = Math.min(available, len);
+            System.arraycopy(buffer, bufferPos, b, off, read);
+            bufferPos += read;
+            return read;
+        }
+
+        /**
+         * Verify footer of the bytes read by this stream the same way {@link CodecUtil#checkFooter(ChecksumIndexInput)} would.
+         *
+         * @throws CorruptStateException if footer is found to be corrupted
+         */
+        void verifyFooter() throws CorruptStateException {
+            if (bufferCount - bufferPos != CodecUtil.footerLength()) {
+                throw new CorruptStateException(
+                        "should have consumed all but 16 bytes from the buffer but saw buffer pos [" + bufferPos + "] and count ["
+                                + bufferCount + "]");
+            }
+            crc32.update(buffer, 0, bufferPos + 8);
+            final int magicFound = Numbers.bytesToInt(buffer, bufferPos);
+            if (magicFound != CodecUtil.FOOTER_MAGIC) {
+                throw new CorruptStateException("unexpected footer magic [" + magicFound + "]");
+            }
+            final int algorithmFound = Numbers.bytesToInt(buffer, bufferPos + 4);
+            if (algorithmFound != 0) {
+                throw new CorruptStateException("unexpected algorithm [" + algorithmFound + "]");
+            }
+            final long checksum = crc32.getValue();
+            final long checksumInFooter = Numbers.bytesToLong(buffer, bufferPos + 8);
+            if (checksum != checksumInFooter) {
+                throw new CorruptStateException("checksums do not match read [" + checksum + "] but expected [" + checksumInFooter + "]");
+            }
+        }
+
+        /**
+         * @return true if the next bytes in this stream are compressed
+         */
+        boolean nextBytesCompressed() {
+            // we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling
+            // this method
+            assert bufferPos > 0 : "buffer position must be greater than 0 but was [" + bufferPos + "]";
+            return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos));
+        }
+
+        /**
+         * @return the number of bytes available in the buffer, possibly refilling the buffer if needed
+         */
+        private int getAvailable() throws IOException {
+            final int footerLen = CodecUtil.footerLength();
+            if (bufferCount == 0) {
+                // first read, fill the buffer
+                bufferCount = Streams.readFully(in, buffer, 0, buffer.length);
+            } else if (bufferPos == bufferCount - footerLen) {
+                // crc and discard all but the last 16 bytes in the buffer that might be the footer bytes
+                assert bufferCount >= footerLen;
+                crc32.update(buffer, 0, bufferPos);
+                System.arraycopy(buffer, bufferPos, buffer, 0, footerLen);
+                bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen);
+                bufferPos = 0;
+            }
+            // bytes in the buffer minus 16 bytes that could be the footer
+            return bufferCount - bufferPos - footerLen;
         }
     }
 

+ 16 - 27
server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java

@@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentFragment;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentParserUtils;
 import org.elasticsearch.index.translog.BufferedChecksumStreamOutput;
 import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
 import org.elasticsearch.test.ESTestCase;
@@ -31,7 +32,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
-import static org.hamcrest.Matchers.containsString;
+
 import static org.hamcrest.Matchers.greaterThan;
 
 public class BlobStoreFormatTests extends ESTestCase {
@@ -58,20 +59,9 @@ public class BlobStoreFormatTests extends ESTestCase {
             }
             if (token == XContentParser.Token.START_OBJECT) {
                 while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
-                    if (token != XContentParser.Token.FIELD_NAME) {
-                        throw new ElasticsearchParseException("unexpected token [{}]", token);
-                    }
-                    String currentFieldName = parser.currentName();
-                    token = parser.nextToken();
-                    if (token.isValue()) {
-                        if ("text" .equals(currentFieldName)) {
-                            text = parser.text();
-                        } else {
-                            throw new ElasticsearchParseException("unexpected field [{}]", currentFieldName);
-                        }
-                    } else {
-                        throw new ElasticsearchParseException("unexpected token [{}]", token);
-                    }
+                    XContentParserUtils.ensureFieldName(parser, token, "text");
+                    XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser);
+                    text = parser.text();
                 }
             }
             if (text == null) {
@@ -93,15 +83,16 @@ public class BlobStoreFormatTests extends ESTestCase {
         ChecksumBlobStoreFormat<BlobObj> checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);
 
         // Write blobs in different formats
-        checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE);
-        checksumSMILE.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true,
+        final String randomText = randomAlphaOfLengthBetween(0, 1024 * 8 * 3);
+        final String normalText = "checksum smile: " + randomText;
+        checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE);
+        final String compressedText = "checksum smile compressed: " + randomText;
+        checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true,
                 MockBigArrays.NON_RECYCLING_INSTANCE);
 
         // Assert that all checksum blobs can be read
-        assertEquals(checksumSMILE.read(blobContainer, "check-smile", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE).getText(),
-                "checksum smile");
-        assertEquals(checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry(),
-                MockBigArrays.NON_RECYCLING_INSTANCE).getText(), "checksum smile compressed");
+        assertEquals(normalText, checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText());
+        assertEquals(compressedText, checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry()).getText());
     }
 
     public void testCompressionIsApplied() throws IOException {
@@ -127,16 +118,14 @@ public class BlobStoreFormatTests extends ESTestCase {
         BlobObj blobObj = new BlobObj(testString);
         ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent);
         checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean(), MockBigArrays.NON_RECYCLING_INSTANCE);
-        assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE).getText(),
+        assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(),
                 testString);
         randomCorruption(blobContainer, "test-path");
         try {
-            checksumFormat.read(blobContainer, "test-path", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE);
+            checksumFormat.read(blobContainer, "test-path", xContentRegistry());
             fail("Should have failed due to corruption");
-        } catch (ElasticsearchCorruptionException ex) {
-            assertThat(ex.getMessage(), containsString("test-path"));
-        } catch (EOFException ex) {
-            // This can happen if corrupt the byte length
+        } catch (ElasticsearchCorruptionException | EOFException ex) {
+            // expected exceptions from random byte corruption
         }
     }
 

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java

@@ -35,7 +35,7 @@ public class SnapshotInfoBlobSerializationTests extends AbstractWireTestCase<Sna
         final PlainActionFuture<SnapshotInfo> future = new PlainActionFuture<>();
         BlobStoreRepository.SNAPSHOT_FORMAT.serialize(instance, "test", randomBoolean(), BigArrays.NON_RECYCLING_INSTANCE,
                 bytes -> ActionListener.completeWith(future,
-                        () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize("test", NamedXContentRegistry.EMPTY, bytes)));
+                        () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(NamedXContentRegistry.EMPTY, bytes.streamInput())));
         return future.actionGet();
     }
 

+ 3 - 4
server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java

@@ -17,7 +17,6 @@ import org.elasticsearch.common.blobstore.BlobPath;
 import org.elasticsearch.common.blobstore.BlobStore;
 import org.elasticsearch.common.blobstore.DeleteResult;
 import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
-import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.util.Maps;
@@ -329,11 +328,11 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
                                 if (basePath().buildAsString().equals(path().buildAsString())) {
                                     try {
                                         final SnapshotInfo updatedInfo = BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(
-                                                blobName, namedXContentRegistry, new BytesArray(data));
+                                                namedXContentRegistry, new ByteArrayInputStream(data));
                                         // If the existing snapshotInfo differs only in the timestamps it stores, then the overwrite is not
                                         // a problem and could be the result of a correctly handled master failover.
-                                        final SnapshotInfo existingInfo = SNAPSHOT_FORMAT.deserialize(
-                                                blobName, namedXContentRegistry, Streams.readFully(readBlob(blobName)));
+                                        final SnapshotInfo existingInfo =
+                                            SNAPSHOT_FORMAT.deserialize(namedXContentRegistry, readBlob(blobName));
                                         assertThat(existingInfo.snapshotId(), equalTo(updatedInfo.snapshotId()));
                                         assertThat(existingInfo.reason(), equalTo(updatedInfo.reason()));
                                         assertThat(existingInfo.state(), equalTo(updatedInfo.state()));