Browse Source

Preserve thread context when calling `getNextPart()` (#109519)

The listener supplied to `getNextPart()` may be completed in a
non-default thread context, and this trips assertions about thread
context pollution in the transport layer, so this commit adds the
missing protection against that.

Also the listener supplied to `getNextPart()` may be completed
immediately on the transport thread, which could lead to a stack
overflow, so this commit adds another `execute()` call to
unconditionally fork a fresh task.

Relates #104851
David Turner 1 year ago
parent
commit
0a008ed71d

+ 17 - 7
modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java

@@ -310,7 +310,7 @@ public class Netty4ChunkedContinuationsIT extends ESNetty4IntegTestCase {
                 safeSleep(scaledRandomIntBetween(10, 500)); // make it more likely the request started executing
             }
             cancellable.cancel();
-        } // closing the request tracker ensures that everything is released, including all response chunks and the overall response
+        } // closing the resource tracker ensures that everything is released, including all response chunks and the overall response
     }
 
     private static Releasable withResourceTracker() {
@@ -525,6 +525,7 @@ public class Netty4ChunkedContinuationsIT extends ESNetty4IntegTestCase {
         public static class Response extends ActionResponse {
             private final Executor executor;
             volatile boolean computingContinuation;
+            boolean recursive = false;
 
             public Response(Executor executor) {
                 this.executor = executor;
@@ -551,11 +552,17 @@ public class Netty4ChunkedContinuationsIT extends ESNetty4IntegTestCase {
 
                     @Override
                     public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
-                        computingContinuation = true;
-                        executor.execute(ActionRunnable.supply(listener, () -> {
-                            computingContinuation = false;
-                            return getResponseBodyPart();
-                        }));
+                        assertFalse(recursive);
+                        recursive = true;
+                        try {
+                            computingContinuation = true;
+                            executor.execute(ActionRunnable.supply(listener, () -> {
+                                computingContinuation = false;
+                                return getResponseBodyPart();
+                            }));
+                        } finally {
+                            recursive = false;
+                        }
                     }
 
                     @Override
@@ -585,7 +592,10 @@ public class Netty4ChunkedContinuationsIT extends ESNetty4IntegTestCase {
             @Override
             protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
                 executor.execute(
-                    ActionRunnable.supply(ActionTestUtils.assertNoFailureListener(listener::onResponse), () -> new Response(executor))
+                    ActionRunnable.supply(
+                        ActionTestUtils.assertNoFailureListener(listener::onResponse),
+                        () -> new Response(randomFrom(executor, EsExecutors.DIRECT_EXECUTOR_SERVICE))
+                    )
                 );
             }
         }

+ 53 - 38
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

@@ -29,6 +29,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.network.ThreadWatchdog;
@@ -271,45 +272,59 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
             writeSequence++;
             finishingWrite.combiner().finish(finishingWrite.onDone());
         } else {
+            final var threadContext = serverTransport.getThreadPool().getThreadContext();
+            assert Transports.assertDefaultThreadContext(threadContext);
             final var channel = finishingWrite.onDone().channel();
-            ActionListener.run(ActionListener.assertOnce(new ActionListener<>() {
-                @Override
-                public void onResponse(ChunkedRestResponseBodyPart continuation) {
-                    channel.writeAndFlush(
-                        new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
-                        finishingWrite.onDone() // pass the terminal listener/promise along the line
-                    );
-                    checkShutdown();
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    logger.error(
-                        Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
-                        e
-                    );
-                    channel.close().addListener(ignored -> {
-                        finishingWrite.combiner().add(channel.newFailedFuture(e));
-                        finishingWrite.combiner().finish(finishingWrite.onDone());
-                    });
-                    checkShutdown();
-                }
-
-                private void checkShutdown() {
-                    if (channel.eventLoop().isShuttingDown()) {
-                        // The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the
-                        // preceding activity made it onto its queue before shutdown or whether it will just vanish without a trace, so
-                        // to avoid a leak we must double-check that the final listener is completed once the event loop is terminated.
-                        // Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an ImmediateEventExecutor
-                        // which means this completion is not subject to the same issue, it still works even if the event loop has already
-                        // terminated.
-                        channel.eventLoop()
-                            .terminationFuture()
-                            .addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException()));
-                    }
-                }
-
-            }), finishingWriteBodyPart::getNextPart);
+            ActionListener.run(
+                new ContextPreservingActionListener<>(
+                    threadContext.newRestorableContext(false),
+                    ActionListener.assertOnce(new ActionListener<>() {
+                        @Override
+                        public void onResponse(ChunkedRestResponseBodyPart continuation) {
+                            // always fork a fresh task to avoid stack overflow
+                            assert Transports.assertDefaultThreadContext(threadContext);
+                            channel.eventLoop()
+                                .execute(
+                                    () -> channel.writeAndFlush(
+                                        new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
+                                        finishingWrite.onDone() // pass the terminal listener/promise along the line
+                                    )
+                                );
+                            checkShutdown();
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            assert Transports.assertDefaultThreadContext(threadContext);
+                            logger.error(
+                                Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
+                                e
+                            );
+                            channel.close().addListener(ignored -> {
+                                finishingWrite.combiner().add(channel.newFailedFuture(e));
+                                finishingWrite.combiner().finish(finishingWrite.onDone());
+                            });
+                            checkShutdown();
+                        }
+
+                        private void checkShutdown() {
+                            if (channel.eventLoop().isShuttingDown()) {
+                                // The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know
+                                // if the preceding activity made it onto its queue before shutdown or whether it will just vanish without a
+                                // trace, so to avoid a leak we must double-check that the final listener is completed once the event loop
+                                // is terminated. Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an
+                                // ImmediateEventExecutor which means this completion is not subject to the same issue, it still works even
+                                // if the event loop has already terminated.
+                                channel.eventLoop()
+                                    .terminationFuture()
+                                    .addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException()));
+                            }
+                        }
+
+                    })
+                ),
+                finishingWriteBodyPart::getNextPart
+            );
         }
     }