mcy 6 سال پیش
والد
کامیت
f26adff7c7

+ 45 - 20
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/AdapterRemoteConfigMonitor.java

@@ -69,6 +69,7 @@ public class AdapterRemoteConfigMonitor {
                 if (configItem.getModifiedTime() != currentConfigTimestamp) {
                     currentConfigTimestamp = configItem.getModifiedTime();
                     overrideLocalCanalConfig(configItem.getContent());
+                    logger.info("## Loaded remote adapter config: application.yml");
                 }
             }
         } catch (Exception e) {
@@ -78,7 +79,7 @@ public class AdapterRemoteConfigMonitor {
 
     /**
      * 获取远程application.yml配置
-     * 
+     *
      * @return 配置对象
      */
     private ConfigItem getRemoteAdapterConfig() {
@@ -100,7 +101,7 @@ public class AdapterRemoteConfigMonitor {
 
     /**
      * 覆盖本地application.yml文件
-     * 
+     *
      * @param content 文件内容
      */
     private void overrideLocalCanalConfig(String content) {
@@ -140,7 +141,7 @@ public class AdapterRemoteConfigMonitor {
      */
     public void loadRemoteAdapterConfigs() {
         try {
-            // 加载远程instance配置
+            // 加载远程adapter配置
             Map<String, ConfigItem>[] modifiedConfigs = getModifiedAdapterConfigs();
             if (modifiedConfigs != null) {
                 overrideLocalAdapterConfigs(modifiedConfigs);
@@ -152,7 +153,7 @@ public class AdapterRemoteConfigMonitor {
 
     /**
      * 获取有变动的adapter配置
-     * 
+     *
      * @return Map[0]: 新增修改的配置, Map[1]: 删除的配置
      */
     @SuppressWarnings("unchecked")
@@ -191,7 +192,7 @@ public class AdapterRemoteConfigMonitor {
                 }
             }
             if (!changedIds.isEmpty()) {
-                Map<String, ConfigItem> changedInstanceConfig = new HashMap<>();
+                Map<String, ConfigItem> changedAdapterConfig = new HashMap<>();
                 String contentsSql = "select id, category, name, content, modified_time from canal_adapter_config  where id in ("
                                      + Joiner.on(",").join(changedIds) + ")";
                 try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
@@ -205,26 +206,26 @@ public class AdapterRemoteConfigMonitor {
 
                         remoteAdapterConfigs.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
                             configItemNew);
-                        changedInstanceConfig.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
+                        changedAdapterConfig.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
                             configItemNew);
                     }
 
-                    res[0] = changedInstanceConfig.isEmpty() ? null : changedInstanceConfig;
+                    res[0] = changedAdapterConfig.isEmpty() ? null : changedAdapterConfig;
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
             }
         }
 
-        Map<String, ConfigItem> removedInstanceConfig = new HashMap<>();
+        Map<String, ConfigItem> removedAdapterConfig = new HashMap<>();
         for (ConfigItem configItem : remoteAdapterConfigs.values()) {
             if (!remoteConfigStatus.containsKey(configItem.getCategory() + "/" + configItem.getName())) {
                 // 删除
                 remoteAdapterConfigs.remove(configItem.getCategory() + "/" + configItem.getName());
-                removedInstanceConfig.put(configItem.getCategory() + "/" + configItem.getName(), null);
+                removedAdapterConfig.put(configItem.getCategory() + "/" + configItem.getName(), null);
             }
         }
-        res[1] = removedInstanceConfig.isEmpty() ? null : removedInstanceConfig;
+        res[1] = removedAdapterConfig.isEmpty() ? null : removedAdapterConfig;
 
         if (res[0] == null && res[1] == null) {
             return null;
@@ -235,33 +236,57 @@ public class AdapterRemoteConfigMonitor {
 
     /**
      * 覆盖adapter配置到本地
-     * 
-     * @param modifiedInstanceConfigs 变动的配置集合
+     *
+     * @param modifiedAdapterConfigs 变动的配置集合
      */
-    private void overrideLocalAdapterConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
-        Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
-        if (changedInstanceConfigs != null) {
-            for (ConfigItem configItem : changedInstanceConfigs.values()) {
+    private void overrideLocalAdapterConfigs(Map<String, ConfigItem>[] modifiedAdapterConfigs) {
+        Map<String, ConfigItem> changedAdapterConfigs = modifiedAdapterConfigs[0];
+        if (changedAdapterConfigs != null) {
+            for (ConfigItem configItem : changedAdapterConfigs.values()) {
                 try (FileWriter writer = new FileWriter(
                     getConfPath() + configItem.getCategory() + "/" + configItem.getName())) {
                     writer.write(configItem.getContent());
                     writer.flush();
+                    logger.info("## Loaded remote adapter config: {}/{}",
+                        configItem.getCategory(),
+                        configItem.getName());
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
             }
         }
-        Map<String, ConfigItem> removedInstanceConfigs = modifiedInstanceConfigs[1];
-        if (removedInstanceConfigs != null) {
-            for (String name : removedInstanceConfigs.keySet()) {
+        Map<String, ConfigItem> removedAdapterConfigs = modifiedAdapterConfigs[1];
+        if (removedAdapterConfigs != null) {
+            for (String name : removedAdapterConfigs.keySet()) {
                 File file = new File(getConfPath() + name);
                 if (file.exists()) {
-                    file.delete();
+                    deleteDir(file);
+                    logger.info("## Deleted and reloaded remote adapter config: {}", name);
                 }
             }
         }
     }
 
+    private static boolean deleteDir(File dirFile) {
+        if (!dirFile.exists()) {
+            return false;
+        }
+
+        if (dirFile.isFile()) {
+            return dirFile.delete();
+        } else {
+            File[] files = dirFile.listFiles();
+            if (files == null || files.length == 0) {
+                return dirFile.delete();
+            }
+            for (File file : files) {
+                deleteDir(file);
+            }
+        }
+
+        return dirFile.delete();
+    }
+
     /**
      * 获取conf文件夹所在路径
      *

+ 42 - 22
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerRemoteConfigMonitor.java

@@ -4,16 +4,8 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.sql.*;
+import java.util.*;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -28,7 +20,7 @@ import com.google.common.collect.MapMaker;
 
 /**
  * 远程配置装载监控
- * 
+ *
  * @author rewerma 2018-12-30 下午05:12:16
  * @version 1.0.1
  */
@@ -47,7 +39,7 @@ public class ManagerRemoteConfigMonitor {
 
     private long                     scanIntervalInSecond   = 5;
     private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
-                                                                new NamedThreadFactory("remote-canal-config-scan"));
+        new NamedThreadFactory("remote-canal-config-scan"));
 
     public ManagerRemoteConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
         this.jdbcUrl = jdbcUrl;
@@ -69,7 +61,7 @@ public class ManagerRemoteConfigMonitor {
 
     /**
      * 加载远程 canal.properties文件 覆盖本地
-     * 
+     *
      * @return 远程配置的properties
      */
     public Properties loadRemoteConfig() {
@@ -83,8 +75,9 @@ public class ManagerRemoteConfigMonitor {
                     overrideLocalCanalConfig(configItem.getContent());
                     properties = new Properties();
                     properties.load(new ByteArrayInputStream(configItem.getContent().getBytes(StandardCharsets.UTF_8)));
-                    scanIntervalInSecond = Integer.valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
-                        "5"));
+                    scanIntervalInSecond = Integer
+                        .valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5"));
+                    logger.info("## Loaded remote canal config: canal.properties ");
                 }
             }
         } catch (Exception e) {
@@ -110,7 +103,7 @@ public class ManagerRemoteConfigMonitor {
 
     /**
      * 获取远程canal.properties配置内容
-     * 
+     *
      * @return 内容对象
      */
     private ConfigItem getRemoteCanalConfig() {
@@ -132,7 +125,7 @@ public class ManagerRemoteConfigMonitor {
 
     /**
      * 覆盖本地 canal.properties
-     * 
+     *
      * @param content 远程配置内容文本
      */
     private void overrideLocalCanalConfig(String content) {
@@ -224,16 +217,22 @@ public class ManagerRemoteConfigMonitor {
 
     /**
      * 覆盖本地instance配置
-     * 
+     *
      * @param modifiedInstanceConfigs 有变更的配置项
      */
     private void overrideLocalInstanceConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
         Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
         if (changedInstanceConfigs != null) {
             for (ConfigItem configItem : changedInstanceConfigs.values()) {
-                try (FileWriter writer = new FileWriter(getConfPath() + configItem.getName() + "/instance.properties")) {
+                File instanceDir = new File(getConfPath() + configItem.getName());
+                if (!instanceDir.exists()) {
+                    instanceDir.mkdirs();
+                }
+                try (FileWriter writer = new FileWriter(
+                    getConfPath() + configItem.getName() + "/instance.properties")) {
                     writer.write(configItem.getContent());
                     writer.flush();
+                    logger.info("## Loaded remote instance config: {}/instance.properties ", configItem.getName());
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
@@ -244,15 +243,36 @@ public class ManagerRemoteConfigMonitor {
             for (String name : removedInstanceConfigs.keySet()) {
                 File file = new File(getConfPath() + name + "/");
                 if (file.exists()) {
-                    file.delete();
+                    deleteDir(file);
+                    logger.info("## Deleted and loaded remote instance config: {} ", name);
                 }
             }
         }
     }
 
+    private static boolean deleteDir(File dirFile) {
+        if (!dirFile.exists()) {
+            return false;
+        }
+
+        if (dirFile.isFile()) {
+            return dirFile.delete();
+        } else {
+            File[] files = dirFile.listFiles();
+            if (files == null || files.length == 0) {
+                return dirFile.delete();
+            }
+            for (File file : files) {
+                deleteDir(file);
+            }
+        }
+
+        return dirFile.delete();
+    }
+
     /**
      * 监听 canal 主配置和 instance 配置变化
-     * 
+     *
      * @param listener 监听回调方法
      */
     public void start(final Listener<Properties> listener) {
@@ -302,7 +322,7 @@ public class ManagerRemoteConfigMonitor {
 
     /**
      * 获取conf文件夹所在路径
-     * 
+     *
      * @return 路径地址
      */
     private String getConfPath() {