|
@@ -11,22 +11,30 @@ package org.elasticsearch.transport.netty4;
|
|
|
import io.netty.buffer.ByteBuf;
|
|
|
import io.netty.buffer.CompositeByteBuf;
|
|
|
import io.netty.buffer.Unpooled;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.DefaultChannelPromise;
|
|
|
import io.netty.util.NettyRuntime;
|
|
|
+import io.netty.util.concurrent.Future;
|
|
|
+import io.netty.util.concurrent.ImmediateEventExecutor;
|
|
|
|
|
|
import org.apache.lucene.util.BytesRef;
|
|
|
import org.apache.lucene.util.BytesRefIterator;
|
|
|
+import org.elasticsearch.ExceptionsHelper;
|
|
|
+import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.recycler.Recycler;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
import org.elasticsearch.core.Booleans;
|
|
|
+import org.elasticsearch.transport.TransportException;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.concurrent.RejectedExecutionException;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
public class Netty4Utils {
|
|
@@ -121,4 +129,57 @@ public class Netty4Utils {
|
|
|
setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
|
|
|
return NettyAllocator.getRecycler();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Calls {@link Channel#writeAndFlush} to write the given message to the given channel, but ensures that the listener is completed even
|
|
|
+ * if the event loop is concurrently shutting down since Netty does not offer this guarantee.
|
|
|
+ */
|
|
|
+ public static void safeWriteAndFlush(Channel channel, Object message, ActionListener<Void> listener) {
|
|
|
+ // Use ImmediateEventExecutor.INSTANCE since we want to be able to complete this promise, and any waiting listeners, even if the
|
|
|
+ // channel's event loop has shut down. Normally this completion will happen on the channel's event loop anyway because the write op
|
|
|
+ // can only be completed by some network event from this point on. However...
|
|
|
+ final var promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
|
|
|
+ addListener(promise, listener);
|
|
|
+ assert assertCorrectPromiseListenerThreading(channel, promise);
|
|
|
+ channel.writeAndFlush(message, promise);
|
|
|
+ if (channel.eventLoop().isShuttingDown()) {
|
|
|
+ // ... if we get here then the event loop may already have terminated, and https://github.com/netty/netty/issues/8007 means that
|
|
|
+ // we cannot know if the preceding writeAndFlush made it onto its queue before shutdown or whether it will just vanish without a
|
|
|
+ // trace, so to avoid a leak we must double-check that the final listener is completed.
|
|
|
+ channel.eventLoop().terminationFuture().addListener(ignored ->
|
|
|
+ // NB the promise executor is ImmediateEventExecutor.INSTANCE which means this call to tryFailure() will ensure its completion,
|
|
|
+ // and the completion of any waiting listeners, without forking away from the current thread. The current thread might be the
|
|
|
+ // thread that was running the event loop since that's where the terminationFuture is completed, or it might be a thread which
|
|
|
+ // called (and is still calling) safeWriteAndFlush.
|
|
|
+ promise.tryFailure(new TransportException("Cannot send network message, event loop is shutting down.")));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static boolean assertCorrectPromiseListenerThreading(Channel channel, Future<?> promise) {
|
|
|
+ final var eventLoop = channel.eventLoop();
|
|
|
+ promise.addListener(future -> {
|
|
|
+ assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || channel.eventLoop().isTerminated()
|
|
|
+ : future.cause();
|
|
|
+ });
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Subscribes the given {@link ActionListener} to the given {@link Future}.
|
|
|
+ */
|
|
|
+ public static void addListener(Future<Void> future, ActionListener<Void> listener) {
|
|
|
+ future.addListener(f -> {
|
|
|
+ if (f.isSuccess()) {
|
|
|
+ listener.onResponse(null);
|
|
|
+ } else {
|
|
|
+ final Throwable cause = f.cause();
|
|
|
+ ExceptionsHelper.maybeDieOnAnotherThread(cause);
|
|
|
+ if (cause instanceof Exception exception) {
|
|
|
+ listener.onFailure(exception);
|
|
|
+ } else {
|
|
|
+ listener.onFailure(new Exception(cause));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|