|
@@ -1,6 +1,7 @@
|
|
|
package com.alibaba.otter.canal.client.impl.running;
|
|
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.text.MessageFormat;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -96,7 +97,11 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
|
|
|
releaseRunning(); // 尝试一下release
|
|
|
}
|
|
|
|
|
|
- public void initRunning() {
|
|
|
+ // 改动记录:
|
|
|
+ // 1,在方法上加synchronized关键字,保证同步顺序执行;
|
|
|
+ // 2,判断Zk上已经存在的activeData是否是本机,是的话把mutex重置为true,否则会导致死锁
|
|
|
+ // 3,增加异常处理,保证出现异常时,running节点能被删除,否则会导致死锁
|
|
|
+ public synchronized void initRunning() {
|
|
|
if (!isStart()) {
|
|
|
return;
|
|
|
}
|
|
@@ -116,12 +121,19 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
|
|
|
initRunning();
|
|
|
} else {
|
|
|
activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
|
|
|
+ if(activeData.getAddress().contains(":") && isMine(activeData.getAddress())){
|
|
|
+ mutex.set(true);
|
|
|
+ }
|
|
|
}
|
|
|
} catch (ZkNoNodeException e) {
|
|
|
zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientData.getClientId()),
|
|
|
true); // 尝试创建父节点
|
|
|
initRunning();
|
|
|
- }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",destination),t);
|
|
|
+ zkClient.delete(path);
|
|
|
+ throw new RuntimeException("something goes wrong in initRunning method. ",t);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|