Bläddra i källkod

Use chunked REST serialization for large REST responses (#88311)

Adds chunked rest serialization infrastructure that tries to serialize
only what can be flushed to the channel right away instead of fully
materializing a response on heap first and then writing it to the channel.

Makes use of the new infrastructure for get-snapshots as an example use case.
Armin Braun 3 år sedan
förälder
incheckning
06d4ef7df1
26 ändrade filer med 990 tillägg och 107 borttagningar
  1. 5 0
      docs/changelog/88311.yaml
  2. 41 0
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java
  3. 122 28
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java
  4. 7 0
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java
  5. 2 12
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java
  6. 28 0
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RestResponse.java
  7. 283 0
      modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java
  8. 27 0
      qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/snapshots/RestGetSnapshotsIT.java
  9. 1 1
      server/src/main/java/org/elasticsearch/action/ActionModule.java
  10. 42 32
      server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java
  11. 39 0
      server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java
  12. 4 0
      server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java
  13. 26 18
      server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java
  14. 3 0
      server/src/main/java/org/elasticsearch/http/HttpRequest.java
  15. 21 4
      server/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java
  16. 117 0
      server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java
  17. 8 0
      server/src/main/java/org/elasticsearch/rest/RestChannel.java
  18. 11 0
      server/src/main/java/org/elasticsearch/rest/RestController.java
  19. 33 1
      server/src/main/java/org/elasticsearch/rest/RestResponse.java
  20. 15 11
      server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestGetSnapshotsAction.java
  21. 20 0
      server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java
  22. 53 0
      server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java
  23. 6 0
      server/src/test/java/org/elasticsearch/http/TestHttpRequest.java
  24. 65 0
      server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyTests.java
  25. 5 0
      server/src/test/java/org/elasticsearch/rest/RestControllerTests.java
  26. 6 0
      test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java

+ 5 - 0
docs/changelog/88311.yaml

@@ -0,0 +1,5 @@
+pr: 88311
+summary: Use chunked REST serialization for large REST responses
+area: Network
+type: enhancement
+issues: []

+ 41 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java

@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.http.netty4;
+
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+
+import org.elasticsearch.rest.ChunkedRestResponseBody;
+import org.elasticsearch.rest.RestStatus;
+
+/**
+ * A http response that will be transferred via chunked encoding when handled by {@link Netty4HttpPipeliningHandler}.
+ */
+public final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4RestResponse {
+
+    private final int sequence;
+
+    private final ChunkedRestResponseBody body;
+
+    Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBody body) {
+        super(version, HttpResponseStatus.valueOf(status.getStatus()));
+        this.sequence = sequence;
+        this.body = body;
+    }
+
+    public ChunkedRestResponseBody body() {
+        return body;
+    }
+
+    @Override
+    public int getSequence() {
+        return sequence;
+    }
+}

+ 122 - 28
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

@@ -11,6 +11,7 @@ package org.elasticsearch.http.netty4;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.compression.JdkZlibEncoder;
@@ -26,11 +27,17 @@ import io.netty.util.concurrent.PromiseCombiner;
 
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.core.Booleans;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.transport.Transports;
+import org.elasticsearch.transport.netty4.Netty4Utils;
+import org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler;
 import org.elasticsearch.transport.netty4.NettyAllocator;
 
+import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -41,13 +48,22 @@ import java.util.Queue;
 
 /**
  * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests.
+ * This handler also throttles write operations and will not pass any writes to the next handler so long as the channel is not writable.
  */
 public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
 
     private final Logger logger;
 
     private final int maxEventsHeld;
-    private final PriorityQueue<Tuple<Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue;
+    private final PriorityQueue<Tuple<? extends Netty4RestResponse, ChannelPromise>> outboundHoldingQueue;
+
+    private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Netty4ChunkedHttpResponse response) {}
+
+    /**
+     * The current {@link ChunkedWrite} if a chunked write is executed at the moment.
+     */
+    @Nullable
+    private ChunkedWrite currentChunkedWrite;
 
     /*
      * The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
@@ -119,14 +135,14 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
     }
 
     @Override
-    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
-        assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass();
+    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws IOException {
+        assert msg instanceof Netty4RestResponse : "Invalid message type: " + msg.getClass();
         boolean success = false;
         try {
-            final Netty4HttpResponse response = (Netty4HttpResponse) msg;
-            if (response.getSequence() != writeSequence) {
-                assert response.getSequence() > writeSequence
-                    : "response sequence [" + response.getSequence() + "] we below write sequence [" + writeSequence + "]";
+            final Netty4RestResponse restResponse = (Netty4RestResponse) msg;
+            if (restResponse.getSequence() != writeSequence) {
+                assert restResponse.getSequence() > writeSequence
+                    : "response sequence [" + restResponse.getSequence() + "] we below write sequence [" + writeSequence + "]";
                 if (outboundHoldingQueue.size() >= maxEventsHeld) {
                     int eventCount = outboundHoldingQueue.size() + 1;
                     throw new IllegalStateException(
@@ -134,21 +150,17 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
                     );
                 }
                 // response is not at the current sequence number so we add it to the outbound queue and return
-                outboundHoldingQueue.add(new Tuple<>(response, promise));
+                outboundHoldingQueue.add(new Tuple<>(restResponse, promise));
                 success = true;
                 return;
             }
 
             // response is at the current sequence number and does not need to wait for any other response to be written so we write
             // it out directly
-            doWrite(ctx, response, promise);
+            doWrite(ctx, restResponse, promise);
             success = true;
             // see if we have any queued up responses that became writeable due to the above write
-            while (outboundHoldingQueue.isEmpty() == false && outboundHoldingQueue.peek().v1().getSequence() == writeSequence) {
-                final Tuple<Netty4HttpResponse, ChannelPromise> top = outboundHoldingQueue.poll();
-                assert top != null : "we know the outbound holding queue to not be empty at this point";
-                doWrite(ctx, top.v1(), top.v2());
-            }
+            doWriteQueued(ctx);
         } catch (IllegalStateException e) {
             ctx.channel().close();
         } finally {
@@ -158,6 +170,14 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
         }
     }
 
+    private void doWriteQueued(ChannelHandlerContext ctx) throws IOException {
+        while (outboundHoldingQueue.isEmpty() == false && outboundHoldingQueue.peek().v1().getSequence() == writeSequence) {
+            final Tuple<? extends Netty4RestResponse, ChannelPromise> top = outboundHoldingQueue.poll();
+            assert top != null : "we know the outbound holding queue to not be empty at this point";
+            doWrite(ctx, top.v1(), top.v2());
+        }
+    }
+
     private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses";
 
     private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES;
@@ -169,6 +189,15 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
         SPLIT_THRESHOLD = (int) (NettyAllocator.suggestedMaxAllocationSize() * 0.99);
     }
 
+    private void doWrite(ChannelHandlerContext ctx, Netty4RestResponse readyResponse, ChannelPromise promise) throws IOException {
+        assert currentChunkedWrite == null : "unexpected existing write [" + currentChunkedWrite + "]";
+        if (readyResponse instanceof Netty4HttpResponse) {
+            doWrite(ctx, (Netty4HttpResponse) readyResponse, promise);
+        } else {
+            doWrite(ctx, (Netty4ChunkedHttpResponse) readyResponse, promise);
+        }
+    }
+
     /**
      * Split up large responses to prevent batch compression {@link JdkZlibEncoder} down the pipeline.
      */
@@ -181,6 +210,31 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
         writeSequence++;
     }
 
+    private void doWrite(ChannelHandlerContext ctx, Netty4ChunkedHttpResponse readyResponse, ChannelPromise promise) throws IOException {
+        final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
+        final ChannelPromise first = ctx.newPromise();
+        combiner.add((Future<Void>) first);
+        currentChunkedWrite = new ChunkedWrite(combiner, promise, readyResponse);
+        if (enqueueWrite(ctx, readyResponse, first)) {
+            // we were able to write out the first chunk directly, try writing out subsequent chunks until the channel becomes unwritable
+            while (ctx.channel().isWritable()) {
+                if (writeChunk(ctx, combiner, readyResponse.body())) {
+                    finishChunkedWrite();
+                    return;
+                }
+            }
+        }
+    }
+
+    private void finishChunkedWrite() {
+        try {
+            currentChunkedWrite.combiner.finish(currentChunkedWrite.onDone);
+        } finally {
+            currentChunkedWrite = null;
+            writeSequence++;
+        }
+    }
+
     private void splitAndWrite(ChannelHandlerContext ctx, Netty4HttpResponse msg, ChannelPromise promise) {
         final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
         HttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers());
@@ -193,7 +247,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
         combiner.finish(promise);
     }
 
-    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws IOException {
         if (ctx.channel().isWritable()) {
             doFlush(ctx);
         }
@@ -201,7 +255,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
     }
 
     @Override
-    public void flush(ChannelHandlerContext ctx) {
+    public void flush(ChannelHandlerContext ctx) throws IOException {
         if (doFlush(ctx) == false) {
             ctx.flush();
         }
@@ -218,7 +272,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
      *
      * @return true if a call to this method resulted in a call to {@link ChannelHandlerContext#flush()} on the given {@code ctx}
      */
-    private boolean doFlush(ChannelHandlerContext ctx) {
+    private boolean doFlush(ChannelHandlerContext ctx) throws IOException {
         assert ctx.executor().inEventLoop();
         final Channel channel = ctx.channel();
         if (channel.isActive() == false) {
@@ -226,8 +280,22 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
             return false;
         }
         while (channel.isWritable()) {
-            final WriteOperation currentWrite = queuedWrites.poll();
+            WriteOperation currentWrite = queuedWrites.poll();
             if (currentWrite == null) {
+                doWriteQueued(ctx);
+                if (channel.isWritable() == false) {
+                    break;
+                }
+                currentWrite = queuedWrites.poll();
+            }
+            if (currentWrite == null) {
+                // no bytes were found queued, check if a chunked message might have become writable
+                if (currentChunkedWrite != null) {
+                    if (writeChunk(ctx, currentChunkedWrite.combiner, currentChunkedWrite.response.body())) {
+                        finishChunkedWrite();
+                    }
+                    continue;
+                }
                 break;
             }
             ctx.write(currentWrite.msg, currentWrite.promise);
@@ -239,6 +307,21 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
         return true;
     }
 
+    private boolean writeChunk(ChannelHandlerContext ctx, PromiseCombiner combiner, ChunkedRestResponseBody body) throws IOException {
+        assert body.isDone() == false : "should not continue to try and serialize once done";
+        final ReleasableBytesReference bytes = body.encodeChunk(
+            Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE,
+            serverTransport.recycler()
+        );
+        assert bytes.length() > 0 : "serialization should not produce empty buffers";
+        final ByteBuf content = Netty4Utils.toByteBuf(bytes);
+        final boolean done = body.isDone();
+        final ChannelFuture f = ctx.write(done ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
+        f.addListener(ignored -> bytes.close());
+        combiner.add(f);
+        return done;
+    }
+
     private void failQueuedWrites() {
         WriteOperation queuedWrite;
         while ((queuedWrite = queuedWrites.poll()) != null) {
@@ -248,32 +331,43 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
 
     @Override
     public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
-        List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = removeAllInflightResponses();
+        if (currentChunkedWrite != null) {
+            safeFailPromise(currentChunkedWrite.onDone, new ClosedChannelException());
+            currentChunkedWrite = null;
+        }
+        List<Tuple<? extends Netty4RestResponse, ChannelPromise>> inflightResponses = removeAllInflightResponses();
 
         if (inflightResponses.isEmpty() == false) {
             ClosedChannelException closedChannelException = new ClosedChannelException();
-            for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
-                try {
-                    inflightResponse.v2().setFailure(closedChannelException);
-                } catch (RuntimeException e) {
-                    logger.error("unexpected error while releasing pipelined http responses", e);
-                }
+            for (Tuple<? extends Netty4RestResponse, ChannelPromise> inflightResponse : inflightResponses) {
+                safeFailPromise(inflightResponse.v2(), closedChannelException);
             }
         }
         ctx.close(promise);
     }
 
+    private void safeFailPromise(ChannelPromise promise, Exception ex) {
+        try {
+            promise.setFailure(ex);
+        } catch (RuntimeException e) {
+            logger.error("unexpected error while releasing pipelined http responses", e);
+        }
+    }
+
     private Future<Void> enqueueWrite(ChannelHandlerContext ctx, HttpObject msg) {
         final ChannelPromise p = ctx.newPromise();
         enqueueWrite(ctx, msg, p);
         return p;
     }
 
-    private void enqueueWrite(ChannelHandlerContext ctx, HttpObject msg, ChannelPromise promise) {
+    // returns true if the write was actually executed and false if it was just queued up
+    private boolean enqueueWrite(ChannelHandlerContext ctx, HttpObject msg, ChannelPromise promise) {
         if (ctx.channel().isWritable() && queuedWrites.isEmpty()) {
             ctx.write(msg, promise);
+            return true;
         } else {
             queuedWrites.add(new WriteOperation(msg, promise));
+            return false;
         }
     }
 
@@ -290,8 +384,8 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
         }
     }
 
-    private List<Tuple<Netty4HttpResponse, ChannelPromise>> removeAllInflightResponses() {
-        ArrayList<Tuple<Netty4HttpResponse, ChannelPromise>> responses = new ArrayList<>(outboundHoldingQueue);
+    private List<Tuple<? extends Netty4RestResponse, ChannelPromise>> removeAllInflightResponses() {
+        ArrayList<Tuple<? extends Netty4RestResponse, ChannelPromise>> responses = new ArrayList<>(outboundHoldingQueue);
         outboundHoldingQueue.clear();
         return responses;
     }

+ 7 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

@@ -22,6 +22,8 @@ import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
 
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.http.HttpRequest;
+import org.elasticsearch.http.HttpResponse;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.transport.netty4.Netty4Utils;
@@ -237,6 +239,11 @@ public class Netty4HttpRequest implements HttpRequest {
         return new Netty4HttpResponse(sequence, request.protocolVersion(), status, contentRef);
     }
 
+    @Override
+    public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) {
+        return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, content);
+    }
+
     @Override
     public Exception getInboundException() {
         return inboundException;

+ 2 - 12
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java

@@ -13,11 +13,10 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.http.HttpResponse;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.transport.netty4.Netty4Utils;
 
-public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse {
+public class Netty4HttpResponse extends DefaultFullHttpResponse implements Netty4RestResponse {
 
     private final int sequence;
 
@@ -26,17 +25,8 @@ public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpR
         this.sequence = sequence;
     }
 
+    @Override
     public int getSequence() {
         return sequence;
     }
-
-    @Override
-    public void addHeader(String name, String value) {
-        headers().add(name, value);
-    }
-
-    @Override
-    public boolean containsHeader(String name) {
-        return headers().contains(name);
-    }
 }

+ 28 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RestResponse.java

@@ -0,0 +1,28 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.http.netty4;
+
+import io.netty.handler.codec.http.HttpMessage;
+
+import org.elasticsearch.http.HttpResponse;
+
+public interface Netty4RestResponse extends HttpResponse, HttpMessage {
+
+    int getSequence();
+
+    @Override
+    default void addHeader(String name, String value) {
+        headers().add(name, value);
+    }
+
+    @Override
+    default boolean containsHeader(String name) {
+        return headers().contains(name);
+    }
+}

+ 283 - 0
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java

@@ -8,22 +8,31 @@
 
 package org.elasticsearch.http.netty4;
 
+import io.netty.buffer.ByteBufHolder;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.DefaultHttpContent;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.QueryStringDecoder;
 
+import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.http.HttpResponse;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.transport.netty4.Netty4Utils;
 import org.junit.After;
 
 import java.nio.channels.ClosedChannelException;
@@ -41,7 +50,11 @@ import java.util.stream.IntStream;
 
 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.mock;
 
 public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
 
@@ -197,6 +210,276 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
         }
     }
 
+    public void testResumesChunkedMessage() {
+        final List<Object> messagesSeen = new ArrayList<>();
+        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler());
+        embeddedChannel.writeInbound(createHttpRequest("/chunked"));
+        final Netty4HttpRequest request = embeddedChannel.readInbound();
+        final BytesReference chunk = new BytesArray(randomByteArrayOfLength(embeddedChannel.config().getWriteBufferHighWaterMark() + 1));
+        final int chunks = randomIntBetween(2, 10);
+        final HttpResponse response = request.createResponse(RestStatus.OK, getRepeatedChunkResponseBody(chunks, chunk));
+        final ChannelPromise promise = embeddedChannel.newPromise();
+        embeddedChannel.write(response, promise);
+        assertFalse("should not be fully flushed right away", promise.isDone());
+        assertThat(messagesSeen, hasSize(2));
+        embeddedChannel.flush();
+        assertTrue(promise.isDone());
+        assertThat(messagesSeen, hasSize(chunks + 1));
+        assertChunkedMessageAtIndex(messagesSeen, 0, chunks, chunk);
+    }
+
+    public void testResumesAfterChunkedMessage() {
+        final List<Object> messagesSeen = new ArrayList<>();
+        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler());
+        embeddedChannel.writeInbound(createHttpRequest("/chunked"));
+        embeddedChannel.writeInbound(createHttpRequest("/chunked2"));
+        final Netty4HttpRequest request1 = embeddedChannel.readInbound();
+        final Netty4HttpRequest request2 = embeddedChannel.readInbound();
+
+        final int chunks1 = randomIntBetween(2, 10);
+        final int chunks2 = randomIntBetween(2, 10);
+        final BytesReference chunk = new BytesArray(randomByteArrayOfLength(embeddedChannel.config().getWriteBufferHighWaterMark() + 1));
+        final HttpResponse response1 = request1.createResponse(RestStatus.OK, getRepeatedChunkResponseBody(chunks1, chunk));
+        final HttpResponse response2 = request2.createResponse(RestStatus.OK, getRepeatedChunkResponseBody(chunks2, chunk));
+        final ChannelPromise promise1 = embeddedChannel.newPromise();
+        final ChannelPromise promise2 = embeddedChannel.newPromise();
+        if (randomBoolean()) {
+            // randomly write messages out of order
+            embeddedChannel.write(response2, promise2);
+            embeddedChannel.write(response1, promise1);
+        } else {
+            embeddedChannel.write(response1, promise1);
+            embeddedChannel.write(response2, promise2);
+        }
+        assertFalse("should not be fully flushed right away", promise1.isDone());
+        assertThat(messagesSeen, hasSize(2));
+        embeddedChannel.flush();
+        assertTrue(promise1.isDone());
+        assertThat(messagesSeen, hasSize(chunks1 + chunks2 + 2));
+        assertChunkedMessageAtIndex(messagesSeen, 0, chunks1, chunk);
+        assertChunkedMessageAtIndex(messagesSeen, chunks1 + 1, chunks2, chunk);
+        assertTrue(promise2.isDone());
+    }
+
+    public void testResumesSingleAfterChunkedMessage() {
+        final List<Object> messagesSeen = new ArrayList<>();
+        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler());
+        embeddedChannel.writeInbound(createHttpRequest("/chunked"));
+        embeddedChannel.writeInbound(createHttpRequest("/single"));
+        final Netty4HttpRequest request1 = embeddedChannel.readInbound();
+        final Netty4HttpRequest request2 = embeddedChannel.readInbound();
+
+        final int chunks1 = randomIntBetween(2, 10);
+        final BytesReference chunk = new BytesArray(randomByteArrayOfLength(embeddedChannel.config().getWriteBufferHighWaterMark() + 1));
+        final HttpResponse response1 = request1.createResponse(RestStatus.OK, getRepeatedChunkResponseBody(chunks1, chunk));
+        final BytesReference single = new BytesArray(
+            randomByteArrayOfLength(randomIntBetween(1, embeddedChannel.config().getWriteBufferHighWaterMark()))
+        );
+        final HttpResponse response2 = request2.createResponse(RestStatus.OK, single);
+        final ChannelPromise promise1 = embeddedChannel.newPromise();
+        final ChannelPromise promise2 = embeddedChannel.newPromise();
+        if (randomBoolean()) {
+            // randomly write messages out of order
+            embeddedChannel.write(response2, promise2);
+            embeddedChannel.write(response1, promise1);
+        } else {
+            embeddedChannel.write(response1, promise1);
+            embeddedChannel.write(response2, promise2);
+        }
+        assertFalse("should not be fully flushed right away", promise1.isDone());
+        assertThat(messagesSeen, hasSize(2));
+        embeddedChannel.flush();
+        assertTrue(promise1.isDone());
+        assertThat(messagesSeen, hasSize(chunks1 + 1 + 1));
+        assertChunkedMessageAtIndex(messagesSeen, 0, chunks1, chunk);
+        assertThat(messagesSeen.get(chunks1 + 1), instanceOf(Netty4HttpResponse.class));
+        assertContentAtIndexEquals(messagesSeen, chunks1 + 1, single);
+        assertTrue(promise2.isDone());
+    }
+
+    public void testChunkedResumesAfterSingleMessage() {
+        final List<Object> messagesSeen = new ArrayList<>();
+        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler());
+        embeddedChannel.writeInbound(createHttpRequest("/chunked"));
+        final Netty4HttpRequest request1 = embeddedChannel.readInbound();
+        embeddedChannel.writeInbound(createHttpRequest("/chunked2"));
+        final Netty4HttpRequest request2 = embeddedChannel.readInbound();
+
+        final int chunks2 = randomIntBetween(2, 10);
+        final BytesReference chunk = new BytesArray(randomByteArrayOfLength(embeddedChannel.config().getWriteBufferHighWaterMark() + 1));
+        final HttpResponse response1 = request1.createResponse(RestStatus.OK, chunk);
+        final HttpResponse response2 = request2.createResponse(RestStatus.OK, getRepeatedChunkResponseBody(chunks2, chunk));
+        final ChannelPromise promise1 = embeddedChannel.newPromise();
+        final ChannelPromise promise2 = embeddedChannel.newPromise();
+        if (randomBoolean()) {
+            // randomly write messages out of order
+            embeddedChannel.write(response2, promise2);
+            assertTrue(embeddedChannel.isWritable());
+            embeddedChannel.write(response1, promise1);
+            assertFalse(embeddedChannel.isWritable());
+        } else {
+            embeddedChannel.write(response1, promise1);
+            assertFalse(embeddedChannel.isWritable());
+            embeddedChannel.write(response2, promise2);
+        }
+        assertFalse("should not be fully flushed right away", promise1.isDone());
+        assertThat("unexpected [" + messagesSeen + "]", messagesSeen, hasSize(1));
+        embeddedChannel.flush();
+        assertTrue(promise1.isDone());
+        assertThat(messagesSeen, hasSize(chunks2 + 2));
+        assertThat(messagesSeen.get(0), instanceOf(Netty4HttpResponse.class));
+        assertChunkedMessageAtIndex(messagesSeen, 1, chunks2, chunk);
+        assertTrue(promise2.isDone());
+    }
+
+    public void testChunkedWithSmallChunksResumesAfterSingleMessage() {
+        final List<Object> messagesSeen = new ArrayList<>();
+        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler());
+        embeddedChannel.writeInbound(createHttpRequest("/chunked"));
+        final Netty4HttpRequest request1 = embeddedChannel.readInbound();
+        embeddedChannel.writeInbound(createHttpRequest("/chunked2"));
+        final Netty4HttpRequest request2 = embeddedChannel.readInbound();
+
+        final int chunks2 = randomIntBetween(2, 10);
+        final HttpResponse response1 = request1.createResponse(
+            RestStatus.OK,
+            new BytesArray(randomByteArrayOfLength(embeddedChannel.config().getWriteBufferHighWaterMark() + 1))
+        );
+        final BytesReference chunk = new BytesArray(randomByteArrayOfLength(randomIntBetween(10, 512)));
+        final HttpResponse response2 = request2.createResponse(RestStatus.OK, getRepeatedChunkResponseBody(chunks2, chunk));
+        final ChannelPromise promise1 = embeddedChannel.newPromise();
+        final ChannelPromise promise2 = embeddedChannel.newPromise();
+        if (randomBoolean()) {
+            // randomly write messages out of order
+            embeddedChannel.write(response2, promise2);
+            assertTrue(embeddedChannel.isWritable());
+            embeddedChannel.write(response1, promise1);
+            assertFalse(embeddedChannel.isWritable());
+        } else {
+            embeddedChannel.write(response1, promise1);
+            assertFalse(embeddedChannel.isWritable());
+            embeddedChannel.write(response2, promise2);
+        }
+        assertFalse("should not be fully flushed right away", promise1.isDone());
+        assertThat("unexpected [" + messagesSeen + "]", messagesSeen, hasSize(1));
+        embeddedChannel.flush();
+        assertTrue(promise1.isDone());
+        assertThat(messagesSeen, hasSize(chunks2 + 2));
+        assertThat(messagesSeen.get(0), instanceOf(Netty4HttpResponse.class));
+        assertChunkedMessageAtIndex(messagesSeen, 1, chunks2, chunk);
+        assertTrue(promise2.isDone());
+    }
+
+    public void testPipeliningRequestsAreReleasedAfterFailureOnChunked() {
+        final List<Object> messagesSeen = new ArrayList<>();
+        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler());
+        embeddedChannel.writeInbound(createHttpRequest("/chunked"));
+        final Netty4HttpRequest chunkedResponseRequest = embeddedChannel.readInbound();
+
+        final BytesReference chunk = new BytesArray(randomByteArrayOfLength(embeddedChannel.config().getWriteBufferHighWaterMark() + 1));
+        final HttpResponse chunkedResponse = chunkedResponseRequest.createResponse(
+            RestStatus.OK,
+            getRepeatedChunkResponseBody(randomIntBetween(2, 10), chunk)
+        );
+        final ChannelPromise chunkedWritePromise = embeddedChannel.newPromise();
+        embeddedChannel.write(chunkedResponse, chunkedWritePromise);
+
+        for (int i = 0; i < randomIntBetween(5, 10); i++) {
+            embeddedChannel.writeInbound(createHttpRequest("/" + i));
+        }
+
+        Netty4HttpRequest inbound;
+        ArrayList<Netty4HttpRequest> requests = new ArrayList<>();
+        while ((inbound = embeddedChannel.readInbound()) != null) {
+            requests.add(inbound);
+        }
+
+        ArrayList<ChannelPromise> promises = new ArrayList<>();
+        for (Netty4HttpRequest request : requests) {
+            ChannelPromise promise = embeddedChannel.newPromise();
+            promises.add(promise);
+            Netty4HttpResponse resp = request.createResponse(RestStatus.OK, BytesArray.EMPTY);
+            embeddedChannel.write(resp, promise);
+        }
+        assertFalse(chunkedWritePromise.isDone());
+        for (ChannelPromise promise : promises) {
+            assertFalse(promise.isDone());
+        }
+        embeddedChannel.close().syncUninterruptibly();
+        assertDoneWithClosedChannel(chunkedWritePromise);
+        for (ChannelPromise promise : promises) {
+            assertDoneWithClosedChannel(promise);
+        }
+        // we wrote the first chunk and its headers only
+        assertThat(messagesSeen, hasSize(2));
+        assertThat(messagesSeen.get(0), instanceOf(Netty4ChunkedHttpResponse.class));
+        assertThat(messagesSeen.get(1), instanceOf(DefaultHttpContent.class));
+    }
+
+    // assert that a message of the given number of repeated chunks is found at the given index in the list and each chunk is equal to
+    // the given BytesReference
+    private static void assertChunkedMessageAtIndex(List<Object> messagesSeen, int index, int chunks, BytesReference chunkBytes) {
+        assertThat(messagesSeen.get(index), instanceOf(Netty4ChunkedHttpResponse.class));
+        for (int i = index + 1; i < chunks; i++) {
+            assertThat(messagesSeen.get(i), instanceOf(DefaultHttpContent.class));
+            assertContentAtIndexEquals(messagesSeen, i, chunkBytes);
+        }
+        assertThat(messagesSeen.get(index + chunks), instanceOf(LastHttpContent.class));
+    }
+
+    private static void assertContentAtIndexEquals(List<Object> messagesSeen, int index, BytesReference single) {
+        assertEquals(Netty4Utils.toBytesReference(((ByteBufHolder) messagesSeen.get(index)).content()), single);
+    }
+
+    private static void assertDoneWithClosedChannel(ChannelPromise chunkedWritePromise) {
+        assertTrue(chunkedWritePromise.isDone());
+        assertThat(chunkedWritePromise.cause(), instanceOf(ClosedChannelException.class));
+    }
+
+    private Netty4HttpPipeliningHandler getTestHttpHandler() {
+        return new Netty4HttpPipeliningHandler(logger, Integer.MAX_VALUE, mock(Netty4HttpServerTransport.class)) {
+            @Override
+            protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
+                ctx.fireChannelRead(pipelinedRequest);
+            }
+        };
+    }
+
+    private static ChunkedRestResponseBody getRepeatedChunkResponseBody(int chunkCount, BytesReference chunk) {
+        return new ChunkedRestResponseBody() {
+
+            private int remaining = chunkCount;
+
+            @Override
+            public boolean isDone() {
+                return remaining == 0;
+            }
+
+            @Override
+            public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) {
+                assertThat(remaining, greaterThan(0));
+                remaining--;
+                return ReleasableBytesReference.wrap(chunk);
+            }
+
+            @Override
+            public String getResponseContentTypeString() {
+                return "application/octet-stream";
+            }
+        };
+    }
+
+    private static ChannelDuplexHandler capturingHandler(List<Object> messagesSeen) {
+        return new ChannelDuplexHandler() {
+            @Override
+            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+                assertTrue(ctx.channel().isWritable());
+                messagesSeen.add(msg);
+                super.write(ctx, msg, promise);
+            }
+        };
+    }
+
     private void assertReadHttpMessageHasContent(EmbeddedChannel embeddedChannel, String expectedContent) {
         FullHttpResponse response = (FullHttpResponse) embeddedChannel.outboundMessages().poll();
         assertNotNull("Expected response to exist, maybe you did not wait long enough?", response);

+ 27 - 0
qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/snapshots/RestGetSnapshotsIT.java

@@ -321,6 +321,33 @@ public class RestGetSnapshotsIT extends AbstractSnapshotRestTestCase {
         assertThat(allBeforeStartTimeDescending(startTime1 - 1), empty());
     }
 
+    public void testLargeChunkedResponses() throws Exception {
+        final String repoName = "test-repo";
+        AbstractSnapshotIntegTestCase.createRepository(logger, repoName, "fs");
+        for (int i = 0; i < 100; i++) {
+            createIndexWithContent("test-index-a-" + i);
+        }
+        final List<String> snapshotNamesWithoutIndex = AbstractSnapshotIntegTestCase.createNSnapshots(
+            logger,
+            repoName,
+            randomIntBetween(200, 300)
+        );
+
+        for (int i = 0; i < 100; i++) {
+            createIndexWithContent("test-index-b-" + i);
+        }
+
+        final List<String> snapshotNamesWithIndex = AbstractSnapshotIntegTestCase.createNSnapshots(
+            logger,
+            repoName,
+            randomIntBetween(200, 300)
+        );
+        final Collection<String> allSnapshotNames = new HashSet<>(snapshotNamesWithIndex);
+        allSnapshotNames.addAll(snapshotNamesWithoutIndex);
+        doTestSortOrder(repoName, allSnapshotNames, SortOrder.ASC);
+        doTestSortOrder(repoName, allSnapshotNames, SortOrder.DESC);
+    }
+
     // create a snapshot that is guaranteed to have a unique start time
     private SnapshotInfo createFullSnapshotWithUniqueStartTime(String repoName, String snapshotName, Set<Long> forbiddenStartTimes) {
         while (true) {

+ 1 - 1
server/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -745,7 +745,7 @@ public class ActionModule extends AbstractModule {
         registerHandler.accept(new RestDeleteRepositoryAction());
         registerHandler.accept(new RestVerifyRepositoryAction());
         registerHandler.accept(new RestCleanupRepositoryAction());
-        registerHandler.accept(new RestGetSnapshotsAction(threadPool));
+        registerHandler.accept(new RestGetSnapshotsAction());
         registerHandler.accept(new RestCreateSnapshotAction());
         registerHandler.accept(new RestCloneSnapshotAction());
         registerHandler.accept(new RestRestoreSnapshotAction());

+ 42 - 32
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java

@@ -11,20 +11,21 @@ package org.elasticsearch.action.admin.cluster.snapshots.get;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContent;
-import org.elasticsearch.xcontent.ToXContentObject;
-import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -32,7 +33,7 @@ import java.util.Objects;
 /**
  * Get snapshots response
  */
-public class GetSnapshotsResponse extends ActionResponse implements ToXContentObject {
+public class GetSnapshotsResponse extends ActionResponse implements ChunkedToXContent {
 
     private static final int UNKNOWN_COUNT = -1;
 
@@ -164,36 +165,45 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb
     }
 
     @Override
-    public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
-        builder.startObject();
-        builder.startArray("snapshots");
-        for (SnapshotInfo snapshotInfo : snapshots) {
-            snapshotInfo.toXContentExternal(builder, params);
-        }
-        builder.endArray();
-        if (failures.isEmpty() == false) {
-            builder.startObject("failures");
-            for (Map.Entry<String, ElasticsearchException> error : failures.entrySet()) {
-                builder.field(error.getKey(), (b, pa) -> {
-                    b.startObject();
-                    error.getValue().toXContent(b, pa);
+    public Iterator<ToXContent> toXContentChunked() {
+        return Iterators.concat(Iterators.single((b, p) -> {
+            b.startObject();
+            b.startArray("snapshots");
+            return b;
+        }),
+            getSnapshots().stream().map(snapshotInfo -> (ToXContent) snapshotInfo::toXContentExternal).iterator(),
+            Iterators.single((b, p) -> {
+                b.endArray();
+                if (failures.isEmpty() == false) {
+                    b.startObject("failures");
+                    for (Map.Entry<String, ElasticsearchException> error : failures.entrySet()) {
+                        b.field(error.getKey(), (bb, pa) -> {
+                            bb.startObject();
+                            error.getValue().toXContent(bb, pa);
+                            bb.endObject();
+                            return bb;
+                        });
+                    }
                     b.endObject();
-                    return b;
-                });
-            }
-            builder.endObject();
-        }
-        if (next != null) {
-            builder.field("next", next);
-        }
-        if (total >= 0) {
-            builder.field("total", total);
-        }
-        if (remaining >= 0) {
-            builder.field("remaining", remaining);
-        }
-        builder.endObject();
-        return builder;
+                }
+                if (next != null) {
+                    b.field("next", next);
+                }
+                if (total >= 0) {
+                    b.field("total", total);
+                }
+                if (remaining >= 0) {
+                    b.field("remaining", remaining);
+                }
+                b.endObject();
+                return b;
+            })
+        );
+    }
+
+    @Override
+    public boolean isFragment() {
+        return false;
     }
 
     public static GetSnapshotsResponse fromXContent(XContentParser parser) throws IOException {

+ 39 - 0
server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java

@@ -0,0 +1,39 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.xcontent;
+
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An extension of {@link ToXContent} that can be serialized in chunks by creating an {@link Iterator<ToXContent>}.
+ * This is used by the REST layer to implement flow control that does not rely on blocking the serializing thread when writing the
+ * serialized bytes to a non-blocking channel.
+ */
+public interface ChunkedToXContent extends ToXContent {
+
+    /**
+     * Create an iterator of {@link ToXContent} chunks, that must be serialized individually with the same {@link XContentBuilder} and
+     * {@link ToXContent.Params} for each call until it is fully drained.
+     * @return iterator over chunks of {@link ToXContent}
+     */
+    Iterator<ToXContent> toXContentChunked();
+
+    @Override
+    default XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        Iterator<ToXContent> serialization = toXContentChunked();
+        while (serialization.hasNext()) {
+            serialization.next().toXContent(builder, params);
+        }
+        return builder;
+    }
+}

+ 4 - 0
server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java

@@ -136,6 +136,10 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
         httpClientStatsTracker = new HttpClientStatsTracker(settings, clusterSettings, threadPool);
     }
 
+    public Recycler<BytesRef> recycler() {
+        return recycler;
+    }
+
     @Override
     public BoundTransportAddress boundAddress() {
         return this.boundAddress;

+ 26 - 18
server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java

@@ -101,25 +101,29 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann
         String opaque = null;
         String contentLength = null;
 
+        boolean isHeadRequest = false;
         try {
-            final BytesReference content = restResponse.content();
-            if (content instanceof Releasable) {
-                toClose.add((Releasable) content);
+            if (request.method() == RestRequest.Method.HEAD) {
+                isHeadRequest = true;
             }
-            toClose.add(this::releaseOutputBuffer);
-
-            BytesReference finalContent = content;
-            try {
-                if (request.method() == RestRequest.Method.HEAD) {
-                    finalContent = BytesArray.EMPTY;
+        } catch (IllegalArgumentException ignored) {
+            assert restResponse.status() == RestStatus.METHOD_NOT_ALLOWED
+                : "request HTTP method is unsupported but HTTP status is not METHOD_NOT_ALLOWED(405)";
+        }
+        try {
+            final HttpResponse httpResponse;
+            if (isHeadRequest == false && restResponse.isChunked()) {
+                httpResponse = httpRequest.createResponse(restResponse.status(), restResponse.chunkedContent());
+            } else {
+                final BytesReference content = restResponse.content();
+                if (content instanceof Releasable) {
+                    toClose.add((Releasable) content);
                 }
-            } catch (IllegalArgumentException ignored) {
-                assert restResponse.status() == RestStatus.METHOD_NOT_ALLOWED
-                    : "request HTTP method is unsupported but HTTP status is not METHOD_NOT_ALLOWED(405)";
-            }
-
-            final HttpResponse httpResponse = httpRequest.createResponse(restResponse.status(), finalContent);
+                toClose.add(this::releaseOutputBuffer);
 
+                BytesReference finalContent = isHeadRequest ? BytesArray.EMPTY : content;
+                httpResponse = httpRequest.createResponse(restResponse.status(), finalContent);
+            }
             corsHandler.setCorsResponseHeaders(httpRequest, httpResponse);
 
             opaque = request.header(X_OPAQUE_ID_HTTP_HEADER);
@@ -133,9 +137,13 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann
 
             // If our response doesn't specify a content-type header, set one
             setHeaderField(httpResponse, CONTENT_TYPE, restResponse.contentType(), false);
-            // If our response has no content-length, calculate and set one
-            contentLength = String.valueOf(restResponse.content().length());
-            setHeaderField(httpResponse, CONTENT_LENGTH, contentLength, false);
+            if (restResponse.isChunked() == false) {
+                // If our response has no content-length, calculate and set one
+                contentLength = String.valueOf(restResponse.content().length());
+                setHeaderField(httpResponse, CONTENT_LENGTH, contentLength, false);
+            } else {
+                setHeaderField(httpResponse, "Transfer-Encoding", "chunked");
+            }
 
             addCookies(httpResponse);
 

+ 3 - 0
server/src/main/java/org/elasticsearch/http/HttpRequest.java

@@ -10,6 +10,7 @@ package org.elasticsearch.http;
 
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestStatus;
 
@@ -66,6 +67,8 @@ public interface HttpRequest {
      */
     HttpResponse createResponse(RestStatus status, BytesReference content);
 
+    HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content);
+
     @Nullable
     Exception getInboundException();
 

+ 21 - 4
server/src/main/java/org/elasticsearch/rest/AbstractRestChannel.java

@@ -10,7 +10,6 @@ package org.elasticsearch.rest;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.stream.BytesStream;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.core.Nullable;
@@ -99,6 +98,26 @@ public abstract class AbstractRestChannel implements RestChannel {
         @Nullable XContentType responseContentType,
         boolean useFiltering
     ) throws IOException {
+        return newBuilder(
+            requestContentType,
+            responseContentType,
+            useFiltering,
+            org.elasticsearch.common.io.Streams.flushOnCloseStream(bytesOutput())
+        );
+    }
+
+    /**
+     * Creates a new {@link XContentBuilder} for a response to be sent using this channel. The builder's type can be sent as a parameter,
+     * through {@code responseContentType} or it can fallback to {@link #newBuilder(XContentType, boolean)} logic if the sent type value
+     * is {@code null}.
+     */
+    @Override
+    public XContentBuilder newBuilder(
+        @Nullable XContentType requestContentType,
+        @Nullable XContentType responseContentType,
+        boolean useFiltering,
+        OutputStream outputStream
+    ) throws IOException {
 
         if (responseContentType == null) {
             if (Strings.hasText(format)) {
@@ -129,8 +148,6 @@ public abstract class AbstractRestChannel implements RestChannel {
             excludes = filters.stream().filter(EXCLUDE_FILTER).map(f -> f.substring(1)).collect(toSet());
         }
 
-        OutputStream unclosableOutputStream = Streams.flushOnCloseStream(bytesOutput());
-
         Map<String, String> parameters = request.getParsedAccept() != null
             ? request.getParsedAccept().getParameters()
             : Collections.emptyMap();
@@ -138,7 +155,7 @@ public abstract class AbstractRestChannel implements RestChannel {
 
         XContentBuilder builder = new XContentBuilder(
             XContentFactory.xContent(responseContentType),
-            unclosableOutputStream,
+            outputStream,
             includes,
             excludes,
             responseMediaType,

+ 117 - 0
server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java

@@ -0,0 +1,117 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.rest;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.io.stream.BytesStream;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
+import org.elasticsearch.common.recycler.Recycler;
+import org.elasticsearch.common.xcontent.ChunkedToXContent;
+import org.elasticsearch.core.IOUtils;
+import org.elasticsearch.core.Streams;
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+/**
+ * The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and
+ * instead serialize only as much of the response as can be flushed to the network right away.
+ */
+public interface ChunkedRestResponseBody {
+
+    /**
+     * @return true once this response has been written fully.
+     */
+    boolean isDone();
+
+    /**
+     * Serializes approximately as many bytes of the response as request by {@code sizeHint} to a {@link ReleasableBytesReference} that
+     * is created from buffers backed by the given {@code recycler}.
+     *
+     * @param sizeHint how many bytes to approximately serialize for the given chunk
+     * @param recycler recycler used to acquire buffers
+     * @return serialized chunk
+     * @throws IOException on serialization failure
+     */
+    ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException;
+
+    /**
+     * @return the response Content-Type header value for this response body
+     */
+    String getResponseContentTypeString();
+
+    /**
+     * Create a chunked response body to be written to a specific {@link RestChannel} from a {@link ChunkedToXContent}.
+     *
+     * @param chunkedToXContent chunked x-content instance to serialize
+     * @param params parameters to use for serialization
+     * @param channel channel the response will be written to
+     * @return chunked rest response body
+     */
+    static ChunkedRestResponseBody fromXContent(ChunkedToXContent chunkedToXContent, ToXContent.Params params, RestChannel channel)
+        throws IOException {
+
+        return new ChunkedRestResponseBody() {
+
+            private final OutputStream out = new OutputStream() {
+                @Override
+                public void write(int b) throws IOException {
+                    target.write(b);
+                }
+
+                @Override
+                public void write(byte[] b, int off, int len) throws IOException {
+                    target.write(b, off, len);
+                }
+            };
+
+            private final XContentBuilder builder = channel.newBuilder(
+                channel.request().getXContentType(),
+                null,
+                true,
+                Streams.noCloseStream(out)
+            );
+
+            private final Iterator<ToXContent> serialization = chunkedToXContent.toXContentChunked();
+
+            private BytesStream target;
+
+            @Override
+            public boolean isDone() {
+                return serialization.hasNext() == false;
+            }
+
+            @Override
+            public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException {
+                final RecyclerBytesStreamOutput chunkStream = new RecyclerBytesStreamOutput(recycler);
+                assert this.target == null;
+                this.target = chunkStream;
+                while (serialization.hasNext()) {
+                    serialization.next().toXContent(builder, params);
+                    if (chunkStream.size() >= sizeHint) {
+                        break;
+                    }
+                }
+                if (serialization.hasNext() == false) {
+                    builder.close();
+                }
+                this.target = null;
+                return new ReleasableBytesReference(chunkStream.bytes(), () -> IOUtils.closeWhileHandlingException(chunkStream));
+            }
+
+            @Override
+            public String getResponseContentTypeString() {
+                return builder.getResponseContentTypeString();
+            }
+        };
+    }
+}

+ 8 - 0
server/src/main/java/org/elasticsearch/rest/RestChannel.java

@@ -14,6 +14,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentType;
 
 import java.io.IOException;
+import java.io.OutputStream;
 
 /**
  * A channel used to construct bytes / builder based outputs, and send responses.
@@ -29,6 +30,13 @@ public interface RestChannel {
     XContentBuilder newBuilder(@Nullable XContentType xContentType, @Nullable XContentType responseContentType, boolean useFiltering)
         throws IOException;
 
+    XContentBuilder newBuilder(
+        @Nullable XContentType xContentType,
+        @Nullable XContentType responseContentType,
+        boolean useFiltering,
+        OutputStream out
+    ) throws IOException;
+
     BytesStream bytesOutput();
 
     RestRequest request();

+ 11 - 0
server/src/main/java/org/elasticsearch/rest/RestController.java

@@ -37,6 +37,7 @@ import org.elasticsearch.xcontent.XContentType;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -709,6 +710,16 @@ public class RestController implements HttpServerTransport.Dispatcher {
             return delegate.newBuilder(xContentType, responseContentType, useFiltering);
         }
 
+        @Override
+        public XContentBuilder newBuilder(
+            XContentType xContentType,
+            XContentType responseContentType,
+            boolean useFiltering,
+            OutputStream out
+        ) throws IOException {
+            return delegate.newBuilder(xContentType, responseContentType, useFiltering, out);
+        }
+
         @Override
         public BytesStream bytesOutput() {
             return delegate.bytesOutput();

+ 33 - 1
server/src/main/java/org/elasticsearch/rest/RestResponse.java

@@ -17,6 +17,7 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
@@ -43,7 +44,12 @@ public class RestResponse {
     private static final Logger SUPPRESSED_ERROR_LOGGER = LogManager.getLogger("rest.suppressed");
 
     private final RestStatus status;
+
+    @Nullable
     private final BytesReference content;
+
+    @Nullable
+    private final ChunkedRestResponseBody chunkedResponseBody;
     private final String responseMediaType;
     private Map<String, List<String>> customHeaders;
 
@@ -68,13 +74,28 @@ public class RestResponse {
         this(status, responseMediaType, new BytesArray(content));
     }
 
+    public RestResponse(RestStatus status, String responseMediaType, BytesReference content) {
+        this(status, responseMediaType, content, null);
+    }
+
+    public RestResponse(RestStatus status, ChunkedRestResponseBody content) {
+        this(status, content.getResponseContentTypeString(), null, content);
+    }
+
     /**
      * Creates a binary response.
      */
-    public RestResponse(RestStatus status, String responseMediaType, BytesReference content) {
+    private RestResponse(
+        RestStatus status,
+        String responseMediaType,
+        @Nullable BytesReference content,
+        @Nullable ChunkedRestResponseBody chunkedResponseBody
+    ) {
         this.status = status;
         this.content = content;
         this.responseMediaType = responseMediaType;
+        this.chunkedResponseBody = chunkedResponseBody;
+        assert (content == null) != (chunkedResponseBody == null);
     }
 
     public RestResponse(RestChannel channel, Exception e) throws IOException {
@@ -106,16 +127,27 @@ public class RestResponse {
         if (e instanceof ElasticsearchException) {
             copyHeaders(((ElasticsearchException) e));
         }
+        this.chunkedResponseBody = null;
     }
 
     public String contentType() {
         return this.responseMediaType;
     }
 
+    @Nullable
     public BytesReference content() {
         return this.content;
     }
 
+    @Nullable
+    public ChunkedRestResponseBody chunkedContent() {
+        return chunkedResponseBody;
+    }
+
+    public boolean isChunked() {
+        return chunkedResponseBody != null;
+    }
+
     public RestStatus status() {
         return this.status;
     }

+ 15 - 11
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestGetSnapshotsAction.java

@@ -9,14 +9,17 @@
 package org.elasticsearch.rest.action.admin.cluster;
 
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestRequest;
-import org.elasticsearch.rest.action.DispatchingRestToXContentListener;
+import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.action.RestActionListener;
 import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.search.sort.SortOrder;
-import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.util.List;
@@ -33,11 +36,7 @@ import static org.elasticsearch.snapshots.SnapshotInfo.INDEX_NAMES_XCONTENT_PARA
  */
 public class RestGetSnapshotsAction extends BaseRestHandler {
 
-    private final ThreadPool threadPool;
-
-    public RestGetSnapshotsAction(ThreadPool threadPool) {
-        this.threadPool = threadPool;
-    }
+    public RestGetSnapshotsAction() {}
 
     @Override
     public List<Route> routes() {
@@ -85,9 +84,14 @@ public class RestGetSnapshotsAction extends BaseRestHandler {
         getSnapshotsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSnapshotsRequest.masterNodeTimeout()));
         return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
             .cluster()
-            .getSnapshots(
-                getSnapshotsRequest,
-                new DispatchingRestToXContentListener<>(threadPool.executor(ThreadPool.Names.MANAGEMENT), channel, request)
-            );
+            .getSnapshots(getSnapshotsRequest, new RestActionListener<>(channel) {
+                @Override
+                protected void processResponse(GetSnapshotsResponse getSnapshotsResponse) throws IOException {
+                    ensureOpen();
+                    channel.sendResponse(
+                        new RestResponse(RestStatus.OK, ChunkedRestResponseBody.fromXContent(getSnapshotsResponse, request, channel))
+                    );
+                }
+            });
     }
 }

+ 20 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.snapshots.Snapshot;
@@ -23,7 +24,9 @@ import org.elasticsearch.snapshots.SnapshotInfoTestUtils;
 import org.elasticsearch.snapshots.SnapshotShardFailure;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.json.JsonXContent;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -33,6 +36,7 @@ import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -41,6 +45,7 @@ import java.util.regex.Pattern;
 
 import static org.elasticsearch.snapshots.SnapshotInfo.INDEX_DETAILS_XCONTENT_PARAM;
 import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
 import static org.hamcrest.CoreMatchers.containsString;
 
 public class GetSnapshotsResponseTests extends ESTestCase {
@@ -173,4 +178,19 @@ public class GetSnapshotsResponseTests extends ESTestCase {
             .test();
     }
 
+    public void testToChunkedXContent() throws Exception {
+        final GetSnapshotsResponse response = createTestInstance();
+        final XContentBuilder builder = JsonXContent.contentBuilder();
+        final Iterator<ToXContent> serialization = response.toXContentChunked();
+        serialization.next().toXContent(builder, EMPTY_PARAMS);
+        for (int i = 0; i < response.getSnapshots().size(); i++) {
+            serialization.next().toXContent(builder, EMPTY_PARAMS);
+            assertTrue(serialization.hasNext());
+        }
+        serialization.next().toXContent(builder, EMPTY_PARAMS);
+        assertFalse(serialization.hasNext());
+        final BytesReference bytesReferenceFromChunked = BytesReference.bytes(builder);
+        assertEquals(bytesReferenceFromChunked, BytesReference.bytes(response.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS)));
+    }
+
 }

+ 53 - 0
server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.http;
 
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.io.stream.BytesStream;
@@ -22,6 +23,7 @@ import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestChannel;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
@@ -414,6 +416,57 @@ public class DefaultRestChannelTests extends ESTestCase {
         verify(tracer).stopTrace(argThat(id -> id.startsWith("rest-")));
     }
 
+    public void testHandleHeadRequest() {
+        HttpRequest httpRequest = new TestHttpRequest(HttpRequest.HttpVersion.HTTP_1_1, RestRequest.Method.HEAD, "/");
+        final RestRequest request = RestRequest.request(parserConfig(), httpRequest, httpChannel);
+        DefaultRestChannel channel = new DefaultRestChannel(
+            httpChannel,
+            request.getHttpRequest(),
+            request,
+            bigArrays,
+            HttpHandlingSettings.fromSettings(Settings.EMPTY),
+            threadPool.getThreadContext(),
+            CorsHandler.fromSettings(Settings.EMPTY),
+            httpTracer,
+            tracer
+        );
+        ArgumentCaptor<HttpResponse> requestCaptor = ArgumentCaptor.forClass(HttpResponse.class);
+        {
+            // non-chunked response
+            channel.sendResponse(
+                new RestResponse(RestStatus.OK, RestResponse.TEXT_CONTENT_TYPE, new BytesArray(randomByteArrayOfLength(5)))
+            );
+            verify(httpChannel).sendResponse(requestCaptor.capture(), any());
+            HttpResponse response = requestCaptor.getValue();
+            assertThat(response, instanceOf(TestHttpResponse.class));
+            assertThat(((TestHttpResponse) response).content().length(), equalTo(0));
+        }
+        {
+            // chunked response
+            channel.sendResponse(new RestResponse(RestStatus.OK, new ChunkedRestResponseBody() {
+
+                @Override
+                public boolean isDone() {
+                    throw new AssertionError("should not try to serialize response body for HEAD request");
+                }
+
+                @Override
+                public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) {
+                    throw new AssertionError("should not try to serialize response body for HEAD request");
+                }
+
+                @Override
+                public String getResponseContentTypeString() {
+                    return RestResponse.TEXT_CONTENT_TYPE;
+                }
+            }));
+            verify(httpChannel, times(2)).sendResponse(requestCaptor.capture(), any());
+            HttpResponse response = requestCaptor.getValue();
+            assertThat(response, instanceOf(TestHttpResponse.class));
+            assertThat(((TestHttpResponse) response).content().length(), equalTo(0));
+        }
+    }
+
     private TestHttpResponse executeRequest(final Settings settings, final String host) {
         return executeRequest(settings, null, host);
     }

+ 6 - 0
server/src/test/java/org/elasticsearch/http/TestHttpRequest.java

@@ -10,6 +10,7 @@ package org.elasticsearch.http;
 
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestStatus;
 
@@ -76,6 +77,11 @@ class TestHttpRequest implements HttpRequest {
         return new TestHttpResponse(status, content);
     }
 
+    @Override
+    public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) {
+        throw new UnsupportedOperationException("chunked responses not supported");
+    }
+
     @Override
     public void release() {}
 

+ 65 - 0
server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyTests.java

@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.rest;
+
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.CompositeBytesReference;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.xcontent.ChunkedToXContent;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.rest.FakeRestChannel;
+import org.elasticsearch.test.rest.FakeRestRequest;
+import org.elasticsearch.transport.BytesRefRecycler;
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.XContent;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ChunkedRestResponseBodyTests extends ESTestCase {
+
+    public void testEncodesChunkedXContentCorrectly() throws IOException {
+        final ChunkedToXContent chunkedToXContent = () -> Iterators.forArray(new ToXContent[] { (b, p) -> b.startObject(), (b, p) -> {
+            if (randomBoolean()) {
+                b.flush();
+            }
+            b.mapContents(Map.of("foo", "bar", "some_other_key", "some_other_value"));
+            return b;
+        }, (b, p) -> b.stringListField("list_field", List.of("string", "otherString")).endObject() });
+        final XContent randomXContent = randomFrom(XContentType.values()).xContent();
+        final XContentBuilder builderDirect = XContentBuilder.builder(randomXContent);
+        var iter = chunkedToXContent.toXContentChunked();
+        while (iter.hasNext()) {
+            iter.next().toXContent(builderDirect, ToXContent.EMPTY_PARAMS);
+        }
+        final var bytesDirect = BytesReference.bytes(builderDirect);
+
+        final var chunkedResponse = ChunkedRestResponseBody.fromXContent(
+            chunkedToXContent,
+            ToXContent.EMPTY_PARAMS,
+            new FakeRestChannel(
+                new FakeRestRequest.Builder(xContentRegistry()).withContent(BytesArray.EMPTY, randomXContent.type()).build(),
+                randomBoolean(),
+                1
+            )
+        );
+
+        final List<BytesReference> refsGenerated = new ArrayList<>();
+        while (chunkedResponse.isDone() == false) {
+            refsGenerated.add(chunkedResponse.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE));
+        }
+
+        assertEquals(bytesDirect, CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0])));
+    }
+}

+ 5 - 0
server/src/test/java/org/elasticsearch/rest/RestControllerTests.java

@@ -697,6 +697,11 @@ public class RestControllerTests extends ESTestCase {
                 return null;
             }
 
+            @Override
+            public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) {
+                throw new AssertionError("should not be called");
+            }
+
             @Override
             public void release() {}
 

+ 6 - 0
test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.http.HttpChannel;
 import org.elasticsearch.http.HttpRequest;
 import org.elasticsearch.http.HttpResponse;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -127,6 +128,11 @@ public class FakeRestRequest extends RestRequest {
             };
         }
 
+        @Override
+        public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) {
+            return createResponse(status, BytesArray.EMPTY);
+        }
+
         @Override
         public void release() {}