Browse Source

解决destination无限连接等待的bug

zhengmingzhi 7 years ago
parent
commit
8b03f172ed

+ 5 - 3
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -43,6 +43,8 @@ public class MysqlConnector {
     // mysql connectinnId
     private long                connectionId      = -1;
     private AtomicBoolean       connected         = new AtomicBoolean(false);
+    
+    public static final int timeout = 3000; // 3s
 
     public MysqlConnector(){
     }
@@ -144,8 +146,8 @@ public class MysqlConnector {
     }
 
     private void negotiate(SocketChannel channel) throws IOException {
-        HeaderPacket header = PacketManager.readHeader(channel, 4);
-        byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength());
+        HeaderPacket header = PacketManager.readHeader(channel, 4, timeout);
+        byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
         if (body[0] < 0) {// check field_count
             if (body[0] == -1) {
                 ErrorPacket error = new ErrorPacket();
@@ -184,7 +186,7 @@ public class MysqlConnector {
         header = null;
         header = PacketManager.readHeader(channel, 4);
         body = null;
-        body = PacketManager.readBytes(channel, header.getPacketBodyLength());
+        body = PacketManager.readBytes(channel, header.getPacketBodyLength(), timeout);
         assert body != null;
         if (body[0] < 0) {
             if (body[0] == -1) {

+ 30 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -64,6 +64,36 @@ public class SocketChannel {
             }
         } while (true);
     }
+    
+    public byte[] read(int readSize, int timeout) throws IOException {
+        int accumulatedWaitTime = 0;
+        do {
+            if (readSize > cache.readableBytes()) {
+                if (null == channel) {
+                    throw new IOException("socket has Interrupted !");
+                }
+
+                accumulatedWaitTime += 100;
+                if (accumulatedWaitTime > timeout) {
+                    throw new IOException("socket read timeout occured !");
+                }
+
+                synchronized (this) {
+                    try {
+                        wait(100);
+                    } catch (InterruptedException e) {
+                        throw new IOException("socket has Interrupted !");
+                    }
+                }
+            } else {
+                byte[] back = new byte[readSize];
+                synchronized (lock) {
+                    cache.readBytes(back);
+                }
+                return back;
+            }
+        } while (true);
+    }
 
     public boolean isConnected() {
         return channel != null ? true : false;

+ 10 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java

@@ -13,9 +13,19 @@ public abstract class PacketManager {
         return header;
     }
 
+    public static HeaderPacket readHeader(SocketChannel ch, int len, int timeout) throws IOException {
+    	HeaderPacket header = new HeaderPacket();
+    	header.fromBytes(ch.read(len, timeout));
+    	return header;
+    }
+
     public static byte[] readBytes(SocketChannel ch, int len) throws IOException {
         return ch.read(len);
     }
+    
+    public static byte[] readBytes(SocketChannel ch, int len, int timeout) throws IOException {
+        return ch.read(len, timeout);
+    }
 
     public static void writePkg(SocketChannel ch, byte[]... srcs) throws IOException {
         ch.writeChannel(srcs);