Explorar el Código

Remove Transfer-Encoding from HTTP request with no content (#133775)

Mikhail Berezovskiy hace 1 mes
padre
commit
937f80c0fc

+ 5 - 0
docs/changelog/133775.yaml

@@ -0,0 +1,5 @@
+pr: 133775
+summary: Remove Transfer-Encoding from HTTP request with no content
+area: Network
+type: bug
+issues: []

+ 22 - 2
modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

@@ -85,6 +85,7 @@ import org.elasticsearch.transport.Transports;
 import org.elasticsearch.transport.netty4.Netty4Utils;
 import org.elasticsearch.xcontent.json.JsonXContent;
 
+import java.io.InputStream;
 import java.nio.channels.ClosedChannelException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
@@ -392,6 +393,23 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
         }
     }
 
+    public void testEmptyChunkedEncoding() throws Exception {
+        try (var clientContext = newClientContext()) {
+            var opaqueId = clientContext.newOpaqueId();
+            final var emptyStream = new HttpChunkedInput(new ChunkedStream(InputStream.nullInputStream()));
+            final var request = httpRequest(opaqueId, 0);
+            HttpUtil.setTransferEncodingChunked(request, true);
+            clientContext.channel().pipeline().addLast(new ChunkedWriteHandler());
+            clientContext.channel().writeAndFlush(request);
+            clientContext.channel().writeAndFlush(emptyStream);
+
+            var handler = clientContext.awaitRestChannelAccepted(opaqueId);
+            var restRequest = handler.restRequest;
+            assertFalse(restRequest.hasContent());
+            assertNull(restRequest.header("Transfer-Encoding"));
+        }
+    }
+
     // ensures that we don't leak buffers in stream on 400-bad-request
     // some bad requests are dispatched from rest-controller before reaching rest handler
     // test relies on netty's buffer leak detection
@@ -733,6 +751,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
     static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
         final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
         final String opaqueId;
+        final RestRequest restRequest;
         private final AtomicReference<ActionListener<Chunk>> nextChunkListenerRef = new AtomicReference<>();
         final Netty4HttpRequestBodyStream stream;
         RestChannel channel;
@@ -740,8 +759,9 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
         final CountDownLatch closedLatch = new CountDownLatch(1);
         volatile boolean shouldThrowInsideHandleChunk = false;
 
-        ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
+        ServerRequestHandler(String opaqueId, RestRequest restRequest, Netty4HttpRequestBodyStream stream) {
             this.opaqueId = opaqueId;
+            this.restRequest = restRequest;
             this.stream = stream;
         }
 
@@ -934,7 +954,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
                 protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
                     var stream = (Netty4HttpRequestBodyStream) request.contentStream();
                     var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0);
-                    var handler = new ServerRequestHandler(opaqueId, stream);
+                    var handler = new ServerRequestHandler(opaqueId, request, stream);
                     handlersByOpaqueId.getHandlerFor(opaqueId).onResponse(handler);
                     return handler;
                 }

+ 50 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandler.java

@@ -0,0 +1,50 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.http.netty4;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.LastHttpContent;
+
+public class Netty4EmptyChunkHandler extends ChannelInboundHandlerAdapter {
+
+    private HttpRequest currentRequest;
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        switch (msg) {
+            case HttpRequest request -> {
+                if (request.decoderResult().isSuccess() && HttpUtil.isTransferEncodingChunked(request)) {
+                    currentRequest = request;
+                    ctx.read();
+                } else {
+                    currentRequest = null;
+                    ctx.fireChannelRead(request);
+                }
+            }
+            case HttpContent content -> {
+                if (currentRequest != null) {
+                    if (content instanceof LastHttpContent && content.content().readableBytes() == 0) {
+                        HttpUtil.setTransferEncodingChunked(currentRequest, false);
+                    }
+                    ctx.fireChannelRead(currentRequest);
+                    ctx.fireChannelRead(content);
+                    currentRequest = null;
+                } else {
+                    ctx.fireChannelRead(content);
+                }
+            }
+            default -> ctx.fireChannelRead(msg);
+        }
+    }
+}

+ 1 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

@@ -414,6 +414,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
             if (ResourceLeakDetector.isEnabled()) {
                 ch.pipeline().addLast(new Netty4LeakDetectionHandler());
             }
+            ch.pipeline().addLast(new Netty4EmptyChunkHandler());
             // See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above
             // can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is
             // resolved we must add another flow controller here:

+ 96 - 0
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4EmptyChunkHandlerTests.java

@@ -0,0 +1,96 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.http.netty4;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.DecoderResult;
+import io.netty.handler.codec.http.DefaultHttpRequest;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+
+import org.elasticsearch.test.ESTestCase;
+
+public class Netty4EmptyChunkHandlerTests extends ESTestCase {
+
+    private EmbeddedChannel channel;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        channel = new EmbeddedChannel(new Netty4EmptyChunkHandler());
+        channel.config().setAutoRead(false);
+    }
+
+    public void testNonChunkedPassthrough() {
+        var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
+        var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
+        channel.writeInbound(req, content);
+        assertEquals(req, channel.readInbound());
+        assertEquals(content, channel.readInbound());
+    }
+
+    public void testDecodingFailurePassthrough() {
+        var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
+        HttpUtil.setTransferEncodingChunked(req, true);
+        req.setDecoderResult(DecoderResult.failure(new Exception()));
+        channel.writeInbound(req);
+        var recvReq = (HttpRequest) channel.readInbound();
+        assertTrue(recvReq.decoderResult().isFailure());
+        assertTrue(HttpUtil.isTransferEncodingChunked(recvReq));
+    }
+
+    public void testHoldChunkedRequest() {
+        var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
+        HttpUtil.setTransferEncodingChunked(req, true);
+        var readSniffer = new ReadSniffer();
+        channel.pipeline().addFirst(readSniffer);
+        channel.writeInbound(req);
+        assertNull("should hold on HTTP request until first chunk arrives", channel.readInbound());
+        assertEquals("must read first chunk when holding request", 1, readSniffer.readCount);
+    }
+
+    public void testRemoveEncodingFromEmpty() {
+        var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
+        HttpUtil.setTransferEncodingChunked(req, true);
+        var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
+        channel.writeInbound(req, content);
+        var recvReq = channel.readInbound();
+        assertEquals(req, recvReq);
+        assertEquals(content, channel.readInbound());
+        assertFalse("should remove Transfer-Encoding from empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
+    }
+
+    public void testKeepEncodingForNonEmpty() {
+        var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
+        HttpUtil.setTransferEncodingChunked(req, true);
+        var content = new DefaultLastHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(between(1, 1024))));
+        channel.writeInbound(req, content);
+        var recvReq = channel.readInbound();
+        assertEquals(req, recvReq);
+        assertEquals(content, channel.readInbound());
+        assertTrue("should keep Transfer-Encoding for non-empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
+    }
+
+    public void testRandomizedChannelReuse() {
+        for (int i = 0; i < 1000; i++) {
+            switch (between(0, 3)) {
+                case 0 -> testNonChunkedPassthrough();
+                case 1 -> testKeepEncodingForNonEmpty();
+                case 2 -> testDecodingFailurePassthrough();
+                default -> testRemoveEncodingFromEmpty();
+            }
+        }
+    }
+}