Browse Source

[STORE] Write Snapshots directly to the blobstore stream

Today we serialize the snashot metadata to a byte array and then copy
the byte array to a stream. Instead this commit moves the serialization
directly to the target stream without the intermediate representation.

Closes #7637
Simon Willnauer 11 years ago
parent
commit
2619911e17

+ 20 - 32
src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -37,6 +37,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -221,40 +222,33 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
                     .indices(indices)
                     .startTime(System.currentTimeMillis())
                     .build();
-            BytesStreamOutput bStream = writeSnapshot(blobStoreSnapshot);
             String snapshotBlobName = snapshotBlobName(snapshotId);
             if (snapshotsBlobContainer.blobExists(snapshotBlobName)) {
                 // TODO: Can we make it atomic?
                 throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists");
             }
-            BytesReference bRef = bStream.bytes();
             try (OutputStream output = snapshotsBlobContainer.createOutput(snapshotBlobName)) {
-                bRef.writeTo(output);
+                writeSnapshot(blobStoreSnapshot, output);
             }
             // Write Global MetaData
             // TODO: Check if metadata needs to be written
-            bStream = writeGlobalMetaData(metaData);
-            bRef = bStream.bytes();
             try (OutputStream output = snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId))) {
-                bRef.writeTo(output);
+                writeGlobalMetaData(metaData, output);
             }
             for (String index : indices) {
-                IndexMetaData indexMetaData = metaData.index(index);
-                BlobPath indexPath = basePath().add("indices").add(index);
-                BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
-                bStream = new BytesStreamOutput();
-                StreamOutput stream = bStream;
-                if (isCompress()) {
-                    stream = CompressorFactory.defaultCompressor().streamOutput(stream);
-                }
-                XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
-                builder.startObject();
-                IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS);
-                builder.endObject();
-                builder.close();
-                bRef = bStream.bytes();
+                final IndexMetaData indexMetaData = metaData.index(index);
+                final BlobPath indexPath = basePath().add("indices").add(index);
+                final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
                 try (OutputStream output = indexMetaDataBlobContainer.createOutput(snapshotBlobName(snapshotId))) {
-                    bRef.writeTo(output);
+                    StreamOutput stream = new OutputStreamStreamOutput(output);
+                    if (isCompress()) {
+                        stream = CompressorFactory.defaultCompressor().streamOutput(stream);
+                    }
+                    XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
+                    builder.startObject();
+                    IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS);
+                    builder.endObject();
+                    builder.close();
                 }
             }
         } catch (IOException ex) {
@@ -334,10 +328,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
             }
             updatedSnapshot.endTime(System.currentTimeMillis());
             snapshot = updatedSnapshot.build();
-            BytesStreamOutput bStream = writeSnapshot(snapshot);
-            BytesReference bRef = bStream.bytes();
             try (OutputStream output = snapshotsBlobContainer.createOutput(blobName)) {
-                bRef.writeTo(output);
+                writeSnapshot(snapshot, output);
             }
             ImmutableList<SnapshotId> snapshotIds = snapshots();
             if (!snapshotIds.contains(snapshotId)) {
@@ -553,9 +545,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
      * @return BytesStreamOutput representing JSON serialized BlobStoreSnapshot
      * @throws IOException
      */
-    private BytesStreamOutput writeSnapshot(BlobStoreSnapshot snapshot) throws IOException {
-        BytesStreamOutput bStream = new BytesStreamOutput();
-        StreamOutput stream = bStream;
+    private void writeSnapshot(BlobStoreSnapshot snapshot, OutputStream outputStream) throws IOException {
+        StreamOutput stream = new OutputStreamStreamOutput(outputStream);
         if (isCompress()) {
             stream = CompressorFactory.defaultCompressor().streamOutput(stream);
         }
@@ -564,7 +555,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
         BlobStoreSnapshot.Builder.toXContent(snapshot, builder, globalOnlyFormatParams);
         builder.endObject();
         builder.close();
-        return bStream;
     }
 
     /**
@@ -574,9 +564,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
      * @return BytesStreamOutput representing JSON serialized global MetaData
      * @throws IOException
      */
-    private BytesStreamOutput writeGlobalMetaData(MetaData metaData) throws IOException {
-        BytesStreamOutput bStream = new BytesStreamOutput();
-        StreamOutput stream = bStream;
+    private void writeGlobalMetaData(MetaData metaData, OutputStream outputStream) throws IOException {
+        StreamOutput stream = new OutputStreamStreamOutput(outputStream) ;
         if (isCompress()) {
             stream = CompressorFactory.defaultCompressor().streamOutput(stream);
         }
@@ -585,7 +574,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
         MetaData.Builder.toXContent(metaData, builder, globalOnlyFormatParams);
         builder.endObject();
         builder.close();
-        return bStream;
     }
 
     /**