Browse Source

Split up large HTTP responses in outbound pipeline (#62666)

Currently Netty will batch compression an entire HTTP response
regardless of its content size. It allocates a byte array at least of
the same size as the uncompressed content. This causes issues with our
attempts to remove humungous G1GC allocations. This commit resolves the
issue by split responses into 128KB chunks.

This has the side-effect of making large outbound HTTP responses that
are compressed be send as chunked transfer-encoding.
Tim Brooks 5 years ago
parent
commit
19c19f28cb

+ 67 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponseCreator.java

@@ -0,0 +1,67 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.http.netty4;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.codec.compression.JdkZlibEncoder;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpResponse;
+import org.elasticsearch.common.Booleans;
+import org.elasticsearch.transport.NettyAllocator;
+
+import java.util.List;
+
+/**
+ * Split up large responses to prevent batch compression {@link JdkZlibEncoder} down the pipeline.
+ */
+@ChannelHandler.Sharable
+class Netty4HttpResponseCreator extends MessageToMessageEncoder<Netty4HttpResponse> {
+
+    private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses";
+
+    private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES;
+    private static final int SPLIT_THRESHOLD;
+
+    static {
+        DO_NOT_SPLIT_HTTP_RESPONSES = Booleans.parseBoolean(System.getProperty(DO_NOT_SPLIT), false);
+        // Netty will add some header bytes if it compresses this message. So we downsize slightly.
+        SPLIT_THRESHOLD = (int) (NettyAllocator.suggestedMaxAllocationSize() * 0.99);
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, Netty4HttpResponse msg, List<Object> out) {
+        if (DO_NOT_SPLIT_HTTP_RESPONSES || msg.content().readableBytes() <= SPLIT_THRESHOLD) {
+            out.add(msg.retain());
+        } else {
+            HttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers());
+            out.add(response);
+            ByteBuf content = msg.content();
+            while (content.readableBytes() > SPLIT_THRESHOLD) {
+                out.add(new DefaultHttpContent(content.readRetainedSlice(SPLIT_THRESHOLD)));
+            }
+            out.add(new DefaultLastHttpContent(content.readRetainedSlice(content.readableBytes())));
+        }
+    }
+}

+ 3 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

@@ -285,6 +285,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
         private final Netty4HttpServerTransport transport;
         private final Netty4HttpServerTransport transport;
         private final Netty4HttpRequestCreator requestCreator;
         private final Netty4HttpRequestCreator requestCreator;
         private final Netty4HttpRequestHandler requestHandler;
         private final Netty4HttpRequestHandler requestHandler;
+        private final Netty4HttpResponseCreator responseCreator;
         private final HttpHandlingSettings handlingSettings;
         private final HttpHandlingSettings handlingSettings;
 
 
         protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
         protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
@@ -292,6 +293,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
             this.handlingSettings = handlingSettings;
             this.handlingSettings = handlingSettings;
             this.requestCreator =  new Netty4HttpRequestCreator();
             this.requestCreator =  new Netty4HttpRequestCreator();
             this.requestHandler = new Netty4HttpRequestHandler(transport);
             this.requestHandler = new Netty4HttpRequestHandler(transport);
+            this.responseCreator = new Netty4HttpResponseCreator();
         }
         }
 
 
         @Override
         @Override
@@ -314,6 +316,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
                 ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
                 ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
             }
             }
             ch.pipeline().addLast("request_creator", requestCreator);
             ch.pipeline().addLast("request_creator", requestCreator);
+            ch.pipeline().addLast("response_creator", responseCreator);
             ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
             ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
             ch.pipeline().addLast("handler", requestHandler);
             ch.pipeline().addLast("handler", requestHandler);
             transport.serverAcceptedChannel(nettyHttpChannel);
             transport.serverAcceptedChannel(nettyHttpChannel);

+ 10 - 2
modules/transport-netty4/src/main/java/org/elasticsearch/transport/CopyBytesSocketChannel.java

@@ -162,9 +162,17 @@ public class CopyBytesSocketChannel extends Netty4NioSocketChannel {
     private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) {
     private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) {
         for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) {
         for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) {
             ByteBuffer buffer = source[i];
             ByteBuffer buffer = source[i];
-            assert buffer.hasArray() : "Buffer must have heap array";
             int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining());
             int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining());
-            destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy);
+            if (buffer.hasArray()) {
+                destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy);
+            } else {
+                int initialLimit = buffer.limit();
+                int initialPosition = buffer.position();
+                buffer.limit(buffer.position() + nBytesToCopy);
+                destination.put(buffer);
+                buffer.position(initialPosition);
+                buffer.limit(initialLimit);
+            }
         }
         }
     }
     }
 
 

+ 36 - 4
modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java

@@ -40,6 +40,7 @@ public class NettyAllocator {
     private static final Logger logger = LogManager.getLogger(NettyAllocator.class);
     private static final Logger logger = LogManager.getLogger(NettyAllocator.class);
     private static final AtomicBoolean descriptionLogged = new AtomicBoolean(false);
     private static final AtomicBoolean descriptionLogged = new AtomicBoolean(false);
 
 
+    private static final long SUGGESTED_MAX_ALLOCATION_SIZE;
     private static final ByteBufAllocator ALLOCATOR;
     private static final ByteBufAllocator ALLOCATOR;
     private static final String DESCRIPTION;
     private static final String DESCRIPTION;
 
 
@@ -50,7 +51,9 @@ public class NettyAllocator {
     static {
     static {
         if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) {
         if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) {
             ALLOCATOR = ByteBufAllocator.DEFAULT;
             ALLOCATOR = ByteBufAllocator.DEFAULT;
-            DESCRIPTION = "[name=netty_default, factors={es.unsafe.use_netty_default_allocator=true}]";
+            SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024;
+            DESCRIPTION = "[name=netty_default, suggested_max_allocation_size=" + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE)
+                + ", factors={es.unsafe.use_netty_default_allocator=true}]";
         } else {
         } else {
             final long heapSizeInBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
             final long heapSizeInBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
             final boolean g1gcEnabled = Boolean.parseBoolean(JvmInfo.jvmInfo().useG1GC());
             final boolean g1gcEnabled = Boolean.parseBoolean(JvmInfo.jvmInfo().useG1GC());
@@ -62,7 +65,15 @@ public class NettyAllocator {
             ByteBufAllocator delegate;
             ByteBufAllocator delegate;
             if (useUnpooled(heapSizeInBytes, g1gcEnabled, g1gcRegionSizeIsKnown, g1gcRegionSizeInBytes)) {
             if (useUnpooled(heapSizeInBytes, g1gcEnabled, g1gcRegionSizeIsKnown, g1gcRegionSizeInBytes)) {
                 delegate = UnpooledByteBufAllocator.DEFAULT;
                 delegate = UnpooledByteBufAllocator.DEFAULT;
-                DESCRIPTION = "[name=unpooled, factors={es.unsafe.use_unpooled_allocator=" + userForcedUnpooled()
+                if (g1gcEnabled && g1gcRegionSizeIsKnown) {
+                    // Suggested max allocation size 1/4 of region size. Guard against unknown edge cases
+                    // where this value would be less than 256KB.
+                    SUGGESTED_MAX_ALLOCATION_SIZE = Math.max(g1gcRegionSizeInBytes >> 2, 256 * 1024);
+                } else {
+                    SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024;
+                }
+                DESCRIPTION = "[name=unpooled, suggested_max_allocation_size=" + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE)
+                    + ", factors={es.unsafe.use_unpooled_allocator=" + System.getProperty(USE_UNPOOLED)
                     + ", g1gc_enabled=" + g1gcEnabled
                     + ", g1gc_enabled=" + g1gcEnabled
                     + ", g1gc_region_size=" + g1gcRegionSize
                     + ", g1gc_region_size=" + g1gcRegionSize
                     + ", heap_size=" + heapSize + "}]";
                     + ", heap_size=" + heapSize + "}]";
@@ -92,8 +103,11 @@ public class NettyAllocator {
                 boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads();
                 boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads();
                 delegate = new PooledByteBufAllocator(false, nHeapArena, 0, pageSize, maxOrder, tinyCacheSize,
                 delegate = new PooledByteBufAllocator(false, nHeapArena, 0, pageSize, maxOrder, tinyCacheSize,
                     smallCacheSize, normalCacheSize, useCacheForAllThreads);
                     smallCacheSize, normalCacheSize, useCacheForAllThreads);
-                ByteSizeValue chunkSize = new ByteSizeValue(pageSize << maxOrder);
+                int chunkSizeInBytes = pageSize << maxOrder;
+                ByteSizeValue chunkSize = new ByteSizeValue(chunkSizeInBytes);
+                SUGGESTED_MAX_ALLOCATION_SIZE = chunkSizeInBytes;
                 DESCRIPTION = "[name=elasticsearch_configured, chunk_size=" + chunkSize
                 DESCRIPTION = "[name=elasticsearch_configured, chunk_size=" + chunkSize
+                    + ", suggested_max_allocation_size=" + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE)
                     + ", factors={es.unsafe.use_netty_default_chunk_and_page_size=" + useDefaultChunkAndPageSize()
                     + ", factors={es.unsafe.use_netty_default_chunk_and_page_size=" + useDefaultChunkAndPageSize()
                     + ", g1gc_enabled=" + g1gcEnabled
                     + ", g1gc_enabled=" + g1gcEnabled
                     + ", g1gc_region_size=" + g1gcRegionSize + "}]";
                     + ", g1gc_region_size=" + g1gcRegionSize + "}]";
@@ -112,6 +126,10 @@ public class NettyAllocator {
         return ALLOCATOR;
         return ALLOCATOR;
     }
     }
 
 
+    public static long suggestedMaxAllocationSize() {
+        return SUGGESTED_MAX_ALLOCATION_SIZE;
+    }
+
     public static String getAllocatorDescription() {
     public static String getAllocatorDescription() {
         return DESCRIPTION;
         return DESCRIPTION;
     }
     }
@@ -135,6 +153,8 @@ public class NettyAllocator {
     private static boolean useUnpooled(long heapSizeInBytes, boolean g1gcEnabled, boolean g1gcRegionSizeIsKnown, long g1RegionSize) {
     private static boolean useUnpooled(long heapSizeInBytes, boolean g1gcEnabled, boolean g1gcRegionSizeIsKnown, long g1RegionSize) {
         if (userForcedUnpooled()) {
         if (userForcedUnpooled()) {
             return true;
             return true;
+        } else if (userForcedPooled()) {
+            return true;
         } else if (heapSizeInBytes <= 1 << 30) {
         } else if (heapSizeInBytes <= 1 << 30) {
             // If the heap is 1GB or less we use unpooled
             // If the heap is 1GB or less we use unpooled
             return true;
             return true;
@@ -155,6 +175,14 @@ public class NettyAllocator {
         }
         }
     }
     }
 
 
+    private static boolean userForcedPooled() {
+        if (System.getProperty(USE_UNPOOLED) != null) {
+            return Booleans.parseBoolean(System.getProperty(USE_UNPOOLED)) == false;
+        } else {
+            return false;
+        }
+    }
+
     private static boolean useDefaultChunkAndPageSize() {
     private static boolean useDefaultChunkAndPageSize() {
         if (System.getProperty(USE_NETTY_DEFAULT_CHUNK) != null) {
         if (System.getProperty(USE_NETTY_DEFAULT_CHUNK) != null) {
             return Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT_CHUNK));
             return Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT_CHUNK));
@@ -163,7 +191,7 @@ public class NettyAllocator {
         }
         }
     }
     }
 
 
-    private static class NoDirectBuffers implements ByteBufAllocator {
+    public static class NoDirectBuffers implements ByteBufAllocator {
 
 
         private final ByteBufAllocator delegate;
         private final ByteBufAllocator delegate;
 
 
@@ -271,5 +299,9 @@ public class NettyAllocator {
         public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
         public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
             return delegate.calculateNewCapacity(minNewCapacity, maxCapacity);
             return delegate.calculateNewCapacity(minNewCapacity, maxCapacity);
         }
         }
+
+        public ByteBufAllocator getDelegate() {
+            return delegate;
+        }
     }
     }
 }
 }

+ 17 - 12
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java

@@ -32,6 +32,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContentDecompressor;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpObject;
 import io.netty.handler.codec.http.HttpObject;
@@ -91,8 +92,8 @@ class Netty4HttpClient implements Closeable {
             .group(new NioEventLoopGroup(1));
             .group(new NioEventLoopGroup(1));
     }
     }
 
 
-    public Collection<FullHttpResponse> get(SocketAddress remoteAddress, String... uris) throws InterruptedException {
-        Collection<HttpRequest> requests = new ArrayList<>(uris.length);
+    public List<FullHttpResponse> get(SocketAddress remoteAddress, String... uris) throws InterruptedException {
+        List<HttpRequest> requests = new ArrayList<>(uris.length);
         for (int i = 0; i < uris.length; i++) {
         for (int i = 0; i < uris.length; i++) {
             final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
             final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
             httpRequest.headers().add(HOST, "localhost");
             httpRequest.headers().add(HOST, "localhost");
@@ -107,10 +108,10 @@ class Netty4HttpClient implements Closeable {
         return processRequestsWithBody(HttpMethod.POST, remoteAddress, urisAndBodies);
         return processRequestsWithBody(HttpMethod.POST, remoteAddress, urisAndBodies);
     }
     }
 
 
-    public final FullHttpResponse post(SocketAddress remoteAddress, FullHttpRequest httpRequest) throws InterruptedException {
-        Collection<FullHttpResponse> responses = sendRequests(remoteAddress, Collections.singleton(httpRequest));
+    public final FullHttpResponse send(SocketAddress remoteAddress, FullHttpRequest httpRequest) throws InterruptedException {
+        List<FullHttpResponse> responses = sendRequests(remoteAddress, Collections.singleton(httpRequest));
         assert responses.size() == 1 : "expected 1 and only 1 http response";
         assert responses.size() == 1 : "expected 1 and only 1 http response";
-        return responses.iterator().next();
+        return responses.get(0);
     }
     }
 
 
     public final Collection<FullHttpResponse> put(SocketAddress remoteAddress, List<Tuple<String, CharSequence>> urisAndBodies)
     public final Collection<FullHttpResponse> put(SocketAddress remoteAddress, List<Tuple<String, CharSequence>> urisAndBodies)
@@ -118,9 +119,9 @@ class Netty4HttpClient implements Closeable {
         return processRequestsWithBody(HttpMethod.PUT, remoteAddress, urisAndBodies);
         return processRequestsWithBody(HttpMethod.PUT, remoteAddress, urisAndBodies);
     }
     }
 
 
-    private Collection<FullHttpResponse> processRequestsWithBody(HttpMethod method, SocketAddress remoteAddress, List<Tuple<String,
+    private List<FullHttpResponse> processRequestsWithBody(HttpMethod method, SocketAddress remoteAddress, List<Tuple<String,
         CharSequence>> urisAndBodies) throws InterruptedException {
         CharSequence>> urisAndBodies) throws InterruptedException {
-        Collection<HttpRequest> requests = new ArrayList<>(urisAndBodies.size());
+        List<HttpRequest> requests = new ArrayList<>(urisAndBodies.size());
         for (Tuple<String, CharSequence> uriAndBody : urisAndBodies) {
         for (Tuple<String, CharSequence> uriAndBody : urisAndBodies) {
             ByteBuf content = Unpooled.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8);
             ByteBuf content = Unpooled.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8);
             HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uriAndBody.v1(), content);
             HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uriAndBody.v1(), content);
@@ -132,11 +133,11 @@ class Netty4HttpClient implements Closeable {
         return sendRequests(remoteAddress, requests);
         return sendRequests(remoteAddress, requests);
     }
     }
 
 
-    private synchronized Collection<FullHttpResponse> sendRequests(
+    private synchronized List<FullHttpResponse> sendRequests(
         final SocketAddress remoteAddress,
         final SocketAddress remoteAddress,
         final Collection<HttpRequest> requests) throws InterruptedException {
         final Collection<HttpRequest> requests) throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(requests.size());
         final CountDownLatch latch = new CountDownLatch(requests.size());
-        final Collection<FullHttpResponse> content = Collections.synchronizedList(new ArrayList<>(requests.size()));
+        final List<FullHttpResponse> content = Collections.synchronizedList(new ArrayList<>(requests.size()));
 
 
         clientBootstrap.handler(new CountDownLatchHandler(latch, content));
         clientBootstrap.handler(new CountDownLatchHandler(latch, content));
 
 
@@ -180,16 +181,20 @@ class Netty4HttpClient implements Closeable {
         }
         }
 
 
         @Override
         @Override
-        protected void initChannel(SocketChannel ch) throws Exception {
+        protected void initChannel(SocketChannel ch) {
             final int maxContentLength = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt();
             final int maxContentLength = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt();
             ch.pipeline().addLast(new HttpResponseDecoder());
             ch.pipeline().addLast(new HttpResponseDecoder());
             ch.pipeline().addLast(new HttpRequestEncoder());
             ch.pipeline().addLast(new HttpRequestEncoder());
+            ch.pipeline().addLast(new HttpContentDecompressor());
             ch.pipeline().addLast(new HttpObjectAggregator(maxContentLength));
             ch.pipeline().addLast(new HttpObjectAggregator(maxContentLength));
             ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {
             ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {
                 @Override
                 @Override
-                protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
+                protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
                     final FullHttpResponse response = (FullHttpResponse) msg;
                     final FullHttpResponse response = (FullHttpResponse) msg;
-                    content.add(response.copy());
+                    // We copy the buffer manually to avoid a huge allocation on a pooled allocator. We have
+                    // a test that tracks huge allocations, so we want to avoid them in this test code.
+                    ByteBuf newContent = Unpooled.copiedBuffer(((FullHttpResponse) msg).content());
+                    content.add(response.replace(newContent));
                     latch.countDown();
                     latch.countDown();
                 }
                 }
 
 

+ 69 - 5
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java

@@ -20,7 +20,11 @@
 package org.elasticsearch.http.netty4;
 package org.elasticsearch.http.netty4;
 
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PoolArenaMetric;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerAdapter;
 import io.netty.channel.ChannelHandlerAdapter;
@@ -178,13 +182,13 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
                 request.headers().set(HttpHeaderNames.EXPECT, expectation);
                 request.headers().set(HttpHeaderNames.EXPECT, expectation);
                 HttpUtil.setContentLength(request, contentLength);
                 HttpUtil.setContentLength(request, contentLength);
 
 
-                final FullHttpResponse response = client.post(remoteAddress.address(), request);
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
                 try {
                 try {
                     assertThat(response.status(), equalTo(expectedStatus));
                     assertThat(response.status(), equalTo(expectedStatus));
                     if (expectedStatus.equals(HttpResponseStatus.CONTINUE)) {
                     if (expectedStatus.equals(HttpResponseStatus.CONTINUE)) {
                         final FullHttpRequest continuationRequest =
                         final FullHttpRequest continuationRequest =
                             new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", Unpooled.EMPTY_BUFFER);
                             new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", Unpooled.EMPTY_BUFFER);
-                        final FullHttpResponse continuationResponse = client.post(remoteAddress.address(), continuationRequest);
+                        final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), continuationRequest);
                         try {
                         try {
                             assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
                             assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
                             assertThat(
                             assertThat(
@@ -266,7 +270,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
                 final String url = "/" + new String(new byte[maxInitialLineLength], Charset.forName("UTF-8"));
                 final String url = "/" + new String(new byte[maxInitialLineLength], Charset.forName("UTF-8"));
                 final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
                 final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
 
 
-                final FullHttpResponse response = client.post(remoteAddress.address(), request);
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
                 try {
                 try {
                     assertThat(response.status(), equalTo(HttpResponseStatus.BAD_REQUEST));
                     assertThat(response.status(), equalTo(HttpResponseStatus.BAD_REQUEST));
                     assertThat(
                     assertThat(
@@ -282,6 +286,66 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
         assertThat(causeReference.get(), instanceOf(TooLongFrameException.class));
         assertThat(causeReference.get(), instanceOf(TooLongFrameException.class));
     }
     }
 
 
+    public void testLargeCompressedResponse() throws InterruptedException {
+        final String responseString = randomAlphaOfLength(4 * 1024 * 1024);
+        final String url = "/thing";
+        final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+
+            @Override
+            public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+                if (url.equals(request.uri())) {
+                    channel.sendResponse(new BytesRestResponse(OK, responseString));
+                } else {
+                    logger.error("--> Unexpected successful uri [{}]", request.uri());
+                    throw new AssertionError();
+                }
+            }
+
+            @Override
+            public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+                logger.error(new ParameterizedMessage("--> Unexpected bad request [{}]",
+                    FakeRestRequest.requestToString(channel.request())), cause);
+                throw new AssertionError();
+            }
+
+        };
+
+        try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
+            Settings.EMPTY, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings,
+            new SharedGroupFactory(Settings.EMPTY))) {
+            transport.start();
+            final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+            try (Netty4HttpClient client = new Netty4HttpClient()) {
+                DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
+                request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip"));
+                long numOfHugeAllocations = getHugeAllocationCount();
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
+                try {
+                    assertThat(getHugeAllocationCount(), equalTo(numOfHugeAllocations));
+                    assertThat(response.status(), equalTo(HttpResponseStatus.OK));
+                    byte[] bytes = new byte[response.content().readableBytes()];
+                    response.content().readBytes(bytes);
+                    assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString));
+                } finally {
+                    response.release();
+                }
+            }
+        }
+    }
+
+    private long getHugeAllocationCount() {
+        long numOfHugAllocations = 0;
+        ByteBufAllocator allocator = NettyAllocator.getAllocator();
+        assert allocator instanceof NettyAllocator.NoDirectBuffers;
+        ByteBufAllocator delegate = ((NettyAllocator.NoDirectBuffers) allocator).getDelegate();
+        if (delegate instanceof PooledByteBufAllocator) {
+            PooledByteBufAllocatorMetric metric = ((PooledByteBufAllocator) delegate).metric();
+            numOfHugAllocations = metric.heapArenas().stream().mapToLong(PoolArenaMetric::numHugeAllocations).sum();
+        }
+        return numOfHugAllocations;
+    }
+
     public void testCorsRequest() throws InterruptedException {
     public void testCorsRequest() throws InterruptedException {
         final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
         final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
 
 
@@ -318,7 +382,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
                 request.headers().add(CorsHandler.ORIGIN, "elastic.co");
                 request.headers().add(CorsHandler.ORIGIN, "elastic.co");
                 request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST");
                 request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST");
 
 
-                final FullHttpResponse response = client.post(remoteAddress.address(), request);
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
                 try {
                 try {
                     assertThat(response.status(), equalTo(HttpResponseStatus.OK));
                     assertThat(response.status(), equalTo(HttpResponseStatus.OK));
                     assertThat(response.headers().get(CorsHandler.ACCESS_CONTROL_ALLOW_ORIGIN), equalTo("elastic.co"));
                     assertThat(response.headers().get(CorsHandler.ACCESS_CONTROL_ALLOW_ORIGIN), equalTo("elastic.co"));
@@ -334,7 +398,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
                 final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
                 final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
                 request.headers().add(CorsHandler.ORIGIN, "elastic2.co");
                 request.headers().add(CorsHandler.ORIGIN, "elastic2.co");
 
 
-                final FullHttpResponse response = client.post(remoteAddress.address(), request);
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
                 try {
                 try {
                     assertThat(response.status(), equalTo(HttpResponseStatus.FORBIDDEN));
                     assertThat(response.status(), equalTo(HttpResponseStatus.FORBIDDEN));
                 } finally {
                 } finally {

+ 1 - 0
plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java

@@ -77,6 +77,7 @@ public class HttpReadWriteHandler implements NioChannelHandler {
             handlers.add(new HttpContentCompressor(settings.getCompressionLevel()));
             handlers.add(new HttpContentCompressor(settings.getCompressionLevel()));
         }
         }
         handlers.add(new NioHttpRequestCreator());
         handlers.add(new NioHttpRequestCreator());
+        handlers.add(new NioHttpResponseCreator());
         handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));
         handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));
 
 
         adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
         adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));

+ 2 - 0
plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequestCreator.java

@@ -19,6 +19,7 @@
 
 
 package org.elasticsearch.http.nio;
 package org.elasticsearch.http.nio;
 
 
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpRequest;
@@ -26,6 +27,7 @@ import org.elasticsearch.ExceptionsHelper;
 
 
 import java.util.List;
 import java.util.List;
 
 
+@ChannelHandler.Sharable
 class NioHttpRequestCreator extends MessageToMessageDecoder<FullHttpRequest> {
 class NioHttpRequestCreator extends MessageToMessageDecoder<FullHttpRequest> {
 
 
     @Override
     @Override

+ 70 - 0
plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpResponseCreator.java

@@ -0,0 +1,70 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.http.nio;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpResponse;
+import org.elasticsearch.common.Booleans;
+import org.elasticsearch.monitor.jvm.JvmInfo;
+
+import java.util.List;
+
+/**
+ * Split up large responses to prevent batch compression or other CPU intensive operations down the pipeline.
+ */
+@ChannelHandler.Sharable
+public class NioHttpResponseCreator extends MessageToMessageEncoder<NioHttpResponse> {
+
+    private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses";
+
+    private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES;
+    private static final int SPLIT_THRESHOLD;
+
+    static {
+        DO_NOT_SPLIT_HTTP_RESPONSES = Booleans.parseBoolean(System.getProperty(DO_NOT_SPLIT), false);
+        // Netty will add some header bytes if it compresses this message. So we downsize slightly.
+        SPLIT_THRESHOLD = (int) (suggestedMaxAllocationSize() * 0.99);
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, NioHttpResponse msg, List<Object> out) {
+        if (DO_NOT_SPLIT_HTTP_RESPONSES || msg.content().readableBytes() <= SPLIT_THRESHOLD) {
+            out.add(msg.retain());
+        } else {
+            HttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers());
+            out.add(response);
+            ByteBuf content = msg.content();
+            while (content.readableBytes() > SPLIT_THRESHOLD) {
+                out.add(new DefaultHttpContent(content.readRetainedSlice(SPLIT_THRESHOLD)));
+            }
+            out.add(new DefaultLastHttpContent(content.readRetainedSlice(content.readableBytes())));
+        }
+    }
+
+    private static long suggestedMaxAllocationSize() {
+        return Math.max(Math.max(JvmInfo.jvmInfo().getG1RegionSize(), 0) >> 2, 256 * 1024);
+    }
+}

+ 2 - 0
plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpClient.java

@@ -25,6 +25,7 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContentDecompressor;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpRequest;
@@ -226,6 +227,7 @@ class NioHttpClient implements Closeable {
             List<ChannelHandler> handlers = new ArrayList<>(5);
             List<ChannelHandler> handlers = new ArrayList<>(5);
             handlers.add(new HttpResponseDecoder());
             handlers.add(new HttpResponseDecoder());
             handlers.add(new HttpRequestEncoder());
             handlers.add(new HttpRequestEncoder());
+            handlers.add(new HttpContentDecompressor());
             handlers.add(new HttpObjectAggregator(maxContentLength));
             handlers.add(new HttpObjectAggregator(maxContentLength));
 
 
             adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
             adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));

+ 46 - 0
plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpServerTransportTests.java

@@ -279,6 +279,52 @@ public class NioHttpServerTransportTests extends ESTestCase {
         }
         }
     }
     }
 
 
+    public void testLargeCompressedResponse() throws InterruptedException {
+        final String responseString = randomAlphaOfLength(4 * 1024 * 1024);
+        final String url = "/thing";
+        final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+
+            @Override
+            public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+                if (url.equals(request.uri())) {
+                    channel.sendResponse(new BytesRestResponse(OK, responseString));
+                } else {
+                    logger.error("--> Unexpected successful uri [{}]", request.uri());
+                    throw new AssertionError();
+                }
+            }
+
+            @Override
+            public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+                logger.error(new ParameterizedMessage("--> Unexpected bad request [{}]",
+                    FakeRestRequest.requestToString(channel.request())), cause);
+                throw new AssertionError();
+            }
+
+        };
+
+        try (NioHttpServerTransport transport = new NioHttpServerTransport(
+            Settings.EMPTY, networkService, bigArrays, pageRecycler, threadPool, xContentRegistry(), dispatcher,
+            new NioGroupFactory(Settings.EMPTY, logger), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
+            transport.start();
+            final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+            try (NioHttpClient client = new NioHttpClient()) {
+                DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
+                request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip"));
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
+                try {
+                    assertThat(response.status(), equalTo(HttpResponseStatus.OK));
+                    byte[] bytes = new byte[response.content().readableBytes()];
+                    response.content().readBytes(bytes);
+                    assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString));
+                } finally {
+                    response.release();
+                }
+            }
+        }
+    }
+
     public void testBadRequest() throws InterruptedException {
     public void testBadRequest() throws InterruptedException {
         final AtomicReference<Throwable> causeReference = new AtomicReference<>();
         final AtomicReference<Throwable> causeReference = new AtomicReference<>();
         final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
         final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {