Browse Source

Merge pull request #473 from jasonhx140/master

connect mysql server failed when reconnect mysql server frequently
agapple 7 years ago
parent
commit
d1f1c0716b

+ 22 - 17
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -54,31 +54,25 @@ public abstract class SocketChannelPool {
     }
 
     public static SocketChannel open(SocketAddress address) throws Exception {
-        final SocketChannel socket = new SocketChannel();
-        final BooleanMutex mutex = new BooleanMutex(false);
-        boot.connect(address).addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture arg0) throws Exception {
-                if (arg0.isSuccess()) {
-                    socket.setChannel(arg0.channel());
-                }
+        SocketChannel socket = null;
+        ChannelFuture future = boot.connect(address).sync();
 
-                mutex.set(true);
-            }
-        });
-        // wait for complete
-        mutex.get();
-        if (null == socket.getChannel()) {
+        if (future.isSuccess()) {
+            future.channel().pipeline().get(BusinessHandler.class).latch.await();
+            socket = chManager.get(future.channel());
+        }
+
+        if (null == socket) {
             throw new IOException("can't create socket!");
         }
-        chManager.put(socket.getChannel(), socket);
+
         return socket;
     }
 
     public static class BusinessHandler extends ChannelInboundHandlerAdapter {
 
         private SocketChannel socket = null;
+        private final CountDownLatch latch = new CountDownLatch(1);
 
         @Override
         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@@ -86,11 +80,22 @@ public abstract class SocketChannelPool {
             chManager.remove(ctx.channel());// 移除
         }
 
+        @Override
+		public void channelActive(ChannelHandlerContext ctx) throws Exception {
+			socket = new SocketChannel();
+			socket.setChannel(ctx.channel());
+			chManager.put(ctx.channel(), socket);
+			latch.countDown();
+			super.channelActive(ctx);
+		}
+
         @Override
         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-            if (null == socket) socket = chManager.get(ctx.channel());
             if (socket != null) {
                 socket.writeCache((ByteBuf) msg);
+            } else {
+                //TODO: need graceful error handler.
+                logger.error("no socket available.");
             }
             ReferenceCountUtil.release(msg);// 添加防止内存泄漏的
         }