1
0
Эх сурвалжийг харах

fixed issue #360 , socket pool bug

agapple 7 жил өмнө
parent
commit
788c1d9b84

+ 11 - 8
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -21,14 +21,16 @@ import java.net.SocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.alibaba.otter.canal.common.utils.BooleanMutex;
+
 /**
  * @author luoyaogui 实现channel的管理(监听连接、读数据、回收) 2016-12-28
  */
 @SuppressWarnings({ "rawtypes", "deprecation" })
 public abstract class SocketChannelPool {
 
-    private static EventLoopGroup              group     = new NioEventLoopGroup();                         // 非阻塞IO线程组
-    private static Bootstrap                   boot      = new Bootstrap();                                 // 主
+    private static EventLoopGroup              group     = new NioEventLoopGroup();                        // 非阻塞IO线程组
+    private static Bootstrap                   boot      = new Bootstrap();                                // 主
     private static Map<Channel, SocketChannel> chManager = new ConcurrentHashMap<Channel, SocketChannel>();
 
     static {
@@ -53,19 +55,20 @@ public abstract class SocketChannelPool {
 
     public static SocketChannel open(SocketAddress address) throws Exception {
         final SocketChannel socket = new SocketChannel();
+        final BooleanMutex mutex = new BooleanMutex(false);
         boot.connect(address).addListener(new ChannelFutureListener() {
 
             @Override
             public void operationComplete(ChannelFuture arg0) throws Exception {
-                if (arg0.isSuccess()) socket.setChannel(arg0.channel());
-                synchronized (socket) {
-                    socket.notify();
+                if (arg0.isSuccess()) {
+                    socket.setChannel(arg0.channel());
                 }
+
+                mutex.set(true);
             }
         });
-        synchronized (socket) {
-            socket.wait();
-        }
+        // wait for complete
+        mutex.get();
         if (null == socket.getChannel()) {
             throw new IOException("can't create socket!");
         }