Sfoglia il codice sorgente

adapter 远程配置
修改rdb热加载配置的问题

machey 6 anni fa
parent
commit
779ebad890

+ 4 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/monitor/ESConfigMonitor.java

@@ -84,6 +84,10 @@ public class ESConfigMonitor {
                     // 加载配置文件
                     // 加载配置文件
                     String configContent = MappingConfigsLoader
                     String configContent = MappingConfigsLoader
                         .loadConfig(adapterName + File.separator + file.getName());
                         .loadConfig(adapterName + File.separator + file.getName());
+                    if (configContent == null) {
+                        onFileDelete(file);
+                        return;
+                    }
                     ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
                     ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
                     config.validate();
                     config.validate();
                     if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
                     if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {

+ 4 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/monitor/HbaseConfigMonitor.java

@@ -60,6 +60,10 @@ public class HbaseConfigMonitor {
             try {
             try {
                 // 加载新增的配置文件
                 // 加载新增的配置文件
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                if (configContent == null) {
+                    onFileDelete(file);
+                    return;
+                }
                 MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                 MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                 config.validate();
                 config.validate();
                 addConfigToCache(file, config);
                 addConfigToCache(file, config);

+ 16 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.adapter.launcher;
 package com.alibaba.otter.canal.adapter.launcher;
 
 
+import com.alibaba.otter.canal.adapter.launcher.monitor.AdapterRemoteConfigMonitor;
 import org.springframework.boot.Banner;
 import org.springframework.boot.Banner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -14,6 +15,21 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 public class CanalAdapterApplication {
 public class CanalAdapterApplication {
 
 
     public static void main(String[] args) {
     public static void main(String[] args) {
+        // 加载远程配置
+//            String jdbcUrl = env.getProperty("canal.manager.jdbc.url");
+//            if (StringUtils.isNotEmpty(jdbcUrl)) {
+//                String jdbcUsername = env.getProperty("canal.manager.jdbc.username");
+//                String jdbcPassword = env.getProperty("canal.manager.jdbc.password");
+//                configMonitor = new AdapterRemoteConfigMonitor(jdbcUrl, jdbcUsername, jdbcPassword);
+//                configMonitor.loadRemoteConfig();
+//                configMonitor.loadRemoteAdapterConfigs();
+//                contextRefresher.refresh();
+//                configMonitor.start();
+//            }
+
+//        AdapterRemoteConfigMonitor aa  = new AdapterRemoteConfigMonitor();
+
+
         SpringApplication application = new SpringApplication(CanalAdapterApplication.class);
         SpringApplication application = new SpringApplication(CanalAdapterApplication.class);
         application.setBannerMode(Banner.Mode.OFF);
         application.setBannerMode(Banner.Mode.OFF);
         application.run(args);
         application.run(args);

+ 38 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/BootstrapConfiguration.java

@@ -0,0 +1,38 @@
+package com.alibaba.otter.canal.adapter.launcher.config;
+
+import javax.annotation.PostConstruct;
+
+import com.alibaba.otter.canal.adapter.launcher.monitor.AdapterRemoteConfigMonitor;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+
+public class BootstrapConfiguration {
+
+    private static final Logger logger = LoggerFactory.getLogger(BootstrapConfiguration.class);
+
+    @Autowired
+    private Environment         env;
+
+    @PostConstruct
+    public void init() {
+        try {
+            // 加载远程配置
+            String jdbcUrl = env.getProperty("canal.manager.jdbc.url");
+            if (StringUtils.isNotEmpty(jdbcUrl)) {
+                String jdbcUsername = env.getProperty("canal.manager.jdbc.username");
+                String jdbcPassword = env.getProperty("canal.manager.jdbc.password");
+                AdapterRemoteConfigMonitor configMonitor = new AdapterRemoteConfigMonitor(jdbcUrl,
+                    jdbcUsername,
+                    jdbcPassword);
+                configMonitor.loadRemoteConfig();
+                configMonitor.loadRemoteAdapterConfigs();
+                configMonitor.start(); // 启动监听
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+}

+ 0 - 12
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -57,18 +57,6 @@ public class CanalAdapterService {
             return;
             return;
         }
         }
         try {
         try {
-            // 加载远程配置
-            String jdbcUrl = env.getProperty("canal.manager.jdbc.url");
-            if (StringUtils.isNotEmpty(jdbcUrl)) {
-                String jdbcUsername = env.getProperty("canal.manager.jdbc.username");
-                String jdbcPassword = env.getProperty("canal.manager.jdbc.password");
-                configMonitor = new AdapterRemoteConfigMonitor(jdbcUrl, jdbcUsername, jdbcPassword);
-                configMonitor.loadRemoteConfig();
-                configMonitor.loadRemoteAdapterConfigs();
-                contextRefresher.refresh();
-                configMonitor.start();
-            }
-
             logger.info("## start the canal client adapters.");
             logger.info("## start the canal client adapters.");
             adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
             adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
             adapterLoader.init();
             adapterLoader.init();

+ 34 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/AdapterRemoteConfigMonitor.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.adapter.launcher.monitor;
 package com.alibaba.otter.canal.adapter.launcher.monitor;
 
 
 import java.io.File;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileWriter;
 import java.io.FileWriter;
+import java.net.URL;
 import java.sql.*;
 import java.sql.*;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -11,12 +13,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.MapMaker;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
 
 
+import com.alibaba.otter.canal.client.adapter.support.Constant;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.google.common.base.Joiner;
+import com.google.common.collect.MapMaker;
 
 
 public class AdapterRemoteConfigMonitor {
 public class AdapterRemoteConfigMonitor {
 
 
@@ -39,6 +44,26 @@ public class AdapterRemoteConfigMonitor {
         this.jdbcPassword = jdbcPassword;
         this.jdbcPassword = jdbcPassword;
     }
     }
 
 
+    public AdapterRemoteConfigMonitor(){
+        try {
+            File configFile = new File(".." + File.separator + Constant.CONF_DIR + File.separator + "bootstrap.yml");
+            if (!configFile.exists()) {
+                URL url = MappingConfigsLoader.class.getClassLoader().getResource("");
+                if (url != null) {
+                    configFile = new File(url.getPath() + "bootstrap.yml");
+                }
+            }
+            if (configFile.exists()) {
+                try (FileInputStream fis = new FileInputStream(configFile)) {
+                    Map a = new Yaml().load(fis);
+                    a = a;
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
     private Connection getConn() throws Exception {
     private Connection getConn() throws Exception {
         if (conn == null || conn.isClosed()) {
         if (conn == null || conn.isClosed()) {
             Class.forName("com.mysql.jdbc.Driver");
             Class.forName("com.mysql.jdbc.Driver");
@@ -124,7 +149,7 @@ public class AdapterRemoteConfigMonitor {
     private Map<String, ConfigItem>[] getModifiedAdapterConfigs() {
     private Map<String, ConfigItem>[] getModifiedAdapterConfigs() {
         Map<String, ConfigItem>[] res = new Map[2];
         Map<String, ConfigItem>[] res = new Map[2];
         Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
         Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
-        String sql = "select id, category, name, modified_time from canal_instance_config";
+        String sql = "select id, category, name, modified_time from canal_adapter_config";
         try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
         try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
             while (rs.next()) {
             while (rs.next()) {
                 ConfigItem configItem = new ConfigItem();
                 ConfigItem configItem = new ConfigItem();
@@ -132,7 +157,7 @@ public class AdapterRemoteConfigMonitor {
                 configItem.setCategory(rs.getString("category"));
                 configItem.setCategory(rs.getString("category"));
                 configItem.setName(rs.getString("name"));
                 configItem.setName(rs.getString("name"));
                 configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
                 configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
-                remoteConfigStatus.put(configItem.getName(), configItem);
+                remoteConfigStatus.put(configItem.getCategory() + "/" + configItem.getName(), configItem);
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);
@@ -157,7 +182,7 @@ public class AdapterRemoteConfigMonitor {
             }
             }
             if (!changedIds.isEmpty()) {
             if (!changedIds.isEmpty()) {
                 Map<String, ConfigItem> changedInstanceConfig = new HashMap<>();
                 Map<String, ConfigItem> changedInstanceConfig = new HashMap<>();
-                String contentsSql = "select id, name, content, modified_time from canal_instance_config  where id in ("
+                String contentsSql = "select id, category, name, content, modified_time from canal_adapter_config  where id in ("
                                      + Joiner.on(",").join(changedIds) + ")";
                                      + Joiner.on(",").join(changedIds) + ")";
                 try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
                 try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
                     while (rs.next()) {
                     while (rs.next()) {
@@ -182,11 +207,11 @@ public class AdapterRemoteConfigMonitor {
         }
         }
 
 
         Map<String, ConfigItem> removedInstanceConfig = new HashMap<>();
         Map<String, ConfigItem> removedInstanceConfig = new HashMap<>();
-        for (String name : remoteAdapterConfigs.keySet()) {
-            if (!remoteConfigStatus.containsKey(name)) {
+        for (ConfigItem configItem : remoteAdapterConfigs.values()) {
+            if (!remoteConfigStatus.containsKey(configItem.getCategory() + "/" + configItem.getName())) {
                 // 删除
                 // 删除
-                remoteAdapterConfigs.remove(name);
-                removedInstanceConfig.put(name, null);
+                remoteAdapterConfigs.remove(configItem.getCategory() + "/" + configItem.getName());
+                removedInstanceConfig.put(configItem.getCategory() + "/" + configItem.getName(), null);
             }
             }
         }
         }
         res[1] = removedInstanceConfig.isEmpty() ? null : removedInstanceConfig;
         res[1] = removedInstanceConfig.isEmpty() ? null : removedInstanceConfig;

+ 3 - 0
client-adapter/launcher/src/main/resources/META-INF/spring.factories

@@ -0,0 +1,3 @@
+# Bootstrap Configuration
+org.springframework.cloud.bootstrap.BootstrapConfiguration=\
+  com.alibaba.otter.canal.adapter.launcher.config.BootstrapConfiguration

+ 1 - 6
client-adapter/launcher/src/main/resources/application.yml

@@ -12,11 +12,6 @@ spring:
     time-zone: GMT+8
     time-zone: GMT+8
     default-property-inclusion: non_null
     default-property-inclusion: non_null
 
 
-#canal.manager.jdbc:
-#  url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
-#  username: root
-#  password: 121212
-
 canal.conf:
 canal.conf:
   canalServerHost: 127.0.0.1:11111
   canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
 #  zookeeperHosts: slave1:2181
@@ -44,7 +39,7 @@ canal.conf:
 #        key: mysql1
 #        key: mysql1
 #        properties:
 #        properties:
 #          jdbc.driverClassName: com.mysql.jdbc.Driver
 #          jdbc.driverClassName: com.mysql.jdbc.Driver
-#          jdbc.url: jdbc:mysql://192.168.0.36/mytest?useUnicode=true
+#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
 #          jdbc.username: root
 #          jdbc.username: root
 #          jdbc.password: 121212
 #          jdbc.password: 121212
 #      - name: rdb
 #      - name: rdb

+ 6 - 0
client-adapter/launcher/src/main/resources/bootstrap.yml

@@ -0,0 +1,6 @@
+canal:
+  manager:
+    jdbc:
+      url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
+      username: root
+      password: 121212

+ 36 - 22
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -4,7 +4,6 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
 
 
-import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
@@ -16,6 +15,7 @@ import org.yaml.snakeyaml.Yaml;
 
 
 import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
 import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 
 
@@ -86,6 +86,10 @@ public class RdbConfigMonitor {
                     // 加载配置文件
                     // 加载配置文件
                     String configContent = MappingConfigsLoader
                     String configContent = MappingConfigsLoader
                         .loadConfig(adapterName + File.separator + file.getName());
                         .loadConfig(adapterName + File.separator + file.getName());
+                    if (configContent == null) {
+                        onFileDelete(file);
+                        return;
+                    }
                     MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                     MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                     config.validate();
                     config.validate();
                     if ((key == null && config.getOuterAdapterKey() == null)
                     if ((key == null && config.getOuterAdapterKey() == null)
@@ -121,34 +125,44 @@ public class RdbConfigMonitor {
         }
         }
 
 
         private void addConfigToCache(File file, MappingConfig mappingConfig) {
         private void addConfigToCache(File file, MappingConfig mappingConfig) {
+            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
+                return;
+            }
             rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
             rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
-            Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
-                .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                 + mappingConfig.getDbMapping().getDatabase() + "."
-                                 + mappingConfig.getDbMapping().getTable(),
-                    k1 -> new HashMap<>());
-            configMap.put(file.getName(), mappingConfig);
-
-            Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
-            mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                    + mappingConfig.getDbMapping().getDatabase(),
-                MirrorDbConfig.create(file.getName(), mappingConfig));
+            if (!mappingConfig.getDbMapping().getMirrorDb()) {
+                Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
+                    .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                     + mappingConfig.getDbMapping().getDatabase() + "."
+                                     + mappingConfig.getDbMapping().getTable(),
+                        k1 -> new HashMap<>());
+                configMap.put(file.getName(), mappingConfig);
+            } else {
+                Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
+                mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                        + mappingConfig.getDbMapping().getDatabase(),
+                    MirrorDbConfig.create(file.getName(), mappingConfig));
+            }
         }
         }
 
 
         private void deleteConfigFromCache(File file) {
         private void deleteConfigFromCache(File file) {
+            MappingConfig mappingConfig = rdbAdapter.getRdbMapping().remove(file.getName());
 
 
-            rdbAdapter.getRdbMapping().remove(file.getName());
-            for (Map<String, MappingConfig> configMap : rdbAdapter.getMappingConfigCache().values()) {
-                if (configMap != null) {
-                    configMap.remove(file.getName());
-                }
+            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
+                return;
             }
             }
-
-            rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
-                if (mirrorDbConfig.getFileName().equals(file.getName())) {
-                    rdbAdapter.getMirrorDbConfigCache().remove(key);
+            if (!mappingConfig.getDbMapping().getMirrorDb()) {
+                for (Map<String, MappingConfig> configMap : rdbAdapter.getMappingConfigCache().values()) {
+                    if (configMap != null) {
+                        configMap.remove(file.getName());
+                    }
                 }
                 }
-            });
+            } else {
+                rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
+                    if (mirrorDbConfig.getFileName().equals(file.getName())) {
+                        rdbAdapter.getMirrorDbConfigCache().remove(key);
+                    }
+                });
+            }
 
 
         }
         }
     }
     }

File diff suppressed because it is too large
+ 0 - 0
deployer/manager_ddl.sql


Some files were not shown because too many files changed in this diff