Explorar o código

fixed issue #1220, 解决一下no alive canal server重试过于频繁

agapple %!s(int64=6) %!d(string=hai) anos
pai
achega
1b11aa1d21

+ 3 - 2
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterNodeAccessStrategy.java

@@ -15,7 +15,6 @@ import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
 import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningData;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 
 /**
  * 集群模式的调度策略
@@ -25,6 +24,7 @@ import com.alibaba.otter.canal.protocol.exception.CanalClientException;
  */
 public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
 
+    private String                           destination;
     private IZkChildListener                 childListener;                                      // 监听所有的服务器列表
     private IZkDataListener                  dataListener;                                       // 监听当前的工作节点
     private ZkClientx                        zkClient;
@@ -32,6 +32,7 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
     private volatile InetSocketAddress       runningAddress = null;
 
     public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
+        this.destination = destination;
         this.zkClient = zkClient;
         childListener = new IZkChildListener() {
 
@@ -73,7 +74,7 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
         } else if (!currentAddress.isEmpty()) { // 如果不存在已经启动的服务,可能服务是一种lazy启动,随机选择一台触发服务器进行启动
             return currentAddress.get(0);// 默认返回第一个节点,之前已经做过shuffle
         } else {
-            throw new CanalClientException("no alive canal server");
+            throw new ServerNotFoundException("no alive canal server for " + destination);
         }
     }
 

+ 29 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/ServerNotFoundException.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.client.impl;
+
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+
+public class ServerNotFoundException extends CanalClientException {
+
+    private static final long serialVersionUID = -3471518241911601774L;
+
+    public ServerNotFoundException(String errorCode, String errorDesc, Throwable cause){
+        super(errorCode, errorDesc, cause);
+    }
+
+    public ServerNotFoundException(String errorCode, String errorDesc){
+        super(errorCode, errorDesc);
+    }
+
+    public ServerNotFoundException(String errorCode, Throwable cause){
+        super(errorCode, cause);
+    }
+
+    public ServerNotFoundException(String errorCode){
+        super(errorCode);
+    }
+
+    public ServerNotFoundException(Throwable cause){
+        super(cause);
+    }
+
+}

+ 10 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor.java

@@ -16,6 +16,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import com.alibaba.otter.canal.client.impl.ServerNotFoundException;
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.utils.BooleanMutex;
 import com.alibaba.otter.canal.common.utils.JsonUtils;
@@ -141,6 +142,15 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
             logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
                 destination),
                 t);
+
+            // fixed issue 1220, 针对server节点不工作避免死循环
+            if (t instanceof ServerNotFoundException) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+
             // 出现任何异常尝试release
             releaseRunning();
             throw new CanalClientException("something goes wrong in initRunning method. ", t);