Selaa lähdekoodia

Add flow-control and remove auto-read in netty4 HTTP pipeline (#126441)

Mikhail Berezovskiy 6 kuukautta sitten
vanhempi
commit
c8805b85d2
15 muutettua tiedostoa jossa 485 lisäystä ja 1155 poistoa
  1. 5 0
      docs/changelog/126441.yaml
  2. 72 81
      modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
  3. 78 0
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java
  4. 4 0
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
  5. 4 0
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java
  6. 67 203
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java
  7. 7 5
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java
  8. 26 97
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
  9. 17 0
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java
  10. 26 2
      modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java
  11. 6 1
      modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java
  12. 93 702
      modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java
  13. 37 63
      modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java
  14. 42 0
      modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java
  15. 1 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java

+ 5 - 0
docs/changelog/126441.yaml

@@ -0,0 +1,5 @@
+pr: 126441
+summary: Add flow-control and remove auto-read in netty4 http pipeline
+area: Network
+type: enhancement
+issues: []

+ 72 - 81
modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

@@ -94,6 +94,64 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
 
     private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);
 
+    private static long transportStatsRequestBytesSize(Ctx ctx) {
+        var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
+        var stats = httpTransport.stats().clientStats();
+        var bytes = 0L;
+        for (var s : stats) {
+            bytes += s.requestSizeBytes();
+        }
+        return bytes;
+    }
+
+    static int MBytes(int m) {
+        return m * 1024 * 1024;
+    }
+
+    static <T> T safePoll(BlockingDeque<T> queue) {
+        try {
+            var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
+            assertNotNull("queue is empty", t);
+            return t;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AssertionError(e);
+        }
+    }
+
+    private static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
+        var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
+        req.headers().add(CONTENT_LENGTH, content.readableBytes());
+        req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
+        req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
+        return req;
+    }
+
+    private static HttpRequest httpRequest(String opaqueId, int contentLength) {
+        return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
+    }
+
+    private static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
+        var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
+        req.headers().add(CONTENT_LENGTH, contentLength);
+        req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
+        req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
+        return req;
+    }
+
+    private static HttpContent randomContent(int size, boolean isLast) {
+        var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
+        if (isLast) {
+            return new DefaultLastHttpContent(buf);
+        } else {
+            return new DefaultHttpContent(buf);
+        }
+    }
+
+    private static ByteBuf randomByteBuf(int size) {
+        return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
+    }
+
     @Override
     protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
         Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
@@ -178,8 +236,6 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
 
             // await stream handler is ready and request full content
             var handler = ctx.awaitRestChannelAccepted(opaqueId);
-            assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
-
             assertFalse(handler.streamClosed);
 
             // terminate client connection
@@ -190,10 +246,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
             handler.stream.next();
 
             // wait for resources to be released
-            assertBusy(() -> {
-                assertEquals(0, handler.stream.bufSize());
-                assertTrue(handler.streamClosed);
-            });
+            assertBusy(() -> assertTrue(handler.streamClosed));
         }
     }
 
@@ -208,15 +261,11 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
 
             // await stream handler is ready and request full content
             var handler = ctx.awaitRestChannelAccepted(opaqueId);
-            assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
             assertFalse(handler.streamClosed);
 
             // terminate connection on server and wait resources are released
             handler.channel.request().getHttpChannel().close();
-            assertBusy(() -> {
-                assertEquals(0, handler.stream.bufSize());
-                assertTrue(handler.streamClosed);
-            });
+            assertBusy(() -> assertTrue(handler.streamClosed));
         }
     }
 
@@ -230,16 +279,12 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
 
             // await stream handler is ready and request full content
             var handler = ctx.awaitRestChannelAccepted(opaqueId);
-            assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
             assertFalse(handler.streamClosed);
 
             handler.shouldThrowInsideHandleChunk = true;
             handler.stream.next();
 
-            assertBusy(() -> {
-                assertEquals(0, handler.stream.bufSize());
-                assertTrue(handler.streamClosed);
-            });
+            assertBusy(() -> assertTrue(handler.streamClosed));
         }
     }
 
@@ -280,7 +325,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
                 });
                 handler.readBytes(partSize);
             }
-            assertTrue(handler.stream.hasLast());
+            assertTrue(handler.recvLast);
         }
     }
 
@@ -385,16 +430,6 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
         }
     }
 
-    private static long transportStatsRequestBytesSize(Ctx ctx) {
-        var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
-        var stats = httpTransport.stats().clientStats();
-        var bytes = 0L;
-        for (var s : stats) {
-            bytes += s.requestSizeBytes();
-        }
-        return bytes;
-    }
-
     /**
      * ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
      */
@@ -489,55 +524,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
         return getTestName() + "-" + reqNo;
     }
 
-    static int MBytes(int m) {
-        return m * 1024 * 1024;
-    }
-
-    static <T> T safePoll(BlockingDeque<T> queue) {
-        try {
-            var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
-            assertNotNull("queue is empty", t);
-            return t;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new AssertionError(e);
-        }
-    }
-
-    static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
-        var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
-        req.headers().add(CONTENT_LENGTH, content.readableBytes());
-        req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
-        req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
-        return req;
-    }
-
-    static HttpRequest httpRequest(String opaqueId, int contentLength) {
-        return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength);
-    }
-
-    static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
-        var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
-        req.headers().add(CONTENT_LENGTH, contentLength);
-        req.headers().add(CONTENT_TYPE, APPLICATION_JSON);
-        req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId);
-        return req;
-    }
-
-    static HttpContent randomContent(int size, boolean isLast) {
-        var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
-        if (isLast) {
-            return new DefaultLastHttpContent(buf);
-        } else {
-            return new DefaultHttpContent(buf);
-        }
-    }
-
-    static ByteBuf randomByteBuf(int size) {
-        return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
-    }
-
-    Ctx setupClientCtx() throws Exception {
+    private Ctx setupClientCtx() throws Exception {
         var nodeName = internalCluster().getRandomNodeName();
         var clientRespQueue = new LinkedBlockingDeque<>(16);
         var bootstrap = bootstrapClient(nodeName, clientRespQueue);
@@ -545,7 +532,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
         return new Ctx(getTestName(), nodeName, bootstrap, channel, clientRespQueue);
     }
 
-    Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
+    private Bootstrap bootstrapClient(String node, BlockingQueue<Object> queue) {
         var httpServer = internalCluster().getInstance(HttpServerTransport.class, node);
         var remoteAddr = randomFrom(httpServer.boundAddress().boundAddresses());
         return new Bootstrap().group(new NioEventLoopGroup(1))
@@ -583,9 +570,13 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
         return false; // enable http
     }
 
-    record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel clientChannel, BlockingDeque<Object> clientRespQueue)
-        implements
-            AutoCloseable {
+    private record Ctx(
+        String testName,
+        String nodeName,
+        Bootstrap clientBootstrap,
+        Channel clientChannel,
+        BlockingDeque<Object> clientRespQueue
+    ) implements AutoCloseable {
 
         @Override
         public void close() throws Exception {
@@ -610,7 +601,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
         }
     }
 
-    static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
+    private static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
         final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
         final String opaqueId;
         final BlockingDeque<Chunk> recvChunks = new LinkedBlockingDeque<>();

+ 78 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java

@@ -0,0 +1,78 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.http.netty4;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.ScheduledFuture;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.time.TimeProvider;
+import org.elasticsearch.common.util.concurrent.FutureUtils;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * When channel auto-read is disabled handlers are responsible to read from channel.
+ * But it's hard to detect when read is missing. This helper class print warnings
+ * when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough
+ * to avoid test hang for too long, but can be increased if needed.
+ */
+class MissingReadDetector extends ChannelDuplexHandler {
+
+    private static final Logger logger = LogManager.getLogger(MissingReadDetector.class);
+
+    private final long interval;
+    private final TimeProvider timer;
+    private boolean pendingRead;
+    private long lastRead;
+    private ScheduledFuture<?> checker;
+
+    MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) {
+        this.interval = missingReadIntervalMillis;
+        this.timer = timer;
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
+            if (pendingRead == false) {
+                long now = timer.absoluteTimeInMillis();
+                if (now >= lastRead + interval) {
+                    logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
+                }
+            }
+        }, interval, interval, TimeUnit.MILLISECONDS);
+        super.handlerAdded(ctx);
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+        if (checker != null) {
+            FutureUtils.cancel(checker);
+        }
+        super.handlerRemoved(ctx);
+    }
+
+    @Override
+    public void read(ChannelHandlerContext ctx) throws Exception {
+        pendingRead = true;
+        ctx.read();
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled";
+        pendingRead = false;
+        lastRead = timer.absoluteTimeInMillis();
+        ctx.fireChannelRead(msg);
+    }
+}

+ 4 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

@@ -15,6 +15,7 @@ import io.netty.handler.codec.http.HttpObject;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.LastHttpContent;
 
 import org.elasticsearch.http.HttpPreRequest;
 import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
@@ -48,6 +49,9 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
         }
         if (aggregating || msg instanceof FullHttpRequest) {
             super.channelRead(ctx, msg);
+            if (msg instanceof LastHttpContent == false) {
+                ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf
+            }
         } else {
             streamContentSizeHandler.channelRead(ctx, msg);
         }

+ 4 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandler.java

@@ -123,6 +123,7 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter {
                 isContinueExpected = true;
             } else {
                 ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
+                ctx.read();
                 return;
             }
         }
@@ -136,6 +137,7 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter {
                 decoder.reset();
             }
             ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+            ctx.read();
         } else {
             ignoreContent = false;
             currentContentLength = 0;
@@ -150,11 +152,13 @@ public class Netty4HttpContentSizeHandler extends ChannelInboundHandlerAdapter {
     private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
         if (ignoreContent) {
             msg.release();
+            ctx.read();
         } else {
             currentContentLength += msg.content().readableBytes();
             if (currentContentLength > maxContentLength) {
                 msg.release();
                 ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
+                ctx.read();
             } else {
                 ctx.fireChannelRead(msg);
             }

+ 67 - 203
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java

@@ -9,249 +9,113 @@
 
 package org.elasticsearch.http.netty4;
 
-import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.DecoderResult;
 import io.netty.handler.codec.http.HttpContent;
 import io.netty.handler.codec.http.HttpObject;
 import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.util.ReferenceCountUtil;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.http.netty4.internal.HttpValidator;
 import org.elasticsearch.transport.Transports;
 
-import java.util.ArrayDeque;
-
-import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_PERMANENTLY;
-import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST;
-import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST;
-import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA;
-import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START;
-
-public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {
+public class Netty4HttpHeaderValidator extends ChannelDuplexHandler {
 
     private final HttpValidator validator;
     private final ThreadContext threadContext;
-    private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
-    private State state = WAITING_TO_START;
+    private State state;
 
     public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) {
         this.validator = validator;
         this.threadContext = threadContext;
     }
 
-    State getState() {
-        return state;
-    }
-
-    @SuppressWarnings("fallthrough")
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         assert msg instanceof HttpObject;
-        final HttpObject httpObject = (HttpObject) msg;
-
-        switch (state) {
-            case WAITING_TO_START:
-                assert pending.isEmpty();
-                pending.add(ReferenceCountUtil.retain(httpObject));
-                requestStart(ctx);
-                assert state == QUEUEING_DATA;
-                assert ctx.channel().config().isAutoRead() == false;
-                break;
-            case QUEUEING_DATA:
-                pending.add(ReferenceCountUtil.retain(httpObject));
-                break;
-            case FORWARDING_DATA_UNTIL_NEXT_REQUEST:
-                assert pending.isEmpty();
-                if (httpObject instanceof LastHttpContent) {
-                    state = WAITING_TO_START;
-                }
-                ctx.fireChannelRead(httpObject);
-                break;
-            case DROPPING_DATA_UNTIL_NEXT_REQUEST:
-                assert pending.isEmpty();
-                if (httpObject instanceof LastHttpContent) {
-                    state = WAITING_TO_START;
-                }
-                ReferenceCountUtil.release(httpObject);
-                break;
-            case DROPPING_DATA_PERMANENTLY:
-                assert pending.isEmpty();
-                ReferenceCountUtil.release(httpObject); // consume without enqueuing
-                ctx.channel().config().setAutoRead(false);
-                break;
-        }
-    }
-
-    private void requestStart(ChannelHandlerContext ctx) {
-        assert state == WAITING_TO_START;
-
-        if (pending.isEmpty()) {
-            return;
-        }
-
-        final HttpObject httpObject = pending.getFirst();
-        final HttpRequest httpRequest;
-        if (httpObject instanceof HttpRequest && httpObject.decoderResult().isSuccess()) {
-            // a properly decoded HTTP start message is expected to begin validation
-            // anything else is probably an error that the downstream HTTP message aggregator will have to handle
-            httpRequest = (HttpRequest) httpObject;
-        } else {
-            httpRequest = null;
-        }
-
-        state = QUEUEING_DATA;
-        ctx.channel().config().setAutoRead(false);
-
-        if (httpRequest == null) {
-            // this looks like a malformed request and will forward without validation
-            ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
+        var httpObject = (HttpObject) msg;
+        if (httpObject.decoderResult().isFailure()) {
+            ctx.fireChannelRead(httpObject); // pass-through for decoding failures
         } else {
-            assert Transports.assertDefaultThreadContext(threadContext);
-            ActionListener.run(
-                // this prevents thread-context changes to propagate to the validation listener
-                // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
-                // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
-                ActionListener.assertOnce(
-                    new ContextPreservingActionListener<Void>(
-                        threadContext.wrapRestorable(threadContext.newStoredContext()),
-                        // Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
-                        new ActionListener<>() {
-                            @Override
-                            public void onResponse(Void unused) {
-                                assert Transports.assertDefaultThreadContext(threadContext);
-                                ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
-                            }
-
-                            @Override
-                            public void onFailure(Exception e) {
-                                assert Transports.assertDefaultThreadContext(threadContext);
-                                ctx.channel().eventLoop().execute(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e));
-                            }
-                        }
-                    )
-                ),
-                listener -> {
-                    // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
-                    try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
-                        validator.validate(httpRequest, ctx.channel(), listener);
-                    }
+            if (msg instanceof HttpRequest request) {
+                validate(ctx, request);
+            } else {
+                assert msg instanceof HttpContent;
+                var content = (HttpContent) msg;
+                if (state == State.DROPPING) {
+                    content.release();
+                    ctx.read();
+                } else {
+                    assert state == State.PASSING : "unexpected content before validation completed";
+                    ctx.fireChannelRead(content);
                 }
-            );
-        }
-    }
-
-    private void forwardFullRequest(ChannelHandlerContext ctx) {
-        Transports.assertDefaultThreadContext(threadContext);
-        assert ctx.channel().eventLoop().inEventLoop();
-        assert ctx.channel().config().isAutoRead() == false;
-        assert state == QUEUEING_DATA;
-
-        ctx.channel().config().setAutoRead(true);
-        boolean fullRequestForwarded = forwardData(ctx, pending);
-
-        assert fullRequestForwarded || pending.isEmpty();
-        if (fullRequestForwarded) {
-            state = WAITING_TO_START;
-            requestStart(ctx);
-        } else {
-            state = FORWARDING_DATA_UNTIL_NEXT_REQUEST;
+            }
         }
-
-        assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
-    }
-
-    private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
-        Transports.assertDefaultThreadContext(threadContext);
-        assert ctx.channel().eventLoop().inEventLoop();
-        assert ctx.channel().config().isAutoRead() == false;
-        assert state == QUEUEING_DATA;
-
-        HttpObject messageToForward = pending.getFirst();
-        boolean fullRequestDropped = dropData(pending);
-        if (messageToForward instanceof HttpContent toReplace) {
-            // if the request to forward contained data (which got dropped), replace with empty data
-            messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
-        }
-        messageToForward.setDecoderResult(DecoderResult.failure(e));
-
-        ctx.channel().config().setAutoRead(true);
-        ctx.fireChannelRead(messageToForward);
-
-        assert fullRequestDropped || pending.isEmpty();
-        if (fullRequestDropped) {
-            state = WAITING_TO_START;
-            requestStart(ctx);
-        } else {
-            state = DROPPING_DATA_UNTIL_NEXT_REQUEST;
-        }
-
-        assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        state = DROPPING_DATA_PERMANENTLY;
-        while (true) {
-            if (dropData(pending) == false) {
-                break;
-            }
+    public void read(ChannelHandlerContext ctx) throws Exception {
+        // until validation is completed we can ignore read calls,
+        // once validation is finished HttpRequest will be fired and downstream can read from there
+        if (state != State.VALIDATING) {
+            ctx.read();
         }
-        super.channelInactive(ctx);
     }
 
-    private static boolean forwardData(ChannelHandlerContext ctx, ArrayDeque<HttpObject> pending) {
-        final int pendingMessages = pending.size();
-        try {
-            HttpObject toForward;
-            while ((toForward = pending.poll()) != null) {
-                ctx.fireChannelRead(toForward);
-                ReferenceCountUtil.release(toForward); // reference cnt incremented when enqueued
-                if (toForward instanceof LastHttpContent) {
-                    return true;
+    void validate(ChannelHandlerContext ctx, HttpRequest request) {
+        assert Transports.assertDefaultThreadContext(threadContext);
+        state = State.VALIDATING;
+        ActionListener.run(
+            // this prevents thread-context changes to propagate to the validation listener
+            // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
+            // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
+            ActionListener.assertOnce(
+                new ContextPreservingActionListener<Void>(
+                    threadContext.wrapRestorable(threadContext.newStoredContext()),
+                    new ActionListener<>() {
+                        @Override
+                        public void onResponse(Void unused) {
+                            handleValidationResult(ctx, request, null);
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            handleValidationResult(ctx, request, e);
+                        }
+                    }
+                )
+            ),
+            listener -> {
+                // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
+                try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
+                    validator.validate(request, ctx.channel(), listener);
                 }
             }
-            return false;
-        } finally {
-            maybeResizePendingDown(pendingMessages, pending);
-        }
+        );
     }
 
-    private static boolean dropData(ArrayDeque<HttpObject> pending) {
-        final int pendingMessages = pending.size();
-        try {
-            HttpObject toDrop;
-            while ((toDrop = pending.poll()) != null) {
-                ReferenceCountUtil.release(toDrop, 2); // 1 for enqueuing, 1 for consuming
-                if (toDrop instanceof LastHttpContent) {
-                    return true;
-                }
+    void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nullable Exception validationError) {
+        assert Transports.assertDefaultThreadContext(threadContext);
+        // Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
+        ctx.channel().eventLoop().execute(() -> {
+            if (validationError != null) {
+                request.setDecoderResult(DecoderResult.failure(validationError));
+                state = State.DROPPING;
+            } else {
+                state = State.PASSING;
             }
-            return false;
-        } finally {
-            maybeResizePendingDown(pendingMessages, pending);
-        }
+            ctx.fireChannelRead(request);
+        });
     }
 
-    private static void maybeResizePendingDown(int largeSize, ArrayDeque<HttpObject> pending) {
-        if (pending.size() <= 4 && largeSize > 32) {
-            // Prevent the ArrayDeque from becoming forever large due to a single large message.
-            ArrayDeque<HttpObject> old = pending;
-            pending = new ArrayDeque<>(4);
-            pending.addAll(old);
-        }
+    private enum State {
+        PASSING,
+        VALIDATING,
+        DROPPING
     }
 
-    enum State {
-        WAITING_TO_START,
-        QUEUEING_DATA,
-        FORWARDING_DATA_UNTIL_NEXT_REQUEST,
-        DROPPING_DATA_UNTIL_NEXT_REQUEST,
-        DROPPING_DATA_PERMANENTLY
-    }
 }

+ 7 - 5
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

@@ -118,6 +118,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
         activityTracker.startActivity();
+        boolean shouldRead = true;
         try {
             if (msg instanceof HttpRequest request) {
                 final Netty4HttpRequest netty4HttpRequest;
@@ -137,25 +138,26 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
                         netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
                         currentRequestStream = null;
                     } else {
-                        var contentStream = new Netty4HttpRequestBodyStream(
-                            ctx.channel(),
-                            serverTransport.getThreadPool().getThreadContext(),
-                            activityTracker
-                        );
+                        var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
                         currentRequestStream = contentStream;
                         netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
+                        shouldRead = false;
                     }
                 }
                 handlePipelinedRequest(ctx, netty4HttpRequest);
             } else {
                 assert msg instanceof HttpContent : "expect HttpContent got " + msg;
                 assert currentRequestStream != null : "current stream must exists before handling http content";
+                shouldRead = false;
                 currentRequestStream.handleNettyContent((HttpContent) msg);
                 if (msg instanceof LastHttpContent) {
                     currentRequestStream = null;
                 }
             }
         } finally {
+            if (shouldRead) {
+                ctx.channel().eventLoop().execute(ctx::read);
+            }
             activityTracker.stopActivity();
         }
     }

+ 26 - 97
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

@@ -9,14 +9,11 @@
 
 package org.elasticsearch.http.netty4;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.HttpContent;
 import io.netty.handler.codec.http.LastHttpContent;
 
-import org.elasticsearch.common.network.ThreadWatchdog;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.http.HttpBody;
@@ -27,34 +24,22 @@ import java.util.List;
 
 /**
  * Netty based implementation of {@link HttpBody.Stream}.
- * This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
- * to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite
- * autoRead=off. In this case chunks will be buffered until downstream calls {@link Stream#next()}
  */
 public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
 
-    private final Channel channel;
-    private final ChannelFutureListener closeListener = future -> doClose();
     private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
     private final ThreadContext threadContext;
-    private final ThreadWatchdog.ActivityTracker activityTracker;
-    private ByteBuf buf;
-    private boolean requested = false;
+    private final ChannelHandlerContext ctx;
     private boolean closing = false;
     private HttpBody.ChunkHandler handler;
     private ThreadContext.StoredContext requestContext;
+    private final ChannelFutureListener closeListener = future -> doClose();
 
-    // used in tests
-    private volatile int bufSize = 0;
-    private volatile boolean hasLast = false;
-
-    public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext, ThreadWatchdog.ActivityTracker activityTracker) {
-        this.channel = channel;
+    public Netty4HttpRequestBodyStream(ChannelHandlerContext ctx, ThreadContext threadContext) {
+        this.ctx = ctx;
         this.threadContext = threadContext;
         this.requestContext = threadContext.newStoredContext();
-        this.activityTracker = activityTracker;
-        Netty4Utils.addListener(channel.closeFuture(), closeListener);
-        channel.config().setAutoRead(false);
+        Netty4Utils.addListener(ctx.channel().closeFuture(), closeListener);
     }
 
     @Override
@@ -73,94 +58,43 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
         tracingHandlers.add(chunkHandler);
     }
 
+    private void read() {
+        ctx.channel().eventLoop().execute(ctx::read);
+    }
+
     @Override
     public void next() {
         assert handler != null : "handler must be set before requesting next chunk";
         requestContext = threadContext.newStoredContext();
-        channel.eventLoop().submit(() -> {
-            activityTracker.startActivity();
-            requested = true;
-            try {
-                if (closing) {
-                    return;
-                }
-                if (buf == null) {
-                    channel.read();
-                } else {
-                    send();
-                }
-            } catch (Throwable e) {
-                channel.pipeline().fireExceptionCaught(e);
-            } finally {
-                activityTracker.stopActivity();
-            }
-        });
+        read();
     }
 
     public void handleNettyContent(HttpContent httpContent) {
-        assert hasLast == false : "receive http content on completed stream";
-        hasLast = httpContent instanceof LastHttpContent;
         if (closing) {
             httpContent.release();
+            read();
         } else {
-            addChunk(httpContent.content());
-            if (requested) {
-                send();
-            }
-        }
-    }
-
-    // adds chunk to current buffer, will allocate composite buffer when need to hold more than 1 chunk
-    private void addChunk(ByteBuf chunk) {
-        assert chunk != null;
-        if (buf == null) {
-            buf = chunk;
-        } else if (buf instanceof CompositeByteBuf comp) {
-            comp.addComponent(true, chunk);
-        } else {
-            var comp = channel.alloc().compositeBuffer();
-            comp.addComponent(true, buf);
-            comp.addComponent(true, chunk);
-            buf = comp;
-        }
-        bufSize = buf.readableBytes();
-    }
-
-    // visible for test
-    int bufSize() {
-        return bufSize;
-    }
-
-    // visible for test
-    boolean hasLast() {
-        return hasLast;
-    }
-
-    private void send() {
-        assert requested;
-        assert handler != null : "must set handler before receiving next chunk";
-        var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
-        requested = false;
-        buf = null;
-        bufSize = 0;
-        try (var ignored = threadContext.restoreExistingContext(requestContext)) {
-            for (var tracer : tracingHandlers) {
-                tracer.onNext(bytesRef, hasLast);
+            try (var ignored = threadContext.restoreExistingContext(requestContext)) {
+                var isLast = httpContent instanceof LastHttpContent;
+                var buf = Netty4Utils.toReleasableBytesReference(httpContent.content());
+                for (var tracer : tracingHandlers) {
+                    tracer.onNext(buf, isLast);
+                }
+                handler.onNext(buf, isLast);
+                if (isLast) {
+                    read();
+                    ctx.channel().closeFuture().removeListener(closeListener);
+                }
             }
-            handler.onNext(bytesRef, hasLast);
-        }
-        if (hasLast) {
-            channel.config().setAutoRead(true);
-            channel.closeFuture().removeListener(closeListener);
         }
     }
 
     @Override
     public void close() {
-        if (channel.eventLoop().inEventLoop()) {
+        if (ctx.channel().eventLoop().inEventLoop()) {
             doClose();
         } else {
-            channel.eventLoop().submit(this::doClose);
+            ctx.channel().eventLoop().submit(this::doClose);
         }
     }
 
@@ -174,11 +108,6 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
                 handler.close();
             }
         }
-        if (buf != null) {
-            buf.release();
-            buf = null;
-            bufSize = 0;
-        }
-        channel.config().setAutoRead(true);
+        read();
     }
 }

+ 17 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

@@ -29,6 +29,7 @@ import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.flow.FlowControlHandler;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.timeout.ReadTimeoutException;
 import io.netty.handler.timeout.ReadTimeoutHandler;
@@ -46,6 +47,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.http.AbstractHttpServerTransport;
@@ -317,6 +319,9 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
 
         @Override
         protected void initChannel(Channel ch) throws Exception {
+            // auto-read must be disabled all the time
+            ch.config().setAutoRead(false);
+
             Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
             ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
             if (acceptChannelPredicate != null) {
@@ -364,6 +369,15 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
             }
             decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
             ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces
+
+            // from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part
+            ch.pipeline().addLast(new FlowControlHandler());
+            if (Assertions.ENABLED) {
+                // missing reads are hard to catch, but we can detect absence of reads within interval
+                long missingReadIntervalMs = 10_000;
+                ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs));
+            }
+
             if (httpValidator != null) {
                 // runs a validation function on the first HTTP message piece which contains all the headers
                 // if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded
@@ -421,6 +435,9 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
                     new Netty4HttpPipeliningHandler(transport.pipeliningMaxEvents, transport, threadWatchdogActivityTracker)
                 );
             transport.serverAcceptedChannel(nettyHttpChannel);
+
+            // make very first read call, since auto-read is disabled; following reads must come from the handlers
+            ch.read();
         }
 
         @Override

+ 26 - 2
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpContentSizeHandlerTests.java

@@ -12,6 +12,7 @@ package org.elasticsearch.http.netty4;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.DecoderResult;
 import io.netty.handler.codec.http.DefaultHttpContent;
 import io.netty.handler.codec.http.DefaultHttpRequest;
 import io.netty.handler.codec.http.DefaultLastHttpContent;
@@ -40,6 +41,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
     private static final int REPS = 1000;
     private EmbeddedChannel channel;
     private EmbeddedChannel encoder; // channel to encode HTTP objects into bytes
+    private ReadSniffer readSniffer;
 
     private static HttpContent httpContent(int size) {
         return new DefaultHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(size)));
@@ -68,7 +70,20 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
         super.setUp();
         var decoder = new HttpRequestDecoder();
         encoder = new EmbeddedChannel(new HttpRequestEncoder());
-        channel = new EmbeddedChannel(decoder, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH));
+        readSniffer = new ReadSniffer();
+        channel = new EmbeddedChannel();
+        channel.config().setAutoRead(false);
+        channel.pipeline().addLast(decoder, readSniffer, new Netty4HttpContentSizeHandler(decoder, MAX_CONTENT_LENGTH));
+    }
+
+    public void testDecodingFailurePassThrough() {
+        for (var i = 0; i < REPS; i++) {
+            var sendReq = httpRequest();
+            sendReq.setDecoderResult(DecoderResult.failure(new Exception("bad")));
+            channel.writeInbound(sendReq);
+            assertEquals(sendReq, channel.readInbound());
+        }
+        assertEquals("should not read from channel, failures are handled downstream", 0, readSniffer.readCount);
     }
 
     /**
@@ -85,6 +100,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
             assertFalse(HttpUtil.is100ContinueExpected(recvRequest));
             channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
             assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
+            assertEquals("must not read from channel", 0, readSniffer.readCount);
         }
     }
 
@@ -99,6 +115,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
             assertNotNull("request should pass", channel.readInbound());
             channel.writeInbound(encode(LastHttpContent.EMPTY_LAST_CONTENT));
             assertEquals(LastHttpContent.EMPTY_LAST_CONTENT, channel.readInbound());
+            assertEquals("must not read from channel", 0, readSniffer.readCount);
         }
     }
 
@@ -121,6 +138,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
             assertNotNull(recvContent);
             assertEquals(MAX_CONTENT_LENGTH, recvContent.content().readableBytes());
             recvContent.release();
+            assertEquals("must not read from channel", 0, readSniffer.readCount);
         }
     }
 
@@ -134,6 +152,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
         channel.writeInbound(encode(sendRequest));
         var resp = (FullHttpResponse) channel.readOutbound();
         assertEquals(HttpResponseStatus.EXPECTATION_FAILED, resp.status());
+        assertEquals("expect 2 reads, one from size handler and HTTP decoder will emit LastHttpContent", 2, readSniffer.readCount);
         assertFalse(channel.isOpen());
         resp.release();
     }
@@ -152,6 +171,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
             assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
             assertNull("request should not pass", channel.readInbound());
             assertTrue("should not close channel", channel.isOpen());
+            assertEquals("must read from channel", i + 1, readSniffer.readCount);
             resp.release();
         }
     }
@@ -160,11 +180,13 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
      * Mixed load of oversized and normal requests with Exepct:100-Continue.
      */
     public void testMixedContent() {
+        var expectReadCnt = 0;
         for (int i = 0; i < REPS; i++) {
             var isOversized = randomBoolean();
             var sendRequest = httpRequest();
             HttpUtil.set100ContinueExpected(sendRequest, true);
             if (isOversized) {
+                expectReadCnt++;
                 HttpUtil.setContentLength(sendRequest, OVERSIZED_LENGTH);
                 channel.writeInbound(encode(sendRequest));
                 var resp = (FullHttpResponse) channel.readOutbound();
@@ -188,6 +210,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
                 assertEquals("actual content size should match", normalSize, recvContent.content().readableBytes());
                 recvContent.release();
             }
+            assertEquals(expectReadCnt, readSniffer.readCount);
         }
     }
 
@@ -205,6 +228,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
             resp.release();
             assertNull("request and content should not pass", channel.readInbound());
             assertTrue("should not close channel", channel.isOpen());
+            assertEquals("expect two reads per loop, one for request and one for content", (i + 1) * 2, readSniffer.readCount);
         }
     }
 
@@ -234,7 +258,7 @@ public class Netty4HttpContentSizeHandlerTests extends ESTestCase {
         var resp = (FullHttpResponse) channel.readOutbound();
         assertEquals("should respond with 413", HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
         assertFalse("should close channel", channel.isOpen());
+        assertEquals("expect read after response", 1, readSniffer.readCount);
         resp.release();
     }
-
 }

+ 6 - 1
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderThreadContextTests.java

@@ -19,6 +19,7 @@ import io.netty.handler.codec.http.DefaultHttpRequest;
 import io.netty.handler.codec.http.DefaultLastHttpContent;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.flow.FlowControlHandler;
 
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -52,7 +53,8 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase {
     @Override
     public void setUp() throws Exception {
         super.setUp();
-        channel = new EmbeddedChannel();
+        channel = new EmbeddedChannel(new FlowControlHandler());
+        channel.config().setAutoRead(false);
         threadPool = new TestThreadPool(TEST_MOCK_TRANSPORT_THREAD_PREFIX);
     }
 
@@ -181,6 +183,7 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase {
         threadPool.generic().submit(() -> {
             DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
             channel.writeInbound(request1);
+            channel.read();
             DefaultHttpContent content1 = randomBoolean() ? new DefaultHttpContent(Unpooled.buffer(4)) : null;
             if (content1 != null) {
                 channel.writeInbound(content1);
@@ -196,9 +199,11 @@ public class Netty4HttpHeaderThreadContextTests extends ESTestCase {
             }
             channel.runPendingTasks();
             assertThat(channel.readInbound(), sameInstance(request1));
+            channel.read();
             if (content1 != null && success) {
                 assertThat(channel.readInbound(), sameInstance(content1));
             }
+            channel.read();
             if (success) {
                 assertThat(channel.readInbound(), sameInstance(lastContent1));
             }

+ 93 - 702
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java

@@ -9,766 +9,157 @@
 
 package org.elasticsearch.http.netty4;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.DecoderResult;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.DefaultHttpContent;
 import io.netty.handler.codec.http.DefaultHttpRequest;
 import io.netty.handler.codec.http.DefaultLastHttpContent;
-import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpContent;
 import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.util.AsciiString;
+import io.netty.handler.flow.FlowControlHandler;
 
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.http.netty4.internal.HttpValidator;
 import org.elasticsearch.test.ESTestCase;
 
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST;
-import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST;
-import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA;
-import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.Matchers.sameInstance;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class Netty4HttpHeaderValidatorTests extends ESTestCase {
-
-    private final AtomicReference<Object> header = new AtomicReference<>();
-    private final AtomicReference<ActionListener<Void>> listener = new AtomicReference<>();
     private EmbeddedChannel channel;
-    private Netty4HttpHeaderValidator netty4HttpHeaderValidator;
-    private final AtomicReference<RuntimeException> validationException = new AtomicReference<>();
+    private BlockingQueue<ValidationRequest> validatorRequestQueue;
 
     @Override
     public void setUp() throws Exception {
         super.setUp();
-        reset();
+        validatorRequestQueue = new LinkedBlockingQueue<>();
+        channel = new EmbeddedChannel(
+            new Netty4HttpHeaderValidator(
+                (httpRequest, channel, listener) -> validatorRequestQueue.add(new ValidationRequest(httpRequest, channel, listener)),
+                new ThreadContext(Settings.EMPTY)
+            )
+        );
+        channel.config().setAutoRead(false);
     }
 
-    private void reset() {
-        channel = new EmbeddedChannel();
-        header.set(null);
-        listener.set(null);
-        validationException.set(null);
-        HttpValidator validator = (httpRequest, channel, validationCompleteListener) -> {
-            header.set(httpRequest);
-            final var exception = validationException.get();
-            if (exception != null) {
-                throw exception;
-            }
-            listener.set(validationCompleteListener);
-        };
-        netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY));
-        channel.pipeline().addLast(netty4HttpHeaderValidator);
+    HttpRequest newHttpRequest() {
+        return new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "");
     }
 
-    public void testValidationPausesAndResumesData() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(request);
-        channel.writeInbound(content);
-
-        assertThat(header.get(), sameInstance(request));
-        // channel is paused
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
-
-        // channel is resumed
-        listener.get().onResponse(null);
-        channel.runPendingTasks();
-
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
-        assertThat(channel.readInbound(), sameInstance(request));
-        assertThat(channel.readInbound(), sameInstance(content));
-        assertThat(channel.readInbound(), nullValue());
-        assertThat(content.refCnt(), equalTo(1));
-
-        // channel continues in resumed state after request finishes
-        DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(lastContent);
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-        assertThat(channel.readInbound(), sameInstance(lastContent));
-        assertThat(lastContent.refCnt(), equalTo(1));
-
-        // channel is again paused while validating next request
-        channel.writeInbound(request);
-        assertFalse(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
+    HttpContent newHttpContent() {
+        return new DefaultHttpContent(Unpooled.buffer());
     }
 
-    public void testValidatorDoesNotTweakAutoReadAfterValidationComplete() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(request);
-        channel.writeInbound(content);
-
-        assertThat(header.get(), sameInstance(request));
-        // channel is paused
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
-
-        // channel is resumed
-        listener.get().onResponse(null);
-        channel.runPendingTasks();
-
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
-        assertThat(channel.readInbound(), sameInstance(request));
-        assertThat(channel.readInbound(), sameInstance(content));
-        assertThat(channel.readInbound(), nullValue());
-        assertThat(content.refCnt(), equalTo(1));
-        channel.config().setAutoRead(false);
-
-        channel.writeOutbound(new DefaultHttpContent(Unpooled.buffer(4)));
-        assertFalse(channel.config().isAutoRead());
+    LastHttpContent newLastHttpContent() {
+        return new DefaultLastHttpContent();
     }
 
-    public void testContentForwardedAfterValidation() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        channel.writeInbound(request);
-
-        DefaultHttpContent content1 = null;
-        if (randomBoolean()) {
-            content1 = new DefaultHttpContent(Unpooled.buffer(4));
-            channel.writeInbound(content1);
-        }
-
-        assertThat(header.get(), sameInstance(request));
-        // channel is paused
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
-
-        // channel is resumed
-        listener.get().onResponse(null);
-        channel.runPendingTasks();
-
-        // resumed channel after successful validation forwards data
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
-        // write more content to the channel after validation passed
-        DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(content2);
-        assertThat(channel.readInbound(), sameInstance(request));
-        DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(content3);
-        if (content1 != null) {
-            assertThat(channel.readInbound(), sameInstance(content1));
-            assertThat(content1.refCnt(), equalTo(1));
-        }
-        assertThat(channel.readInbound(), sameInstance(content2));
-        assertThat(content2.refCnt(), equalTo(1));
-        DefaultHttpContent content4 = null;
-        if (randomBoolean()) {
-            content4 = new DefaultHttpContent(Unpooled.buffer(4));
-            channel.writeInbound(content4);
-        }
-        assertThat(channel.readInbound(), sameInstance(content3));
-        assertThat(content3.refCnt(), equalTo(1));
-        if (content4 != null) {
-            assertThat(channel.readInbound(), sameInstance(content4));
-            assertThat(content4.refCnt(), equalTo(1));
-        }
-
-        // channel continues in resumed state after request finishes
-        DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(lastContent);
-
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-        assertThat(channel.readInbound(), sameInstance(lastContent));
-        assertThat(lastContent.refCnt(), equalTo(1));
-
-        if (randomBoolean()) {
-            channel.writeInbound(request);
-            assertFalse(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
-        }
+    public void testValidatorReceiveHttpRequest() {
+        channel.writeInbound(newHttpRequest());
+        assertEquals(1, validatorRequestQueue.size());
+        assertNull(channel.readInbound());
     }
 
-    public void testContentDroppedAfterValidationFailure() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        channel.writeInbound(request);
-
-        DefaultHttpContent content1 = null;
-        if (randomBoolean()) {
-            content1 = new DefaultHttpContent(Unpooled.buffer(4));
-            channel.writeInbound(content1);
-        }
-
-        assertThat(header.get(), sameInstance(request));
-        // channel is paused
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
-
-        // channel is resumed
-        listener.get().onFailure(new ElasticsearchException("Boom"));
-        channel.runPendingTasks();
-
-        // resumed channel after failed validation drops data
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
-        // write more content to the channel after validation passed
-        DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(content2);
-        assertThat(channel.readInbound(), sameInstance(request));
-        DefaultHttpContent content3 = new DefaultHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(content3);
-        if (content1 != null) {
-            assertThat(channel.readInbound(), nullValue());
-            assertThat(content1.refCnt(), equalTo(0));
-        }
-        assertThat(channel.readInbound(), nullValue()); // content2
-        assertThat(content2.refCnt(), equalTo(0));
-        DefaultHttpContent content4 = null;
-        if (randomBoolean()) {
-            content4 = new DefaultHttpContent(Unpooled.buffer(4));
-            channel.writeInbound(content4);
-        }
-        assertThat(channel.readInbound(), nullValue()); // content3
-        assertThat(content3.refCnt(), equalTo(0));
-        if (content4 != null) {
-            assertThat(channel.readInbound(), nullValue());
-            assertThat(content4.refCnt(), equalTo(0));
-        }
-
-        assertThat(channel.readInbound(), nullValue()); // extra read still returns "null"
-
-        // channel continues in resumed state after request finishes
-        DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(lastContent);
-
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-        assertThat(channel.readInbound(), nullValue()); // lastContent
-        assertThat(lastContent.refCnt(), equalTo(0));
-
-        if (randomBoolean()) {
-            channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"));
-            assertFalse(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
+    public void testDecoderFailurePassThrough() {
+        for (var i = 0; i < 1000; i++) {
+            var httpRequest = newHttpRequest();
+            httpRequest.setDecoderResult(DecoderResult.failure(new Exception("bad")));
+            channel.writeInbound(httpRequest);
+            assertEquals(httpRequest, channel.readInbound());
         }
     }
 
-    public void testValidationErrorForwardsAsDecoderErrorMessage() {
-        for (Exception exception : List.of(
-            new Exception("Failure"),
-            new ElasticsearchException("Failure"),
-            new ElasticsearchSecurityException("Failure")
-        )) {
-            assertTrue(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-            final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-            final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
+    /**
+     * Sends back-to-back http requests and randomly fail validation.
+     * Ensures that invalid requests drop content and valid pass through.
+     */
+    public void testMixedValidationResults() {
+        for (var i = 0; i < 1000; i++) {
+            var shouldPassValidation = randomBoolean();
+            var request = newHttpRequest();
+            var content = newHttpContent();
+            var last = newLastHttpContent();
 
             channel.writeInbound(request);
-            channel.writeInbound(content);
-
-            assertThat(header.get(), sameInstance(request));
-            assertThat(channel.readInbound(), nullValue());
-            assertFalse(channel.config().isAutoRead());
-
-            listener.get().onFailure(exception);
-            channel.runPendingTasks();
-            assertTrue(channel.config().isAutoRead());
-            DefaultHttpRequest failed = channel.readInbound();
-            assertThat(failed, sameInstance(request));
-            assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue());
-            assertTrue(failed.decoderResult().isFailure());
-            Exception cause = (Exception) failed.decoderResult().cause();
-            assertThat(cause, equalTo(exception));
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
-
-            assertThat(channel.readInbound(), nullValue());
-            assertThat(content.refCnt(), equalTo(0));
-
-            DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
-            channel.writeInbound(lastContent);
-            assertTrue(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-            assertThat(channel.readInbound(), nullValue());
-            assertThat(lastContent.refCnt(), equalTo(0));
-
-            reset();
-        }
-    }
-
-    public void testValidationExceptionForwardsAsDecoderErrorMessage() {
-        final var exception = new ElasticsearchException("Failure");
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-
-        validationException.set(exception);
-        channel.writeInbound(request);
-
-        assertThat(header.get(), sameInstance(request));
-        assertThat(listener.get(), nullValue());
-
-        channel.runPendingTasks();
-        assertTrue(channel.config().isAutoRead());
-        DefaultHttpRequest failed = channel.readInbound();
-        assertThat(failed, sameInstance(request));
-        assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue());
-        assertTrue(failed.decoderResult().isFailure());
-        Exception cause = (Exception) failed.decoderResult().cause();
-        assertThat(cause, equalTo(exception));
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
-
-        final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(content);
-
-        assertThat(channel.readInbound(), nullValue());
-        assertThat(content.refCnt(), equalTo(0));
-
-        DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(lastContent);
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-        assertThat(channel.readInbound(), nullValue());
-        assertThat(lastContent.refCnt(), equalTo(0));
-    }
-
-    public void testValidationHandlesMultipleQueuedUpMessages() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4));
-        DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(request1);
-        channel.writeInbound(content1);
-        channel.writeInbound(lastContent1);
-        final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
-        DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(request2);
-        channel.writeInbound(content2);
-        channel.writeInbound(lastContent2);
-
-        assertThat(header.get(), sameInstance(request1));
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
-
-        listener.get().onResponse(null);
-        channel.runPendingTasks();
-        assertThat(channel.readInbound(), sameInstance(request1));
-        assertThat(channel.readInbound(), sameInstance(content1));
-        assertThat(channel.readInbound(), sameInstance(lastContent1));
-        assertThat(content1.refCnt(), equalTo(1));
-        assertThat(lastContent1.refCnt(), equalTo(1));
-
-        assertThat(header.get(), sameInstance(request2));
-
-        assertFalse(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
-        assertThat(channel.readInbound(), nullValue());
-
-        listener.get().onResponse(null);
-        channel.runPendingTasks();
-        assertThat(channel.readInbound(), sameInstance(request2));
-        assertThat(channel.readInbound(), sameInstance(content2));
-        assertThat(channel.readInbound(), sameInstance(lastContent2));
-        assertThat(content2.refCnt(), equalTo(1));
-        assertThat(lastContent2.refCnt(), equalTo(1));
-
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-        assertThat(channel.readInbound(), nullValue());
-    }
-
-    public void testValidationFailureRecoversForEnqueued() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        // write 2 requests before validation for the first one fails
-        final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4));
-        DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(request1);
-        channel.writeInbound(content1);
-        channel.writeInbound(lastContent1);
-        final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
-        DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(request2);
-        channel.writeInbound(content2);
-
-        boolean finishSecondRequest = randomBoolean();
-        if (finishSecondRequest) {
-            channel.writeInbound(lastContent2);
-        }
-
-        // channel is paused and both requests are queued
-        assertThat(header.get(), sameInstance(request1));
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
-        assertThat(content1.refCnt(), equalTo(2));
-        assertThat(lastContent1.refCnt(), equalTo(2));
-        assertThat(content2.refCnt(), equalTo(2));
-        if (finishSecondRequest) {
-            assertThat(lastContent2.refCnt(), equalTo(2));
-        }
-
-        // validation for the 1st request FAILS
-        Exception exception = new ElasticsearchException("Boom");
-        listener.get().onFailure(exception);
-        channel.runPendingTasks();
-
-        // request1 becomes a decoder exception and its content is dropped
-        assertThat(channel.readInbound(), sameInstance(request1));
-        assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue());
-        assertTrue(request1.decoderResult().isFailure());
-        Exception cause = (Exception) request1.decoderResult().cause();
-        assertThat(cause, equalTo(exception));
-        assertThat(content1.refCnt(), equalTo(0)); // content is dropped
-        assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped
-        assertThat(channel.readInbound(), nullValue());
-
-        // channel pauses for the validation of the 2nd request
-        assertThat(header.get(), sameInstance(request2));
-        assertFalse(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
-        assertThat(channel.readInbound(), nullValue());
-
-        // validation for the 2nd request SUCCEEDS
-        listener.get().onResponse(null);
-        channel.runPendingTasks();
-
-        // 2nd request is forwarded correctly
-        assertThat(channel.readInbound(), sameInstance(request2));
-        assertThat(channel.readInbound(), sameInstance(content2));
-        assertThat(content2.refCnt(), equalTo(1));
-
-        if (finishSecondRequest == false) {
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
-            assertTrue(channel.config().isAutoRead());
-            assertThat(channel.readInbound(), nullValue());
-            // while in forwarding state the request can continue
-            if (randomBoolean()) {
-                DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
-                channel.writeInbound(content);
-                assertThat(channel.readInbound(), sameInstance(content));
-                assertThat(content.refCnt(), equalTo(1));
+            var validationRequest = validatorRequestQueue.poll();
+            assertNotNull(validationRequest);
+            if (shouldPassValidation) {
+                validationRequest.listener.onResponse(null);
+            } else {
+                validationRequest.listener.onFailure(new ValidationException());
             }
-            channel.writeInbound(lastContent2);
-        }
-
-        assertThat(channel.readInbound(), sameInstance(lastContent2));
-        assertThat(lastContent2.refCnt(), equalTo(1));
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-        assertTrue(channel.config().isAutoRead());
-    }
-
-    public void testValidationFailureRecoversForInbound() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        // write a single request, but don't finish it yet, for which the validation fails
-        final DefaultHttpRequest request1 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        DefaultHttpContent content1 = new DefaultHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(request1);
-        channel.writeInbound(content1);
-
-        // channel is paused and the request is queued
-        assertThat(header.get(), sameInstance(request1));
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
-        assertThat(content1.refCnt(), equalTo(2));
-
-        // validation for the 1st request FAILS
-        Exception exception = new ElasticsearchException("Boom");
-        listener.get().onFailure(exception);
-        channel.runPendingTasks();
-
-        // request1 becomes a decoder exception and its content is dropped
-        assertThat(channel.readInbound(), sameInstance(request1));
-        assertThat(request1.headers().get(HttpHeaderNames.CONNECTION), nullValue());
-        assertTrue(request1.decoderResult().isFailure());
-        Exception cause = (Exception) request1.decoderResult().cause();
-        assertThat(cause, equalTo(exception));
-        assertThat(content1.refCnt(), equalTo(0)); // content is dropped
-        assertThat(channel.readInbound(), nullValue());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));
-
-        if (randomBoolean()) {
-            channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4)));
-        }
-        DefaultLastHttpContent lastContent1 = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(lastContent1);
-        if (randomBoolean()) {
-            assertThat(channel.readInbound(), nullValue());
-        }
-        assertThat(lastContent1.refCnt(), equalTo(0)); // content is dropped
-
-        // write 2nd request after the 1st one failed validation
-        final DefaultHttpRequest request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        DefaultHttpContent content2 = new DefaultHttpContent(Unpooled.buffer(4));
-        DefaultLastHttpContent lastContent2 = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(request2);
-        channel.writeInbound(content2);
-        boolean finishSecondRequest = randomBoolean();
-        if (finishSecondRequest) {
-            channel.writeInbound(lastContent2);
-        }
-
-        // channel pauses for the validation of the 2nd request
-        assertThat(header.get(), sameInstance(request2));
-        assertFalse(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
-        assertThat(channel.readInbound(), nullValue());
-
-        // validation for the 2nd request SUCCEEDS
-        listener.get().onResponse(null);
-        channel.runPendingTasks();
+            channel.runPendingTasks();
 
-        // 2nd request is forwarded correctly
-        assertThat(channel.readInbound(), sameInstance(request2));
-        assertThat(channel.readInbound(), sameInstance(content2));
-        assertThat(content2.refCnt(), equalTo(1));
+            var gotRequest = channel.readInbound();
+            assertEquals(
+                "should set decoder result failure for invalid request",
+                shouldPassValidation,
+                ((HttpRequest) gotRequest).decoderResult().isSuccess()
+            );
+            assertEquals(request, gotRequest);
 
-        if (finishSecondRequest == false) {
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(FORWARDING_DATA_UNTIL_NEXT_REQUEST));
-            assertTrue(channel.config().isAutoRead());
-            assertThat(channel.readInbound(), nullValue());
-            // while in forwarding state the request can continue
-            if (randomBoolean()) {
-                DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
-                channel.writeInbound(content);
-                assertThat(channel.readInbound(), sameInstance(content));
-                assertThat(content.refCnt(), equalTo(1));
+            channel.writeInbound(content);
+            channel.writeInbound(last);
+            if (shouldPassValidation) {
+                assertEquals("should pass content for valid request", content, channel.readInbound());
+                assertEquals(last, channel.readInbound());
+            } else {
+                assertNull("should drop content for invalid request", channel.readInbound());
             }
-            channel.writeInbound(lastContent2);
         }
-
-        assertThat(channel.readInbound(), sameInstance(lastContent2));
-        assertThat(lastContent2.refCnt(), equalTo(1));
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-        assertTrue(channel.config().isAutoRead());
     }
 
-    public void testValidationSuccessForLargeMessage() {
-        assertTrue(channel.config().isAutoRead());
+    public void testIgnoreReadWhenValidating() {
+        channel.pipeline().addFirst(new FlowControlHandler()); // catch all inbound messages
 
-        final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        channel.writeInbound(request);
+        channel.writeInbound(newHttpRequest());
+        channel.writeInbound(newLastHttpContent()); // should hold by flow-control-handler
+        assertNull("nothing should pass yet", channel.readInbound());
 
-        int messageLength = randomIntBetween(32, 128);
-        for (int i = 0; i < messageLength; ++i) {
-            channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(4)));
-        }
-        channel.writeInbound(new DefaultLastHttpContent(Unpooled.buffer(4)));
-        boolean followupRequest = randomBoolean();
-        if (followupRequest) {
-            channel.writeInbound(request);
-        }
+        channel.read();
+        var validationRequest = validatorRequestQueue.poll();
+        assertNotNull(validationRequest);
 
-        assertThat(header.get(), sameInstance(request));
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
+        channel.read();
+        assertNull("should ignore read while validating", channel.readInbound());
 
-        listener.get().onResponse(null);
+        validationRequest.listener.onResponse(null);
         channel.runPendingTasks();
-        if (followupRequest) {
-            assertFalse(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
-        } else {
-            assertTrue(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-        }
-        assertThat(channel.readInbound(), sameInstance(request));
-        for (int i = 0; i < messageLength; ++i) {
-            Object content = channel.readInbound();
-            assertThat(content, instanceOf(DefaultHttpContent.class));
-            assertThat(((DefaultHttpContent) content).refCnt(), equalTo(1));
-        }
-        assertThat(channel.readInbound(), instanceOf(LastHttpContent.class));
-        assertThat(channel.readInbound(), nullValue());
-    }
-
-    public void testValidationFailureForLargeMessage() {
-        assertTrue(channel.config().isAutoRead());
-
-        final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
-        channel.writeInbound(request);
+        assertTrue("http request should pass", channel.readInbound() instanceof HttpRequest);
+        assertNull("content should not pass yet, need explicit read", channel.readInbound());
 
-        int messageLength = randomIntBetween(32, 128);
-        DefaultHttpContent[] messageContents = new DefaultHttpContent[messageLength];
-        for (int i = 0; i < messageLength; ++i) {
-            messageContents[i] = new DefaultHttpContent(Unpooled.buffer(4));
-            channel.writeInbound(messageContents[i]);
-        }
-        DefaultLastHttpContent lastHttpContent = new DefaultLastHttpContent(Unpooled.buffer(4));
-        channel.writeInbound(lastHttpContent);
-        boolean followupRequest = randomBoolean();
-        if (followupRequest) {
-            channel.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"));
-        }
-
-        assertThat(header.get(), sameInstance(request));
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
-
-        Exception exception = new ElasticsearchException("Boom");
-        listener.get().onFailure(exception);
-        channel.runPendingTasks();
-        if (followupRequest) {
-            assertFalse(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(QUEUEING_DATA));
-        } else {
-            assertTrue(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-        }
-        assertThat(channel.readInbound(), sameInstance(request));
-        assertThat(request.headers().get(HttpHeaderNames.CONNECTION), nullValue());
-        assertTrue(request.decoderResult().isFailure());
-        Exception cause = (Exception) request.decoderResult().cause();
-        assertThat(cause, equalTo(exception));
-        for (int i = 0; i < messageLength; ++i) {
-            assertThat(channel.readInbound(), nullValue());
-            assertThat(messageContents[i].refCnt(), equalTo(0));
-        }
-        assertThat(channel.readInbound(), nullValue());
-        assertThat(lastHttpContent.refCnt(), equalTo(0));
-        assertThat(channel.readInbound(), nullValue());
+        channel.read();
+        assertTrue(channel.readInbound() instanceof LastHttpContent);
     }
 
-    public void testFullRequestValidationFailure() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
+    public void testWithFlowControlAndAggregator() {
+        channel.pipeline().addFirst(new FlowControlHandler());
+        channel.pipeline().addLast(new Netty4HttpAggregator(8192, (req) -> true, new HttpRequestDecoder()));
 
-        ByteBuf buf = channel.alloc().buffer();
-        ByteBufUtil.copy(AsciiString.of("test full http request"), buf);
-        final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf);
-        channel.writeInbound(request);
+        channel.writeInbound(newHttpRequest());
+        channel.writeInbound(newHttpContent());
+        channel.writeInbound(newLastHttpContent());
 
-        // request got through to validation
-        assertThat(header.get(), sameInstance(request));
-        // channel is paused
-        assertThat(channel.readInbound(), nullValue());
-        assertFalse(channel.config().isAutoRead());
+        channel.read();
+        assertNull("should ignore read while validating", channel.readInbound());
 
-        // validation fails
-        Exception exception = new ElasticsearchException("Boom");
-        listener.get().onFailure(exception);
+        var validationRequest = validatorRequestQueue.poll();
+        assertNotNull(validationRequest);
+        validationRequest.listener.onResponse(null);
         channel.runPendingTasks();
 
-        // channel is resumed and waiting for next request
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        DefaultFullHttpRequest throughRequest = channel.readInbound();
-        // "through request" contains a decoder exception
-        assertThat(throughRequest, not(sameInstance(request)));
-        assertTrue(throughRequest.decoderResult().isFailure());
-        // the content is cleared when validation fails
-        assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is(""));
-        assertThat(buf.refCnt(), is(0));
-        Exception cause = (Exception) throughRequest.decoderResult().cause();
-        assertThat(cause, equalTo(exception));
+        assertTrue(channel.readInbound() instanceof FullHttpRequest);
     }
 
-    public void testFullRequestValidationSuccess() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        ByteBuf buf = channel.alloc().buffer();
-        try {
-            ByteBufUtil.copy(AsciiString.of("test full http request"), buf);
-            final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf);
-            channel.writeInbound(request);
-
-            // request got through to validation
-            assertThat(header.get(), sameInstance(request));
-            // channel is paused
-            assertThat(channel.readInbound(), nullValue());
-            assertFalse(channel.config().isAutoRead());
+    record ValidationRequest(HttpRequest request, Channel channel, ActionListener<Void> listener) {}
 
-            // validation succeeds
-            listener.get().onResponse(null);
-            channel.runPendingTasks();
-
-            // channel is resumed and waiting for next request
-            assertTrue(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-            DefaultFullHttpRequest throughRequest = channel.readInbound();
-            // request goes through unaltered
-            assertThat(throughRequest, sameInstance(request));
-            assertFalse(throughRequest.decoderResult().isFailure());
-            // the content is unaltered
-            assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request"));
-            assertThat(buf.refCnt(), is(1));
-            assertThat(throughRequest.decoderResult().cause(), nullValue());
-        } finally {
-            buf.release();
-        }
-    }
-
-    public void testFullRequestWithDecoderException() {
-        assertTrue(channel.config().isAutoRead());
-        assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-        ByteBuf buf = channel.alloc().buffer();
-        try {
-            ByteBufUtil.copy(AsciiString.of("test full http request"), buf);
-            // a request with a decoder error prior to validation
-            final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri", buf);
-            Exception cause = new ElasticsearchException("Boom");
-            request.setDecoderResult(DecoderResult.failure(cause));
-            channel.writeInbound(request);
-
-            // request goes through without invoking the validator
-            assertThat(header.get(), nullValue());
-            assertThat(listener.get(), nullValue());
-            // channel is NOT paused
-            assertTrue(channel.config().isAutoRead());
-            assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
-
-            DefaultFullHttpRequest throughRequest = channel.readInbound();
-            // request goes through unaltered
-            assertThat(throughRequest, sameInstance(request));
-            assertTrue(throughRequest.decoderResult().isFailure());
-            assertThat(throughRequest.decoderResult().cause(), equalTo(cause));
-            // the content is unaltered
-            assertThat(new String(ByteBufUtil.getBytes(throughRequest.content()), StandardCharsets.UTF_8), is("test full http request"));
-            assertThat(buf.refCnt(), is(1));
-        } finally {
-            buf.release();
-        }
-    }
 }

+ 37 - 63
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java

@@ -21,7 +21,6 @@ import io.netty.handler.codec.http.HttpContent;
 import io.netty.handler.flow.FlowControlHandler;
 
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
-import org.elasticsearch.common.network.ThreadWatchdog;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.http.HttpBody;
@@ -43,17 +42,24 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
     static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
     private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
     private EmbeddedChannel channel;
+    private ReadSniffer readSniffer;
     private Netty4HttpRequestBodyStream stream;
-    private ThreadWatchdog.ActivityTracker activityTracker;
 
     @Override
     public void setUp() throws Exception {
         super.setUp();
         channel = new EmbeddedChannel();
-        activityTracker = new ThreadWatchdog.ActivityTracker();
-        stream = new Netty4HttpRequestBodyStream(channel, threadContext, activityTracker);
-        stream.setHandler(discardHandler); // set default handler, each test might override one
+        readSniffer = new ReadSniffer();
+        channel.pipeline().addLast(new FlowControlHandler(), readSniffer);
+        channel.config().setAutoRead(false);
         channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
+            @Override
+            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+                stream = new Netty4HttpRequestBodyStream(ctx, threadContext);
+                stream.setHandler(discardHandler); // set default handler, each test might override one
+                super.handlerAdded(ctx);
+            }
+
             @Override
             protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
                 stream.handleNettyContent(msg);
@@ -67,17 +73,8 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
         stream.close();
     }
 
-    // ensures that no chunks are sent downstream without request
-    public void testEnqueueChunksBeforeRequest() {
-        var totalChunks = randomIntBetween(1, 100);
-        for (int i = 0; i < totalChunks; i++) {
-            channel.writeInbound(randomContent(1024));
-        }
-        assertEquals(totalChunks * 1024, stream.bufSize());
-    }
-
-    // ensures all received chunks can be flushed downstream
-    public void testFlushAllReceivedChunks() {
+    // ensures all chunks are passed to downstream
+    public void testPassAllChunks() {
         var chunks = new ArrayList<ReleasableBytesReference>();
         var totalBytes = new AtomicInteger();
         stream.setHandler((chunk, isLast) -> {
@@ -85,52 +82,35 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
             totalBytes.addAndGet(chunk.length());
             chunk.close();
         });
-
         var chunkSize = 1024;
         var totalChunks = randomIntBetween(1, 100);
         for (int i = 0; i < totalChunks; i++) {
             channel.writeInbound(randomContent(chunkSize));
+            stream.next();
+            channel.runPendingTasks();
+
         }
-        stream.next();
-        channel.runPendingTasks();
-        assertEquals("should receive all chunks as single composite", 1, chunks.size());
+        assertEquals(totalChunks, chunks.size());
         assertEquals(chunkSize * totalChunks, totalBytes.get());
     }
 
-    // ensures that channel.setAutoRead(true) only when we flush last chunk
-    public void testSetAutoReadOnLastFlush() {
+    // ensures that we read from channel after last chunk
+    public void testChannelReadAfterLastContent() {
         channel.writeInbound(randomLastContent(10));
-        assertFalse("should not auto-read on last content reception", channel.config().isAutoRead());
         stream.next();
         channel.runPendingTasks();
-        assertTrue("should set auto-read once last content is flushed", channel.config().isAutoRead());
+        assertEquals("should have at least 2 reads, one for last content, and one after last", 2, readSniffer.readCount);
     }
 
-    // ensures that we read from channel when no current chunks available
-    // and pass next chunk downstream without holding
-    public void testReadFromChannel() {
-        var gotChunks = new ArrayList<ReleasableBytesReference>();
-        var gotLast = new AtomicBoolean(false);
-        stream.setHandler((chunk, isLast) -> {
-            gotChunks.add(chunk);
-            gotLast.set(isLast);
-            chunk.close();
-        });
-        channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read()
-        var chunkSize = 1024;
-        var totalChunks = randomIntBetween(1, 32);
-        for (int i = 0; i < totalChunks - 1; i++) {
-            channel.writeInbound(randomContent(chunkSize));
-        }
-        channel.writeInbound(randomLastContent(chunkSize));
-
-        for (int i = 0; i < totalChunks; i++) {
-            assertEquals("should not enqueue chunks", 0, stream.bufSize());
-            stream.next();
-            channel.runPendingTasks();
-            assertEquals("each next() should produce single chunk", i + 1, gotChunks.size());
-        }
-        assertTrue("should receive last content", gotLast.get());
+    // ensures when stream is closing we read and discard chunks
+    public void testReadAndReleaseOnClosing() {
+        var unexpectedChunk = new AtomicBoolean();
+        stream.setHandler((chunk, isLast) -> unexpectedChunk.set(true));
+        stream.close();
+        channel.writeInbound(randomContent(1024));
+        channel.writeInbound(randomLastContent(0));
+        assertFalse("chunk should be discarded", unexpectedChunk.get());
+        assertEquals("expect 3 reads, a first from stream.close, and other two after chunks", 3, readSniffer.readCount);
     }
 
     public void testReadFromHasCorrectThreadContext() throws InterruptedException {
@@ -142,9 +122,15 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
         try {
             // activity tracker requires stream execution in the same thread, setting up stream inside event-loop
             eventLoop.submit(() -> {
-                channel = new EmbeddedChannel();
-                stream = new Netty4HttpRequestBodyStream(channel, threadContext, new ThreadWatchdog.ActivityTracker());
+                channel = new EmbeddedChannel(new FlowControlHandler());
+                channel.config().setAutoRead(false);
                 channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
+                    @Override
+                    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+                        stream = new Netty4HttpRequestBodyStream(ctx, threadContext);
+                        super.handlerAdded(ctx);
+                    }
+
                     @Override
                     protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
                         stream.handleNettyContent(msg);
@@ -198,18 +184,6 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
         }
     }
 
-    public void testStreamNextActivityTracker() {
-        var t0 = activityTracker.get();
-        var N = between(1, 10);
-        for (int i = 0; i < N; i++) {
-            channel.writeInbound(randomContent(1024));
-            stream.next();
-            channel.runPendingTasks();
-        }
-        var t1 = activityTracker.get();
-        assertEquals("stream#next() must trigger activity tracker: N*step=" + N + "*2=" + N * 2L + " times", t1, t0 + N * 2L);
-    }
-
     // ensure that we catch all exceptions and throw them into channel pipeline
     public void testCatchExceptions() {
         var gotExceptions = new CountDownLatch(3); // number of tests below

+ 42 - 0
modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/ReadSniffer.java

@@ -0,0 +1,42 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.http.netty4;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+
+/**
+ * Sniffs channel reads, helps detect missing or unexpected ones.
+ * <pre>
+ *     {@code
+ *     chan = new EmbeddedChannel();
+ *     chan.config().setAutoRead(false);
+ *     readSniffer = new ReadSniffer();
+ *     chan.pipeline().addLast(readSniffer, ...otherHandlers);
+ *     ...
+ *     // run test
+ *     ...
+ *     assertEquals("unexpected read", 0, readSniffer.readCnt)
+ *     // or
+ *     assertEquals("exact number of reads", 2, readSniffer.readCnt)
+ *     }
+ * </pre>
+ *
+ */
+public class ReadSniffer extends ChannelOutboundHandlerAdapter {
+
+    int readCount;
+
+    @Override
+    public void read(ChannelHandlerContext ctx) throws Exception {
+        readCount++;
+        super.read(ctx);
+    }
+}

+ 1 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java

@@ -251,7 +251,7 @@ public class SecurityNetty4HttpServerTransportCloseNotifyTests extends AbstractH
             server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release());
             server.netty.stop();
             server.threadPool.shutdownNow();
-            safeAwait(client.netty.config().group().shutdownGracefully());
+            safeAwait(client.netty.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS));
         }
     }