Pārlūkot izejas kodu

Merge pull request #333 from jasonhx140/master

fix bug:异常网络连接导致canal server 无法关闭
agapple 7 gadi atpakaļ
vecāks
revīzija
780b4f715b

+ 24 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNetty.java

@@ -9,6 +9,8 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
@@ -32,6 +34,7 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
     private int                     port;
     private Channel                 serverChannel = null;
     private ServerBootstrap         bootstrap     = null;
+    private ChannelGroup            childGroups   = null;  //socket channel container, used to close sockets explicitly.
 
     private static class SingletonHolder {
 
@@ -40,6 +43,7 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
 
     private CanalServerWithNetty(){
         this.embeddedServer = CanalServerWithEmbedded.instance();
+        this.childGroups = new DefaultChannelGroup();
     }
 
     public static CanalServerWithNetty instance() {
@@ -55,6 +59,19 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
 
         this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
             Executors.newCachedThreadPool()));
+        /*
+         * enable keep-alive mechanism, handle abnormal network connection scenarios on OS level.
+         * the threshold parameters are depended on OS. 
+         * e.g. On Linux:
+         * net.ipv4.tcp_keepalive_time = 300
+         * net.ipv4.tcp_keepalive_probes = 2 
+         * net.ipv4.tcp_keepalive_intvl = 30 
+         */
+        bootstrap.setOption("child.keepAlive", true);
+        /*
+         * optional parameter.
+         */
+        bootstrap.setOption("child.tcpNoDelay", true);
 
         // 构造对应的pipeline
         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -62,7 +79,8 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
             public ChannelPipeline getPipeline() throws Exception {
                 ChannelPipeline pipelines = Channels.pipeline();
                 pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
-                pipelines.addLast(HandshakeInitializationHandler.class.getName(), new HandshakeInitializationHandler());
+                //support to maintain child socket channel.
+                pipelines.addLast(HandshakeInitializationHandler.class.getName(), new HandshakeInitializationHandler(childGroups));
                 pipelines.addLast(ClientAuthenticationHandler.class.getName(),
                     new ClientAuthenticationHandler(embeddedServer));
 
@@ -87,6 +105,11 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
             this.serverChannel.close().awaitUninterruptibly(1000);
         }
 
+        //close sockets explicitly to reduce socket channel hung in complicated network environment.
+        if (this.childGroups != null) {
+        	this.childGroups.close().awaitUninterruptibly(5000);
+        }
+
         if (this.bootstrap != null) {
             this.bootstrap.releaseExternalResources();
         }

+ 4 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.server.netty.handler;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang.StringUtils;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.ChannelFuture;
@@ -87,10 +89,11 @@ public class ClientAuthenticationHandler extends SimpleChannelHandler {
                         if (clientAuth.getNetWriteTimeout() > 0) {
                             writeTimeout = clientAuth.getNetWriteTimeout();
                         }
+                        //fix bug: soTimeout parameter's unit from connector is millseconds.
                         IdleStateHandler idleStateHandler = new IdleStateHandler(NettyUtils.hashedWheelTimer,
                             readTimeout,
                             writeTimeout,
-                            0);
+                            0, TimeUnit.MILLISECONDS);
                         ctx.getPipeline().addBefore(SessionHandler.class.getName(),
                             IdleStateHandler.class.getName(),
                             idleStateHandler);

+ 12 - 0
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/HandshakeInitializationHandler.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.server.netty.handler;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -18,10 +19,21 @@ import com.alibaba.otter.canal.server.netty.NettyUtils;
  * @version 1.0.0
  */
 public class HandshakeInitializationHandler extends SimpleChannelHandler {
+	//support to maintain socket channel.
+	private ChannelGroup childGroups;
+
+	public HandshakeInitializationHandler(ChannelGroup childGroups) {
+		this.childGroups = childGroups;
+	}
 
     private static final Logger logger = LoggerFactory.getLogger(HandshakeInitializationHandler.class);
 
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+    	//add new socket channel in channel container, used to manage sockets.
+    	if(childGroups != null) {
+    		childGroups.add(ctx.getChannel());
+    	}
+
         byte[] body = Packet.newBuilder()
             .setType(CanalPacket.PacketType.HANDSHAKE)
             .setBody(Handshake.newBuilder().build().toByteString())