Parcourir la source

Merge pull request #1154 from rewerma/master

adapter 配置修改动态加载
agapple il y a 6 ans
Parent
commit
178bea07a8
17 fichiers modifiés avec 601 ajouts et 117 suppressions
  1. 25 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MappingConfigsLoader.java
  2. 25 12
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  3. 151 0
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/monitor/ESConfigMonitor.java
  4. 2 2
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  5. 6 6
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java
  6. 6 6
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java
  7. 4 4
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java
  8. 9 9
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java
  9. 5 5
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java
  10. 6 6
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java
  11. 33 13
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  12. 129 0
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/monitor/HbaseConfigMonitor.java
  13. 0 22
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/RefresherConfig.java
  14. 2 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationConfigMonitor.java
  15. 56 30
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  16. 141 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java
  17. 1 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

+ 25 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MappingConfigsLoader.java

@@ -43,4 +43,29 @@ public class MappingConfigsLoader {
 
         return configContentMap;
     }
+
+    public static String loadConfig(String name) {
+        // 先取本地文件,再取类路径
+        File filePath = new File(".." + File.separator + Constant.CONF_DIR + File.separator + name);
+        if (!filePath.exists()) {
+            URL url = MappingConfigsLoader.class.getClassLoader().getResource("");
+            if (url != null) {
+                filePath = new File(url.getPath() + name);
+            }
+        }
+        if (filePath.exists()) {
+            String fileName = filePath.getName();
+            if (!fileName.endsWith(".yml")) {
+                return null;
+            }
+            try (InputStream in = new FileInputStream(filePath)) {
+                byte[] bytes = new byte[in.available()];
+                in.read(bytes);
+                return new String(bytes, StandardCharsets.UTF_8);
+            } catch (IOException e) {
+                throw new RuntimeException("Read mapping config: " + filePath.getAbsolutePath() + " error. ", e);
+            }
+        }
+        return null;
+    }
 }

+ 25 - 12
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -1,10 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.es;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -23,6 +24,7 @@ import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SqlParser;
+import com.alibaba.otter.canal.client.adapter.es.monitor.ESConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
@@ -37,12 +39,14 @@ import com.alibaba.otter.canal.client.adapter.support.*;
 @SPI("es")
 public class ESAdapter implements OuterAdapter {
 
-    private Map<String, ESSyncConfig>       esSyncConfig        = new LinkedHashMap<>(); // 文件名对应配置
-    private Map<String, List<ESSyncConfig>> dbTableEsSyncConfig = new LinkedHashMap<>(); // schema-table对应配置
+    private Map<String, ESSyncConfig>              esSyncConfig        = new ConcurrentHashMap<>(); // 文件名对应配置
+    private Map<String, Map<String, ESSyncConfig>> dbTableEsSyncConfig = new ConcurrentHashMap<>(); // schema-table对应配置
 
-    private TransportClient                 transportClient;
+    private TransportClient                        transportClient;
 
-    private ESSyncService                   esSyncService;
+    private ESSyncService                          esSyncService;
+
+    private ESConfigMonitor                        esConfigMonitor;
 
     public TransportClient getTransportClient() {
         return transportClient;
@@ -56,7 +60,7 @@ public class ESAdapter implements OuterAdapter {
         return esSyncConfig;
     }
 
-    public Map<String, List<ESSyncConfig>> getDbTableEsSyncConfig() {
+    public Map<String, Map<String, ESSyncConfig>> getDbTableEsSyncConfig() {
         return dbTableEsSyncConfig;
     }
 
@@ -73,7 +77,9 @@ public class ESAdapter implements OuterAdapter {
                 }
             });
 
-            for (ESSyncConfig config : esSyncConfig.values()) {
+            for (Map.Entry<String, ESSyncConfig> entry : esSyncConfig.entrySet()) {
+                String configName = entry.getKey();
+                ESSyncConfig config = entry.getValue();
                 SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
                 config.getEsMapping().setSchemaItem(schemaItem);
 
@@ -89,9 +95,9 @@ public class ESAdapter implements OuterAdapter {
                 String schema = matcher.group(2);
 
                 schemaItem.getAliasTableItems().values().forEach(tableItem -> {
-                    List<ESSyncConfig> esSyncConfigs = dbTableEsSyncConfig
-                        .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new ArrayList<>());
-                    esSyncConfigs.add(config);
+                    Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig
+                        .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new HashMap<>());
+                    esSyncConfigMap.put(configName, config);
                 });
             }
 
@@ -108,6 +114,9 @@ public class ESAdapter implements OuterAdapter {
             }
             ESTemplate esTemplate = new ESTemplate(transportClient);
             esSyncService = new ESSyncService(esTemplate);
+
+            esConfigMonitor = new ESConfigMonitor();
+            esConfigMonitor.init(this);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -117,8 +126,8 @@ public class ESAdapter implements OuterAdapter {
     public void sync(Dml dml) {
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = dbTableEsSyncConfig.get(database + "-" + table);
-        esSyncService.sync(esSyncConfigs, dml);
+        Map<String, ESSyncConfig> configMap = dbTableEsSyncConfig.get(database + "-" + table);
+        esSyncService.sync(configMap.values(), dml);
     }
 
     @Override
@@ -192,6 +201,10 @@ public class ESAdapter implements OuterAdapter {
 
     @Override
     public String getDestination(String task) {
+        if (esConfigMonitor != null) {
+            esConfigMonitor.destroy();
+        }
+
         ESSyncConfig config = esSyncConfig.get(task);
         if (config != null) {
             return config.getDestination();

+ 151 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/monitor/ESConfigMonitor.java

@@ -0,0 +1,151 @@
+package com.alibaba.otter.canal.client.adapter.es.monitor;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SqlParser;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+
+public class ESConfigMonitor {
+
+    private static final Logger   logger      = LoggerFactory.getLogger(ESConfigMonitor.class);
+
+    private static final String   adapterName = "es";
+
+    private ESAdapter             esAdapter;
+
+    private FileAlterationMonitor fileMonitor;
+
+    public void init(ESAdapter esAdapter) {
+        this.esAdapter = esAdapter;
+        File confDir = Util.getConfDirPath(adapterName);
+        try {
+            FileAlterationObserver observer = new FileAlterationObserver(confDir,
+                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+            FileListener listener = new FileListener();
+            observer.addListener(listener);
+            fileMonitor = new FileAlterationMonitor(3000, observer);
+            fileMonitor.start();
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void destroy() {
+        try {
+            fileMonitor.stop();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private class FileListener extends FileAlterationListenerAdaptor {
+
+        @Override
+        public void onFileCreate(File file) {
+            super.onFileCreate(file);
+            try {
+                // 加载新增的配置文件
+                String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
+                config.validate();
+                addConfigToCache(file, config);
+
+                logger.info("Add a new es mapping config: {} to canal adapter", file.getName());
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileChange(File file) {
+            super.onFileChange(file);
+
+            try {
+                if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
+                    // 加载配置文件
+                    String configContent = MappingConfigsLoader
+                        .loadConfig(adapterName + File.separator + file.getName());
+                    ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
+                    config.validate();
+                    if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
+                        deleteConfigFromCache(file);
+                    }
+                    addConfigToCache(file, config);
+
+                    logger.info("Change a es mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileDelete(File file) {
+            super.onFileDelete(file);
+
+            try {
+                if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
+                    deleteConfigFromCache(file);
+
+                    logger.info("Delete a es mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        private void addConfigToCache(File file, ESSyncConfig config) {
+            esAdapter.getEsSyncConfig().put(file.getName(), config);
+
+            SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
+            config.getEsMapping().setSchemaItem(schemaItem);
+
+            DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+            if (dataSource == null || dataSource.getUrl() == null) {
+                throw new RuntimeException("No data source found: " + config.getDataSourceKey());
+            }
+            Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
+            Matcher matcher = pattern.matcher(dataSource.getUrl());
+            if (!matcher.find()) {
+                throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
+            }
+            String schema = matcher.group(2);
+
+            schemaItem.getAliasTableItems().values().forEach(tableItem -> {
+                Map<String, ESSyncConfig> esSyncConfigMap = esAdapter.getDbTableEsSyncConfig()
+                    .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new HashMap<>());
+                esSyncConfigMap.put(file.getName(), config);
+            });
+
+        }
+
+        private void deleteConfigFromCache(File file) {
+            esAdapter.getEsSyncConfig().remove(file.getName());
+            for (Map<String, ESSyncConfig> configMap : esAdapter.getDbTableEsSyncConfig().values()) {
+                if (configMap != null) {
+                    configMap.remove(file.getName());
+                }
+            }
+
+        }
+    }
+}

+ 2 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
+import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -13,7 +14,6 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
-import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
@@ -39,7 +39,7 @@ public class ESSyncService {
         this.esTemplate = esTemplate;
     }
 
-    public void sync(List<ESSyncConfig> esSyncConfigs, Dml dml) {
+    public void sync(Collection<ESSyncConfig> esSyncConfigs, Dml dml) {
         long begin = System.currentTimeMillis();
         if (esSyncConfigs != null) {
             if (logger.isTraceEnabled()) {

+ 6 - 6
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java

@@ -50,9 +50,9 @@ public class LabelSyncJoinSub2Test {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b;a_", response.getSource().get("_labels"));
@@ -88,9 +88,9 @@ public class LabelSyncJoinSub2Test {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b;aa_", response.getSource().get("_labels"));
@@ -120,9 +120,9 @@ public class LabelSyncJoinSub2Test {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b_", response.getSource().get("_labels"));

+ 6 - 6
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java

@@ -50,9 +50,9 @@ public class LabelSyncJoinSubTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b;a", response.getSource().get("_labels"));
@@ -88,9 +88,9 @@ public class LabelSyncJoinSubTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b;aa", response.getSource().get("_labels"));
@@ -120,9 +120,9 @@ public class LabelSyncJoinSubTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b", response.getSource().get("_labels"));

+ 4 - 4
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java

@@ -48,9 +48,9 @@ public class RoleSyncJoinOne2Test {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin_", response.getSource().get("_role_name"));
@@ -85,9 +85,9 @@ public class RoleSyncJoinOne2Test {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin3_", response.getSource().get("_role_name"));

+ 9 - 9
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java

@@ -49,9 +49,9 @@ public class RoleSyncJoinOneTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin", response.getSource().get("_role_name"));
@@ -86,9 +86,9 @@ public class RoleSyncJoinOneTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin2", response.getSource().get("_role_name"));
@@ -124,9 +124,9 @@ public class RoleSyncJoinOneTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("operator", response.getSource().get("_role_name"));
@@ -151,7 +151,7 @@ public class RoleSyncJoinOneTest {
         old2.put("role_id", 2L);
         dml2.setOld(oldList2);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml2);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml2);
 
         GetResponse response2 = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin2", response2.getSource().get("_role_name"));
@@ -181,9 +181,9 @@ public class RoleSyncJoinOneTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertNull(response.getSource().get("_role_name"));

+ 5 - 5
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java

@@ -20,7 +20,7 @@ public class UserSyncJoinOneTest {
 
     @Before
     public void init() {
-//        AdapterConfigs.put("es", "mytest_user_join_one.yml");
+        // AdapterConfigs.put("es", "mytest_user_join_one.yml");
         esAdapter = Common.init();
     }
 
@@ -50,9 +50,9 @@ public class UserSyncJoinOneTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("Eric_", response.getSource().get("_name"));
@@ -86,9 +86,9 @@ public class UserSyncJoinOneTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("Eric2_", response.getSource().get("_name"));

+ 6 - 6
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java

@@ -43,9 +43,9 @@ public class UserSyncSingleTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("Eric", response.getSource().get("_name"));
@@ -76,9 +76,9 @@ public class UserSyncSingleTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("Eric2", response.getSource().get("_name"));
@@ -106,9 +106,9 @@ public class UserSyncSingleTest {
 
         String database = dml.getDatabase();
         String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+        Map<String, ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
 
-        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+        esAdapter.getEsSyncService().sync(esSyncConfigs.values(), dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertNull(response.getSource());

+ 33 - 13
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.hbase.monitor.HbaseConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
 import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
@@ -36,14 +37,24 @@ import com.alibaba.otter.canal.client.adapter.support.*;
 @SPI("hbase")
 public class HbaseAdapter implements OuterAdapter {
 
-    private static Logger              logger             = LoggerFactory.getLogger(HbaseAdapter.class);
+    private static Logger                           logger             = LoggerFactory.getLogger(HbaseAdapter.class);
 
-    private Map<String, MappingConfig> hbaseMapping       = new ConcurrentHashMap<>();                            // 文件名对应配置
-    private Map<String, MappingConfig> mappingConfigCache = new ConcurrentHashMap<>();                            // 库名-表名对应配置
+    private Map<String, MappingConfig>              hbaseMapping       = new ConcurrentHashMap<>();                  // 文件名对应配置
+    private Map<String, Map<String, MappingConfig>> mappingConfigCache = new ConcurrentHashMap<>();                  // 库名-表名对应配置
 
-    private Connection                 conn;
-    private HbaseSyncService           hbaseSyncService;
-    private HbaseTemplate              hbaseTemplate;
+    private Connection                              conn;
+    private HbaseSyncService                        hbaseSyncService;
+    private HbaseTemplate                           hbaseTemplate;
+
+    private HbaseConfigMonitor                      configMonitor;
+
+    public Map<String, MappingConfig> getHbaseMapping() {
+        return hbaseMapping;
+    }
+
+    public Map<String, Map<String, MappingConfig>> getMappingConfigCache() {
+        return mappingConfigCache;
+    }
 
     @Override
     public void init(OuterAdapterConfig configuration) {
@@ -57,11 +68,14 @@ public class HbaseAdapter implements OuterAdapter {
                     hbaseMapping.put(key, mappingConfig);
                 }
             });
-            for (MappingConfig mappingConfig : hbaseMapping.values()) {
-                mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                       + mappingConfig.getHbaseMapping().getDatabase() + "."
-                                       + mappingConfig.getHbaseMapping().getTable(),
-                    mappingConfig);
+            for (Map.Entry<String, MappingConfig> entry : hbaseMapping.entrySet()) {
+                String configName = entry.getKey();
+                MappingConfig mappingConfig = entry.getValue();
+                String k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                           + mappingConfig.getHbaseMapping().getDatabase() + "."
+                           + mappingConfig.getHbaseMapping().getTable();
+                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(k, k1 -> new HashMap<>());
+                configMap.put(configName, mappingConfig);
             }
 
             Map<String, String> properties = configuration.getProperties();
@@ -71,6 +85,9 @@ public class HbaseAdapter implements OuterAdapter {
             conn = ConnectionFactory.createConnection(hbaseConfig);
             hbaseTemplate = new HbaseTemplate(conn);
             hbaseSyncService = new HbaseSyncService(hbaseTemplate);
+
+            configMonitor = new HbaseConfigMonitor();
+            configMonitor.init(this);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -84,8 +101,8 @@ public class HbaseAdapter implements OuterAdapter {
         String destination = StringUtils.trimToEmpty(dml.getDestination());
         String database = dml.getDatabase();
         String table = dml.getTable();
-        MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
-        hbaseSyncService.sync(config, dml);
+        Map<String, MappingConfig> configMap = mappingConfigCache.get(destination + "." + database + "." + table);
+        configMap.values().forEach(config -> hbaseSyncService.sync(config, dml));
     }
 
     @Override
@@ -160,6 +177,9 @@ public class HbaseAdapter implements OuterAdapter {
 
     @Override
     public void destroy() {
+        if (configMonitor != null) {
+            configMonitor.destroy();
+        }
         if (conn != null) {
             try {
                 conn.close();

+ 129 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/monitor/HbaseConfigMonitor.java

@@ -0,0 +1,129 @@
+package com.alibaba.otter.canal.client.adapter.hbase.monitor;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.otter.canal.client.adapter.hbase.HbaseAdapter;
+import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+
+public class HbaseConfigMonitor {
+
+    private static final Logger   logger      = LoggerFactory.getLogger(HbaseConfigMonitor.class);
+
+    private static final String   adapterName = "hbase";
+
+    private HbaseAdapter          hbaseAdapter;
+
+    private FileAlterationMonitor fileMonitor;
+
+    public void init(HbaseAdapter hbaseAdapter) {
+        this.hbaseAdapter = hbaseAdapter;
+        File confDir = Util.getConfDirPath(adapterName);
+        try {
+            FileAlterationObserver observer = new FileAlterationObserver(confDir,
+                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+            FileListener listener = new FileListener();
+            observer.addListener(listener);
+            fileMonitor = new FileAlterationMonitor(3000, observer);
+            fileMonitor.start();
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void destroy() {
+        try {
+            fileMonitor.stop();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private class FileListener extends FileAlterationListenerAdaptor {
+
+        @Override
+        public void onFileCreate(File file) {
+            super.onFileCreate(file);
+            try {
+                // 加载新增的配置文件
+                String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                config.validate();
+                addConfigToCache(file, config);
+
+                logger.info("Add a new hbase mapping config: {} to canal adapter", file.getName());
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileChange(File file) {
+            super.onFileChange(file);
+
+            try {
+                if (hbaseAdapter.getHbaseMapping().containsKey(file.getName())) {
+                    // 加载配置文件
+                    String configContent = MappingConfigsLoader
+                        .loadConfig(adapterName + File.separator + file.getName());
+                    MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                    config.validate();
+                    if (hbaseAdapter.getHbaseMapping().containsKey(file.getName())) {
+                        deleteConfigFromCache(file);
+                    }
+                    addConfigToCache(file, config);
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileDelete(File file) {
+            super.onFileDelete(file);
+
+            try {
+                if (hbaseAdapter.getHbaseMapping().containsKey(file.getName())) {
+                    deleteConfigFromCache(file);
+
+                    logger.info("Delete a hbase mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        private void addConfigToCache(File file, MappingConfig config) {
+            hbaseAdapter.getHbaseMapping().put(file.getName(), config);
+            Map<String, MappingConfig> configMap = hbaseAdapter.getMappingConfigCache()
+                .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "."
+                                 + config.getHbaseMapping().getDatabase() + "." + config.getHbaseMapping().getTable(),
+                    k1 -> new HashMap<>());
+            configMap.put(file.getName(), config);
+        }
+
+        private void deleteConfigFromCache(File file) {
+
+            hbaseAdapter.getHbaseMapping().remove(file.getName());
+            for (Map<String, MappingConfig> configMap : hbaseAdapter.getMappingConfigCache().values()) {
+                if (configMap != null) {
+                    configMap.remove(file.getName());
+                }
+            }
+
+        }
+    }
+}

+ 0 - 22
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/RefresherConfig.java

@@ -1,22 +0,0 @@
-package com.alibaba.otter.canal.adapter.launcher.config;
-
-import org.springframework.cloud.context.refresh.ContextRefresher;
-import org.springframework.cloud.context.scope.refresh.RefreshScope;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class RefresherConfig {
-
-    @Bean
-    public RefreshScope refreshScope() {
-        return new RefreshScope();
-    }
-
-    @Bean
-    public ContextRefresher contextRefresher(ConfigurableApplicationContext configurableApplicationContext,
-                                             RefreshScope refreshScope) {
-        return new ContextRefresher(configurableApplicationContext, refreshScope);
-    }
-}

+ 2 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationRunningMonitor.java → client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationConfigMonitor.java

@@ -22,9 +22,9 @@ import com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 
 @Component
-public class ApplicationRunningMonitor {
+public class ApplicationConfigMonitor {
 
-    private static final Logger   logger = LoggerFactory.getLogger(ApplicationRunningMonitor.class);
+    private static final Logger   logger = LoggerFactory.getLogger(ApplicationConfigMonitor.class);
 
     @Resource
     private ContextRefresher      contextRefresher;

+ 56 - 30
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -5,6 +5,7 @@ import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -20,6 +21,7 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
@@ -28,22 +30,33 @@ import com.alibaba.otter.canal.client.adapter.support.*;
 @SPI("rdb")
 public class RdbAdapter implements OuterAdapter {
 
-    private static Logger              logger             = LoggerFactory.getLogger(RdbAdapter.class);
+    private static Logger                           logger             = LoggerFactory.getLogger(RdbAdapter.class);
 
-    private Map<String, MappingConfig> rdbMapping         = new HashMap<>();                                // 文件名对应配置
-    private Map<String, MappingConfig> mappingConfigCache = new HashMap<>();                                // 库名-表名对应配置
+    private Map<String, MappingConfig>              rdbMapping         = new HashMap<>();                          // 文件名对应配置
+    private Map<String, Map<String, MappingConfig>> mappingConfigCache = new HashMap<>();                          // 库名-表名对应配置
 
-    private DruidDataSource            dataSource;
+    private DruidDataSource                         dataSource;
 
-    private RdbSyncService             rdbSyncService;
+    private RdbSyncService                          rdbSyncService;
 
-    private int                        commitSize         = 3000;
+    private int                                     commitSize         = 3000;
 
-    private volatile boolean           running            = false;
+    private volatile boolean                        running            = false;
 
-    private List<SimpleDml>            dmlList            = Collections.synchronizedList(new ArrayList<>());
-    private Lock                       syncLock           = new ReentrantLock();
-    private ExecutorService            executor           = Executors.newFixedThreadPool(1);
+    private List<SimpleDml>                         dmlList            = Collections
+        .synchronizedList(new ArrayList<>());
+    private Lock                                    syncLock           = new ReentrantLock();
+    private ExecutorService                         executor           = Executors.newFixedThreadPool(1);
+
+    private RdbConfigMonitor                        rdbConfigMonitor;
+
+    public Map<String, MappingConfig> getRdbMapping() {
+        return rdbMapping;
+    }
+
+    public Map<String, Map<String, MappingConfig>> getMappingConfigCache() {
+        return mappingConfigCache;
+    }
 
     @Override
     public void init(OuterAdapterConfig configuration) {
@@ -56,11 +69,15 @@ public class RdbAdapter implements OuterAdapter {
                 rdbMapping.put(key, mappingConfig);
             }
         });
-        for (MappingConfig mappingConfig : rdbMapping.values()) {
-            mappingConfigCache
-                .put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                     + mappingConfig.getDbMapping().getDatabase() + "." + mappingConfig.getDbMapping().getTable(),
-                    mappingConfig);
+        for (Map.Entry<String, MappingConfig> entry : rdbMapping.entrySet()) {
+            String configName = entry.getKey();
+            MappingConfig mappingConfig = entry.getValue();
+            Map<String, MappingConfig> configMap = mappingConfigCache
+                .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                 + mappingConfig.getDbMapping().getDatabase() + "."
+                                 + mappingConfig.getDbMapping().getTable(),
+                    k1 -> new HashMap<>());
+            configMap.put(configName, mappingConfig);
         }
 
         Map<String, String> properties = configuration.getProperties();
@@ -93,19 +110,22 @@ public class RdbAdapter implements OuterAdapter {
 
         executor.submit(() -> {
             while (running) {
+                int beginSize = dmlList.size();
                 try {
-                    int size1 = dmlList.size();
                     Thread.sleep(3000);
-                    int size2 = dmlList.size();
-                    if (size1 == size2) {
-                        // 超时提交
-                        sync();
-                    }
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                int endSize = dmlList.size();
+
+                if (endSize - beginSize < 300) {
+                    sync();
                 }
             }
         });
+
+        rdbConfigMonitor = new RdbConfigMonitor();
+        rdbConfigMonitor.init(configuration.getKey(), this);
     }
 
     @Override
@@ -113,16 +133,18 @@ public class RdbAdapter implements OuterAdapter {
         String destination = StringUtils.trimToEmpty(dml.getDestination());
         String database = dml.getDatabase();
         String table = dml.getTable();
-        MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
+        Map<String, MappingConfig> configMap = mappingConfigCache.get(destination + "." + database + "." + table);
 
-        List<SimpleDml> simpleDmlList = SimpleDml.dml2SimpleDml(dml, config);
+        if (configMap != null) {
+            configMap.values().forEach(config -> {
+                List<SimpleDml> simpleDmlList = SimpleDml.dml2SimpleDml(dml, config);
+                dmlList.addAll(simpleDmlList);
 
-        dmlList.addAll(simpleDmlList);
-
-        if (dmlList.size() > commitSize) {
-            sync();
+                if (dmlList.size() >= commitSize) {
+                    sync();
+                }
+            });
         }
-
         if (logger.isDebugEnabled()) {
             logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
         }
@@ -235,6 +257,10 @@ public class RdbAdapter implements OuterAdapter {
     @Override
     public void destroy() {
         running = false;
+        if (rdbConfigMonitor != null) {
+            rdbConfigMonitor.destroy();
+        }
+
         executor.shutdown();
 
         if (rdbSyncService != null) {

+ 141 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -0,0 +1,141 @@
+package com.alibaba.otter.canal.client.adapter.rdb.monitor;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+
+public class RdbConfigMonitor {
+
+    private static final Logger   logger = LoggerFactory.getLogger(RdbConfigMonitor.class);
+
+    private static final String   adapterName = "rdb";
+
+    private String                key;
+
+    private RdbAdapter            rdbAdapter;
+
+    private FileAlterationMonitor fileMonitor;
+
+    public void init(String key, RdbAdapter rdbAdapter) {
+        this.key = key;
+        this.rdbAdapter = rdbAdapter;
+        File confDir = Util.getConfDirPath(adapterName);
+        try {
+            FileAlterationObserver observer = new FileAlterationObserver(confDir,
+                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+            FileListener listener = new FileListener();
+            observer.addListener(listener);
+            fileMonitor = new FileAlterationMonitor(3000, observer);
+            fileMonitor.start();
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void destroy() {
+        try {
+            fileMonitor.stop();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private class FileListener extends FileAlterationListenerAdaptor {
+
+        @Override
+        public void onFileCreate(File file) {
+            super.onFileCreate(file);
+            try {
+                // 加载新增的配置文件
+                String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                config.validate();
+                if ((key == null && config.getOuterAdapterKey() == null)
+                    || (key != null && key.equals(config.getOuterAdapterKey()))) {
+                    addConfigToCache(file, config);
+
+                    logger.info("Add a new rdb mapping config: {} to canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileChange(File file) {
+            super.onFileChange(file);
+
+            try {
+                if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
+                    // 加载配置文件
+                    String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                    MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+                    config.validate();
+                    if ((key == null && config.getOuterAdapterKey() == null)
+                        || (key != null && key.equals(config.getOuterAdapterKey()))) {
+                        if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
+                            deleteConfigFromCache(file);
+                        }
+                        addConfigToCache(file, config);
+                    } else {
+                        // 不能修改outerAdapterKey
+                        throw new RuntimeException("Outer adapter key not allowed modify");
+                    }
+                    logger.info("Change a rdb mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        @Override
+        public void onFileDelete(File file) {
+            super.onFileDelete(file);
+
+            try {
+                if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
+                    deleteConfigFromCache(file);
+
+                    logger.info("Delete a rdb mapping config: {} of canal adapter", file.getName());
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        private void addConfigToCache(File file, MappingConfig config) {
+            rdbAdapter.getRdbMapping().put(file.getName(), config);
+            Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
+                .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "."
+                                 + config.getDbMapping().getDatabase() + "." + config.getDbMapping().getTable(),
+                    k1 -> new HashMap<>());
+            configMap.put(file.getName(), config);
+        }
+
+        private void deleteConfigFromCache(File file) {
+
+            rdbAdapter.getRdbMapping().remove(file.getName());
+            for (Map<String, MappingConfig> configMap : rdbAdapter.getMappingConfigCache().values()) {
+                if (configMap != null) {
+                    configMap.remove(file.getName());
+                }
+            }
+
+        }
+    }
+}

+ 1 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -10,6 +10,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.sql.DataSource;