agapple 7 роки тому
батько
коміт
989a69e7be

+ 6 - 6
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlUpdateExecutor.java

@@ -1,7 +1,6 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -21,7 +20,7 @@ public class MysqlUpdateExecutor {
 
     private static final Logger logger = LoggerFactory.getLogger(MysqlUpdateExecutor.class);
 
-    private MysqlConnector       connector;
+    private MysqlConnector      connector;
 
     public MysqlUpdateExecutor(MysqlConnector connector) throws IOException{
         if (!connector.isConnected()) {
@@ -31,9 +30,9 @@ public class MysqlUpdateExecutor {
         this.connector = connector;
     }
 
-   /* public MysqlUpdateExecutor(SocketChannel ch){
-        this.channel = ch;
-    }*/
+    /*
+     * public MysqlUpdateExecutor(SocketChannel ch){ this.channel = ch; }
+     */
 
     public OKPacket update(String updateString) throws IOException {
         QueryCommandPacket cmd = new QueryCommandPacket();
@@ -42,7 +41,8 @@ public class MysqlUpdateExecutor {
         PacketManager.writeBody(connector.getChannel(), bodyBytes);
 
         logger.debug("read update result...");
-        byte[] body = PacketManager.readBytes(connector.getChannel(), PacketManager.readHeader(connector.getChannel(), 4).getPacketBodyLength());
+        byte[] body = PacketManager.readBytes(connector.getChannel(),
+            PacketManager.readHeader(connector.getChannel(), 4).getPacketBodyLength());
         if (body[0] < 0) {
             ErrorPacket packet = new ErrorPacket();
             packet.fromBytes(body);

+ 69 - 63
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java

@@ -27,68 +27,74 @@ import java.util.concurrent.ConcurrentHashMap;
 @SuppressWarnings({ "rawtypes", "deprecation" })
 public abstract class SocketChannelPool {
 
-	private static EventLoopGroup group = new NioEventLoopGroup();//非阻塞IO线程组
-	private static Bootstrap boot = new Bootstrap();//主
-	private static Map<Channel,SocketChannel> chManager = new ConcurrentHashMap<Channel,SocketChannel>();
-	
-	static{
-		boot.group(group).channel(NioSocketChannel.class)
-			.option(ChannelOption.SO_RCVBUF, 32*1024)
-			.option(ChannelOption.SO_SNDBUF, 32*1024)
-			.option(ChannelOption.TCP_NODELAY, true)//如果是延时敏感型应用,建议关闭Nagle算法
-			.option(ChannelOption.SO_KEEPALIVE, true)
-			.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
-			.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //
-			.handler(new ChannelInitializer(){
-				@Override
-				protected void initChannel(Channel arg0) throws Exception {
-					arg0.pipeline().addLast(new BusinessHandler());//命令过滤和handler添加管理
-				}
-			});
-	}
-	
-	public static SocketChannel open(SocketAddress address) throws Exception {
-		final SocketChannel socket = new SocketChannel();
-		boot.connect(address).addListener(new ChannelFutureListener(){
-			@Override
-			public void operationComplete(ChannelFuture arg0) throws Exception {
-				if(arg0.isSuccess())
-					socket.setChannel(arg0.channel());
-				synchronized (socket) {
-					socket.notify();
-				}
-		}});
-		synchronized (socket) {
-			socket.wait();
-		}
-		if(null == socket.getChannel()){
-			throw new IOException("can't create socket!");
-		}
-		chManager.put(socket.getChannel(), socket);
-		return socket;
-	}
+    private static EventLoopGroup              group     = new NioEventLoopGroup();                         // 非阻塞IO线程组
+    private static Bootstrap                   boot      = new Bootstrap();                                 // 主
+    private static Map<Channel, SocketChannel> chManager = new ConcurrentHashMap<Channel, SocketChannel>();
 
-	public static class BusinessHandler extends ChannelInboundHandlerAdapter {
-		private SocketChannel socket=null;
-		@Override
-		public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-			socket.setChannel(null);
-			chManager.remove(ctx.channel());//移除
-		}
-		@Override
-		public void channelRead(ChannelHandlerContext ctx, Object msg)
-				throws Exception {
-			if(null == socket)
-				socket = chManager.get(ctx.channel());
-			if(socket != null){
-				socket.writeCache((ByteBuf) msg);
-			}
-			ReferenceCountUtil.release(msg);//添加防止内存泄漏的
-		}
-		@Override
-		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-				throws Exception {
-			ctx.close();
-		}
-	}
+    static {
+        boot.group(group)
+            .channel(NioSocketChannel.class)
+            .option(ChannelOption.SO_RCVBUF, 32 * 1024)
+            .option(ChannelOption.SO_SNDBUF, 32 * 1024)
+            .option(ChannelOption.TCP_NODELAY, true)
+            // 如果是延时敏感型应用,建议关闭Nagle算法
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
+            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            //
+            .handler(new ChannelInitializer() {
+
+                @Override
+                protected void initChannel(Channel arg0) throws Exception {
+                    arg0.pipeline().addLast(new BusinessHandler());// 命令过滤和handler添加管理
+                }
+            });
+    }
+
+    public static SocketChannel open(SocketAddress address) throws Exception {
+        final SocketChannel socket = new SocketChannel();
+        boot.connect(address).addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture arg0) throws Exception {
+                if (arg0.isSuccess()) socket.setChannel(arg0.channel());
+                synchronized (socket) {
+                    socket.notify();
+                }
+            }
+        });
+        synchronized (socket) {
+            socket.wait();
+        }
+        if (null == socket.getChannel()) {
+            throw new IOException("can't create socket!");
+        }
+        chManager.put(socket.getChannel(), socket);
+        return socket;
+    }
+
+    public static class BusinessHandler extends ChannelInboundHandlerAdapter {
+
+        private SocketChannel socket = null;
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            socket.setChannel(null);
+            chManager.remove(ctx.channel());// 移除
+        }
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+            if (null == socket) socket = chManager.get(ctx.channel());
+            if (socket != null) {
+                socket.writeCache((ByteBuf) msg);
+            }
+            ReferenceCountUtil.release(msg);// 添加防止内存泄漏的
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            ctx.close();
+        }
+    }
 }

+ 1 - 1
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

@@ -2,7 +2,6 @@ package com.alibaba.otter.canal.instance.core;
 
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -17,6 +16,7 @@ import com.alibaba.otter.canal.parse.ha.HeartBeatHAController;
 import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.sink.CanalEventSink;

+ 6 - 1
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -6,7 +6,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.index.*;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +37,12 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
+import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
+import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
+import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
+import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -4,11 +4,11 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedByInterruptException;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.taobao.tddl.dbsync.binlog.LogFetcher;
 
 /**

+ 6 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManagerTest.java

@@ -30,14 +30,18 @@ public class PeriodMixedLogPositionManagerTest extends AbstractLogPositionManage
         MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
         ZooKeeperLogPositionManager zookeeperLogPositionManager = new ZooKeeperLogPositionManager(zkclientx);
 
-        PeriodMixedLogPositionManager logPositionManager = new PeriodMixedLogPositionManager(memoryLogPositionManager, zookeeperLogPositionManager, 1000L);
+        PeriodMixedLogPositionManager logPositionManager = new PeriodMixedLogPositionManager(memoryLogPositionManager,
+            zookeeperLogPositionManager,
+            1000L);
 
         logPositionManager.start();
 
         LogPosition position2 = doTest(logPositionManager);
         sleep(1500);
 
-        PeriodMixedLogPositionManager logPositionManager2 = new PeriodMixedLogPositionManager(memoryLogPositionManager, zookeeperLogPositionManager, 1000L);
+        PeriodMixedLogPositionManager logPositionManager2 = new PeriodMixedLogPositionManager(memoryLogPositionManager,
+            zookeeperLogPositionManager,
+            1000L);
         logPositionManager2.start();
 
         LogPosition getPosition2 = logPositionManager2.getLatestIndexBy(destination);

+ 14 - 11
server/src/main/java/com/alibaba/otter/canal/server/netty/CanalServerWithNetty.java

@@ -34,7 +34,10 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
     private int                     port;
     private Channel                 serverChannel = null;
     private ServerBootstrap         bootstrap     = null;
-    private ChannelGroup            childGroups   = null;  //socket channel container, used to close sockets explicitly.
+    private ChannelGroup            childGroups   = null; // socket channel
+                                                          // container, used to
+                                                          // close sockets
+                                                          // explicitly.
 
     private static class SingletonHolder {
 
@@ -60,12 +63,10 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
         this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
             Executors.newCachedThreadPool()));
         /*
-         * enable keep-alive mechanism, handle abnormal network connection scenarios on OS level.
-         * the threshold parameters are depended on OS. 
-         * e.g. On Linux:
-         * net.ipv4.tcp_keepalive_time = 300
-         * net.ipv4.tcp_keepalive_probes = 2 
-         * net.ipv4.tcp_keepalive_intvl = 30 
+         * enable keep-alive mechanism, handle abnormal network connection
+         * scenarios on OS level. the threshold parameters are depended on OS.
+         * e.g. On Linux: net.ipv4.tcp_keepalive_time = 300
+         * net.ipv4.tcp_keepalive_probes = 2 net.ipv4.tcp_keepalive_intvl = 30
          */
         bootstrap.setOption("child.keepAlive", true);
         /*
@@ -79,8 +80,9 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
             public ChannelPipeline getPipeline() throws Exception {
                 ChannelPipeline pipelines = Channels.pipeline();
                 pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
-                //support to maintain child socket channel.
-                pipelines.addLast(HandshakeInitializationHandler.class.getName(), new HandshakeInitializationHandler(childGroups));
+                // support to maintain child socket channel.
+                pipelines.addLast(HandshakeInitializationHandler.class.getName(),
+                    new HandshakeInitializationHandler(childGroups));
                 pipelines.addLast(ClientAuthenticationHandler.class.getName(),
                     new ClientAuthenticationHandler(embeddedServer));
 
@@ -105,9 +107,10 @@ public class CanalServerWithNetty extends AbstractCanalLifeCycle implements Cana
             this.serverChannel.close().awaitUninterruptibly(1000);
         }
 
-        //close sockets explicitly to reduce socket channel hung in complicated network environment.
+        // close sockets explicitly to reduce socket channel hung in complicated
+        // network environment.
         if (this.childGroups != null) {
-        	this.childGroups.close().awaitUninterruptibly(5000);
+            this.childGroups.close().awaitUninterruptibly(5000);
         }
 
         if (this.bootstrap != null) {

+ 4 - 2
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java

@@ -89,11 +89,13 @@ public class ClientAuthenticationHandler extends SimpleChannelHandler {
                         if (clientAuth.getNetWriteTimeout() > 0) {
                             writeTimeout = clientAuth.getNetWriteTimeout();
                         }
-                        //fix bug: soTimeout parameter's unit from connector is millseconds.
+                        // fix bug: soTimeout parameter's unit from connector is
+                        // millseconds.
                         IdleStateHandler idleStateHandler = new IdleStateHandler(NettyUtils.hashedWheelTimer,
                             readTimeout,
                             writeTimeout,
-                            0, TimeUnit.MILLISECONDS);
+                            0,
+                            TimeUnit.MILLISECONDS);
                         ctx.getPipeline().addBefore(SessionHandler.class.getName(),
                             IdleStateHandler.class.getName(),
                             idleStateHandler);

+ 10 - 9
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/HandshakeInitializationHandler.java

@@ -19,20 +19,21 @@ import com.alibaba.otter.canal.server.netty.NettyUtils;
  * @version 1.0.0
  */
 public class HandshakeInitializationHandler extends SimpleChannelHandler {
-	//support to maintain socket channel.
-	private ChannelGroup childGroups;
 
-	public HandshakeInitializationHandler(ChannelGroup childGroups) {
-		this.childGroups = childGroups;
-	}
+    // support to maintain socket channel.
+    private ChannelGroup childGroups;
+
+    public HandshakeInitializationHandler(ChannelGroup childGroups){
+        this.childGroups = childGroups;
+    }
 
     private static final Logger logger = LoggerFactory.getLogger(HandshakeInitializationHandler.class);
 
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-    	//add new socket channel in channel container, used to manage sockets.
-    	if(childGroups != null) {
-    		childGroups.add(ctx.getChannel());
-    	}
+        // add new socket channel in channel container, used to manage sockets.
+        if (childGroups != null) {
+            childGroups.add(ctx.getChannel());
+        }
 
         byte[] body = Packet.newBuilder()
             .setType(CanalPacket.PacketType.HANDSHAKE)