|
|
@@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
|
|
|
import org.elasticsearch.common.recycler.Recycler;
|
|
|
import org.elasticsearch.common.xcontent.ChunkedToXContent;
|
|
|
import org.elasticsearch.core.IOUtils;
|
|
|
+import org.elasticsearch.core.Releasables;
|
|
|
import org.elasticsearch.core.Streams;
|
|
|
import org.elasticsearch.xcontent.ToXContent;
|
|
|
import org.elasticsearch.xcontent.XContentBuilder;
|
|
|
@@ -92,20 +93,32 @@ public interface ChunkedRestResponseBody {
|
|
|
|
|
|
@Override
|
|
|
public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException {
|
|
|
- final RecyclerBytesStreamOutput chunkStream = new RecyclerBytesStreamOutput(recycler);
|
|
|
- assert this.target == null;
|
|
|
- this.target = chunkStream;
|
|
|
- while (serialization.hasNext()) {
|
|
|
- serialization.next().toXContent(builder, params);
|
|
|
- if (chunkStream.size() >= sizeHint) {
|
|
|
- break;
|
|
|
+ try {
|
|
|
+ final RecyclerBytesStreamOutput chunkStream = new RecyclerBytesStreamOutput(recycler);
|
|
|
+ assert target == null;
|
|
|
+ target = chunkStream;
|
|
|
+ while (serialization.hasNext()) {
|
|
|
+ serialization.next().toXContent(builder, params);
|
|
|
+ if (chunkStream.size() >= sizeHint) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (serialization.hasNext() == false) {
|
|
|
+ builder.close();
|
|
|
+ }
|
|
|
+ final var result = new ReleasableBytesReference(
|
|
|
+ chunkStream.bytes(),
|
|
|
+ () -> Releasables.closeExpectNoException(chunkStream)
|
|
|
+ );
|
|
|
+ target = null;
|
|
|
+ return result;
|
|
|
+ } finally {
|
|
|
+ if (target != null) {
|
|
|
+ assert false : "failure encoding chunk";
|
|
|
+ IOUtils.closeWhileHandlingException(target);
|
|
|
+ target = null;
|
|
|
}
|
|
|
}
|
|
|
- if (serialization.hasNext() == false) {
|
|
|
- builder.close();
|
|
|
- }
|
|
|
- this.target = null;
|
|
|
- return new ReleasableBytesReference(chunkStream.bytes(), () -> IOUtils.closeWhileHandlingException(chunkStream));
|
|
|
}
|
|
|
|
|
|
@Override
|