Browse Source

Fix issue where pages aren't released (#27459)

This is related to #27422. Right now when we send a write to the netty
transport, we attach a listener to the future. When you submit a write
on the netty event loop and the event loop is shutdown, the onFailure
method is called. Unfortunately, netty then tries to notify the listener
which cannot be done without dispatching to the event loop. In this
case, the dispatch fails and netty logs and error and does not tell us.

This commit checks that netty is still not shutdown after sending a
message. If netty is shutdown, we complete the listener.
Tim Brooks 8 years ago
parent
commit
4e04f95ab4

+ 11 - 2
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java

@@ -22,13 +22,17 @@ package org.elasticsearch.transport.netty4;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.util.Supplier;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.transport.TcpChannel;
+import org.elasticsearch.transport.TransportException;
 
 import java.net.InetSocketAddress;
+import java.nio.channels.ClosedSelectorException;
 import java.util.concurrent.CompletableFuture;
 
 public class NettyTcpChannel implements TcpChannel {
@@ -80,8 +84,8 @@ public class NettyTcpChannel implements TcpChannel {
 
     @Override
     public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
-        final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));
-        future.addListener(f -> {
+        ChannelPromise writePromise = channel.newPromise();
+        writePromise.addListener(f -> {
             if (f.isSuccess()) {
                 listener.onResponse(null);
             } else {
@@ -91,6 +95,11 @@ public class NettyTcpChannel implements TcpChannel {
                 listener.onFailure((Exception) cause);
             }
         });
+        channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise);
+        
+        if (channel.eventLoop().isShutdown()) {
+            listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
+        }
     }
 
     public Channel getLowLevelChannel() {