1
0
Эх сурвалжийг харах

fixed https://github.com/alibaba/canal/pull/334

agapple 7 жил өмнө
parent
commit
8e88dc87f7

+ 14 - 12
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/ZooKeeperx.java

@@ -1,5 +1,13 @@
 package com.alibaba.otter.canal.common.zookeeper;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.apache.commons.lang.StringUtils;
@@ -13,14 +21,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.ReflectionUtils;
 
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 /**
  * 封装了ZooKeeper,使其支持节点的优先顺序,比如美国机房的节点会优先加载美国对应的zk集群列表,都失败后才会选择加载杭州的zk集群列表 *
  * 
@@ -33,8 +33,10 @@ public class ZooKeeperx extends ZkConnection {
     private static final Logger logger                  = LoggerFactory.getLogger(ZooKeeperx.class);
     private static final Field  clientCnxnField         = ReflectionUtils.findField(ZooKeeper.class, "cnxn");
     private static final Field  hostProviderField       = ReflectionUtils.findField(ClientCnxn.class, "hostProvider");
-    private static final Field  serverAddressesField    = ReflectionUtils.findField(StaticHostProvider.class, "serverAddresses");
-    private static final Field  zookeeperLockField      = ReflectionUtils.findField(ZkConnection.class, "_zookeeperLock");
+    private static final Field  serverAddressesField    = ReflectionUtils.findField(StaticHostProvider.class,
+                                                            "serverAddresses");
+    private static final Field  zookeeperLockField      = ReflectionUtils.findField(ZkConnection.class,
+                                                            "_zookeeperLock");
     private static final Field  zookeeperFiled          = ReflectionUtils.findField(ZkConnection.class, "_zk");
     private static final int    DEFAULT_SESSION_TIMEOUT = 90000;
 
@@ -98,7 +100,8 @@ public class ZooKeeperx extends ZkConnection {
                     // 强制获取zk中的地址信息
                     ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
                     HostProvider hostProvider = (HostProvider) ReflectionUtils.getField(hostProviderField, cnxn);
-                    List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField, hostProvider);
+                    List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField,
+                        hostProvider);
                     // 添加第二组集群列表
                     serverAddrs.addAll(new ConnectStringParser(cluster).getServerAddresses());
                 }
@@ -116,4 +119,3 @@ public class ZooKeeperx extends ZkConnection {
 
     }
 }
-

+ 1 - 1
deployer/src/main/resources/logback.xml

@@ -74,7 +74,7 @@
         <appender-ref ref="CANAL-META" />
     </logger>
     
-	<root level="INFO">
+	<root level="WARN">
 		<!--<appender-ref ref="STDOUT"/>-->
 		<appender-ref ref="CANAL-ROOT" />
 	</root>

+ 0 - 4
driver/pom.xml

@@ -33,10 +33,6 @@
 			<groupId>org.slf4j</groupId>
 			<artifactId>slf4j-api</artifactId>
 		</dependency>
-		<dependency>
-			<groupId>org.jboss.netty</groupId>
-			<artifactId>netty</artifactId>
-		</dependency>
 		<!-- test dependency -->
 		<dependency>
 			<groupId>junit</groupId>

+ 4 - 7
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -2,9 +2,6 @@ package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannelPool;
-
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -17,6 +14,8 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.client.QuitCommandPack
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.HandshakeInitializationPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.Reply323Packet;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannelPool;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.MySQLPasswordEncrypter;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
@@ -65,10 +64,8 @@ public class MysqlConnector {
     public void connect() throws IOException {
         if (connected.compareAndSet(false, true)) {
             try {
-                channel = SocketChannel.open();
-                configChannel(channel);
+                channel = SocketChannelPool.open(address);
                 logger.info("connect MysqlConnection to {}...", address);
-                channel.connect(address);
                 negotiate(channel);
             } catch (Exception e) {
                 disconnect();
@@ -142,7 +139,7 @@ public class MysqlConnector {
         HeaderPacket quitHeader = new HeaderPacket();
         quitHeader.setPacketBodyLength(cmdBody.length);
         quitHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.writePkg(channel, quitHeader.toBytes(),cmdBody);
+        PacketManager.writePkg(channel, quitHeader.toBytes(), cmdBody);
     }
 
     private void negotiate(SocketChannel channel) throws IOException {

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlQueryExecutor.java

@@ -1,7 +1,6 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
 import java.io.IOException;
-import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -12,6 +11,7 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetHeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.RowDataPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
 
 /**
@@ -88,7 +88,7 @@ public class MysqlQueryExecutor {
         for (RowDataPacket r : rowData) {
             resultSet.getFieldValues().addAll(r.getColumns());
         }
-        resultSet.setSourceAddress(channel.socket().getRemoteSocketAddress());
+        resultSet.setSourceAddress(channel.getRemoteSocketAddress());
 
         return resultSet;
     }

+ 63 - 53
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java

@@ -14,59 +14,69 @@ import java.net.SocketAddress;
  * @author luoyaogui
  */
 public class SocketChannel {
-	private Channel channel = null;
-	private Object lock = new Object();
-	private ByteBuf cache = PooledByteBufAllocator.DEFAULT.directBuffer(1024*1024);//缓存大小
 
-	public Channel getChannel() {
-		return channel;
-	}
-	public void setChannel(Channel channel) {
-		this.channel = channel;
-	}
+    private Channel channel = null;
+    private Object  lock    = new Object();
+    private ByteBuf cache   = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024); // 缓存大小
 
-	public void writeCache(ByteBuf buf){
-		synchronized (lock) {
-			cache.discardReadBytes();//回收内存
-			cache.writeBytes(buf);
-		}
-	}
-	public void writeChannel(byte[]... buf) throws IOException {
-		if(channel != null && channel.isWritable())
-			channel.writeAndFlush(Unpooled.copiedBuffer(buf));
-		else
-			throw new IOException("write  failed  !  please checking !");
-	}
-	public byte[] read(int readSize) throws IOException {
-		do{
-			if(readSize > cache.readableBytes()){
-				if(null == channel)
-					throw new IOException("socket has Interrupted !");
-				synchronized (this) {
-					try { wait(100); } catch (InterruptedException e) {}
-				}
-			} else {
-				byte[] back = new byte[readSize];
-				synchronized (lock) {
-					cache.readBytes(back);
-				}
-				return back;
-			}
-		}while(true);
-	}
-	public boolean isConnected() {
-		return channel!=null?true:false;
-	}
-	public SocketAddress getRemoteSocketAddress(){
-		return channel!=null?channel.remoteAddress():null;
-	}
-	public void close(){
-		if(channel != null){
-			channel.close();
-		}
-		channel = null;
-		cache.discardReadBytes();//回收已占用的内存
-		cache.release();//释放整个内存
-		cache = null;
-	}
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+
+    public void writeCache(ByteBuf buf) {
+        synchronized (lock) {
+            cache.discardReadBytes();// 回收内存
+            cache.writeBytes(buf);
+        }
+    }
+
+    public void writeChannel(byte[]... buf) throws IOException {
+        if (channel != null && channel.isWritable()) channel.writeAndFlush(Unpooled.copiedBuffer(buf));
+        else throw new IOException("write  failed  !  please checking !");
+    }
+
+    public byte[] read(int readSize) throws IOException {
+        do {
+            if (readSize > cache.readableBytes()) {
+                if (null == channel) {
+                    throw new IOException("socket has Interrupted !");
+                }
+                synchronized (this) {
+                    try {
+                        wait(100);
+                    } catch (InterruptedException e) {
+                        throw new IOException("socket has Interrupted !");
+                    }
+                }
+            } else {
+                byte[] back = new byte[readSize];
+                synchronized (lock) {
+                    cache.readBytes(back);
+                }
+                return back;
+            }
+        } while (true);
+    }
+
+    public boolean isConnected() {
+        return channel != null ? true : false;
+    }
+
+    public SocketAddress getRemoteSocketAddress() {
+        return channel != null ? channel.remoteAddress() : null;
+    }
+
+    public void close() {
+        if (channel != null) {
+            channel.close();
+        }
+        channel = null;
+        cache.discardReadBytes();// 回收已占用的内存
+        cache.release();// 释放整个内存
+        cache = null;
+    }
 }

+ 5 - 3
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.parse.driver.mysql.utils;
 
 import java.io.IOException;
+
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
 
 public abstract class PacketManager {
 
@@ -18,15 +20,15 @@ public abstract class PacketManager {
     public static void writePkg(SocketChannel ch, byte[]... srcs) throws IOException {
         ch.writeChannel(srcs);
     }
-    
+
     public static void writeBody(SocketChannel ch, byte[] body) throws IOException {
-    	writeBody0(ch, body, (byte) 0);
+        writeBody0(ch, body, (byte) 0);
     }
 
     public static void writeBody0(SocketChannel ch, byte[] body, byte packetSeqNumber) throws IOException {
         HeaderPacket header = new HeaderPacket();
         header.setPacketBodyLength(body.length);
         header.setPacketSequenceNumber(packetSeqNumber);
-        ch.writeChannel(header.toBytes(),body);
+        ch.writeChannel(header.toBytes(), body);
     }
 }

+ 1 - 1
example/src/main/java/com/alibaba/otter/canal/example/ClusterCanalClientTest.java

@@ -28,7 +28,7 @@ public class ClusterCanalClientTest extends AbstractCanalClientTest {
         // "stability_test", "", "");
 
         // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
-        CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", destination, "", "");
+        CanalConnector connector = CanalConnectors.newClusterConnector("10.218.140.189:2181", destination, "", "");
 
         final ClusterCanalClientTest clientTest = new ClusterCanalClientTest(destination);
         clientTest.setConnector(connector);

+ 5 - 2
example/src/main/java/com/alibaba/otter/canal/example/SimpleCanalClientTest.java

@@ -23,8 +23,11 @@ public class SimpleCanalClientTest extends AbstractCanalClientTest {
     public static void main(String args[]) {
         // 根据ip,直接创建链接,无HA的功能
         String destination = "example";
-        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
-            11111), destination, "", "");
+        String ip = AddressUtils.getHostIp();
+        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
+            destination,
+            "",
+            "");
 
         final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
         clientTest.setConnector(connector);

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -5,6 +5,7 @@ import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +39,6 @@ public class MysqlConnection implements ErosaConnection {
     }
 
     public MysqlConnection(InetSocketAddress address, String username, String password){
-
         connector = new MysqlConnector(address, username, password);
     }
 
@@ -138,7 +138,7 @@ public class MysqlConnection implements ErosaConnection {
         HeaderPacket binlogDumpHeader = new HeaderPacket();
         binlogDumpHeader.setPacketBodyLength(cmdBody.length);
         binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
-        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(),cmdBody);
+        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
         connector.setDumping(true);
     }
 

+ 0 - 1
parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java

@@ -2,7 +2,6 @@ package com.alibaba.otter.canal.parse;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 
 import org.junit.Assert;
 import org.junit.Test;

+ 0 - 5
pom.xml

@@ -194,11 +194,6 @@
                 <artifactId>oro</artifactId>
                 <version>2.0.8</version>
             </dependency>
-            <dependency>
-                <groupId>org.jboss.netty</groupId>
-                <artifactId>netty</artifactId>
-                <version>3.2.5.Final</version>
-            </dependency>
             <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty-all</artifactId>