瀏覽代碼

Transfer network bytes to smaller buffer (#62673)

Currently we read in 64KB blocks from the network. When TLS is not
enabled, these bytes are normally passed all the way to the application
layer (some exceptions: compression). For the HTTP layer this means that
these bytes can live throughout the entire lifecycle of an indexing
request.

The problem is that if the reads from the socket are small, this means
that 64KB buffers can be consumed by 1KB or smaller reads. If the socket
buffer or TCP buffer sizes are small, the leads to massive memory
waste. It has been identified as a major source of OOMs on coordinating
nodes as Elasticsearch easily exhausts the heap for these network bytes.

This commit resolves the problem by placing a handler after the TLS
handler to copy these bytes to a more appropriate buffer size as
necessary. This comes after TLS, because TLS is a framing layer which
often resolves this problem for us (the 64KB buffer will be decoded
into a more appropriate buffer size). However, this extra handler will
solve it for the non-TLS pipelines.
Tim Brooks 5 年之前
父節點
當前提交
1547bd672d

+ 4 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

@@ -61,6 +61,7 @@ import org.elasticsearch.http.HttpReadTimeoutException;
 import org.elasticsearch.http.HttpServerChannel;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.NettyAllocator;
+import org.elasticsearch.transport.NettyByteBufSizer;
 import org.elasticsearch.transport.SharedGroupFactory;
 import org.elasticsearch.transport.netty4.Netty4Utils;
 
@@ -283,6 +284,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
     protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
 
         private final Netty4HttpServerTransport transport;
+        private final NettyByteBufSizer byteBufSizer;
         private final Netty4HttpRequestCreator requestCreator;
         private final Netty4HttpRequestHandler requestHandler;
         private final Netty4HttpResponseCreator responseCreator;
@@ -291,6 +293,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
         protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
             this.transport = transport;
             this.handlingSettings = handlingSettings;
+            this.byteBufSizer =  new NettyByteBufSizer();
             this.requestCreator =  new Netty4HttpRequestCreator();
             this.requestHandler = new Netty4HttpRequestHandler(transport);
             this.responseCreator = new Netty4HttpResponseCreator();
@@ -300,6 +303,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
         protected void initChannel(Channel ch) throws Exception {
             Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
             ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
+            ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
             ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
             final HttpRequestDecoder decoder = new HttpRequestDecoder(
                 handlingSettings.getMaxInitialLineLength(),

+ 43 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyByteBufSizer.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.transport;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import java.util.List;
+
+@ChannelHandler.Sharable
+public class NettyByteBufSizer extends MessageToMessageDecoder<ByteBuf> {
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
+        int readableBytes = buf.readableBytes();
+        if (buf.capacity() >= 1024) {
+            ByteBuf resized = buf.discardReadBytes().capacity(readableBytes);
+            assert resized.readableBytes() == readableBytes;
+            out.add(resized.retain());
+        } else {
+            out.add(buf.retain());
+        }
+    }
+}

+ 3 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

@@ -54,6 +54,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Netty4NioSocketChannel;
 import org.elasticsearch.transport.NettyAllocator;
+import org.elasticsearch.transport.NettyByteBufSizer;
 import org.elasticsearch.transport.SharedGroupFactory;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.TransportSettings;
@@ -326,6 +327,7 @@ public class Netty4Transport extends TcpTransport {
     protected class ServerChannelInitializer extends ChannelInitializer<Channel> {
 
         protected final String name;
+        private final NettyByteBufSizer sizer = new NettyByteBufSizer();
 
         protected ServerChannelInitializer(String name) {
             this.name = name;
@@ -338,6 +340,7 @@ public class Netty4Transport extends TcpTransport {
             NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
             Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
             ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
+            ch.pipeline().addLast("byte_buf_sizer", sizer);
             ch.pipeline().addLast("logging", new ESLoggingHandler());
             ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
             serverAcceptedChannel(nettyTcpChannel);