Przeglądaj źródła

Merge pull request #173 from lulu2panpan/master

Canal客户端高可用bug修复
agapple 9 lat temu
rodzic
commit
6a03ffaf7b

+ 1 - 1
client/src/main/java/com/alibaba/otter/canal/client/CanalConnectors.java

@@ -28,7 +28,7 @@ public class CanalConnectors {
      */
     public static CanalConnector newSingleConnector(SocketAddress address, String destination, String username,
                                                     String password) {
-        SimpleCanalConnector canalConnector = new SimpleCanalConnector(address, username, password, destination);
+        SimpleCanalConnector canalConnector = new SimpleCanalConnector(address, username, password, destination, null);
         canalConnector.setSoTimeout(30 * 1000);
         return canalConnector;
     }

+ 2 - 2
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.java

@@ -45,7 +45,7 @@ public class ClusterCanalConnector implements CanalConnector {
             int times = 0;
             while (true) {
                 try {
-                    currentConnector = new SimpleCanalConnector(nextAddress, username, password, destination);
+                    currentConnector = new SimpleCanalConnector(nextAddress, username, password, destination, accessStrategy);
                     currentConnector.setSoTimeout(soTimeout);
                     if (filter != null) {
                         currentConnector.setFilter(filter);
@@ -57,7 +57,7 @@ public class ClusterCanalConnector implements CanalConnector {
                     currentConnector.connect();
                     break;
                 } catch (Exception e) {
-                    logger.warn("failed to connect to:{} after retry {} times", nextAddress, times);
+                    logger.warn("failed to connect to:{} after retry {} times", currentConnector.getAddress(), times);
                     currentConnector.disconnect();
                     currentConnector = null;
                     // retry for #retryTimes for each node when trying to

+ 10 - 3
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -14,6 +14,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalNodeAccessStrategy;
 import com.alibaba.otter.canal.client.impl.running.ClientRunningData;
 import com.alibaba.otter.canal.client.impl.running.ClientRunningListener;
 import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;
@@ -69,18 +70,21 @@ public class SimpleCanalConnector implements CanalConnector {
     // 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败
     private Object               readDataLock          = new Object();
     private Object               writeDataLock         = new Object();
+    private CanalNodeAccessStrategy accessStrategy;
 
-    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination){
-        this(address, username, password, destination, 60000);
+    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
+                                CanalNodeAccessStrategy accessStrategy){
+        this(address, username, password, destination, 60000, accessStrategy);
     }
 
     public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
-                                int soTimeout){
+                                int soTimeout, CanalNodeAccessStrategy accessStrategy){
         this.address = address;
         this.username = username;
         this.password = password;
         this.soTimeout = soTimeout;
         this.clientIdentity = new ClientIdentity(destination, (short) 1001);
+        this.accessStrategy = accessStrategy;
     }
 
     public void connect() throws CanalClientException {
@@ -123,6 +127,9 @@ public class SimpleCanalConnector implements CanalConnector {
 
     private InetSocketAddress doConnect() throws CanalClientException {
         try {
+            if (accessStrategy != null) {
+				address = accessStrategy.nextNode();//实时获取一下最新的running节点,lubiao
+			}
             channel = SocketChannel.open();
             channel.socket().setSoTimeout(soTimeout);
             channel.connect(address);

+ 14 - 2
client/src/main/java/com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.client.impl.running;
 
 import java.net.InetSocketAddress;
+import java.text.MessageFormat;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -96,7 +97,11 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
         releaseRunning(); // 尝试一下release
     }
 
-    public void initRunning() {
+    // 改动记录:
+    // 1,在方法上加synchronized关键字,保证同步顺序执行;
+    // 2,判断Zk上已经存在的activeData是否是本机,是的话把mutex重置为true,否则会导致死锁
+    // 3,增加异常处理,保证出现异常时,running节点能被删除,否则会导致死锁
+    public synchronized void initRunning() {
         if (!isStart()) {
             return;
         }
@@ -116,12 +121,19 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
                 initRunning();
             } else {
                 activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
+                if(activeData.getAddress().contains(":") && isMine(activeData.getAddress())){
+                	mutex.set(true);
+                }
             }
         } catch (ZkNoNodeException e) {
             zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientData.getClientId()),
                 true); // 尝试创建父节点
             initRunning();
-        }
+        } catch (Throwable t) {
+			logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",destination),t);
+			zkClient.delete(path);
+			throw new RuntimeException("something goes wrong in initRunning method. ",t);
+		}
     }
 
     /**