|
@@ -4,6 +4,7 @@ import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import com.alibaba.otter.canal.common.CanalException;
|
|
|
import org.I0Itec.zkclient.IZkDataListener;
|
|
|
import org.I0Itec.zkclient.exception.ZkException;
|
|
|
import org.I0Itec.zkclient.exception.ZkInterruptedException;
|
|
@@ -91,18 +92,25 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
|
|
|
processStart();
|
|
|
}
|
|
|
|
|
|
- public void start() {
|
|
|
+ public synchronized void start() {
|
|
|
super.start();
|
|
|
- processStart();
|
|
|
- if (zkClient != null) {
|
|
|
- // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
|
|
|
- String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
|
|
|
- zkClient.subscribeDataChanges(path, dataListener);
|
|
|
+ try {
|
|
|
+ processStart();
|
|
|
+ if (zkClient != null) {
|
|
|
+ // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
|
|
|
+ String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
|
|
|
+ zkClient.subscribeDataChanges(path, dataListener);
|
|
|
|
|
|
- initRunning();
|
|
|
- } else {
|
|
|
- processActiveEnter();// 没有zk,直接启动
|
|
|
+ initRunning();
|
|
|
+ } else {
|
|
|
+ processActiveEnter();// 没有zk,直接启动
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("start failed", e);
|
|
|
+ // 没有正常启动,重置一下状态,避免干扰下一次start
|
|
|
+ stop();
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
public void release() {
|
|
@@ -113,7 +121,7 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void stop() {
|
|
|
+ public synchronized void stop() {
|
|
|
super.stop();
|
|
|
|
|
|
if (zkClient != null) {
|
|
@@ -234,11 +242,7 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
|
|
|
|
|
|
private void processActiveEnter() {
|
|
|
if (listener != null) {
|
|
|
- try {
|
|
|
- listener.processActiveEnter();
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error("processActiveEnter failed", e);
|
|
|
- }
|
|
|
+ listener.processActiveEnter();
|
|
|
}
|
|
|
}
|
|
|
|