Browse Source

动态刷新同步锁状态 (#2181)

* 解决修改数据库远程配置,canal-adapter 一直循环destroy、init。

* 解决动态添加instance和对应的adapter无锁状态问题。
Jony 5 years ago
parent
commit
c093fbb8f6

+ 21 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@@ -64,6 +64,27 @@ public class SyncSwitch {
         }
     }
 
+    public synchronized void refresh() {
+        for (String destination : adapterCanalConfig.DESTINATIONS) {
+            BooleanMutex booleanMutex;
+            if (mode == Mode.DISTRIBUTED) {
+                CuratorFramework curator = curatorClient.getCurator();
+                booleanMutex = DISTRIBUTED_LOCK.get(destination);
+                if (booleanMutex == null) {
+                    BooleanMutex mutex = new BooleanMutex(true);
+                    initMutex(curator, destination, mutex);
+                    DISTRIBUTED_LOCK.put(destination, mutex);
+                    startListen(destination, mutex);
+                }
+            } else {
+                booleanMutex = LOCAL_LOCK.get(destination);
+                if (booleanMutex == null) {
+                    LOCAL_LOCK.put(destination, new BooleanMutex(true));
+                }
+            }
+        }
+    }
+
     @SuppressWarnings("resource")
     private synchronized void startListen(String destination, BooleanMutex mutex) {
         try {

+ 2 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -53,6 +53,8 @@ public class CanalAdapterService {
             return;
         }
         try {
+            syncSwitch.refresh();
+            logger.info("## syncSwitch refreshed.");
             logger.info("## start the canal client adapters.");
             adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
             adapterLoader.init();