Bläddra i källkod

canal server增加数据库远程配置

machey 6 år sedan
förälder
incheckning
71b7c0b868

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 11 - 1
deployer/manager_ddl.sql


+ 23 - 13
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -34,22 +34,28 @@ public class CanalLauncher {
             if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                 properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
+            } else {
+                properties.load(new FileInputStream(conf));
+            }
 
-                String jdbcUrl = properties.getProperty("canal.manager.jdbc.url");
-                if (!StringUtils.isEmpty(jdbcUrl)) {
-                    // load remote config
-                    String jdbcUsername = properties.getProperty("canal.manager.jdbc.username");
-                    String jdbcPassword = properties.getProperty("canal.manager.jdbc.password");
-                    managerDbConfigMonitor = new ManagerDbConfigMonitor(jdbcUrl, jdbcUsername, jdbcPassword);
-                    Properties remoteConfig = managerDbConfigMonitor.loadRemoteConfig();
-                    if (remoteConfig != null) {
-                        properties = remoteConfig;
-                    } else {
-                        managerDbConfigMonitor = null;
-                    }
+            String jdbcUrl = properties.getProperty("canal.manager.jdbc.url");
+            if (!StringUtils.isEmpty(jdbcUrl)) {
+                logger.info("## load remote canal configurations");
+                // load remote config
+                String jdbcUsername = properties.getProperty("canal.manager.jdbc.username");
+                String jdbcPassword = properties.getProperty("canal.manager.jdbc.password");
+                managerDbConfigMonitor = new ManagerDbConfigMonitor(jdbcUrl, jdbcUsername, jdbcPassword);
+                // 加载远程canal.properties
+                Properties remoteConfig = managerDbConfigMonitor.loadRemoteConfig();
+                // 加载remote instance配置
+                managerDbConfigMonitor.loadRemoteInstanceConfigs();
+                if (remoteConfig != null) {
+                    properties = remoteConfig;
+                } else {
+                    managerDbConfigMonitor = null;
                 }
             } else {
-                properties.load(new FileInputStream(conf));
+                logger.info("## load canal configurations");
             }
 
             final CanalStater canalStater = new CanalStater();
@@ -73,6 +79,10 @@ public class CanalLauncher {
 
             while (running)
                 ;
+
+            if (managerDbConfigMonitor != null) {
+                managerDbConfigMonitor.destroy();
+            }
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal Server:", e);
         }

+ 119 - 27
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerDbConfigMonitor.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.deployer.monitor;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.FileWriter;
 import java.nio.charset.StandardCharsets;
 import java.sql.*;
@@ -9,14 +10,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.MapMaker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
 import com.alibaba.otter.canal.deployer.CanalConstants;
-import com.google.common.base.Function;
-import com.google.common.collect.MigrateMap;
+import com.google.common.base.Joiner;
+import com.google.common.collect.MapMaker;
 
 public class ManagerDbConfigMonitor {
 
@@ -32,7 +32,7 @@ public class ManagerDbConfigMonitor {
     private long                     currentConfigTimestamp = 0;
 
     private long                     scanIntervalInSecond   = 5;
-    private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(1,
+    private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
         new NamedThreadFactory("remote-canal-config-scan"));
 
     public ManagerDbConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
@@ -56,6 +56,7 @@ public class ManagerDbConfigMonitor {
     public Properties loadRemoteConfig() {
         Properties properties = null;
         try {
+            // 加载远程canal配置
             ConfigItem configItem = getRemoteCanalConfig();
             if (configItem != null) {
                 if (configItem.getModifiedTime() != currentConfigTimestamp) {
@@ -73,6 +74,18 @@ public class ManagerDbConfigMonitor {
         return properties;
     }
 
+    public void loadRemoteInstanceConfigs() {
+        try {
+            // 加载远程instance配置
+            Map<String, ConfigItem>[] modifiedConfigs = getModifiedInstanceConfigs();
+            if (modifiedConfigs != null) {
+                overrideLocalInstanceConfigs(modifiedConfigs);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
     private ConfigItem getRemoteCanalConfig() {
         String sql = "select name, content, modified_time from canal_config where id=1";
         try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
@@ -91,7 +104,7 @@ public class ManagerDbConfigMonitor {
     }
 
     private void overrideLocalCanalConfig(String content) {
-        try (FileWriter writer = new FileWriter("../conf/canal.properties")) {
+        try (FileWriter writer = new FileWriter(getConfPath() + "canal.properties")) {
             writer.write(content);
             writer.flush();
         } catch (Exception e) {
@@ -100,6 +113,7 @@ public class ManagerDbConfigMonitor {
     }
 
     public void start(final Listener<Properties> listener) {
+        // 监听canal.properties变化
         executor.scheduleWithFixedDelay(new Runnable() {
 
             public void run() {
@@ -115,45 +129,99 @@ public class ManagerDbConfigMonitor {
             }
 
         }, 10, scanIntervalInSecond, TimeUnit.SECONDS);
-    }
 
-    public interface Listener<Properties> {
+        // 监听instance变化
+        executor.scheduleWithFixedDelay(new Runnable() {
 
-        void onChange(Properties properties);
+            public void run() {
+                try {
+                    loadRemoteInstanceConfigs();
+                } catch (Throwable e) {
+                    logger.error("scan failed", e);
+                }
+            }
+
+        }, 10, 3, TimeUnit.SECONDS);
     }
 
-    private Map<String, ConfigItem> getChangedInstanceConfigs() {
-        String sql = "select name, content, modified_time from instance_config";
+    @SuppressWarnings("unchecked")
+    private Map<String, ConfigItem>[] getModifiedInstanceConfigs() {
+        Map<String, ConfigItem>[] res = new Map[2];
+        Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
+        String sql = "select id, name, modified_time from canal_instance_config";
         try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
-            Map<String, ConfigItem> changedInstanceConfig = new HashMap<>();
             while (rs.next()) {
-                ConfigItem configItemNew = new ConfigItem();
-                configItemNew.setName(rs.getString("name"));
-                configItemNew.setContent(rs.getString("content"));
-                configItemNew.setModifiedTime(rs.getTimestamp("modified_time").getTime());
-
-                ConfigItem configItem = remoteInstanceConfigs.get(configItemNew.getName());
-                if (configItem == null) {
-                    remoteInstanceConfigs.put(configItemNew.getName(), configItemNew);
-                    changedInstanceConfig.put(configItemNew.getName(), configItemNew);
+                ConfigItem configItem = new ConfigItem();
+                configItem.setId(rs.getLong("id"));
+                configItem.setName(rs.getString("name"));
+                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+                remoteConfigStatus.put(configItem.getName(), configItem);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return null;
+        }
+
+        if (!remoteConfigStatus.isEmpty()) {
+            List<Long> changedIds = new ArrayList<>();
+
+            for (ConfigItem remoteConfigStat : remoteConfigStatus.values()) {
+                ConfigItem currentConfig = remoteInstanceConfigs.get(remoteConfigStat.getName());
+                if (currentConfig == null) {
+                    // 新增
+                    changedIds.add(remoteConfigStat.getId());
                 } else {
-                    if (configItem.getModifiedTime() != configItemNew.getModifiedTime()) {
+                    // 修改
+                    if (currentConfig.getModifiedTime() != remoteConfigStat.getModifiedTime()) {
+                        changedIds.add(remoteConfigStat.getId());
+                    }
+                }
+            }
+            if (!changedIds.isEmpty()) {
+                Map<String, ConfigItem> changedInstanceConfig = new HashMap<>();
+                String contentsSql = "select id, name, content, modified_time from canal_instance_config  where id in ("
+                                     + Joiner.on(",").join(changedIds) + ")";
+                try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
+                    while (rs.next()) {
+                        ConfigItem configItemNew = new ConfigItem();
+                        configItemNew.setId(rs.getLong("id"));
+                        configItemNew.setName(rs.getString("name"));
+                        configItemNew.setContent(rs.getString("content"));
+                        configItemNew.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+
                         remoteInstanceConfigs.put(configItemNew.getName(), configItemNew);
                         changedInstanceConfig.put(configItemNew.getName(), configItemNew);
                     }
+
+                    res[0] = changedInstanceConfig.isEmpty() ? null : changedInstanceConfig;
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                 }
             }
-            return changedInstanceConfig.isEmpty() ? null : changedInstanceConfig;
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
         }
-        return null;
+
+        Map<String, ConfigItem> removedInstanceConfig = new HashMap<>();
+        for (String name : remoteInstanceConfigs.keySet()) {
+            if (!remoteConfigStatus.containsKey(name)) {
+                // 删除
+                removedInstanceConfig.put(name, null);
+            }
+        }
+        res[1] = removedInstanceConfig.isEmpty() ? null : removedInstanceConfig;
+
+        if (res[0] == null && res[1] == null) {
+            return null;
+        } else {
+            return res;
+        }
     }
 
-    private void overrideLocalInstanceConfigs(Map<String, ConfigItem> changedInstanceConfigs) {
+    private void overrideLocalInstanceConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
+        Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
         if (changedInstanceConfigs != null) {
             for (ConfigItem configItem : changedInstanceConfigs.values()) {
-                try (FileWriter writer = new FileWriter("../conf/" + configItem.getName() + "/instance.properties")) {
+                try (FileWriter writer = new FileWriter(
+                    getConfPath() + configItem.getName() + "/instance.properties")) {
                     writer.write(configItem.getContent());
                     writer.flush();
                 } catch (Exception e) {
@@ -161,9 +229,19 @@ public class ManagerDbConfigMonitor {
                 }
             }
         }
+        Map<String, ConfigItem> removedInstanceConfigs = modifiedInstanceConfigs[1];
+        if (removedInstanceConfigs != null) {
+            for (String name : removedInstanceConfigs.keySet()) {
+                File file = new File(getConfPath() + name + "/");
+                if (file.exists()) {
+                    file.delete();
+                }
+            }
+        }
     }
 
     public void destroy() {
+        executor.shutdownNow();
         if (conn != null) {
             try {
                 conn.close();
@@ -173,6 +251,16 @@ public class ManagerDbConfigMonitor {
         }
     }
 
+    private String getConfPath() {
+        String classpath = this.getClass().getResource("/").getPath();
+        String confPath = classpath + ".." + File.separator + "conf" + File.separator;
+        if (new File(confPath).exists()) {
+            return confPath;
+        } else {
+            return classpath;
+        }
+    }
+
     public static class ConfigItem {
 
         private Long   id;
@@ -213,4 +301,8 @@ public class ManagerDbConfigMonitor {
         }
     }
 
+    public interface Listener<Properties> {
+
+        void onChange(Properties properties);
+    }
 }

+ 3 - 0
deployer/src/main/resources/canal.properties

@@ -1,6 +1,9 @@
 #################################################
 ######### 		common argument		############# 
 #################################################
+#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
+#canal.manager.jdbc.username=root
+#canal.manager.jdbc.password=121212
 canal.id = 1
 canal.ip =
 canal.port = 11111

Vissa filer visades inte eftersom för många filer har ändrats