1
0
Эх сурвалжийг харах

Merge pull request #536 from Wu-Jianqiang/master

主要修复 SocketChannel 默认缓存大小(1MB)不够用时需自动扩充,否则将因缓存空间不足而造成I/O超时假象
agapple 7 жил өмнө
parent
commit
fddaff1bd9

+ 43 - 28
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -15,10 +15,12 @@ import java.net.SocketAddress;
  */
 public class SocketChannel {
 
-    private static final int period  = 10;
+    private static final int WAIT_PERIOD  = 10; // milliseconds
+    private static final int DEFAULT_INIT_BUFFER_SIZE = 1024 * 1024; // 1MB,默认初始缓存大小
+    private static final int DEFAULT_MAX_BUFFER_SIZE = 4 * DEFAULT_INIT_BUFFER_SIZE; // 4MB,默认最大缓存大小
     private Channel channel = null;
     private Object  lock    = new Object();
-    private ByteBuf cache   = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024); // 缓存大小
+    private ByteBuf cache   = PooledByteBufAllocator.DEFAULT.directBuffer(DEFAULT_INIT_BUFFER_SIZE); // 缓存大小
 
     public Channel getChannel() {
         return channel;
@@ -28,9 +30,13 @@ public class SocketChannel {
         this.channel = channel;
     }
 
-    public void writeCache(ByteBuf buf) throws InterruptedException {
+    public void writeCache(ByteBuf buf) throws InterruptedException, IOException {
         synchronized (lock) {
             while (true) {
+                if (null == cache) {
+                    throw new IOException("socket is closed !");
+                }
+
                 cache.discardReadBytes();// 回收内存
                 // source buffer is empty.
                 if (!buf.isReadable()) {
@@ -41,7 +47,7 @@ public class SocketChannel {
                     cache.writeBytes(buf, Math.min(cache.writableBytes(), buf.readableBytes()));
                 } else {
                     // dest buffer is full.
-                    lock.wait(period);
+                    lock.wait(WAIT_PERIOD);
                 }
             }
         }
@@ -56,44 +62,46 @@ public class SocketChannel {
     }
 
     public byte[] read(int readSize) throws IOException {
-        do {
-            if (readSize > cache.readableBytes()) {
-                if (null == channel) {
-                    throw new java.nio.channels.ClosedByInterruptException();
-                }
-                synchronized (this) {
-                    try {
-                        wait(period);
-                    } catch (InterruptedException e) {
-                        throw new java.nio.channels.ClosedByInterruptException();
-                    }
-                }
-            } else {
-                byte[] back = new byte[readSize];
-                synchronized (lock) {
-                    cache.readBytes(back);
-                }
-                return back;
-            }
-        } while (true);
+        return read(readSize, 0);
     }
 
     public byte[] read(int readSize, int timeout) throws IOException {
         int accumulatedWaitTime = 0;
+        
+        // 若读取内容较长,则自动扩充超时时间,以初始缓存大小为基准计算倍数
+        if (timeout > 0 && readSize > DEFAULT_INIT_BUFFER_SIZE ) {
+            timeout *= (readSize / DEFAULT_INIT_BUFFER_SIZE + 1);
+        }
         do {
             if (readSize > cache.readableBytes()) {
                 if (null == channel) {
                     throw new IOException("socket has Interrupted !");
                 }
 
-                accumulatedWaitTime += period;
-                if (accumulatedWaitTime > timeout) {
-                    throw new IOException("socket read timeout occured !");
+                // 默认缓存大小不够用时需自动扩充,否则将因缓存空间不足而造成I/O超时假象
+                if (!cache.isWritable(readSize - cache.readableBytes())) {
+                    synchronized (lock) {
+                        int deltaSize = readSize - cache.readableBytes(); // 同步锁后重新读取
+                        deltaSize = deltaSize - cache.writableBytes();
+                        if (deltaSize > 0) {
+                            deltaSize = (deltaSize / 32 + 1) * 32;
+                            cache.capacity(cache.capacity() + deltaSize);
+                        }
+                    }
+                } else if (timeout > 0) {
+                    accumulatedWaitTime += WAIT_PERIOD;
+                    if (accumulatedWaitTime > timeout) {
+                        StringBuilder sb = new StringBuilder("socket read timeout occured !");
+                        sb.append(" readSize = ").append(readSize);
+                        sb.append(", readableBytes = ").append(cache.readableBytes());
+                        sb.append(", timeout = ").append(timeout);
+                        throw new IOException(sb.toString());
+                    }
                 }
 
                 synchronized (this) {
                     try {
-                        wait(period);
+                        wait(WAIT_PERIOD);
                     } catch (InterruptedException e) {
                         throw new IOException("socket has Interrupted !");
                     }
@@ -102,6 +110,13 @@ public class SocketChannel {
                 byte[] back = new byte[readSize];
                 synchronized (lock) {
                     cache.readBytes(back);
+                    // 恢复自动扩充的过大缓存到默认初始大小,释放空间
+                    if (cache.capacity() > DEFAULT_MAX_BUFFER_SIZE) {
+                        cache.discardReadBytes(); // 回收已读空间,重置读写指针
+                        if (cache.readableBytes() < DEFAULT_INIT_BUFFER_SIZE) {
+                            cache.capacity(DEFAULT_INIT_BUFFER_SIZE);
+                        }
+                    }
                 }
                 return back;
             }

+ 3 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -48,8 +48,8 @@ public abstract class SocketChannelPool {
             .handler(new ChannelInitializer() {
 
                 @Override
-                protected void initChannel(Channel arg0) throws Exception {
-                    arg0.pipeline().addLast(new BusinessHandler());// 命令过滤和handler添加管理
+                protected void initChannel(Channel ch) throws Exception {
+                    ch.pipeline().addLast(new BusinessHandler());// 命令过滤和handler添加管理
                 }
             });
     }
@@ -79,6 +79,7 @@ public abstract class SocketChannelPool {
         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
             socket.setChannel(null);
             chManager.remove(ctx.channel());// 移除
+            super.channelInactive(ctx);
         }
 
         @Override

+ 8 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -297,9 +297,15 @@ public class MysqlConnection implements ErosaConnection {
             logger.warn("update mariadb_slave_capability failed", e);
         }
 
-        long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
+        /**
+         * MASTER_HEARTBEAT_PERIOD sets the interval in seconds between replication heartbeats. 
+         * Whenever the master's binary log is updated with an event, the waiting period for the next heartbeat is reset.
+         * interval is a decimal value having the range 0 to 4294967 seconds and a resolution in milliseconds; 
+         * the smallest nonzero value is 0.001. Heartbeats are sent by the master only if there are no unsent events 
+         * in the binary log file for a period longer than interval.
+         */
         try {
-            update("SET @master_heartbeat_period=" + periodNano);
+            update("SET @master_heartbeat_period=" + MASTER_HEARTBEAT_PERIOD_SECONDS);
         } catch (Exception e) {
             logger.warn("update master_heartbeat_period failed", e);
         }