Przeglądaj źródła

Stop Copying Bulk HTTP Requests in NIO Networking (#49819)

Same as #44564 but for NIO.
Armin Braun 5 lat temu
rodzic
commit
44d5ad9ac6

+ 3 - 1
plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java

@@ -24,6 +24,7 @@ import io.netty.buffer.Unpooled;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefIterator;
 import org.elasticsearch.common.bytes.AbstractBytesReference;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 
@@ -70,7 +71,8 @@ class ByteBufUtils {
     }
 
     static BytesReference toBytesReference(final ByteBuf buffer) {
-        return new ByteBufBytesReference(buffer, buffer.readableBytes());
+        final int readableBytes = buffer.readableBytes();
+        return readableBytes == 0 ? BytesArray.EMPTY : new ByteBufBytesReference(buffer, readableBytes);
     }
 
     private static class ByteBufBytesReference extends AbstractBytesReference {

+ 16 - 25
plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java

@@ -19,10 +19,8 @@
 
 package org.elasticsearch.http.nio;
 
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.handler.codec.ByteToMessageDecoder;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpContentCompressor;
 import io.netty.handler.codec.http.HttpContentDecompressor;
@@ -158,32 +156,25 @@ public class HttpReadWriteHandler implements NioChannelHandler {
     private void handleRequest(Object msg) {
         final HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = (HttpPipelinedRequest<FullHttpRequest>) msg;
         FullHttpRequest request = pipelinedRequest.getRequest();
-
-        final FullHttpRequest copiedRequest;
+        boolean success = false;
+        NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence());
         try {
-            copiedRequest = new DefaultFullHttpRequest(
-                request.protocolVersion(),
-                request.method(),
-                request.uri(),
-                Unpooled.copiedBuffer(request.content()),
-                request.headers(),
-                request.trailingHeaders());
-        } finally {
-            // As we have copied the buffer, we can release the request
-            request.release();
-        }
-        NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence());
-
-        if (request.decoderResult().isFailure()) {
-            Throwable cause = request.decoderResult().cause();
-            if (cause instanceof Error) {
-                ExceptionsHelper.maybeDieOnAnotherThread(cause);
-                transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
+            if (request.decoderResult().isFailure()) {
+                Throwable cause = request.decoderResult().cause();
+                if (cause instanceof Error) {
+                    ExceptionsHelper.maybeDieOnAnotherThread(cause);
+                    transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
+                } else {
+                    transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
+                }
             } else {
-                transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
+                transport.incomingRequest(httpRequest, nioHttpChannel);
+            }
+            success = true;
+        } finally {
+            if (success == false) {
+                request.release();
             }
-        } else {
-            transport.incomingRequest(httpRequest, nioHttpChannel);
         }
     }
 

+ 34 - 10
plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.http.nio;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.FullHttpRequest;
@@ -28,7 +30,6 @@ import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.cookie.Cookie;
 import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
 import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
-import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.http.HttpRequest;
 import org.elasticsearch.rest.RestRequest;
@@ -40,6 +41,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 public class NioHttpRequest implements HttpRequest {
@@ -48,16 +50,22 @@ public class NioHttpRequest implements HttpRequest {
     private final BytesReference content;
     private final HttpHeadersMap headers;
     private final int sequence;
+    private final AtomicBoolean released;
+    private final boolean pooled;
 
     NioHttpRequest(FullHttpRequest request, int sequence) {
+        this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
+            ByteBufUtils.toBytesReference(request.content()));
+    }
+
+    private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
+        BytesReference content) {
         this.request = request;
-        headers = new HttpHeadersMap(request.headers());
         this.sequence = sequence;
-        if (request.content().isReadable()) {
-            this.content = ByteBufUtils.toBytesReference(request.content());
-        } else {
-            this.content = BytesArray.EMPTY;
-        }
+        this.headers = headers;
+        this.content = content;
+        this.pooled = pooled;
+        this.released = released;
     }
 
     @Override
@@ -105,17 +113,32 @@ public class NioHttpRequest implements HttpRequest {
 
     @Override
     public BytesReference content() {
+        assert released.get() == false;
         return content;
     }
 
     @Override
     public void release() {
-        // NioHttpRequest works from copied unpooled bytes no need to release anything
+        if (pooled && released.compareAndSet(false, true)) {
+            request.release();
+        }
     }
 
     @Override
     public HttpRequest releaseAndCopy() {
-        return this;
+        assert released.get() == false;
+        if (pooled == false) {
+            return this;
+        }
+        try {
+            final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
+            return new NioHttpRequest(
+                new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
+                    request.trailingHeaders()),
+                headers, sequence, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent));
+        } finally {
+            release();
+        }
     }
 
     @Override
@@ -156,7 +179,8 @@ public class NioHttpRequest implements HttpRequest {
         trailingHeaders.remove(header);
         FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
             request.content(), headersWithoutContentTypeHeader, trailingHeaders);
-        return new NioHttpRequest(requestWithoutHeader, sequence);
+        return new NioHttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
+            pooled, content);
     }
 
     @Override