瀏覽代碼

1. CanalController stop 需要同时将 embededCanalServer.stop (#4477)

2. ServerRunningMonitor 线程池未正常回收,线程池管理与 start/stop保持一致
华仔 2 年之前
父節點
當前提交
fa0c08c170

+ 7 - 2
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java

@@ -38,7 +38,7 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
     private ServerRunningData          serverData;
     // 当前实际运行的节点状态信息
     private volatile ServerRunningData activeData;
-    private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService   delayExecutor;
     private int                        delayTime    = 5;
     private ServerRunningListener      listener;
 
@@ -73,7 +73,7 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
                     initRunning();
                 } else {
                     // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
-                    delayExector.schedule(() -> initRunning(), delayTime, TimeUnit.SECONDS);
+                    delayExecutor.schedule(() -> initRunning(), delayTime, TimeUnit.SECONDS);
                 }
             }
 
@@ -90,6 +90,7 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
         try {
             processStart();
             if (zkClient != null) {
+                delayExecutor = Executors.newScheduledThreadPool(1);
                 // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
                 String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
                 zkClient.subscribeDataChanges(path, dataListener);
@@ -122,6 +123,10 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
         if (zkClient != null) {
             String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
             zkClient.unsubscribeDataChanges(path, dataListener);
+            if (delayExecutor != null) {
+                delayExecutor.shutdown();
+                delayExecutor = null;
+            }
 
             releaseRunning(); // 尝试一下release
         } else {

+ 5 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -558,6 +558,11 @@ public class CanalController {
         }
 
         ZkClientx.clearClients();
+
+        // 需要释放 CanalServerWithEmbedded 否则主线程退出后,进程无法自动完整退出...
+        if (embededCanalServer != null) {
+            embededCanalServer.stop();
+        }
     }
 
     private void initCid(String path) {