Ver Fonte

整理adapter端远程配置, 统一扩展接口

mcy há 6 anos atrás
pai
commit
9a2e890f57

+ 19 - 22
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/BootstrapConfiguration.java

@@ -1,14 +1,14 @@
 package com.alibaba.otter.canal.adapter.launcher.config;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 
-import com.alibaba.otter.canal.adapter.launcher.monitor.AdapterRemoteConfigMonitor;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 
+import com.alibaba.otter.canal.adapter.launcher.monitor.remote.RemoteConfigLoader;
+import com.alibaba.otter.canal.adapter.launcher.monitor.remote.RemoteConfigLoaderFactory;
+
 /**
  * Bootstrap级别配置加载
  *
@@ -17,28 +17,25 @@ import org.springframework.core.env.Environment;
  */
 public class BootstrapConfiguration {
 
-    private static final Logger logger = LoggerFactory.getLogger(BootstrapConfiguration.class);
-
     @Autowired
-    private Environment         env;
+    private Environment        env;
+
+    private RemoteConfigLoader remoteConfigLoader = null;
 
     @PostConstruct
     public void loadRemoteConfig() {
-        try {
-            // 加载远程配置
-            String jdbcUrl = env.getProperty("canal.manager.jdbc.url");
-            if (StringUtils.isNotEmpty(jdbcUrl)) {
-                String jdbcUsername = env.getProperty("canal.manager.jdbc.username");
-                String jdbcPassword = env.getProperty("canal.manager.jdbc.password");
-                AdapterRemoteConfigMonitor configMonitor = new AdapterRemoteConfigMonitor(jdbcUrl,
-                    jdbcUsername,
-                    jdbcPassword);
-                configMonitor.loadRemoteConfig();
-                configMonitor.loadRemoteAdapterConfigs();
-                configMonitor.start(); // 启动监听
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+        remoteConfigLoader = RemoteConfigLoaderFactory.getRemoteConfigLoader(env);
+        if (remoteConfigLoader != null) {
+            remoteConfigLoader.loadRemoteConfig();
+            remoteConfigLoader.loadRemoteAdapterConfigs();
+            remoteConfigLoader.startMonitor(); // 启动监听
+        }
+    }
+
+    @PreDestroy
+    public synchronized void destroy() {
+        if (remoteConfigLoader != null) {
+            remoteConfigLoader.destroy();
         }
     }
 }

+ 8 - 15
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -4,8 +4,6 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 
-import com.alibaba.otter.canal.adapter.launcher.monitor.AdapterRemoteConfigMonitor;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
@@ -29,27 +27,25 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 @RefreshScope
 public class CanalAdapterService {
 
-    private static final Logger        logger        = LoggerFactory.getLogger(CanalAdapterService.class);
+    private static final Logger logger  = LoggerFactory.getLogger(CanalAdapterService.class);
 
-    private CanalAdapterLoader         adapterLoader;
+    private CanalAdapterLoader  adapterLoader;
 
     @Resource
-    private ContextRefresher           contextRefresher;
+    private ContextRefresher    contextRefresher;
 
     @Resource
-    private AdapterCanalConfig         adapterCanalConfig;
+    private AdapterCanalConfig  adapterCanalConfig;
     @Resource
-    private Environment                env;
+    private Environment         env;
 
     // 注入bean保证优先注册
     @Resource
-    private SpringContext              springContext;
+    private SpringContext       springContext;
     @Resource
-    private SyncSwitch                 syncSwitch;
+    private SyncSwitch          syncSwitch;
 
-    private volatile boolean           running       = false;
-
-    private AdapterRemoteConfigMonitor configMonitor = null;
+    private volatile boolean    running = false;
 
     @PostConstruct
     public synchronized void init() {
@@ -75,9 +71,6 @@ public class CanalAdapterService {
         try {
             running = false;
             logger.info("## stop the canal client adapters");
-            if (configMonitor != null) {
-                configMonitor.destroy();
-            }
 
             if (adapterLoader != null) {
                 adapterLoader.destroy();

+ 56 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/ConfigItem.java

@@ -0,0 +1,56 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
+
+/**
+ * 配置对应对象
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public class ConfigItem {
+
+    private Long   id;
+    private String category;
+    private String name;
+    private String content;
+    private long   modifiedTime;
+
+    public Long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    public String getCategory() {
+        return category;
+    }
+
+    public void setCategory(String category) {
+        this.category = category;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }
+
+    public long getModifiedTime() {
+        return modifiedTime;
+    }
+
+    public void setModifiedTime(long modifiedTime) {
+        this.modifiedTime = modifiedTime;
+    }
+}

+ 101 - 156
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/AdapterRemoteConfigMonitor.java → client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/DbRemoteConfigLoader.java

@@ -1,10 +1,11 @@
-package com.alibaba.otter.canal.adapter.launcher.monitor;
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileWriter;
-import java.net.URL;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -13,54 +14,61 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
-import com.alibaba.otter.canal.client.adapter.support.Constant;
-import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
 import com.google.common.base.Joiner;
 import com.google.common.collect.MapMaker;
 
 /**
- * 远程配置装载、监控类
+ * 基于数据库的远程配置装载器
  *
- * @author rewerma @ 2019-01-05
+ * @author rewerma 2019-01-25 下午05:20:16
  * @version 1.0.0
  */
-public class AdapterRemoteConfigMonitor {
+public class DbRemoteConfigLoader implements RemoteConfigLoader {
 
-    private static final Logger      logger                 = LoggerFactory.getLogger(AdapterRemoteConfigMonitor.class);
+    private static final Logger      logger                 = LoggerFactory.getLogger(DbRemoteConfigLoader.class);
 
-    private Connection               conn;
-    private String                   jdbcUrl;
-    private String                   jdbcUsername;
-    private String                   jdbcPassword;
+    private DruidDataSource          dataSource;
 
-    private long                     currentConfigTimestamp = 0;
+    private static volatile long     currentConfigTimestamp = 0;
     private Map<String, ConfigItem>  remoteAdapterConfigs   = new MapMaker().makeMap();
 
     private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
         new NamedThreadFactory("remote-adapter-config-scan"));
 
-    public AdapterRemoteConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
-        this.jdbcUrl = jdbcUrl;
-        this.jdbcUsername = jdbcUsername;
-        this.jdbcPassword = jdbcPassword;
-    }
+    private RemoteAdapterMonitor     remoteAdapterMonitor   = new RemoteAdapterMonitorImpl();
 
-    private Connection getConn() throws Exception {
-        if (conn == null || conn.isClosed()) {
-            Class.forName("com.mysql.jdbc.Driver");
-            conn = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);
+    public DbRemoteConfigLoader(String driverName, String jdbcUrl, String jdbcUsername, String jdbcPassword){
+        dataSource = new DruidDataSource();
+        if (StringUtils.isEmpty(driverName)) {
+            driverName = "com.mysql.jdbc.Driver";
+        }
+        dataSource.setDriverClassName(driverName);
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUsername);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            throw new RuntimeException(e.getMessage(), e);
         }
-        return conn;
     }
 
     /**
-     * 加载远程application.yml配置到本地
+     * 加载远程application.yml配置
      */
+    @Override
     public void loadRemoteConfig() {
         try {
             // 加载远程adapter配置
@@ -84,7 +92,9 @@ public class AdapterRemoteConfigMonitor {
      */
     private ConfigItem getRemoteAdapterConfig() {
         String sql = "select name, content, modified_time from canal_config where id=2";
-        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+        try (Connection conn = dataSource.getConnection();
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(sql)) {
             if (rs.next()) {
                 ConfigItem configItem = new ConfigItem();
                 configItem.setId(2L);
@@ -114,54 +124,29 @@ public class AdapterRemoteConfigMonitor {
     }
 
     /**
-     * 启动监听数据库变化
-     */
-    public void start() {
-        // 监听application.yml变化
-        executor.scheduleWithFixedDelay(() -> {
-            try {
-                loadRemoteConfig();
-            } catch (Throwable e) {
-                logger.error("scan remote application.yml failed", e);
-            }
-        }, 10, 3, TimeUnit.SECONDS);
-
-        // 监听adapter变化
-        executor.scheduleWithFixedDelay(() -> {
-            try {
-                loadRemoteAdapterConfigs();
-            } catch (Throwable e) {
-                logger.error("scan remote adapter configs failed", e);
-            }
-        }, 10, 3, TimeUnit.SECONDS);
-    }
-
-    /**
-     * 加载adapter配置到本地
+     * 加载adapter配置
      */
+    @Override
     public void loadRemoteAdapterConfigs() {
         try {
             // 加载远程adapter配置
-            Map<String, ConfigItem>[] modifiedConfigs = getModifiedAdapterConfigs();
-            if (modifiedConfigs != null) {
-                overrideLocalAdapterConfigs(modifiedConfigs);
-            }
+            loadModifiedAdapterConfigs();
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
-     * 获取有变动的adapter配置
-     *
-     * @return Map[0]: 新增修改的配置, Map[1]: 删除的配置
+     * 加载有变动的adapter配置
      */
     @SuppressWarnings("unchecked")
-    private Map<String, ConfigItem>[] getModifiedAdapterConfigs() {
+    private void loadModifiedAdapterConfigs() {
         Map<String, ConfigItem>[] res = new Map[2];
         Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
         String sql = "select id, category, name, modified_time from canal_adapter_config";
-        try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
+        try (Connection conn = dataSource.getConnection();
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(sql)) {
             while (rs.next()) {
                 ConfigItem configItem = new ConfigItem();
                 configItem.setId(rs.getLong("id"));
@@ -172,7 +157,6 @@ public class AdapterRemoteConfigMonitor {
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
-            return null;
         }
 
         if (!remoteConfigStatus.isEmpty()) {
@@ -192,10 +176,11 @@ public class AdapterRemoteConfigMonitor {
                 }
             }
             if (!changedIds.isEmpty()) {
-                Map<String, ConfigItem> changedAdapterConfig = new HashMap<>();
                 String contentsSql = "select id, category, name, content, modified_time from canal_adapter_config  where id in ("
                                      + Joiner.on(",").join(changedIds) + ")";
-                try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
+                try (Connection conn = dataSource.getConnection();
+                        Statement stmt = conn.createStatement();
+                        ResultSet rs = stmt.executeQuery(contentsSql)) {
                     while (rs.next()) {
                         ConfigItem configItemNew = new ConfigItem();
                         configItemNew.setId(rs.getLong("id"));
@@ -206,63 +191,20 @@ public class AdapterRemoteConfigMonitor {
 
                         remoteAdapterConfigs.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
                             configItemNew);
-                        changedAdapterConfig.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
-                            configItemNew);
+                        remoteAdapterMonitor.onModify(configItemNew);
                     }
 
-                    res[0] = changedAdapterConfig.isEmpty() ? null : changedAdapterConfig;
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
             }
         }
 
-        Map<String, ConfigItem> removedAdapterConfig = new HashMap<>();
         for (ConfigItem configItem : remoteAdapterConfigs.values()) {
             if (!remoteConfigStatus.containsKey(configItem.getCategory() + "/" + configItem.getName())) {
                 // 删除
                 remoteAdapterConfigs.remove(configItem.getCategory() + "/" + configItem.getName());
-                removedAdapterConfig.put(configItem.getCategory() + "/" + configItem.getName(), null);
-            }
-        }
-        res[1] = removedAdapterConfig.isEmpty() ? null : removedAdapterConfig;
-
-        if (res[0] == null && res[1] == null) {
-            return null;
-        } else {
-            return res;
-        }
-    }
-
-    /**
-     * 覆盖adapter配置到本地
-     *
-     * @param modifiedAdapterConfigs 变动的配置集合
-     */
-    private void overrideLocalAdapterConfigs(Map<String, ConfigItem>[] modifiedAdapterConfigs) {
-        Map<String, ConfigItem> changedAdapterConfigs = modifiedAdapterConfigs[0];
-        if (changedAdapterConfigs != null) {
-            for (ConfigItem configItem : changedAdapterConfigs.values()) {
-                try (FileWriter writer = new FileWriter(
-                    getConfPath() + configItem.getCategory() + "/" + configItem.getName())) {
-                    writer.write(configItem.getContent());
-                    writer.flush();
-                    logger.info("## Loaded remote adapter config: {}/{}",
-                        configItem.getCategory(),
-                        configItem.getName());
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-        }
-        Map<String, ConfigItem> removedAdapterConfigs = modifiedAdapterConfigs[1];
-        if (removedAdapterConfigs != null) {
-            for (String name : removedAdapterConfigs.keySet()) {
-                File file = new File(getConfPath() + name);
-                if (file.exists()) {
-                    deleteDir(file);
-                    logger.info("## Deleted and reloaded remote adapter config: {}", name);
-                }
+                remoteAdapterMonitor.onDelete(configItem.getCategory() + "/" + configItem.getName());
             }
         }
     }
@@ -302,66 +244,69 @@ public class AdapterRemoteConfigMonitor {
         }
     }
 
-    public void destroy() {
-        executor.shutdownNow();
-        if (conn != null) {
+    /**
+     * 启动监听数据库变化
+     */
+    @Override
+    public void startMonitor() {
+        // 监听application.yml变化
+        executor.scheduleWithFixedDelay(() -> {
             try {
-                conn.close();
-            } catch (SQLException e) {
-                logger.error(e.getMessage(), e);
+                loadRemoteConfig();
+            } catch (Throwable e) {
+                logger.error("scan remote application.yml failed", e);
             }
-        }
+        }, 10, 3, TimeUnit.SECONDS);
+
+        // 监听adapter变化
+        executor.scheduleWithFixedDelay(() -> {
+            try {
+                loadRemoteAdapterConfigs();
+            } catch (Throwable e) {
+                logger.error("scan remote adapter configs failed", e);
+            }
+        }, 10, 3, TimeUnit.SECONDS);
     }
 
     /**
-     * 配置对应对象
+     * 销毁
      */
-    public static class ConfigItem {
-
-        private Long   id;
-        private String category;
-        private String name;
-        private String content;
-        private long   modifiedTime;
-
-        public Long getId() {
-            return id;
-        }
-
-        public void setId(Long id) {
-            this.id = id;
-        }
-
-        public String getCategory() {
-            return category;
-        }
-
-        public void setCategory(String category) {
-            this.category = category;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public void setName(String name) {
-            this.name = name;
+    @Override
+    public void destroy() {
+        executor.shutdownNow();
+        try {
+            dataSource.close();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
+    }
 
-        public String getContent() {
-            return content;
-        }
+    private class RemoteAdapterMonitorImpl implements RemoteAdapterMonitor {
 
-        public void setContent(String content) {
-            this.content = content;
+        @Override
+        public void onAdd(ConfigItem configItem) {
+            this.onModify(configItem);
         }
 
-        public long getModifiedTime() {
-            return modifiedTime;
+        @Override
+        public void onModify(ConfigItem configItem) {
+            try (FileWriter writer = new FileWriter(
+                getConfPath() + configItem.getCategory() + "/" + configItem.getName())) {
+                writer.write(configItem.getContent());
+                writer.flush();
+                logger.info("## Loaded remote adapter config: {}/{}", configItem.getCategory(), configItem.getName());
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
         }
 
-        public void setModifiedTime(long modifiedTime) {
-            this.modifiedTime = modifiedTime;
+        @Override
+        public void onDelete(String name) {
+            File file = new File(getConfPath() + name);
+            if (file.exists()) {
+                deleteDir(file);
+                logger.info("## Deleted and reloaded remote adapter config: {}", name);
+            }
         }
     }
 }

+ 31 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteAdapterMonitor.java

@@ -0,0 +1,31 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
+
+/**
+ * 远程配置监听器接口
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public interface RemoteAdapterMonitor {
+
+    /**
+     * 新增配置事件
+     *
+     * @param configItem 配置项
+     */
+    void onAdd(ConfigItem configItem);
+
+    /**
+     * 修改配置事件
+     *
+     * @param configItem 配置项
+     */
+    void onModify(ConfigItem configItem);
+
+    /**
+     * 删除配置事件
+     *
+     * @param name 配置名
+     */
+    void onDelete(String name);
+}

+ 30 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteConfigLoader.java

@@ -0,0 +1,30 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
+
+/**
+ * 远程配置装载器接口
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public interface RemoteConfigLoader {
+
+    /**
+     * 加载远程application.yml配置到本地
+     */
+    void loadRemoteConfig();
+
+    /**
+     * 加载adapter配置
+     */
+    void loadRemoteAdapterConfigs();
+
+    /**
+     * 启动监听数据库变化
+     */
+    void startMonitor();
+
+    /**
+     * 销毁
+     */
+    void destroy();
+}

+ 34 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/remote/RemoteConfigLoaderFactory.java

@@ -0,0 +1,34 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.env.Environment;
+
+/**
+ * 远程配置装载器工厂类
+ *
+ * @author rewerma 2019-01-25 下午05:20:16
+ * @version 1.0.0
+ */
+public class RemoteConfigLoaderFactory {
+
+    private static final Logger logger = LoggerFactory.getLogger(RemoteConfigLoaderFactory.class);
+
+    public static RemoteConfigLoader getRemoteConfigLoader(Environment env) {
+        try {
+            String jdbcUrl = env.getProperty("canal.manager.jdbc.url");
+            if (!StringUtils.isEmpty(jdbcUrl)) {
+                // load remote config
+                String driverName = env.getProperty("canal.manager.jdbc.driverName");
+                String jdbcUsername = env.getProperty("canal.manager.jdbc.username");
+                String jdbcPassword = env.getProperty("canal.manager.jdbc.password");
+                return new DbRemoteConfigLoader(driverName, jdbcUrl, jdbcUsername, jdbcPassword);
+            }
+            // 可扩展其它远程配置加载器
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+        return null;
+    }
+}