Pārlūkot izejas kodu

使用netty重构socketChannel(jdk内置),修改了PacketManager实现,以及其他相关引用的类。

luoyaogui 8 gadi atpakaļ
vecāks
revīzija
5c11c71783

+ 12 - 4
common/pom.xml

@@ -1,4 +1,5 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<parent>
 		<groupId>com.alibaba.otter</groupId>
@@ -11,6 +12,13 @@
 	<name>canal common module for otter ${project.version}</name>
 	<url>http://b2b-doc.alibaba-inc.com/display/opentech/Otter</url>
 	<dependencies>
+		<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
+		<dependency>
+			<groupId>io.netty</groupId>
+			<artifactId>netty-all</artifactId>
+			<version>4.1.6.Final</version>
+		</dependency>
+
 		<!-- zk -->
 		<dependency>
 			<groupId>org.apache.zookeeper</groupId>
@@ -61,8 +69,8 @@
 		</dependency>
 		<!-- junit -->
 		<dependency>
-		  <groupId>junit</groupId>
-		  <artifactId>junit</artifactId>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
 		</dependency>
-  </dependencies>
+	</dependencies>
 </project>

+ 20 - 36
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -2,8 +2,6 @@ package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -16,6 +14,8 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QuitCommandPack
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.HandshakeInitializationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.Reply323Packet;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannelPool;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
@@ -27,22 +27,22 @@ import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
  */
 public class MysqlConnector {
 
-    private static final Logger logger = LoggerFactory.getLogger(MysqlConnector.class);
-    private InetSocketAddress address;
-    private String username;
-    private String password;
+    private static final Logger logger            = LoggerFactory.getLogger(MysqlConnector.class);
+    private InetSocketAddress   address;
+    private String              username;
+    private String              password;
 
-    private byte charsetNumber = 33;
-    private String defaultSchema = "retl";
-    private int soTimeout = 30 * 1000;
-    private int receiveBufferSize = 16 * 1024;
-    private int sendBufferSize = 16 * 1024;
+    private byte                charsetNumber     = 33;
+    private String              defaultSchema     = "retl";
+    private int                 soTimeout         = 30 * 1000;
+    private int                 receiveBufferSize = 16 * 1024;
+    private int                 sendBufferSize    = 16 * 1024;
 
-    private SocketChannel channel;
-    private volatile boolean dumping = false;
+    private SocketChannel       channel;
+    private volatile boolean    dumping           = false;
     // mysql connectinnId
-    private long connectionId = -1;
-    private AtomicBoolean connected = new AtomicBoolean(false);
+    private long                connectionId      = -1;
+    private AtomicBoolean       connected         = new AtomicBoolean(false);
 
     public MysqlConnector(){
     }
@@ -64,10 +64,8 @@ public class MysqlConnector {
     public void connect() throws IOException {
         if (connected.compareAndSet(false, true)) {
             try {
-                channel = SocketChannel.open();
-                configChannel(channel);
+                channel = SocketChannelPool.open(address);
                 logger.info("connect MysqlConnection to {}...", address);
-                channel.connect(address);
                 negotiate(channel);
             } catch (Exception e) {
                 disconnect();
@@ -103,8 +101,7 @@ public class MysqlConnector {
                     MysqlUpdateExecutor executor = new MysqlUpdateExecutor(connector);
                     executor.update("KILL CONNECTION " + connectionId);
                 } catch (Exception e) {
-                    // 忽略具体异常
-                    logger.warn("KILL DUMP " + connectionId + " failure:" + ExceptionUtils.getStackTrace(e));
+                    throw new IOException("KILL DUMP " + connectionId + " failure:" + ExceptionUtils.getStackTrace(e));
                 } finally {
                     if (connector != null) {
                         connector.disconnect();
@@ -142,19 +139,7 @@ public class MysqlConnector {
         HeaderPacket quitHeader = new HeaderPacket();
         quitHeader.setPacketBodyLength(cmdBody.length);
         quitHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.write(channel,
-            new ByteBuffer[] { ByteBuffer.wrap(quitHeader.toBytes()), ByteBuffer.wrap(cmdBody) });
-    }
-
-    // ====================== help method ====================
-
-    private void configChannel(SocketChannel channel) throws IOException {
-        channel.socket().setKeepAlive(true);
-        channel.socket().setReuseAddress(true);
-        channel.socket().setSoTimeout(soTimeout);
-        channel.socket().setTcpNoDelay(true);
-        channel.socket().setReceiveBufferSize(receiveBufferSize);
-        channel.socket().setSendBufferSize(sendBufferSize);
+        PacketManager.write(channel,quitHeader.toBytes(),cmdBody);
     }
 
     private void negotiate(SocketChannel channel) throws IOException {
@@ -191,8 +176,7 @@ public class MysqlConnector {
         h.setPacketBodyLength(clientAuthPkgBody.length);
         h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));
 
-        PacketManager.write(channel,
-            new ByteBuffer[] { ByteBuffer.wrap(h.toBytes()), ByteBuffer.wrap(clientAuthPkgBody) });
+        PacketManager.write(channel,h.toBytes(), clientAuthPkgBody);
         logger.info("client authentication packet is sent out.");
 
         // check auth result
@@ -228,7 +212,7 @@ public class MysqlConnector {
         h323.setPacketBodyLength(b323Body.length);
         h323.setPacketSequenceNumber((byte) (packetSequenceNumber + 1));
 
-        PacketManager.write(channel, new ByteBuffer[] { ByteBuffer.wrap(h323.toBytes()), ByteBuffer.wrap(b323Body) });
+        PacketManager.write(channel, h323.toBytes(), b323Body);
         logger.info("client 323 authentication packet is sent out.");
         // check auth result
         HeaderPacket header = PacketManager.readHeader(channel, 4);

+ 5 - 3
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlQueryExecutor.java

@@ -1,7 +1,6 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -12,6 +11,7 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetHeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.RowDataPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
 /**
@@ -82,13 +82,15 @@ public class MysqlQueryExecutor {
             rowDataPacket.fromBytes(body);
             rowData.add(rowDataPacket);
         }
-
+        //未知,不知道是否需要锁定
+        //channel.lock();//锁定读
+        
         ResultSetPacket resultSet = new ResultSetPacket();
         resultSet.getFieldDescriptors().addAll(fields);
         for (RowDataPacket r : rowData) {
             resultSet.getFieldValues().addAll(r.getColumns());
         }
-        resultSet.setSourceAddress(channel.socket().getRemoteSocketAddress());
+        resultSet.setSourceAddress(channel.getRemoteSocketAddress());
 
         return resultSet;
     }

+ 2 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlUpdateExecutor.java

@@ -1,7 +1,6 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
-import java.nio.channels.SocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -9,6 +8,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QueryCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.OKPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
 /**
@@ -48,6 +48,7 @@ public class MysqlUpdateExecutor {
             packet.fromBytes(body);
             throw new IOException(packet + "\n with command: " + updateString);
         }
+        //channel.lock();//锁定读
 
         OKPacket packet = new OKPacket();
         packet.fromBytes(body);

+ 78 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -0,0 +1,78 @@
+package com.alibaba.otter.canal.parse.driver.mysql.socket;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+/**
+ * @author luoyaogui
+ * 封装netty的通信channel和数据接收缓存,实现读、写、连接校验的功能。
+ * 2016-12-28
+ */
+public class SocketChannel {
+	private Channel channel = null;
+	private Object lock = new Object();
+	private ByteBuf cache = PooledByteBufAllocator.DEFAULT.directBuffer(1024*1024*5);//缓存大小
+
+	public Channel getChannel() {
+		return channel;
+	}
+	public void setChannel(Channel channel,boolean notify) {
+		this.channel = channel;
+		if(notify){//是否需要通知,主要时channel不可用时
+			synchronized(this){
+				notifyAll();
+			}
+		}
+	}
+
+	public void writeCache(ByteBuf buf){
+		synchronized (lock) {
+			cache.discardReadBytes();//回收内存
+			cache.writeBytes(buf);
+		}
+		synchronized(this){
+			notifyAll();
+		}
+	}
+	public void writeChannel(byte[]... buf) throws IOException {
+		if(channel != null && channel.isWritable())
+			channel.writeAndFlush(Unpooled.copiedBuffer(buf));
+		else
+			throw new IOException("write  failed  !  please checking !");
+	}
+	public int read(ByteBuffer buffer) throws IOException {
+		if(null == channel)
+			throw new IOException("socket has Interrupted !");
+		if(cache.readableBytes() <  buffer.remaining()){
+			synchronized(this){
+				try {wait();} catch (InterruptedException e) {}
+			}
+		}else{
+			synchronized (lock){
+				cache.readBytes(buffer);
+			}
+		}
+		return 0;
+	}
+	public boolean isConnected() {
+		return channel!=null?true:false;
+	}
+	public SocketAddress getRemoteSocketAddress(){
+		return channel!=null?channel.remoteAddress():null;
+	}
+	public void close(){
+		if(channel != null){
+			channel.close();
+		}
+		channel = null;
+		cache.discardReadBytes();//回收已占用的内存
+		cache.release();//释放整个内存
+		cache = null;
+	}
+}

+ 95 - 0
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -0,0 +1,95 @@
+package com.alibaba.otter.canal.parse.driver.mysql.socket;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+/**
+ * @author luoyaogui
+ * 实现channel的管理(监听连接、读数据、回收)
+ * 2016-12-28
+ */
+@SuppressWarnings("rawtypes")
+public abstract class SocketChannelPool {
+
+	private static EventLoopGroup group = new NioEventLoopGroup();//非阻塞IO线程组
+	private static Bootstrap boot = new Bootstrap();//主
+	private static Map<Channel,SocketChannel> chManager = new ConcurrentHashMap<Channel,SocketChannel>();
+	
+	static{
+		boot.group(group).channel(NioSocketChannel.class)
+			.option(ChannelOption.SO_RCVBUF, 32*1024)
+			.option(ChannelOption.SO_SNDBUF, 32*1024)
+			.option(ChannelOption.TCP_NODELAY, true)//如果是延时敏感型应用,建议关闭Nagle算法
+			.option(ChannelOption.SO_KEEPALIVE, true)
+			.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
+			.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //
+			.handler(new ChannelInitializer(){
+				@Override
+				protected void initChannel(Channel arg0) throws Exception {
+					arg0.pipeline().addLast(new BusinessHandler());//命令过滤和handler添加管理
+				}
+			});
+	}
+	
+	public static SocketChannel open(SocketAddress address) throws Exception {
+		final SocketChannel socket = new SocketChannel();
+		boot.connect(address).addListener(new ChannelFutureListener(){
+			@Override
+			public void operationComplete(ChannelFuture arg0) throws Exception {
+				if(arg0.isSuccess())
+					socket.setChannel(arg0.channel(),false);
+				synchronized (socket) {
+					socket.notify();
+				}
+		}});
+		synchronized (socket) {
+			socket.wait();
+		}
+		if(null == socket.getChannel()){
+			throw new IOException("can't create socket!");
+		}
+		chManager.put(socket.getChannel(), socket);
+		return socket;
+	}
+
+	public static class BusinessHandler extends ChannelInboundHandlerAdapter {
+		private SocketChannel socket=null;
+		@Override
+		public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+			socket.setChannel(null,true);
+			chManager.remove(ctx.channel());//移除
+		}
+		@Override
+		public void channelRead(ChannelHandlerContext ctx, Object msg)
+				throws Exception {
+			if(null == socket)
+				socket = chManager.get(ctx.channel());
+			if(socket != null){
+				socket.writeCache((ByteBuf) msg);
+			}
+			ReferenceCountUtil.release(msg);//添加防止内存泄漏的
+		}
+		@Override
+		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+				throws Exception {
+			ctx.close();
+		}
+	}
+}

+ 4 - 16
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java

@@ -2,9 +2,9 @@ package com.alibaba.otter.canal.parse.driver.mysql.utils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 
 public abstract class PacketManager {
 
@@ -40,20 +40,8 @@ public abstract class PacketManager {
      * @return
      * @throws IOException
      */
-    public static void write(SocketChannel ch, ByteBuffer[] srcs) throws IOException {
-        @SuppressWarnings("unused")
-        long total = 0;
-        for (ByteBuffer buffer : srcs) {
-            total += buffer.remaining();
-        }
-
-        ch.write(srcs);
-        // https://github.com/alibaba/canal/issues/24
-        // 部分windows用户会出现size != total的情况,jdk为java7/openjdk,估计和java版本有关,暂时不做检查
-        // long size = ch.write(srcs);
-        // if (size != total) {
-        // throw new IOException("unexpected blocking io behavior");
-        // }
+    public static void write(SocketChannel ch, byte[]... srcs) throws IOException {
+        ch.writeChannel(srcs);
     }
 
     public static void write(SocketChannel ch, byte[] body) throws IOException {
@@ -64,6 +52,6 @@ public abstract class PacketManager {
         HeaderPacket header = new HeaderPacket();
         header.setPacketBodyLength(body.length);
         header.setPacketSequenceNumber(packetSeqNumber);
-        write(ch, new ByteBuffer[] { ByteBuffer.wrap(header.toBytes()), ByteBuffer.wrap(body) });
+        write(ch, header.toBytes(),body);
     }
 }

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -139,8 +139,8 @@ public class MysqlConnection implements ErosaConnection {
         HeaderPacket binlogDumpHeader = new HeaderPacket();
         binlogDumpHeader.setPacketBodyLength(cmdBody.length);
         binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.write(connector.getChannel(), new ByteBuffer[] { ByteBuffer.wrap(binlogDumpHeader.toBytes()),
-                ByteBuffer.wrap(cmdBody) });
+        PacketManager.write(connector.getChannel(), binlogDumpHeader.toBytes(),
+                cmdBody);
 
         connector.setDumping(true);
     }

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -5,11 +5,11 @@ import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.taobao.tddl.dbsync.binlog.LogFetcher;
 
 /**
@@ -156,7 +156,7 @@ public class DirectLogFetcher extends LogFetcher {
                 throw new IOException("Unexpected End Stream");
             }
         }
-
+        
         // for (int count, n = 0; n < len; n += count) {
         // if (0 > (count = input.read(buffer, off + n, len - n))) {
         // // Reached end of input stream

+ 1 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -94,7 +94,6 @@ public class DirectLogFetcherTest {
         HeaderPacket binlogDumpHeader = new HeaderPacket();
         binlogDumpHeader.setPacketBodyLength(cmdBody.length);
         binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.write(connector.getChannel(), new ByteBuffer[] { ByteBuffer.wrap(binlogDumpHeader.toBytes()),
-                ByteBuffer.wrap(cmdBody) });
+        PacketManager.write(connector.getChannel(), binlogDumpHeader.toBytes(),cmdBody);
     }
 }