浏览代码

fixed socket wait period 10ms

agapple 7 年之前
父节点
当前提交
2d49b709ca

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -43,8 +43,8 @@ public class MysqlConnector {
     // mysql connectinnId
     private long                connectionId      = -1;
     private AtomicBoolean       connected         = new AtomicBoolean(false);
-    
-    public static final int timeout = 3000; // 3s
+
+    public static final int     timeout           = 5 * 1000;                                     // 5s
 
     public MysqlConnector(){
     }

+ 8 - 7
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -15,6 +15,7 @@ import java.net.SocketAddress;
  */
 public class SocketChannel {
 
+    private static final int period  = 10;
     private Channel channel = null;
     private Object  lock    = new Object();
     private ByteBuf cache   = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024); // 缓存大小
@@ -31,7 +32,7 @@ public class SocketChannel {
         synchronized (lock) {
             while (true) {
                 cache.discardReadBytes();// 回收内存
-                //source buffer is empty.
+                // source buffer is empty.
                 if (!buf.isReadable()) {
                     break;
                 }
@@ -39,8 +40,8 @@ public class SocketChannel {
                 if (cache.isWritable()) {
                     cache.writeBytes(buf, Math.min(cache.writableBytes(), buf.readableBytes()));
                 } else {
-                    //dest buffer is full.
-                    lock.wait(100);
+                    // dest buffer is full.
+                    lock.wait(period);
                 }
             }
         }
@@ -62,7 +63,7 @@ public class SocketChannel {
                 }
                 synchronized (this) {
                     try {
-                        wait(100);
+                        wait(period);
                     } catch (InterruptedException e) {
                         throw new java.nio.channels.ClosedByInterruptException();
                     }
@@ -76,7 +77,7 @@ public class SocketChannel {
             }
         } while (true);
     }
-    
+
     public byte[] read(int readSize, int timeout) throws IOException {
         int accumulatedWaitTime = 0;
         do {
@@ -85,14 +86,14 @@ public class SocketChannel {
                     throw new IOException("socket has Interrupted !");
                 }
 
-                accumulatedWaitTime += 100;
+                accumulatedWaitTime += period;
                 if (accumulatedWaitTime > timeout) {
                     throw new IOException("socket read timeout occured !");
                 }
 
                 synchronized (this) {
                     try {
-                        wait(100);
+                        wait(period);
                     } catch (InterruptedException e) {
                         throw new IOException("socket has Interrupted !");
                     }

+ 2 - 3
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -7,13 +7,12 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
 import java.net.SocketAddress;
@@ -103,7 +102,7 @@ public abstract class SocketChannelPool {
 
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-            //need output error for troubeshooting.
+            // need output error for troubeshooting.
             logger.error("business error.", cause);
             ctx.close();
         }