Browse Source

fixed issue #171 , client HA bug , modify SimpleCanalConnector

agapple 9 years ago
parent
commit
4f1e3bafe7

+ 2 - 0
client/src/main/java/com/alibaba/otter/canal/client/CanalNodeAccessStrategy.java

@@ -10,5 +10,7 @@ import java.net.SocketAddress;
  */
 public interface CanalNodeAccessStrategy {
 
+    SocketAddress currentNode();
+
     SocketAddress nextNode();
 }

+ 9 - 3
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.java

@@ -41,11 +41,17 @@ public class ClusterCanalConnector implements CanalConnector {
 
     public void connect() throws CanalClientException {
         while (currentConnector == null) {
-            SocketAddress nextAddress = this.accessStrategy.nextNode();
             int times = 0;
             while (true) {
                 try {
-                    currentConnector = new SimpleCanalConnector(nextAddress, username, password, destination, accessStrategy);
+                    currentConnector = new SimpleCanalConnector(null, username, password, destination) {
+
+                        @Override
+                        public SocketAddress getAddress() {
+                            return accessStrategy.nextNode();
+                        }
+
+                    };
                     currentConnector.setSoTimeout(soTimeout);
                     if (filter != null) {
                         currentConnector.setFilter(filter);
@@ -57,7 +63,7 @@ public class ClusterCanalConnector implements CanalConnector {
                     currentConnector.connect();
                     break;
                 } catch (Exception e) {
-                    logger.warn("failed to connect to:{} after retry {} times", currentConnector.getAddress(), times);
+                    logger.warn("failed to connect to:{} after retry {} times", accessStrategy.currentNode(), times);
                     currentConnector.disconnect();
                     currentConnector = null;
                     // retry for #retryTimes for each node when trying to

+ 6 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterNodeAccessStrategy.java

@@ -62,6 +62,11 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
         initRunning(this.zkClient.readData(runningPath, true));
     }
 
+    @Override
+    public SocketAddress currentNode() {
+        return nextNode();
+    }
+
     public SocketAddress nextNode() {
         if (runningAddress != null) {// 如果服务已经启动,直接选择当前正在工作的节点
             return runningAddress;
@@ -109,4 +114,5 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
         return zkClient;
     }
 
+
 }

+ 11 - 10
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -14,7 +14,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.client.CanalNodeAccessStrategy;
 import com.alibaba.otter.canal.client.impl.running.ClientRunningData;
 import com.alibaba.otter.canal.client.impl.running.ClientRunningListener;
 import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;
@@ -70,21 +69,18 @@ public class SimpleCanalConnector implements CanalConnector {
     // 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败
     private Object               readDataLock          = new Object();
     private Object               writeDataLock         = new Object();
-    private CanalNodeAccessStrategy accessStrategy;
 
-    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
-                                CanalNodeAccessStrategy accessStrategy){
-        this(address, username, password, destination, 60000, accessStrategy);
+    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination){
+        this(address, username, password, destination, 60000);
     }
 
     public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
-                                int soTimeout, CanalNodeAccessStrategy accessStrategy){
+                                int soTimeout){
         this.address = address;
         this.username = username;
         this.password = password;
         this.soTimeout = soTimeout;
         this.clientIdentity = new ClientIdentity(destination, (short) 1001);
-        this.accessStrategy = accessStrategy;
     }
 
     public void connect() throws CanalClientException {
@@ -127,11 +123,12 @@ public class SimpleCanalConnector implements CanalConnector {
 
     private InetSocketAddress doConnect() throws CanalClientException {
         try {
-            if (accessStrategy != null) {
-				address = accessStrategy.nextNode();//实时获取一下最新的running节点,lubiao
-			}
             channel = SocketChannel.open();
             channel.socket().setSoTimeout(soTimeout);
+            SocketAddress address = getAddress();
+            if (address == null) {
+                address = getNextAddress();
+            }
             channel.connect(address);
             Packet p = Packet.parseFrom(readNextPacket(channel));
             if (p.getVersion() != 1) {
@@ -441,6 +438,10 @@ public class SimpleCanalConnector implements CanalConnector {
         }
     }
 
+    public SocketAddress getNextAddress() {
+        return null;
+    }
+
     public SocketAddress getAddress() {
         return address;
     }

+ 5 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleNodeAccessStrategy.java

@@ -32,4 +32,9 @@ public class SimpleNodeAccessStrategy implements CanalNodeAccessStrategy {
         }
     }
 
+    @Override
+    public SocketAddress currentNode() {
+        return nodes.get(index);
+    }
+
 }

+ 11 - 6
client/src/main/java/com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor.java

@@ -21,6 +21,7 @@ import com.alibaba.otter.canal.common.utils.BooleanMutex;
 import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 
 /**
  * clinet running控制
@@ -121,8 +122,9 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
                 initRunning();
             } else {
                 activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
-                if(activeData.getAddress().contains(":") && isMine(activeData.getAddress())){
-                	mutex.set(true);
+                // 如果发现已经存在,判断一下是否自己,避免活锁
+                if (activeData.getAddress().contains(":") && isMine(activeData.getAddress())) {
+                    mutex.set(true);
                 }
             }
         } catch (ZkNoNodeException e) {
@@ -130,10 +132,13 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
                 true); // 尝试创建父节点
             initRunning();
         } catch (Throwable t) {
-			logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",destination),t);
-			zkClient.delete(path);
-			throw new RuntimeException("something goes wrong in initRunning method. ",t);
-		}
+            logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
+                destination),
+                t);
+            // 出现任何异常尝试release
+            releaseRunning();
+            throw new CanalClientException("something goes wrong in initRunning method. ", t);
+        }
     }
 
     /**