Browse Source

Merge pull request #334 from luoyaogui/master

针对使用netty取代JDK的socketchannel实现的优化
agapple 7 năm trước cách đây
mục cha
commit
867d6607d4

+ 7 - 20
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -2,8 +2,9 @@ package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannelPool;
+
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -103,8 +104,7 @@ public class MysqlConnector {
                     MysqlUpdateExecutor executor = new MysqlUpdateExecutor(connector);
                     executor.update("KILL CONNECTION " + connectionId);
                 } catch (Exception e) {
-                    // 忽略具体异常
-                    logger.info("KILL DUMP " + connectionId + " failure:" + ExceptionUtils.getStackTrace(e));
+                    throw new IOException("KILL DUMP " + connectionId + " failure:" + ExceptionUtils.getStackTrace(e));
                 } finally {
                     if (connector != null) {
                         connector.disconnect();
@@ -142,19 +142,7 @@ public class MysqlConnector {
         HeaderPacket quitHeader = new HeaderPacket();
         quitHeader.setPacketBodyLength(cmdBody.length);
         quitHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.write(channel,
-            new ByteBuffer[] { ByteBuffer.wrap(quitHeader.toBytes()), ByteBuffer.wrap(cmdBody) });
-    }
-
-    // ====================== help method ====================
-
-    private void configChannel(SocketChannel channel) throws IOException {
-        channel.socket().setKeepAlive(true);
-        channel.socket().setReuseAddress(true);
-        channel.socket().setSoTimeout(soTimeout);
-        channel.socket().setTcpNoDelay(true);
-        channel.socket().setReceiveBufferSize(receiveBufferSize);
-        channel.socket().setSendBufferSize(sendBufferSize);
+        PacketManager.writePkg(channel, quitHeader.toBytes(),cmdBody);
     }
 
     private void negotiate(SocketChannel channel) throws IOException {
@@ -191,8 +179,7 @@ public class MysqlConnector {
         h.setPacketBodyLength(clientAuthPkgBody.length);
         h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
 
-        PacketManager.write(channel,
-            new ByteBuffer[] { ByteBuffer.wrap(h.toBytes()), ByteBuffer.wrap(clientAuthPkgBody) });
+        PacketManager.writePkg(channel, h.toBytes(), clientAuthPkgBody);
         logger.info("client authentication packet is sent out.");
 
         // check auth result
@@ -228,7 +215,7 @@ public class MysqlConnector {
         h323.setPacketBodyLength(b323Body.length);
         h323.setPacketSequenceNumber((byte) (packetSequenceNumber + 1));
 
-        PacketManager.write(channel, new ByteBuffer[] { ByteBuffer.wrap(h323.toBytes()), ByteBuffer.wrap(b323Body) });
+        PacketManager.writePkg(channel, h323.toBytes(), b323Body);
         logger.info("client 323 authentication packet is sent out.");
         // check auth result
         HeaderPacket header = PacketManager.readHeader(channel, 4);

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlQueryExecutor.java

@@ -1,7 +1,7 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
-import java.nio.channels.SocketChannel;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -51,7 +51,7 @@ public class MysqlQueryExecutor {
         QueryCommandPacket cmd = new QueryCommandPacket();
         cmd.setQueryString(queryString);
         byte[] bodyBytes = cmd.toBytes();
-        PacketManager.write(channel, bodyBytes);
+        PacketManager.writeBody(channel, bodyBytes);
         byte[] body = readNextPacket();
 
         if (body[0] < 0) {

+ 7 - 7
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlUpdateExecutor.java

@@ -1,7 +1,7 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
-import java.nio.channels.SocketChannel;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -21,28 +21,28 @@ public class MysqlUpdateExecutor {
 
     private static final Logger logger = LoggerFactory.getLogger(MysqlUpdateExecutor.class);
 
-    private SocketChannel       channel;
+    private MysqlConnector       connector;
 
     public MysqlUpdateExecutor(MysqlConnector connector) throws IOException{
         if (!connector.isConnected()) {
             throw new IOException("should execute connector.connect() first");
         }
 
-        this.channel = connector.getChannel();
+        this.connector = connector;
     }
 
-    public MysqlUpdateExecutor(SocketChannel ch){
+   /* public MysqlUpdateExecutor(SocketChannel ch){
         this.channel = ch;
-    }
+    }*/
 
     public OKPacket update(String updateString) throws IOException {
         QueryCommandPacket cmd = new QueryCommandPacket();
         cmd.setQueryString(updateString);
         byte[] bodyBytes = cmd.toBytes();
-        PacketManager.write(channel, bodyBytes);
+        PacketManager.writeBody(connector.getChannel(), bodyBytes);
 
         logger.debug("read update result...");
-        byte[] body = PacketManager.readBytes(channel, PacketManager.readHeader(channel, 4).getPacketBodyLength());
+        byte[] body = PacketManager.readBytes(connector.getChannel(), PacketManager.readHeader(connector.getChannel(), 4).getPacketBodyLength());
         if (body[0] < 0) {
             ErrorPacket packet = new ErrorPacket();
             packet.fromBytes(body);

+ 53 - 72
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -7,7 +7,6 @@ import io.netty.channel.Channel;
 
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 
 /**
  * 封装netty的通信channel和数据接收缓存,实现读、写、连接校验的功能。 2016-12-28
@@ -15,77 +14,59 @@ import java.nio.ByteBuffer;
  * @author luoyaogui
  */
 public class SocketChannel {
+	private Channel channel = null;
+	private Object lock = new Object();
+	private ByteBuf cache = PooledByteBufAllocator.DEFAULT.directBuffer(1024*1024);//缓存大小
 
-    private Channel channel = null;
-    private Object  lock    = new Object();
-    private ByteBuf cache   = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024 * 5); // 缓存大小
+	public Channel getChannel() {
+		return channel;
+	}
+	public void setChannel(Channel channel) {
+		this.channel = channel;
+	}
 
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public void setChannel(Channel channel, boolean notify) {
-        this.channel = channel;
-        if (notify) {// 是否需要通知,主要是channel不可用时
-            synchronized (this) {
-                notifyAll();
-            }
-        }
-    }
-
-    public void writeCache(ByteBuf buf) {
-        synchronized (lock) {
-            cache.discardReadBytes();// 回收内存
-            cache.writeBytes(buf);
-        }
-        synchronized (this) {
-            notifyAll();
-        }
-    }
-
-    public void writeChannel(byte[]... buf) throws IOException {
-        if (channel != null && channel.isWritable()) {
-            channel.writeAndFlush(Unpooled.copiedBuffer(buf));
-        } else {
-            throw new IOException("write  failed  !  please checking !");
-        }
-    }
-
-    public int read(ByteBuffer buffer) throws IOException {
-        if (null == channel) {
-            throw new IOException("socket has Interrupted !");
-        }
-        if (cache.readableBytes() < buffer.remaining()) {
-            synchronized (this) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    throw new IOException("socket has Interrupted !");
-                }
-            }
-        } else {
-            synchronized (lock) {
-                cache.readBytes(buffer);
-            }
-        }
-        return 0;
-    }
-
-    public boolean isConnected() {
-        return channel != null ? true : false;
-    }
-
-    public SocketAddress getRemoteSocketAddress() {
-        return channel != null ? channel.remoteAddress() : null;
-    }
-
-    public void close() {
-        if (channel != null) {
-            channel.close();
-        }
-        channel = null;
-        cache.discardReadBytes();// 回收已占用的内存
-        cache.release();// 释放整个内存
-        cache = null;
-    }
+	public void writeCache(ByteBuf buf){
+		synchronized (lock) {
+			cache.discardReadBytes();//回收内存
+			cache.writeBytes(buf);
+		}
+	}
+	public void writeChannel(byte[]... buf) throws IOException {
+		if(channel != null && channel.isWritable())
+			channel.writeAndFlush(Unpooled.copiedBuffer(buf));
+		else
+			throw new IOException("write  failed  !  please checking !");
+	}
+	public byte[] read(int readSize) throws IOException {
+		do{
+			if(readSize > cache.readableBytes()){
+				if(null == channel)
+					throw new IOException("socket has Interrupted !");
+				synchronized (this) {
+					try { wait(100); } catch (InterruptedException e) {}
+				}
+			} else {
+				byte[] back = new byte[readSize];
+				synchronized (lock) {
+					cache.readBytes(back);
+				}
+				return back;
+			}
+		}while(true);
+	}
+	public boolean isConnected() {
+		return channel!=null?true:false;
+	}
+	public SocketAddress getRemoteSocketAddress(){
+		return channel!=null?channel.remoteAddress():null;
+	}
+	public void close(){
+		if(channel != null){
+			channel.close();
+		}
+		channel = null;
+		cache.discardReadBytes();//回收已占用的内存
+		cache.release();//释放整个内存
+		cache = null;
+	}
 }

+ 63 - 70
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -27,75 +27,68 @@ import java.util.concurrent.ConcurrentHashMap;
 @SuppressWarnings({ "rawtypes", "deprecation" })
 public abstract class SocketChannelPool {
 
-    private static EventLoopGroup              group     = new NioEventLoopGroup();                        // 非阻塞IO线程组
-    private static Bootstrap                   boot      = new Bootstrap();                                // 主
-    private static Map<Channel, SocketChannel> chManager = new ConcurrentHashMap<Channel, SocketChannel>();
+	private static EventLoopGroup group = new NioEventLoopGroup();//非阻塞IO线程组
+	private static Bootstrap boot = new Bootstrap();//主
+	private static Map<Channel,SocketChannel> chManager = new ConcurrentHashMap<Channel,SocketChannel>();
+	
+	static{
+		boot.group(group).channel(NioSocketChannel.class)
+			.option(ChannelOption.SO_RCVBUF, 32*1024)
+			.option(ChannelOption.SO_SNDBUF, 32*1024)
+			.option(ChannelOption.TCP_NODELAY, true)//如果是延时敏感型应用,建议关闭Nagle算法
+			.option(ChannelOption.SO_KEEPALIVE, true)
+			.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
+			.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //
+			.handler(new ChannelInitializer(){
+				@Override
+				protected void initChannel(Channel arg0) throws Exception {
+					arg0.pipeline().addLast(new BusinessHandler());//命令过滤和handler添加管理
+				}
+			});
+	}
+	
+	public static SocketChannel open(SocketAddress address) throws Exception {
+		final SocketChannel socket = new SocketChannel();
+		boot.connect(address).addListener(new ChannelFutureListener(){
+			@Override
+			public void operationComplete(ChannelFuture arg0) throws Exception {
+				if(arg0.isSuccess())
+					socket.setChannel(arg0.channel());
+				synchronized (socket) {
+					socket.notify();
+				}
+		}});
+		synchronized (socket) {
+			socket.wait();
+		}
+		if(null == socket.getChannel()){
+			throw new IOException("can't create socket!");
+		}
+		chManager.put(socket.getChannel(), socket);
+		return socket;
+	}
 
-    static {
-        boot.group(group)
-            .channel(NioSocketChannel.class)
-            .option(ChannelOption.SO_RCVBUF, 32 * 1024)
-            .option(ChannelOption.SO_SNDBUF, 32 * 1024)
-            .option(ChannelOption.TCP_NODELAY, true)
-            // 如果是延时敏感型应用,建议关闭Nagle算法
-            .option(ChannelOption.SO_KEEPALIVE, true)
-            .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
-            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-            //
-            .handler(new ChannelInitializer() {
-
-                @Override
-                protected void initChannel(Channel arg0) throws Exception {
-                    arg0.pipeline().addLast(new BusinessHandler());// 命令过滤和handler添加管理
-                }
-            });
-    }
-
-    public static SocketChannel open(SocketAddress address) throws Exception {
-        final SocketChannel socket = new SocketChannel();
-        boot.connect(address).addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture arg0) throws Exception {
-                if (arg0.isSuccess()) {
-                    socket.setChannel(arg0.channel(), false);
-                }
-                synchronized (socket) {
-                    socket.notify();
-                }
-            }
-        });
-        synchronized (socket) {
-            socket.wait();
-        }
-        if (null == socket.getChannel()) {
-            throw new IOException("can't create socket!");
-        }
-        chManager.put(socket.getChannel(), socket);
-        return socket;
-    }
-
-    public static class BusinessHandler extends ChannelInboundHandlerAdapter {
-
-        private SocketChannel socket = null;
-
-        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-            socket.setChannel(null, true);
-            chManager.remove(ctx.channel());// 移除
-        }
-
-        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-            if (null == socket) {
-                socket = chManager.get(ctx.channel());
-            }
-            if (socket != null) {
-                socket.writeCache((ByteBuf) msg);
-            }
-            ReferenceCountUtil.release(msg);// 添加防止内存泄漏的
-        }
-
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-            ctx.close();
-        }
-    }
+	public static class BusinessHandler extends ChannelInboundHandlerAdapter {
+		private SocketChannel socket=null;
+		@Override
+		public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+			socket.setChannel(null);
+			chManager.remove(ctx.channel());//移除
+		}
+		@Override
+		public void channelRead(ChannelHandlerContext ctx, Object msg)
+				throws Exception {
+			if(null == socket)
+				socket = chManager.get(ctx.channel());
+			if(socket != null){
+				socket.writeCache((ByteBuf) msg);
+			}
+			ReferenceCountUtil.release(msg);//添加防止内存泄漏的
+		}
+		@Override
+		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+				throws Exception {
+			ctx.close();
+		}
+	}
 }

+ 9 - 46
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java

@@ -1,69 +1,32 @@
 package com.alibaba.otter.canal.parse.driver.mysql.utils;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 
 public abstract class PacketManager {
 
     public static HeaderPacket readHeader(SocketChannel ch, int len) throws IOException {
         HeaderPacket header = new HeaderPacket();
-        header.fromBytes(readBytesAsBuffer(ch, len).array());
+        header.fromBytes(ch.read(len));
         return header;
     }
 
-    public static ByteBuffer readBytesAsBuffer(SocketChannel ch, int len) throws IOException {
-        ByteBuffer buffer = ByteBuffer.allocate(len);
-        while (buffer.hasRemaining()) {
-            int readNum = ch.read(buffer);
-            if (readNum == -1) {
-                throw new IOException("Unexpected End Stream");
-            }
-        }
-        return buffer;
-    }
-
     public static byte[] readBytes(SocketChannel ch, int len) throws IOException {
-        return readBytesAsBuffer(ch, len).array();
+        return ch.read(len);
     }
 
-    /**
-     * Since We r using blocking IO, so we will just write once and assert the
-     * length to simplify the read operation.<br>
-     * If the block write doesn't work as we expected, we will change this
-     * implementation as per the result.
-     * 
-     * @param ch
-     * @param len
-     * @return
-     * @throws IOException
-     */
-    public static void write(SocketChannel ch, ByteBuffer[] srcs) throws IOException {
-        @SuppressWarnings("unused")
-        long total = 0;
-        for (ByteBuffer buffer : srcs) {
-            total += buffer.remaining();
-        }
-
-        ch.write(srcs);
-        // https://github.com/alibaba/canal/issues/24
-        // 部分windows用户会出现size != total的情况,jdk为java7/openjdk,估计和java版本有关,暂时不做检查
-        // long size = ch.write(srcs);
-        // if (size != total) {
-        // throw new IOException("unexpected blocking io behavior");
-        // }
+    public static void writePkg(SocketChannel ch, byte[]... srcs) throws IOException {
+        ch.writeChannel(srcs);
     }
-
-    public static void write(SocketChannel ch, byte[] body) throws IOException {
-        write(ch, body, (byte) 0);
+    
+    public static void writeBody(SocketChannel ch, byte[] body) throws IOException {
+    	writeBody0(ch, body, (byte) 0);
     }
 
-    public static void write(SocketChannel ch, byte[] body, byte packetSeqNumber) throws IOException {
+    public static void writeBody0(SocketChannel ch, byte[] body, byte packetSeqNumber) throws IOException {
         HeaderPacket header = new HeaderPacket();
         header.setPacketBodyLength(body.length);
         header.setPacketSequenceNumber(packetSeqNumber);
-        write(ch, new ByteBuffer[] { ByteBuffer.wrap(header.toBytes()), ByteBuffer.wrap(body) });
+        ch.writeChannel(header.toBytes(),body);
     }
 }

+ 4 - 41
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -2,11 +2,9 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,8 +23,6 @@ import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogDecoder;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
-import com.taobao.tddl.dbsync.binlog.LogPosition;
-import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
 
 public class MysqlConnection implements ErosaConnection {
 
@@ -37,7 +33,6 @@ public class MysqlConnection implements ErosaConnection {
     private Charset             charset = Charset.forName("UTF-8");
     private BinlogFormat        binlogFormat;
     private BinlogImage         binlogImage;
-    private int                 binlogChecksum;
 
     public MysqlConnection(){
     }
@@ -83,7 +78,7 @@ public class MysqlConnection implements ErosaConnection {
      */
     public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
-        loadBinlogChecksum();
+
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
@@ -93,8 +88,6 @@ public class MysqlConnection implements ErosaConnection {
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
-        context.setLogPosition(new LogPosition(binlogfilename));
-        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             LogEvent event = null;
             event = decoder.decode(fetcher, context);
@@ -111,14 +104,11 @@ public class MysqlConnection implements ErosaConnection {
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
         updateSettings();
-        loadBinlogChecksum();
         sendBinlogDump(binlogfilename, binlogPosition);
         DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
         fetcher.start(connector.getChannel());
         LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
         LogContext context = new LogContext();
-        context.setLogPosition(new LogPosition(binlogfilename));
-        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             LogEvent event = null;
             event = decoder.decode(fetcher, context);
@@ -148,9 +138,7 @@ public class MysqlConnection implements ErosaConnection {
         HeaderPacket binlogDumpHeader = new HeaderPacket();
         binlogDumpHeader.setPacketBodyLength(cmdBody.length);
         binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.write(connector.getChannel(), new ByteBuffer[] { ByteBuffer.wrap(binlogDumpHeader.toBytes()),
-                ByteBuffer.wrap(cmdBody) });
-
+        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(),cmdBody);
         connector.setDumping(true);
     }
 
@@ -205,12 +193,9 @@ public class MysqlConnection implements ErosaConnection {
             // 如果不设置会出现错误: Slave can not handle replication events with the
             // checksum that master is configured to log
             // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
-            // '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出
-            update("set @master_binlog_checksum= @@global.binlog_checksum");
+            update("set @master_binlog_checksum= '@@global.binlog_checksum'");
         } catch (Exception e) {
-            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
-                logger.warn(ExceptionUtils.getFullStackTrace(e));
-            }
+            logger.warn(ExceptionUtils.getFullStackTrace(e));
         }
 
         try {
@@ -278,28 +263,6 @@ public class MysqlConnection implements ErosaConnection {
         }
     }
 
-    /**
-     * 获取主库checksum信息
-     * https://dev.mysql.com/doc/refman/5.6/en/replication-options
-     * -binary-log.html#option_mysqld_binlog-checksum
-     */
-    private void loadBinlogChecksum() {
-        ResultSetPacket rs = null;
-        try {
-            rs = query("select @master_binlog_checksum");
-        } catch (IOException e) {
-            throw new CanalParseException(e);
-        }
-
-        List<String> columnValues = rs.getFieldValues();
-        if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0) != null
-            && columnValues.get(0).toUpperCase().equals("CRC32")) {
-            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
-        } else {
-            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
-        }
-    }
-
     public static enum BinlogFormat {
 
         STATEMENT("STATEMENT"), ROW("ROW"), MIXED("MIXED");

+ 3 - 16
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -3,9 +3,8 @@ package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -149,20 +148,8 @@ public class DirectLogFetcher extends LogFetcher {
     private final boolean fetch0(final int off, final int len) throws IOException {
         ensureCapacity(off + len);
 
-        ByteBuffer buffer = ByteBuffer.wrap(this.buffer, off, len);
-        while (buffer.hasRemaining()) {
-            int readNum = channel.read(buffer);
-            if (readNum == -1) {
-                throw new IOException("Unexpected End Stream");
-            }
-        }
-
-        // for (int count, n = 0; n < len; n += count) {
-        // if (0 > (count = input.read(buffer, off + n, len - n))) {
-        // // Reached end of input stream
-        // return false;
-        // }
-        // }
+        byte[] read = channel.read(len);
+        System.arraycopy(read, 0, this.buffer, off, len);
 
         if (limit < off + len) limit = off + len;
         return true;

+ 1 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -94,7 +94,6 @@ public class DirectLogFetcherTest {
         HeaderPacket binlogDumpHeader = new HeaderPacket();
         binlogDumpHeader.setPacketBodyLength(cmdBody.length);
         binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.write(connector.getChannel(), new ByteBuffer[] { ByteBuffer.wrap(binlogDumpHeader.toBytes()),
-                ByteBuffer.wrap(cmdBody) });
+        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
     }
 }