Browse Source

Merge pull request #487 from jasonhx140/master

dump connection is disconnected
agapple 7 years ago
parent
commit
2d31e6bf91

+ 15 - 3
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -27,10 +27,22 @@ public class SocketChannel {
         this.channel = channel;
     }
 
-    public void writeCache(ByteBuf buf) {
+    public void writeCache(ByteBuf buf) throws InterruptedException {
         synchronized (lock) {
-            cache.discardReadBytes();// 回收内存
-            cache.writeBytes(buf);
+            while (true) {
+                cache.discardReadBytes();// 回收内存
+                //source buffer is empty.
+                if (!buf.isReadable()) {
+                    break;
+                }
+
+                if (cache.isWritable()) {
+                    cache.writeBytes(buf, Math.min(cache.writableBytes(), buf.readableBytes()));
+                } else {
+                    //dest buffer is full.
+                    lock.wait(100);
+                }
+            }
         }
     }
 

+ 6 - 5
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -7,7 +7,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -71,7 +71,7 @@ public abstract class SocketChannelPool {
         return socket;
     }
 
-    public static class BusinessHandler extends ChannelInboundHandlerAdapter {
+    public static class BusinessHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
         private SocketChannel        socket = null;
         private final CountDownLatch latch  = new CountDownLatch(1);
@@ -92,18 +92,19 @@ public abstract class SocketChannelPool {
         }
 
         @Override
-        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
             if (socket != null) {
-                socket.writeCache((ByteBuf) msg);
+                socket.writeCache(msg);
             } else {
                 // TODO: need graceful error handler.
                 logger.error("no socket available.");
             }
-            ReferenceCountUtil.release(msg);// 添加防止内存泄漏的
         }
 
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            //need output error for troubeshooting.
+            logger.error("business error.", cause);
             ctx.close();
         }
     }