Browse Source

Merge pull request #1383 from rewerma/master

增加adapter远程配置
agapple 6 years ago
parent
commit
670b7830a5

+ 4 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/monitor/ESConfigMonitor.java

@@ -84,6 +84,10 @@ public class ESConfigMonitor {
                     // 加载配置文件
                     String configContent = MappingConfigsLoader
                         .loadConfig(adapterName + File.separator + file.getName());
+                    if (configContent == null) {
+                        onFileDelete(file);
+                        return;
+                    }
                     ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
                     config.validate();
                     if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {

+ 4 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/monitor/HbaseConfigMonitor.java

@@ -79,6 +79,10 @@ public class HbaseConfigMonitor {
                     // 加载配置文件
                     String configContent = MappingConfigsLoader
                         .loadConfig(adapterName + File.separator + file.getName());
+                    if (configContent == null) {
+                        onFileDelete(file);
+                        return;
+                    }
                     MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                     config.validate();
                     if (hbaseAdapter.getHbaseMapping().containsKey(file.getName())) {

+ 44 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/BootstrapConfiguration.java

@@ -0,0 +1,44 @@
+package com.alibaba.otter.canal.adapter.launcher.config;
+
+import javax.annotation.PostConstruct;
+
+import com.alibaba.otter.canal.adapter.launcher.monitor.AdapterRemoteConfigMonitor;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+
+/**
+ * Bootstrap级别配置加载
+ *
+ * @author rewerma @ 2019-01-05
+ * @version 1.0.0
+ */
+public class BootstrapConfiguration {
+
+    private static final Logger logger = LoggerFactory.getLogger(BootstrapConfiguration.class);
+
+    @Autowired
+    private Environment         env;
+
+    @PostConstruct
+    public void loadRemoteConfig() {
+        try {
+            // 加载远程配置
+            String jdbcUrl = env.getProperty("canal.manager.jdbc.url");
+            if (StringUtils.isNotEmpty(jdbcUrl)) {
+                String jdbcUsername = env.getProperty("canal.manager.jdbc.username");
+                String jdbcPassword = env.getProperty("canal.manager.jdbc.password");
+                AdapterRemoteConfigMonitor configMonitor = new AdapterRemoteConfigMonitor(jdbcUrl,
+                    jdbcUsername,
+                    jdbcPassword);
+                configMonitor.loadRemoteConfig();
+                configMonitor.loadRemoteAdapterConfigs();
+                configMonitor.start(); // 启动监听
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+}

+ 21 - 6
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -4,9 +4,13 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 
+import com.alibaba.otter.canal.adapter.launcher.monitor.AdapterRemoteConfigMonitor;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.cloud.context.refresh.ContextRefresher;
+import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
 import com.alibaba.druid.pool.DruidDataSource;
@@ -25,20 +29,27 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 @RefreshScope
 public class CanalAdapterService {
 
-    private static final Logger logger  = LoggerFactory.getLogger(CanalAdapterService.class);
+    private static final Logger        logger        = LoggerFactory.getLogger(CanalAdapterService.class);
 
-    private CanalAdapterLoader  adapterLoader;
+    private CanalAdapterLoader         adapterLoader;
 
     @Resource
-    private AdapterCanalConfig  adapterCanalConfig;
+    private ContextRefresher           contextRefresher;
+
+    @Resource
+    private AdapterCanalConfig         adapterCanalConfig;
+    @Resource
+    private Environment                env;
 
     // 注入bean保证优先注册
     @Resource
-    private SpringContext       springContext;
+    private SpringContext              springContext;
     @Resource
-    private SyncSwitch          syncSwitch;
+    private SyncSwitch                 syncSwitch;
 
-    private volatile boolean    running = false;
+    private volatile boolean           running       = false;
+
+    private AdapterRemoteConfigMonitor configMonitor = null;
 
     @PostConstruct
     public synchronized void init() {
@@ -64,6 +75,10 @@ public class CanalAdapterService {
         try {
             running = false;
             logger.info("## stop the canal client adapters");
+            if (configMonitor != null) {
+                configMonitor.destroy();
+            }
+
             if (adapterLoader != null) {
                 adapterLoader.destroy();
                 adapterLoader = null;

+ 342 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/AdapterRemoteConfigMonitor.java

@@ -0,0 +1,342 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.net.URL;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.otter.canal.client.adapter.support.Constant;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.google.common.base.Joiner;
+import com.google.common.collect.MapMaker;
+
+/**
+ * 远程配置装载、监控类
+ *
+ * @author rewerma @ 2019-01-05
+ * @version 1.0.0
+ */
+public class AdapterRemoteConfigMonitor {
+
+    private static final Logger      logger                 = LoggerFactory.getLogger(AdapterRemoteConfigMonitor.class);
+
+    private Connection               conn;
+    private String                   jdbcUrl;
+    private String                   jdbcUsername;
+    private String                   jdbcPassword;
+
+    private long                     currentConfigTimestamp = 0;
+    private Map<String, ConfigItem>  remoteAdapterConfigs   = new MapMaker().makeMap();
+
+    private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
+        new NamedThreadFactory("remote-adapter-config-scan"));
+
+    public AdapterRemoteConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
+        this.jdbcUrl = jdbcUrl;
+        this.jdbcUsername = jdbcUsername;
+        this.jdbcPassword = jdbcPassword;
+    }
+
+    private Connection getConn() throws Exception {
+        if (conn == null || conn.isClosed()) {
+            Class.forName("com.mysql.jdbc.Driver");
+            conn = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);
+        }
+        return conn;
+    }
+
+    /**
+     * 加载远程application.yml配置到本地
+     */
+    public void loadRemoteConfig() {
+        try {
+            // 加载远程adapter配置
+            ConfigItem configItem = getRemoteAdapterConfig();
+            if (configItem != null) {
+                if (configItem.getModifiedTime() != currentConfigTimestamp) {
+                    currentConfigTimestamp = configItem.getModifiedTime();
+                    overrideLocalCanalConfig(configItem.getContent());
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 获取远程application.yml配置
+     * 
+     * @return 配置对象
+     */
+    private ConfigItem getRemoteAdapterConfig() {
+        String sql = "select name, content, modified_time from canal_config where id=2";
+        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+            if (rs.next()) {
+                ConfigItem configItem = new ConfigItem();
+                configItem.setId(2L);
+                configItem.setName(rs.getString("name"));
+                configItem.setContent(rs.getString("content"));
+                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+                return configItem;
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+        return null;
+    }
+
+    /**
+     * 覆盖本地application.yml文件
+     * 
+     * @param content 文件内容
+     */
+    private void overrideLocalCanalConfig(String content) {
+        try (FileWriter writer = new FileWriter(getConfPath() + "application.yml")) {
+            writer.write(content);
+            writer.flush();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 启动监听数据库变化
+     */
+    public void start() {
+        // 监听application.yml变化
+        executor.scheduleWithFixedDelay(() -> {
+            try {
+                loadRemoteConfig();
+            } catch (Throwable e) {
+                logger.error("scan remote application.yml failed", e);
+            }
+        }, 10, 3, TimeUnit.SECONDS);
+
+        // 监听adapter变化
+        executor.scheduleWithFixedDelay(() -> {
+            try {
+                loadRemoteAdapterConfigs();
+            } catch (Throwable e) {
+                logger.error("scan remote adapter configs failed", e);
+            }
+        }, 10, 3, TimeUnit.SECONDS);
+    }
+
+    /**
+     * 加载adapter配置到本地
+     */
+    public void loadRemoteAdapterConfigs() {
+        try {
+            // 加载远程instance配置
+            Map<String, ConfigItem>[] modifiedConfigs = getModifiedAdapterConfigs();
+            if (modifiedConfigs != null) {
+                overrideLocalAdapterConfigs(modifiedConfigs);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 获取有变动的adapter配置
+     * 
+     * @return Map[0]: 新增修改的配置, Map[1]: 删除的配置
+     */
+    @SuppressWarnings("unchecked")
+    private Map<String, ConfigItem>[] getModifiedAdapterConfigs() {
+        Map<String, ConfigItem>[] res = new Map[2];
+        Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
+        String sql = "select id, category, name, modified_time from canal_adapter_config";
+        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+            while (rs.next()) {
+                ConfigItem configItem = new ConfigItem();
+                configItem.setId(rs.getLong("id"));
+                configItem.setCategory(rs.getString("category"));
+                configItem.setName(rs.getString("name"));
+                configItem.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+                remoteConfigStatus.put(configItem.getCategory() + "/" + configItem.getName(), configItem);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return null;
+        }
+
+        if (!remoteConfigStatus.isEmpty()) {
+            List<Long> changedIds = new ArrayList<>();
+
+            for (ConfigItem remoteConfigStat : remoteConfigStatus.values()) {
+                ConfigItem currentConfig = remoteAdapterConfigs
+                    .get(remoteConfigStat.getCategory() + "/" + remoteConfigStat.getName());
+                if (currentConfig == null) {
+                    // 新增
+                    changedIds.add(remoteConfigStat.getId());
+                } else {
+                    // 修改
+                    if (currentConfig.getModifiedTime() != remoteConfigStat.getModifiedTime()) {
+                        changedIds.add(remoteConfigStat.getId());
+                    }
+                }
+            }
+            if (!changedIds.isEmpty()) {
+                Map<String, ConfigItem> changedInstanceConfig = new HashMap<>();
+                String contentsSql = "select id, category, name, content, modified_time from canal_adapter_config  where id in ("
+                                     + Joiner.on(",").join(changedIds) + ")";
+                try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
+                    while (rs.next()) {
+                        ConfigItem configItemNew = new ConfigItem();
+                        configItemNew.setId(rs.getLong("id"));
+                        configItemNew.setCategory(rs.getString("category"));
+                        configItemNew.setName(rs.getString("name"));
+                        configItemNew.setContent(rs.getString("content"));
+                        configItemNew.setModifiedTime(rs.getTimestamp("modified_time").getTime());
+
+                        remoteAdapterConfigs.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
+                            configItemNew);
+                        changedInstanceConfig.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
+                            configItemNew);
+                    }
+
+                    res[0] = changedInstanceConfig.isEmpty() ? null : changedInstanceConfig;
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+
+        Map<String, ConfigItem> removedInstanceConfig = new HashMap<>();
+        for (ConfigItem configItem : remoteAdapterConfigs.values()) {
+            if (!remoteConfigStatus.containsKey(configItem.getCategory() + "/" + configItem.getName())) {
+                // 删除
+                remoteAdapterConfigs.remove(configItem.getCategory() + "/" + configItem.getName());
+                removedInstanceConfig.put(configItem.getCategory() + "/" + configItem.getName(), null);
+            }
+        }
+        res[1] = removedInstanceConfig.isEmpty() ? null : removedInstanceConfig;
+
+        if (res[0] == null && res[1] == null) {
+            return null;
+        } else {
+            return res;
+        }
+    }
+
+    /**
+     * 覆盖adapter配置到本地
+     * 
+     * @param modifiedInstanceConfigs 变动的配置集合
+     */
+    private void overrideLocalAdapterConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
+        Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
+        if (changedInstanceConfigs != null) {
+            for (ConfigItem configItem : changedInstanceConfigs.values()) {
+                try (FileWriter writer = new FileWriter(
+                    getConfPath() + configItem.getCategory() + "/" + configItem.getName())) {
+                    writer.write(configItem.getContent());
+                    writer.flush();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+        Map<String, ConfigItem> removedInstanceConfigs = modifiedInstanceConfigs[1];
+        if (removedInstanceConfigs != null) {
+            for (String name : removedInstanceConfigs.keySet()) {
+                File file = new File(getConfPath() + name);
+                if (file.exists()) {
+                    file.delete();
+                }
+            }
+        }
+    }
+
+    /**
+     * 获取conf文件夹所在路径
+     *
+     * @return 路径地址
+     */
+    private String getConfPath() {
+        String classpath = this.getClass().getResource("/").getPath();
+        String confPath = classpath + "../conf/";
+        if (new File(confPath).exists()) {
+            return confPath;
+        } else {
+            return classpath;
+        }
+    }
+
+    public void destroy() {
+        executor.shutdownNow();
+        if (conn != null) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    /**
+     * 配置对应对象
+     */
+    public static class ConfigItem {
+
+        private Long   id;
+        private String category;
+        private String name;
+        private String content;
+        private long   modifiedTime;
+
+        public Long getId() {
+            return id;
+        }
+
+        public void setId(Long id) {
+            this.id = id;
+        }
+
+        public String getCategory() {
+            return category;
+        }
+
+        public void setCategory(String category) {
+            this.category = category;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public String getContent() {
+            return content;
+        }
+
+        public void setContent(String content) {
+            this.content = content;
+        }
+
+        public long getModifiedTime() {
+            return modifiedTime;
+        }
+
+        public void setModifiedTime(long modifiedTime) {
+            this.modifiedTime = modifiedTime;
+        }
+    }
+}

+ 3 - 0
client-adapter/launcher/src/main/resources/META-INF/spring.factories

@@ -0,0 +1,3 @@
+# Bootstrap Configuration
+org.springframework.cloud.bootstrap.BootstrapConfiguration=\
+  com.alibaba.otter.canal.adapter.launcher.config.BootstrapConfiguration

+ 1 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -39,7 +39,7 @@ canal.conf:
 #        key: mysql1
 #        properties:
 #          jdbc.driverClassName: com.mysql.jdbc.Driver
-#          jdbc.url: jdbc:mysql://192.168.0.36/mytest?useUnicode=true
+#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
 #          jdbc.username: root
 #          jdbc.password: 121212
 #      - name: rdb

+ 6 - 0
client-adapter/launcher/src/main/resources/bootstrap.yml

@@ -0,0 +1,6 @@
+#canal:
+#  manager:
+#    jdbc:
+#      url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
+#      username: root
+#      password: 121212

+ 36 - 26
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -4,6 +4,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
@@ -86,6 +87,10 @@ public class RdbConfigMonitor {
                     // 加载配置文件
                     String configContent = MappingConfigsLoader
                         .loadConfig(adapterName + File.separator + file.getName());
+                    if (configContent == null) {
+                        onFileDelete(file);
+                        return;
+                    }
                     MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                     config.validate();
                     if ((key == null && config.getOuterAdapterKey() == null)
@@ -121,39 +126,44 @@ public class RdbConfigMonitor {
         }
 
         private void addConfigToCache(File file, MappingConfig mappingConfig) {
-            MappingConfig.DbMapping dbMapping = mappingConfig.getDbMapping();
-            if (dbMapping != null) {
-                if (!dbMapping.getMirrorDb()) {
-                    rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
-                    Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
-                        .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                         + mappingConfig.getDbMapping().getDatabase() + "."
-                                         + mappingConfig.getDbMapping().getTable(),
-                            k1 -> new HashMap<>());
-                    configMap.put(file.getName(), mappingConfig);
-                } else {
-                    Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
-                    mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                            + mappingConfig.getDbMapping().getDatabase(),
-                        MirrorDbConfig.create(file.getName(), mappingConfig));
-                }
+            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
+                return;
+            }
+            rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
+            if (!mappingConfig.getDbMapping().getMirrorDb()) {
+                Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
+                    .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                     + mappingConfig.getDbMapping().getDatabase() + "."
+                                     + mappingConfig.getDbMapping().getTable(),
+                        k1 -> new HashMap<>());
+                configMap.put(file.getName(), mappingConfig);
+            } else {
+                Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
+                mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                        + mappingConfig.getDbMapping().getDatabase(),
+                    MirrorDbConfig.create(file.getName(), mappingConfig));
             }
         }
 
         private void deleteConfigFromCache(File file) {
+            MappingConfig mappingConfig = rdbAdapter.getRdbMapping().remove(file.getName());
 
-            rdbAdapter.getRdbMapping().remove(file.getName());
-            for (Map<String, MappingConfig> configMap : rdbAdapter.getMappingConfigCache().values()) {
-                if (configMap != null) {
-                    configMap.remove(file.getName());
-                }
+            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
+                return;
             }
-
-            rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
-                if (mirrorDbConfig.getFileName().equals(file.getName())) {
-                    rdbAdapter.getMirrorDbConfigCache().remove(key);
+            if (!mappingConfig.getDbMapping().getMirrorDb()) {
+                for (Map<String, MappingConfig> configMap : rdbAdapter.getMappingConfigCache().values()) {
+                    if (configMap != null) {
+                        configMap.remove(file.getName());
+                    }
                 }
-            });
+            } else {
+                rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
+                    if (mirrorDbConfig.getFileName().equals(file.getName())) {
+                        rdbAdapter.getMirrorDbConfigCache().remove(key);
+                    }
+                });
+            }
 
         }
     }

File diff suppressed because it is too large
+ 11 - 2
deployer/manager_ddl.sql


Some files were not shown because too many files changed in this diff