Browse Source

repair issue #3401 (#3923)

* 解决OutAdapter单例导致的问题,如 1.多个es7 outAdapter的多线程问题(共享同一个es bulk request),2.多个es7 outAdapter共用一份es配置问题

* 1. 优化adapter和监听器之间的代码结构
2. 监听器只处理匹配的配置文件
3. bugfix-es adapter配置文件回调时漏写esVersion
理论上一个adapter实例对应一个config配置文件

Co-authored-by: agapple <jianghang.loujh@alibaba-inc.com>
zhuchao941 3 years ago
parent
commit
4315f203a2
16 changed files with 586 additions and 502 deletions
  1. 28 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/FileName2KeyMapping.java
  2. 2 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java
  3. 79 45
      client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/ESAdapter.java
  4. 17 34
      client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/monitor/ESConfigMonitor.java
  5. 81 38
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  6. 10 37
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/monitor/HbaseConfigMonitor.java
  7. 73 36
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/KuduAdapter.java
  8. 13 49
      client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/monitor/KuduConfigMonitor.java
  9. 24 15
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  10. 14 10
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java
  11. 6 6
      client-adapter/launcher/src/main/resources/bootstrap.yml
  12. 60 25
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/PhoenixAdapter.java
  13. 9 51
      client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/monitor/PhoenixConfigMonitor.java
  14. 70 32
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  15. 16 74
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java
  16. 84 50
      client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/TablestoreAdapter.java

+ 28 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/FileName2KeyMapping.java

@@ -0,0 +1,28 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created by @author zhuchao on @date 2021/11/11.
+ */
+public class FileName2KeyMapping {
+
+    private static Map<String, String> MAP = new ConcurrentHashMap<>();
+
+    public static void register(String type, String fileName, String key) {
+        MAP.putIfAbsent(join(type, fileName), key);
+    }
+
+    public static void unregister(String type, String fileName) {
+        MAP.remove(join(type, fileName));
+    }
+
+    public static String getKey(String type, String fileName) {
+        return MAP.get(join(type, fileName));
+    }
+
+    private static String join(String type, String fileName) {
+        return type + "|" + fileName;
+    }
+}

+ 2 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -30,6 +30,8 @@ public class Util {
 
     private static final Logger logger = LoggerFactory.getLogger(Util.class);
 
+    public static final String AUTO_GENERATED_PREFIX = "AUTO_GENERATED_";
+
     /**
      * 通过DS执行sql
      */

+ 79 - 45
client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/ESAdapter.java

@@ -1,14 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.es.core;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.lang.StringUtils;
-
 import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
@@ -21,7 +12,17 @@ import com.alibaba.otter.canal.client.adapter.es.core.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.FileName2KeyMapping;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
 
 /**
  * ES外部适配器
@@ -42,6 +43,8 @@ public abstract class ESAdapter implements OuterAdapter {
 
     protected Properties                             envProperties;
 
+    protected OuterAdapterConfig                     configuration;
+
     public ESSyncService getEsSyncService() {
         return esSyncService;
     }
@@ -58,23 +61,13 @@ public abstract class ESAdapter implements OuterAdapter {
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         try {
             this.envProperties = envProperties;
+            this.configuration = configuration;
             Map<String, ESSyncConfig> esSyncConfigTmp = ESSyncConfigLoader.load(envProperties);
             // 过滤不匹配的key的配置
             esSyncConfigTmp.forEach((key, config) -> {
-                if ((config.getOuterAdapterKey() == null && configuration.getKey() == null)
-                    || (config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
-                        .equalsIgnoreCase(configuration.getKey()))) {
-                    esSyncConfig.put(key, config);
-                }
+                addConfig(key, config);
             });
 
-            for (Map.Entry<String, ESSyncConfig> entry : esSyncConfig.entrySet()) {
-                String configName = entry.getKey();
-                ESSyncConfig config = entry.getValue();
-
-                addSyncConfigToCache(configName, config);
-            }
-
             esSyncService = new ESSyncService(esTemplate);
 
             esConfigMonitor = new ESConfigMonitor();
@@ -138,7 +131,7 @@ public abstract class ESAdapter implements OuterAdapter {
         return null;
     }
 
-    public void addSyncConfigToCache(String configName, ESSyncConfig config) {
+    private void addSyncConfigToCache(String configName, ESSyncConfig config) {
         Properties envProperties = this.envProperties;
         SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
         config.getEsMapping().setSchemaItem(schemaItem);
@@ -155,28 +148,69 @@ public abstract class ESAdapter implements OuterAdapter {
         String schema = matcher.group(2);
 
         schemaItem.getAliasTableItems()
-            .values()
-            .forEach(tableItem -> {
-                Map<String, ESSyncConfig> esSyncConfigMap;
-                if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                    esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
-                                                                          + "-"
-                                                                          + StringUtils.trimToEmpty(config.getGroupId())
-                                                                          + "_"
-                                                                          + tableItem.getSchema() == null ? schema : tableItem.getSchema()
-                                                                          + "-"
-                                                                          + tableItem.getTableName(),
-                        k -> new ConcurrentHashMap<>());
-                } else {
-                    esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
-                                                                          + "_"
-                                                                          + tableItem.getSchema() == null ? schema : tableItem.getSchema()
-                                                                          + "-"
-                                                                          + tableItem.getTableName(),
-                        k -> new ConcurrentHashMap<>());
-                }
-
-                esSyncConfigMap.put(configName, config);
-            });
+                .values()
+                .forEach(tableItem -> {
+                    Map<String, ESSyncConfig> esSyncConfigMap;
+                    if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                        esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
+                                        + "-"
+                                        + StringUtils.trimToEmpty(config.getGroupId())
+                                        + "_"
+                                        + tableItem.getSchema() == null ? schema : tableItem.getSchema()
+                                        + "-"
+                                        + tableItem.getTableName(),
+                                k -> new ConcurrentHashMap<>());
+                    } else {
+                        esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(StringUtils.trimToEmpty(config.getDestination())
+                                        + "_"
+                                        + tableItem.getSchema() == null ? schema : tableItem.getSchema()
+                                        + "-"
+                                        + tableItem.getTableName(),
+                                k -> new ConcurrentHashMap<>());
+                    }
+
+                    esSyncConfigMap.put(configName, config);
+                });
+    }
+
+    public boolean addConfig(String fileName, ESSyncConfig config) {
+        if (match(config)) {
+            esSyncConfig.put(fileName, config);
+            addSyncConfigToCache(fileName, config);
+            FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName,
+                    configuration.getKey());
+            return true;
+        }
+        return false;
+    }
+
+    public void updateConfig(String fileName, ESSyncConfig config) {
+        if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey()
+                .equals(configuration.getKey())) {
+            // 理论上不允许改这个 因为本身就是通过这个关联起Adapter和Config的
+            throw new RuntimeException("not allow to change outAdapterKey");
+        }
+        esSyncConfig.put(fileName, config);
+        addSyncConfigToCache(fileName, config);
+    }
+
+    public void deleteConfig(String fileName) {
+        esSyncConfig.remove(fileName);
+        for (Map<String, ESSyncConfig> configMap : dbTableEsSyncConfig.values()) {
+            if (configMap != null) {
+                configMap.remove(fileName);
+            }
+        }
+        FileName2KeyMapping.unregister(getClass().getAnnotation(SPI.class).value(), fileName);
+    }
+
+    private boolean match(ESSyncConfig config) {
+        boolean sameMatch = config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
+                .equalsIgnoreCase(configuration.getKey());
+        boolean prefixMatch = config.getOuterAdapterKey() == null && configuration.getKey()
+                .startsWith(StringUtils
+                        .join(new String[]{Util.AUTO_GENERATED_PREFIX, config.getDestination(),
+                                config.getGroupId()}, '-'));
+        return sameMatch || prefixMatch;
     }
 }

+ 17 - 34
client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/monitor/ESConfigMonitor.java

@@ -1,9 +1,12 @@
 package com.alibaba.otter.canal.client.adapter.es.core.monitor;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.es.core.ESAdapter;
+import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import java.io.File;
-import java.util.Map;
 import java.util.Properties;
-
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
@@ -11,12 +14,6 @@ import org.apache.commons.io.monitor.FileAlterationObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
-import com.alibaba.otter.canal.client.adapter.es.core.ESAdapter;
-import com.alibaba.otter.canal.client.adapter.es.core.config.ESSyncConfig;
-import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
-import com.alibaba.otter.canal.client.adapter.support.Util;
-
 public class ESConfigMonitor {
 
     private static final Logger   logger = LoggerFactory.getLogger(ESConfigMonitor.class);
@@ -36,7 +33,7 @@ public class ESConfigMonitor {
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
-                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+                    FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
             FileListener listener = new FileListener();
             observer.addListener(listener);
             fileMonitor = new FileAlterationMonitor(3000, observer);
@@ -69,9 +66,14 @@ public class ESConfigMonitor {
                     null,
                     envProperties);
                 if (config != null) {
+                    // 这里要记得设置esVersion bugfix
+                    config.setEsVersion(adapterName);
                     config.validate();
-                    addConfigToCache(file, config);
-                    logger.info("Add a new es mapping config: {} to canal adapter", file.getName());
+                    boolean result = esAdapter.addConfig(file.getName(), config);
+                    if (result) {
+                        logger.info("Add a new es mapping config: {} to canal adapter",
+                                file.getName());
+                    }
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
@@ -99,12 +101,10 @@ public class ESConfigMonitor {
                     if (config == null) {
                         return;
                     }
+                    // 这里要记得设置esVersion bugfix
+                    config.setEsVersion(adapterName);
                     config.validate();
-                    if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
-                        deleteConfigFromCache(file);
-                    }
-                    addConfigToCache(file, config);
-
+                    esAdapter.updateConfig(file.getName(), config);
                     logger.info("Change a es mapping config: {} of canal adapter", file.getName());
                 }
             } catch (Exception e) {
@@ -118,29 +118,12 @@ public class ESConfigMonitor {
 
             try {
                 if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
-                    deleteConfigFromCache(file);
-
+                    esAdapter.deleteConfig(file.getName());
                     logger.info("Delete a es mapping config: {} of canal adapter", file.getName());
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
         }
-
-        private void addConfigToCache(File file, ESSyncConfig config) {
-            esAdapter.getEsSyncConfig().put(file.getName(), config);
-
-            esAdapter.addSyncConfigToCache(file.getName(), config);
-        }
-
-        private void deleteConfigFromCache(File file) {
-            esAdapter.getEsSyncConfig().remove(file.getName());
-            for (Map<String, ESSyncConfig> configMap : esAdapter.getDbTableEsSyncConfig().values()) {
-                if (configMap != null) {
-                    configMap.remove(file.getName());
-                }
-            }
-
-        }
     }
 }

+ 81 - 38
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -1,9 +1,25 @@
 package com.alibaba.otter.canal.client.adapter.hbase;
 
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.hbase.monitor.HbaseConfigMonitor;
+import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
+import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
+import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.FileName2KeyMapping;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -16,18 +32,6 @@ import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfigLoader;
-import com.alibaba.otter.canal.client.adapter.hbase.monitor.HbaseConfigMonitor;
-import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
-import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
-import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
-
 /**
  * HBase外部适配器
  *
@@ -49,6 +53,8 @@ public class HbaseAdapter implements OuterAdapter {
 
     private Properties                              envProperties;
 
+    private OuterAdapterConfig                      configuration;
+
     public Map<String, MappingConfig> getHbaseMapping() {
         return hbaseMapping;
     }
@@ -61,33 +67,12 @@ public class HbaseAdapter implements OuterAdapter {
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         try {
             this.envProperties = envProperties;
+            this.configuration = configuration;
             Map<String, MappingConfig> hbaseMappingTmp = MappingConfigLoader.load(envProperties);
             // 过滤不匹配的key的配置
-            hbaseMappingTmp.forEach((key, mappingConfig) -> {
-                if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                    || (mappingConfig.getOuterAdapterKey() != null
-                        && mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
-                    hbaseMapping.put(key, mappingConfig);
-                }
+            hbaseMappingTmp.forEach((key, config) -> {
+                addConfig(key, config);
             });
-            for (Map.Entry<String, MappingConfig> entry : hbaseMapping.entrySet()) {
-                String configName = entry.getKey();
-                MappingConfig mappingConfig = entry.getValue();
-                String k;
-                if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                    k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
-                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
-                        + mappingConfig.getHbaseMapping().getDatabase() + "-"
-                        + mappingConfig.getHbaseMapping().getTable();
-                } else {
-                    k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
-                        + mappingConfig.getHbaseMapping().getDatabase() + "-"
-                        + mappingConfig.getHbaseMapping().getTable();
-                }
-                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(k,
-                    k1 -> new ConcurrentHashMap<>());
-                configMap.put(configName, mappingConfig);
-            }
 
             Map<String, String> properties = configuration.getProperties();
 
@@ -223,4 +208,62 @@ public class HbaseAdapter implements OuterAdapter {
         }
         return null;
     }
+
+    private void addSyncConfigToCache(String configName, MappingConfig mappingConfig) {
+        String k;
+        if (envProperties != null && !"tcp"
+                .equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+            k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-" + StringUtils
+                    .trimToEmpty(mappingConfig.getGroupId()) + "_" + mappingConfig.getHbaseMapping()
+                    .getDatabase() + "-" + mappingConfig.getHbaseMapping().getTable();
+        } else {
+            k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_" + mappingConfig
+                    .getHbaseMapping().getDatabase() + "-" + mappingConfig.getHbaseMapping()
+                    .getTable();
+        }
+        Map<String, MappingConfig> configMap = mappingConfigCache
+                .computeIfAbsent(k, k1 -> new ConcurrentHashMap<>());
+        configMap.put(configName, mappingConfig);
+    }
+
+    public boolean addConfig(String fileName, MappingConfig config) {
+        if (match(config)) {
+            hbaseMapping.put(fileName, config);
+            addSyncConfigToCache(fileName, config);
+            FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName,
+                    configuration.getKey());
+            return true;
+        }
+        return false;
+    }
+
+    public void updateConfig(String fileName, MappingConfig config) {
+        if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey()
+                .equals(configuration.getKey())) {
+            // 理论上不允许改这个 因为本身就是通过这个关联起Adapter和Config的
+            throw new RuntimeException("not allow to change outAdapterKey");
+        }
+        hbaseMapping.put(fileName, config);
+        addSyncConfigToCache(fileName, config);
+    }
+
+    public void deleteConfig(String fileName) {
+        hbaseMapping.remove(fileName);
+        for (Map<String, MappingConfig> configMap : mappingConfigCache.values()) {
+            if (configMap != null) {
+                configMap.remove(fileName);
+            }
+        }
+        FileName2KeyMapping.unregister(getClass().getAnnotation(SPI.class).value(), fileName);
+    }
+
+    private boolean match(MappingConfig config) {
+        boolean sameMatch = config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
+                .equalsIgnoreCase(configuration.getKey());
+        boolean prefixMatch = config.getOuterAdapterKey() == null && configuration.getKey()
+                .startsWith(StringUtils
+                        .join(new String[]{Util.AUTO_GENERATED_PREFIX, config.getDestination(),
+                                config.getGroupId()}, '-'));
+        return sameMatch || prefixMatch;
+    }
 }

+ 10 - 37
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/monitor/HbaseConfigMonitor.java

@@ -5,20 +5,15 @@ import com.alibaba.otter.canal.client.adapter.hbase.HbaseAdapter;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
 import com.alibaba.otter.canal.client.adapter.support.Util;
-
+import java.io.File;
+import java.util.Properties;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
 import org.apache.commons.io.monitor.FileAlterationObserver;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
 public class HbaseConfigMonitor {
 
     private static final Logger   logger      = LoggerFactory.getLogger(HbaseConfigMonitor.class);
@@ -37,7 +32,7 @@ public class HbaseConfigMonitor {
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
-                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+                    FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
             FileListener listener = new FileListener();
             observer.addListener(listener);
             fileMonitor = new FileAlterationMonitor(3000, observer);
@@ -70,9 +65,11 @@ public class HbaseConfigMonitor {
                     return;
                 }
                 config.validate();
-                addConfigToCache(file, config);
-
-                logger.info("Add a new hbase mapping config: {} to canal adapter", file.getName());
+                boolean result = hbaseAdapter.addConfig(file.getName(), config);
+                if (result) {
+                    logger.info("Add a new hbase mapping config: {} to canal adapter",
+                            file.getName());
+                }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
@@ -97,10 +94,7 @@ public class HbaseConfigMonitor {
                         return;
                     }
                     config.validate();
-                    if (hbaseAdapter.getHbaseMapping().containsKey(file.getName())) {
-                        deleteConfigFromCache(file);
-                    }
-                    addConfigToCache(file, config);
+                    hbaseAdapter.updateConfig(file.getName(), config);
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
@@ -113,33 +107,12 @@ public class HbaseConfigMonitor {
 
             try {
                 if (hbaseAdapter.getHbaseMapping().containsKey(file.getName())) {
-                    deleteConfigFromCache(file);
-
+                    hbaseAdapter.deleteConfig(file.getName());
                     logger.info("Delete a hbase mapping config: {} of canal adapter", file.getName());
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
         }
-
-        private void addConfigToCache(File file, MappingConfig config) {
-            hbaseAdapter.getHbaseMapping().put(file.getName(), config);
-            Map<String, MappingConfig> configMap = hbaseAdapter.getMappingConfigCache()
-                .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "_"
-                                 + config.getHbaseMapping().getDatabase() + "-" + config.getHbaseMapping().getTable(),
-                    k1 -> new HashMap<>());
-            configMap.put(file.getName(), config);
-        }
-
-        private void deleteConfigFromCache(File file) {
-
-            hbaseAdapter.getHbaseMapping().remove(file.getName());
-            for (Map<String, MappingConfig> configMap : hbaseAdapter.getMappingConfigCache().values()) {
-                if (configMap != null) {
-                    configMap.remove(file.getName());
-                }
-            }
-
-        }
     }
 }

+ 73 - 36
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/KuduAdapter.java

@@ -1,16 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.kudu;
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
 import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfigLoader;
@@ -20,8 +9,19 @@ import com.alibaba.otter.canal.client.adapter.kudu.service.KuduSyncService;
 import com.alibaba.otter.canal.client.adapter.kudu.support.KuduTemplate;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.FileName2KeyMapping;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author liuyadong
@@ -35,8 +35,6 @@ public class KuduAdapter implements OuterAdapter {
     private Map<String, KuduMappingConfig>              kuduMapping        = new ConcurrentHashMap<>();                 // 文件名对应配置
     private Map<String, Map<String, KuduMappingConfig>> mappingConfigCache = new ConcurrentHashMap<>();                 // 库名-表名对应配置
 
-    private String                                      dataSourceKey;
-
     private KuduTemplate                                kuduTemplate;
 
     private KuduSyncService                             kuduSyncService;
@@ -45,6 +43,8 @@ public class KuduAdapter implements OuterAdapter {
 
     private Properties                                  envProperties;
 
+    private OuterAdapterConfig                          configuration;
+
     public Map<String, KuduMappingConfig> getKuduMapping() {
         return kuduMapping;
     }
@@ -56,36 +56,17 @@ public class KuduAdapter implements OuterAdapter {
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         this.envProperties = envProperties;
+        this.configuration = configuration;
         Map<String, KuduMappingConfig> kuduMappingTmp = KuduMappingConfigLoader.load(envProperties);
         // 过滤不匹配的key的配置,获取连接key,key为配置文件名称
-        kuduMappingTmp.forEach((key, mappingConfig) -> {
-            if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
-                    .equalsIgnoreCase(configuration.getKey()))) {
-                kuduMapping.put(key, mappingConfig);
-                dataSourceKey = mappingConfig.getDataSourceKey();
-            }
+        kuduMappingTmp.forEach((key, config) -> {
+            addConfig(key, config);
         });
         // 判断目标字段是否为空
         if (kuduMapping.isEmpty()) {
             throw new RuntimeException("No kudu adapter found for config key: " + configuration.getKey());
         }
-        for (Map.Entry<String, KuduMappingConfig> entry : kuduMapping.entrySet()) {
-            String configName = entry.getKey();
-            KuduMappingConfig mappingConfig = entry.getValue();
-            String k;
-            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
-                    + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
-                    + mappingConfig.getKuduMapping().getDatabase() + "-" + mappingConfig.getKuduMapping().getTable();
-            } else {
-                k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
-                    + mappingConfig.getKuduMapping().getDatabase() + "-" + mappingConfig.getKuduMapping().getTable();
-            }
-            Map<String, KuduMappingConfig> configMap = mappingConfigCache.computeIfAbsent(k,
-                k1 -> new ConcurrentHashMap<>());
-            configMap.put(configName, mappingConfig);
-        }
+
         Map<String, String> properties = configuration.getProperties();
 
         String kudu_master = properties.get("kudu.master.address");
@@ -203,4 +184,60 @@ public class KuduAdapter implements OuterAdapter {
         }
         return null;
     }
+
+    private void addSyncConfigToCache(String configName, KuduMappingConfig mappingConfig) {
+        String k;
+        if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+            k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                    + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                    + mappingConfig.getKuduMapping().getDatabase() + "-" + mappingConfig.getKuduMapping().getTable();
+        } else {
+            k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                    + mappingConfig.getKuduMapping().getDatabase() + "-" + mappingConfig.getKuduMapping().getTable();
+        }
+        Map<String, KuduMappingConfig> configMap = mappingConfigCache.computeIfAbsent(k,
+                k1 -> new ConcurrentHashMap<>());
+        configMap.put(configName, mappingConfig);
+    }
+
+    public boolean addConfig(String fileName, KuduMappingConfig config) {
+        if (match(config)) {
+            kuduMapping.put(fileName, config);
+            addSyncConfigToCache(fileName, config);
+            FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName,
+                    configuration.getKey());
+            return true;
+        }
+        return false;
+    }
+
+    public void updateConfig(String fileName, KuduMappingConfig config) {
+        if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey()
+                .equals(configuration.getKey())) {
+            // 理论上不允许改这个 因为本身就是通过这个关联起Adapter和Config的
+            throw new RuntimeException("not allow to change outAdapterKey");
+        }
+        kuduMapping.put(fileName, config);
+        addSyncConfigToCache(fileName, config);
+    }
+
+    public void deleteConfig(String fileName) {
+        kuduMapping.remove(fileName);
+        for (Map<String, KuduMappingConfig> configMap : mappingConfigCache.values()) {
+            if (configMap != null) {
+                configMap.remove(fileName);
+            }
+        }
+        FileName2KeyMapping.unregister(getClass().getAnnotation(SPI.class).value(), fileName);
+    }
+
+    private boolean match(KuduMappingConfig config) {
+        boolean sameMatch = config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
+                .equalsIgnoreCase(configuration.getKey());
+        boolean prefixMatch = config.getOuterAdapterKey() == null && configuration.getKey()
+                .startsWith(StringUtils
+                        .join(new String[]{Util.AUTO_GENERATED_PREFIX, config.getDestination(),
+                                config.getGroupId()}, '-'));
+        return sameMatch || prefixMatch;
+    }
 }

+ 13 - 49
client-adapter/kudu/src/main/java/com/alibaba/otter/canal/client/adapter/kudu/monitor/KuduConfigMonitor.java

@@ -1,24 +1,19 @@
 package com.alibaba.otter.canal.client.adapter.kudu.monitor;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
+import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
-
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
 import org.apache.commons.io.monitor.FileAlterationObserver;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
-import com.alibaba.otter.canal.client.adapter.kudu.KuduAdapter;
-import com.alibaba.otter.canal.client.adapter.kudu.config.KuduMappingConfig;
-import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
-import com.alibaba.otter.canal.client.adapter.support.Util;
-
 /**
  * @author liuyadong
  * @description 配置文件监听
@@ -41,7 +36,7 @@ public class KuduConfigMonitor {
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
-                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+                    FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
             FileListener listener = new FileListener();
             observer.addListener(listener);
             fileMonitor = new FileAlterationMonitor(3000, observer);
@@ -83,9 +78,11 @@ public class KuduConfigMonitor {
                     return;
                 }
                 config.validate();
-                addConfigToCache(file, config);
-
-                logger.info("Add a new kudu mapping config: {} to canal adapter", file.getName());
+                boolean result = kuduAdapter.addConfig(file.getName(), config);
+                if (result) {
+                    logger.info("Add a new kudu mapping config: {} to canal adapter",
+                            file.getName());
+                }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
@@ -113,10 +110,7 @@ public class KuduConfigMonitor {
                         return;
                     }
                     config.validate();
-                    if (kuduAdapter.getKuduMapping().containsKey(file.getName())) {
-                        deleteConfigFromCache(file);
-                    }
-                    addConfigToCache(file, config);
+                    kuduAdapter.updateConfig(file.getName(), config);
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
@@ -129,42 +123,12 @@ public class KuduConfigMonitor {
 
             try {
                 if (kuduAdapter.getKuduMapping().containsKey(file.getName())) {
-                    deleteConfigFromCache(file);
+                    kuduAdapter.deleteConfig(file.getName());
                     logger.info("Delete a hbase mapping config: {} of canal adapter", file.getName());
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
         }
-
-        /**
-         * 添加配置文件信息到缓存
-         *
-         * @param file
-         * @param config
-         */
-        private void addConfigToCache(File file, KuduMappingConfig config) {
-            kuduAdapter.getKuduMapping().put(file.getName(), config);
-            Map<String, KuduMappingConfig> configMap = kuduAdapter.getMappingConfigCache()
-                .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "."
-                                 + config.getKuduMapping().getDatabase() + "." + config.getKuduMapping().getTable(),
-                    k1 -> new HashMap<>());
-            configMap.put(file.getName(), config);
-        }
-
-        /**
-         * 从缓存中删除配置
-         *
-         * @param file 文件
-         */
-        private void deleteConfigFromCache(File file) {
-            kuduAdapter.getKuduMapping().remove(file.getName());
-            for (Map<String, KuduMappingConfig> configMap : kuduAdapter.getMappingConfigCache().values()) {
-                if (configMap != null) {
-                    configMap.remove(file.getName());
-                }
-            }
-        }
-
     }
 }

+ 24 - 15
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -1,5 +1,11 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
+import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -8,7 +14,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -17,12 +22,6 @@ import org.springframework.core.env.Environment;
 import org.springframework.core.env.PropertySource;
 import org.springframework.core.env.StandardEnvironment;
 
-import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-
 /**
  * 外部适配器的加载器
  *
@@ -50,24 +49,34 @@ public class CanalAdapterLoader {
 
         for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
             for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
+                int i = 0;
                 List<List<OuterAdapter>> canalOuterAdapterGroups = new CopyOnWriteArrayList<>();
                 List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>();
+
                 for (OuterAdapterConfig config : group.getOuterAdapters()) {
+                    // 保证一定有key
+                    if (config.getKey() == null) {
+                        String key = StringUtils.join(new String[]{Util.AUTO_GENERATED_PREFIX,
+                                        canalAdapter.getInstance(), group.getGroupId(), String.valueOf(i)},
+                                '-');
+                        config.setKey(key);
+                    }
+                    i++;
                     loadAdapter(config, canalOuterAdapters);
                 }
                 canalOuterAdapterGroups.add(canalOuterAdapters);
 
                 AdapterProcessor adapterProcessor = canalAdapterProcessors.computeIfAbsent(canalAdapter.getInstance()
-                                                                                           + "|"
-                                                                                           + StringUtils.trimToEmpty(group.getGroupId()),
-                    f -> new AdapterProcessor(canalClientConfig,
-                        canalAdapter.getInstance(),
-                        group.getGroupId(),
-                        canalOuterAdapterGroups));
+                                + "|"
+                                + StringUtils.trimToEmpty(group.getGroupId()),
+                        f -> new AdapterProcessor(canalClientConfig,
+                                canalAdapter.getInstance(),
+                                group.getGroupId(),
+                                canalOuterAdapterGroups));
                 adapterProcessor.start();
 
                 logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-"
-                                                                                   + group.getGroupId());
+                        + group.getGroupId());
             }
         }
 
@@ -201,7 +210,7 @@ public class CanalAdapterLoader {
     private void loadAdapter(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
             OuterAdapter adapter;
-            adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey()));
+            adapter = loader.getExtension(config.getName(), config.getKey());
 
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             // 替换ClassLoader

+ 14 - 10
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -1,15 +1,21 @@
 package com.alibaba.otter.canal.adapter.launcher.rest;
 
+import com.alibaba.otter.canal.adapter.launcher.common.EtlLock;
+import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import com.alibaba.otter.canal.client.adapter.support.FileName2KeyMapping;
+import com.alibaba.otter.canal.client.adapter.support.Result;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -19,14 +25,6 @@ import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
-import com.alibaba.otter.canal.adapter.launcher.common.EtlLock;
-import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
-import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
-import com.alibaba.otter.canal.client.adapter.support.Result;
-
 /**
  * 适配器操作Rest
  *
@@ -66,6 +64,9 @@ public class CommonRest {
     @PostMapping("/etl/{type}/{key}/{task}")
     public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
                          @RequestParam(name = "params", required = false) String params) {
+        if (key == null) {
+            key = FileName2KeyMapping.getKey(type, task);
+        }
         OuterAdapter adapter = loader.getExtension(type, key);
         String destination = adapter.getDestination(task);
         String lockKey = destination == null ? task : destination;
@@ -133,6 +134,9 @@ public class CommonRest {
      */
     @GetMapping("/count/{type}/{key}/{task}")
     public Map<String, Object> count(@PathVariable String type, @PathVariable String key, @PathVariable String task) {
+        if (key == null) {
+            key = FileName2KeyMapping.getKey(type, task);
+        }
         OuterAdapter adapter = loader.getExtension(type, key);
         return adapter.count(task);
     }

+ 6 - 6
client-adapter/launcher/src/main/resources/bootstrap.yml

@@ -1,6 +1,6 @@
-#canal:
-#  manager:
-#    jdbc:
-#      url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
-#      username: root
-#      password: 121212
+canal:
+  manager:
+    jdbc:
+      url: jdbc:mysql://rm-bp1yjb2rh8249f1ft90130.mysql.rds.aliyuncs.com:3306/canal_manager?useUnicode=true
+      username: dev
+      password: dev123456

+ 60 - 25
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/PhoenixAdapter.java

@@ -43,7 +43,7 @@ public class PhoenixAdapter implements OuterAdapter {
 
     private Properties envProperties;
 
-
+    private OuterAdapterConfig configuration;
 
     public Map<String, MappingConfig> getPhoenixMapping() {
         return phoenixMapping;
@@ -64,14 +64,11 @@ public class PhoenixAdapter implements OuterAdapter {
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         this.envProperties = envProperties;
+        this.configuration = configuration;
         Map<String, MappingConfig> phoenixMappingTmp = ConfigLoader.load(envProperties);
         // 过滤不匹配的key的配置
-        phoenixMappingTmp.forEach((key, mappingConfig) -> {
-            if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                    || (mappingConfig.getOuterAdapterKey() != null
-                    && mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
-                phoenixMapping.put(key, mappingConfig);
-            }
+        phoenixMappingTmp.forEach((key, config) -> {
+            addConfig(key, config);
         });
 
         if (phoenixMapping.isEmpty()) {
@@ -80,24 +77,6 @@ public class PhoenixAdapter implements OuterAdapter {
             logger.info("[{}]phoenix config mapping: {}", this, phoenixMapping.keySet());
         }
 
-        for (Map.Entry<String, MappingConfig> entry : phoenixMapping.entrySet()) {
-            String configName = entry.getKey();
-            MappingConfig mappingConfig = entry.getValue();
-            String key;
-            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
-                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
-                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable().toLowerCase();
-            } else {
-                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
-                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable().toLowerCase();
-            }
-            Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
-                    k1 -> new ConcurrentHashMap<>());
-            configMap.put(configName, mappingConfig);
-        }
-
-
         Map<String, String> properties = configuration.getProperties();
 
         DriverClass= properties.get("jdbc.driverClassName");
@@ -288,4 +267,60 @@ public class PhoenixAdapter implements OuterAdapter {
             phoenixSyncService.close();
         }
     }
+
+    private void addSyncConfigToCache(String configName, MappingConfig mappingConfig) {
+        String key;
+        if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+            key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                    + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                    + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable().toLowerCase();
+        } else {
+            key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                    + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable().toLowerCase();
+        }
+        Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
+                k1 -> new ConcurrentHashMap<>());
+        configMap.put(configName, mappingConfig);
+    }
+
+    public boolean addConfig(String fileName, MappingConfig config) {
+        if (match(config)) {
+            phoenixMapping.put(fileName, config);
+            addSyncConfigToCache(fileName, config);
+            FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName,
+                    configuration.getKey());
+            return true;
+        }
+        return false;
+    }
+
+    public void updateConfig(String fileName, MappingConfig config) {
+        if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey()
+                .equals(configuration.getKey())) {
+            // 理论上不允许改这个 因为本身就是通过这个关联起Adapter和Config的
+            throw new RuntimeException("not allow to change outAdapterKey");
+        }
+        phoenixMapping.put(fileName, config);
+        addSyncConfigToCache(fileName, config);
+    }
+
+    public void deleteConfig(String fileName) {
+        phoenixMapping.remove(fileName);
+        for (Map<String, MappingConfig> configMap : mappingConfigCache.values()) {
+            if (configMap != null) {
+                configMap.remove(fileName);
+            }
+        }
+        FileName2KeyMapping.unregister(getClass().getAnnotation(SPI.class).value(), fileName);
+    }
+
+    private boolean match(MappingConfig config) {
+        boolean sameMatch = config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
+                .equalsIgnoreCase(configuration.getKey());
+        boolean prefixMatch = config.getOuterAdapterKey() == null && configuration.getKey()
+                .startsWith(StringUtils
+                        .join(new String[]{Util.AUTO_GENERATED_PREFIX, config.getDestination(),
+                                config.getGroupId()}, '-'));
+        return sameMatch || prefixMatch;
+    }
 }

+ 9 - 51
client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/monitor/PhoenixConfigMonitor.java

@@ -1,23 +1,19 @@
 package com.alibaba.otter.canal.client.adapter.phoenix.monitor;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import com.alibaba.otter.canal.client.adapter.phoenix.PhoenixAdapter;
 import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
 import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
 import com.alibaba.otter.canal.client.adapter.support.Util;
+import java.io.File;
+import java.util.Properties;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
 import org.apache.commons.io.monitor.FileAlterationObserver;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
 /**
  * phoenix config monitor
  */
@@ -75,11 +71,10 @@ public class PhoenixConfigMonitor {
                     return;
                 }
                 config.validate();
-                if ((key == null && config.getOuterAdapterKey() == null)
-                        || (key != null && key.equals(config.getOuterAdapterKey()))) {
-                    addConfigToCache(file, config);
-
-                    logger.info("Add a new phoenix mapping config: {} to canal adapter", file.getName());
+                boolean result = phoenixAdapter.addConfig(file.getName(), config);
+                if (result) {
+                    logger.info("Add a new phoenix mapping config: {} to canal adapter",
+                            file.getName());
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
@@ -105,16 +100,7 @@ public class PhoenixConfigMonitor {
                         return;
                     }
                     config.validate();
-                    if ((key == null && config.getOuterAdapterKey() == null)
-                            || (key != null && key.equals(config.getOuterAdapterKey()))) {
-                        if (phoenixAdapter.getPhoenixMapping().containsKey(file.getName())) {
-                            deleteConfigFromCache(file);
-                        }
-                        addConfigToCache(file, config);
-                    } else {
-                        // 不能修改outerAdapterKey
-                        throw new RuntimeException("Outer adapter key not allowed modify");
-                    }
+                    phoenixAdapter.updateConfig(file.getName(), config);
                     logger.info("Change a phoenix mapping config: {} of canal adapter", file.getName());
                 }
             } catch (Exception e) {
@@ -128,40 +114,12 @@ public class PhoenixConfigMonitor {
 
             try {
                 if (phoenixAdapter.getPhoenixMapping().containsKey(file.getName())) {
-                    deleteConfigFromCache(file);
-
+                    phoenixAdapter.deleteConfig(file.getName());
                     logger.info("Delete a phoenix mapping config: {} of canal adapter", file.getName());
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             }
         }
-
-        private void addConfigToCache(File file, MappingConfig mappingConfig) {
-            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
-                return;
-            }
-            phoenixAdapter.getPhoenixMapping().put(file.getName(), mappingConfig);
-            Map<String, MappingConfig> configMap = phoenixAdapter.getMappingConfigCache()
-                    .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
-                                    + mappingConfig.getDbMapping().getDatabase() + "-"
-                                    + mappingConfig.getDbMapping().getTable().toLowerCase(),
-                            k1 -> new HashMap<>());
-            configMap.put(file.getName(), mappingConfig);
-        }
-
-        private void deleteConfigFromCache(File file) {
-            logger.info("deleteConfigFromCache: {}", file.getName());
-            MappingConfig mappingConfig = phoenixAdapter.getPhoenixMapping().remove(file.getName());
-
-            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
-                return;
-            }
-            for (Map<String, MappingConfig> configMap : phoenixAdapter.getMappingConfigCache().values()) {
-                if (configMap != null) {
-                    configMap.remove(file.getName());
-                }
-            }
-        }
     }
 }

+ 70 - 32
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.rdb;
 
+import com.alibaba.otter.canal.client.adapter.support.FileName2KeyMapping;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collections;
@@ -56,6 +57,8 @@ public class RdbAdapter implements OuterAdapter {
 
     private Properties                              envProperties;
 
+    private OuterAdapterConfig                      configuration;
+
     public Map<String, MappingConfig> getRdbMapping() {
         return rdbMapping;
     }
@@ -76,49 +79,21 @@ public class RdbAdapter implements OuterAdapter {
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         this.envProperties = envProperties;
-
+        this.configuration = configuration;
+      
         // 从jdbc url获取db类型
         Map<String, String> properties = configuration.getProperties();
         String dbType = JdbcUtils.getDbType(properties.get("jdbc.url"), null);
-
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load(envProperties);
         // 过滤不匹配的key的配置
-        rdbMappingTmp.forEach((key, mappingConfig) -> {
-            if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
-                    .equalsIgnoreCase(configuration.getKey()))) {
-                rdbMapping.put(key, mappingConfig);
-            }
+        rdbMappingTmp.forEach((key, config) -> {
+            addConfig(key, config);
         });
 
         if (rdbMapping.isEmpty()) {
             throw new RuntimeException("No rdb adapter found for config key: " + configuration.getKey());
         }
 
-        for (Map.Entry<String, MappingConfig> entry : rdbMapping.entrySet()) {
-            String configName = entry.getKey();
-            MappingConfig mappingConfig = entry.getValue();
-            if (!mappingConfig.getDbMapping().getMirrorDb()) {
-                String key;
-                if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                    key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
-                          + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
-                          + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
-                } else {
-                    key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
-                          + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
-                }
-                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
-                    k1 -> new ConcurrentHashMap<>());
-                configMap.put(configName, mappingConfig);
-            } else {
-                // mirrorDB
-                String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                             + mappingConfig.getDbMapping().getDatabase();
-                mirrorDbConfigCache.put(key, MirrorDbConfig.create(configName, mappingConfig));
-            }
-        }
-
         // 初始化连接池
         dataSource = new DruidDataSource();
         dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
@@ -308,4 +283,67 @@ public class RdbAdapter implements OuterAdapter {
             dataSource.close();
         }
     }
+
+    private void addSyncConfigToCache(String configName, MappingConfig mappingConfig) {
+        if (!mappingConfig.getDbMapping().getMirrorDb()) {
+            String key;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+            } else {
+                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+            }
+            Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
+                    k1 -> new ConcurrentHashMap<>());
+            configMap.put(configName, mappingConfig);
+        } else {
+            // mirrorDB
+            String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                    + mappingConfig.getDbMapping().getDatabase();
+            mirrorDbConfigCache.put(key, MirrorDbConfig.create(configName, mappingConfig));
+        }
+    }
+
+    public boolean addConfig(String fileName, MappingConfig config) {
+        if (match(config)) {
+            rdbMapping.put(fileName, config);
+            addSyncConfigToCache(fileName, config);
+            FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName,
+                    configuration.getKey());
+            return true;
+        }
+        return false;
+    }
+
+    public void updateConfig(String fileName, MappingConfig config) {
+        if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey()
+                .equals(configuration.getKey())) {
+            // 理论上不允许改这个 因为本身就是通过这个关联起Adapter和Config的
+            throw new RuntimeException("not allow to change outAdapterKey");
+        }
+        rdbMapping.put(fileName, config);
+        addSyncConfigToCache(fileName, config);
+    }
+
+    public void deleteConfig(String fileName) {
+        rdbMapping.remove(fileName);
+        for (Map<String, MappingConfig> configMap : mappingConfigCache.values()) {
+            if (configMap != null) {
+                configMap.remove(fileName);
+            }
+        }
+        FileName2KeyMapping.unregister(getClass().getAnnotation(SPI.class).value(), fileName);
+    }
+
+    private boolean match(MappingConfig config) {
+        boolean sameMatch = config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
+                .equalsIgnoreCase(configuration.getKey());
+        boolean prefixMatch = config.getOuterAdapterKey() == null && configuration.getKey()
+                .startsWith(StringUtils
+                        .join(new String[]{Util.AUTO_GENERATED_PREFIX, config.getDestination(),
+                                config.getGroupId()}, '-'));
+        return sameMatch || prefixMatch;
+    }
 }

+ 16 - 74
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -1,25 +1,19 @@
 package com.alibaba.otter.canal.client.adapter.rdb.monitor;
 
+import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
-
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
 import org.apache.commons.io.monitor.FileAlterationObserver;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
-import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
-import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
-import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
-import com.alibaba.otter.canal.client.adapter.support.Util;
-
 public class RdbConfigMonitor {
 
     private static final Logger   logger      = LoggerFactory.getLogger(RdbConfigMonitor.class);
@@ -41,7 +35,7 @@ public class RdbConfigMonitor {
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
-                FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
+                    FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter("yml")));
             FileListener listener = new FileListener();
             observer.addListener(listener);
             fileMonitor = new FileAlterationMonitor(3000, observer);
@@ -69,16 +63,15 @@ public class RdbConfigMonitor {
                 // 加载新增的配置文件
                 String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
                 MappingConfig config = YmlConfigBinder
-                    .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                        .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
                 if (config == null) {
                     return;
                 }
                 config.validate();
-                if ((key == null && config.getOuterAdapterKey() == null)
-                    || (key != null && key.equals(config.getOuterAdapterKey()))) {
-                    addConfigToCache(file, config);
-
-                    logger.info("Add a new rdb mapping config: {} to canal adapter", file.getName());
+                boolean result = rdbAdapter.addConfig(file.getName(), config);
+                if (result) {
+                    logger.info("Add a new rdb mapping config: {} to canal adapter",
+                            file.getName());
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
@@ -92,28 +85,19 @@ public class RdbConfigMonitor {
             try {
                 if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
                     // 加载配置文件
-                    String configContent = MappingConfigsLoader
-                        .loadConfig(adapterName + File.separator + file.getName());
+                    String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
                     if (configContent == null) {
                         onFileDelete(file);
                         return;
                     }
                     MappingConfig config = YmlConfigBinder
-                        .bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
+                            .bindYmlToObj(null, configContent, MappingConfig.class, null,
+                                    envProperties);
                     if (config == null) {
                         return;
                     }
                     config.validate();
-                    if ((key == null && config.getOuterAdapterKey() == null)
-                        || (key != null && key.equals(config.getOuterAdapterKey()))) {
-                        if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
-                            deleteConfigFromCache(file);
-                        }
-                        addConfigToCache(file, config);
-                    } else {
-                        // 不能修改outerAdapterKey
-                        throw new RuntimeException("Outer adapter key not allowed modify");
-                    }
+                    rdbAdapter.updateConfig(file.getName(), config);
                     logger.info("Change a rdb mapping config: {} of canal adapter", file.getName());
                 }
             } catch (Exception e) {
@@ -127,7 +111,7 @@ public class RdbConfigMonitor {
 
             try {
                 if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
-                    deleteConfigFromCache(file);
+                    rdbAdapter.deleteConfig(file.getName());
 
                     logger.info("Delete a rdb mapping config: {} of canal adapter", file.getName());
                 }
@@ -135,47 +119,5 @@ public class RdbConfigMonitor {
                 logger.error(e.getMessage(), e);
             }
         }
-
-        private void addConfigToCache(File file, MappingConfig mappingConfig) {
-            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
-                return;
-            }
-            rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
-            if (!mappingConfig.getDbMapping().getMirrorDb()) {
-                Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
-                    .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
-                                     + mappingConfig.getDbMapping().getDatabase() + "-"
-                                     + mappingConfig.getDbMapping().getTable(),
-                        k1 -> new HashMap<>());
-                configMap.put(file.getName(), mappingConfig);
-            } else {
-                Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
-                mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                        + mappingConfig.getDbMapping().getDatabase(),
-                    MirrorDbConfig.create(file.getName(), mappingConfig));
-            }
-        }
-
-        private void deleteConfigFromCache(File file) {
-            MappingConfig mappingConfig = rdbAdapter.getRdbMapping().remove(file.getName());
-
-            if (mappingConfig == null || mappingConfig.getDbMapping() == null) {
-                return;
-            }
-            if (!mappingConfig.getDbMapping().getMirrorDb()) {
-                for (Map<String, MappingConfig> configMap : rdbAdapter.getMappingConfigCache().values()) {
-                    if (configMap != null) {
-                        configMap.remove(file.getName());
-                    }
-                }
-            } else {
-                rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
-                    if (mirrorDbConfig.getFileName().equals(file.getName())) {
-                        rdbAdapter.getMirrorDbConfigCache().remove(key);
-                    }
-                });
-            }
-
-        }
     }
 }

+ 84 - 50
client-adapter/tablestore/src/main/java/com/alibaba/otter/canal/client/adapter/tablestore/TablestoreAdapter.java

@@ -1,6 +1,8 @@
 package com.alibaba.otter.canal.client.adapter.tablestore;
 
 
+import com.alibaba.otter.canal.client.adapter.support.FileName2KeyMapping;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.stream.Collectors;
@@ -55,62 +57,14 @@ public class TablestoreAdapter implements OuterAdapter {
         this.configuration = configuration;
         Map<String, MappingConfig> tablestoreMappingTmp = ConfigLoader.load(envProperties);
         // 过滤不匹配的key的配置
-        tablestoreMappingTmp.forEach((key, mappingConfig) -> {
-            if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
-                    || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
-                    .equalsIgnoreCase(configuration.getKey()))) {
-                tablestoreMapping.put(key, mappingConfig);
-                mappingConfig.getDbMapping().init(mappingConfig);
-            }
+        tablestoreMappingTmp.forEach((key, config) -> {
+            addConfig(key, config);
         });
 
         if (tablestoreMapping.isEmpty()) {
             throw new RuntimeException("No tablestore adapter found for config key: " + configuration.getKey());
         }
 
-        Map<String, String> properties = configuration.getProperties();
-
-        for (Map.Entry<String, MappingConfig> entry : tablestoreMapping.entrySet()) {
-            String configName = entry.getKey();
-            MappingConfig mappingConfig = entry.getValue();
-            String key;
-            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
-                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
-                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
-            } else {
-                key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
-                        + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
-            }
-            Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
-                    k1 -> new ConcurrentHashMap<>());
-            configMap.put(configName, mappingConfig);
-
-
-            // 构建对应的 TableStoreWriter
-            ServiceCredentials credentials = new DefaultCredentials(
-                    properties.get(PropertyConstants.TABLESTORE_ACCESSSECRETID),
-                    properties.get(PropertyConstants.TABLESTORE_ACCESSSECRETKEY)
-            );
-
-
-            WriterConfig config = getWriterConfig(mappingConfig);
-
-            TableStoreWriter writer = new DefaultTableStoreWriter(
-                    properties.get(PropertyConstants.TABLESTORE_ENDPOINT),
-                    credentials,
-                    properties.get(PropertyConstants.TABLESTORE_INSTANCENAME),
-                    mappingConfig.getDbMapping().getTargetTable(),
-                    config,
-                    null
-            );
-
-            Map<String, TableStoreWriter> config2writerMap = writerCache.computeIfAbsent(key,
-                    k1 -> new ConcurrentHashMap<>());
-            config2writerMap.put(configName, writer);
-
-        }
-
         tablestoreSyncService = new TablestoreSyncService();
     }
 
@@ -315,4 +269,84 @@ public class TablestoreAdapter implements OuterAdapter {
             }
         }
     }
+
+    private void addSyncConfigToCache(String configName, MappingConfig mappingConfig) {
+        Map<String, String> properties = configuration.getProperties();
+        String key;
+        if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+            key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                    + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                    + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+        } else {
+            key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                    + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+        }
+        Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
+                k1 -> new ConcurrentHashMap<>());
+        configMap.put(configName, mappingConfig);
+
+
+        // 构建对应的 TableStoreWriter
+        ServiceCredentials credentials = new DefaultCredentials(
+                properties.get(PropertyConstants.TABLESTORE_ACCESSSECRETID),
+                properties.get(PropertyConstants.TABLESTORE_ACCESSSECRETKEY)
+        );
+
+
+        WriterConfig config = getWriterConfig(mappingConfig);
+
+        TableStoreWriter writer = new DefaultTableStoreWriter(
+                properties.get(PropertyConstants.TABLESTORE_ENDPOINT),
+                credentials,
+                properties.get(PropertyConstants.TABLESTORE_INSTANCENAME),
+                mappingConfig.getDbMapping().getTargetTable(),
+                config,
+                null
+        );
+
+        Map<String, TableStoreWriter> config2writerMap = writerCache.computeIfAbsent(key,
+                k1 -> new ConcurrentHashMap<>());
+        config2writerMap.put(configName, writer);
+    }
+
+    public boolean addConfig(String fileName, MappingConfig config) {
+        if (match(config)) {
+            tablestoreMapping.put(fileName, config);
+            addSyncConfigToCache(fileName, config);
+            FileName2KeyMapping.register(getClass().getAnnotation(SPI.class).value(), fileName,
+                    configuration.getKey());
+            return true;
+        }
+        return false;
+    }
+
+    public void updateConfig(String fileName, MappingConfig config) {
+        if (config.getOuterAdapterKey() != null && !config.getOuterAdapterKey()
+                .equals(configuration.getKey())) {
+            // 理论上不允许改这个 因为本身就是通过这个关联起Adapter和Config的
+            throw new RuntimeException("not allow to change outAdapterKey");
+        }
+        tablestoreMapping.put(fileName, config);
+        addSyncConfigToCache(fileName, config);
+    }
+
+    public void deleteConfig(String fileName) {
+        tablestoreMapping.remove(fileName);
+        for (Map<String, MappingConfig> configMap : mappingConfigCache.values()) {
+            if (configMap != null) {
+                configMap.remove(fileName);
+            }
+        }
+        FileName2KeyMapping.unregister(getClass().getAnnotation(SPI.class).value(), fileName);
+    }
+
+    private boolean match(MappingConfig config) {
+        boolean sameMatch = config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
+                .equalsIgnoreCase(configuration.getKey());
+        boolean prefixMatch = config.getOuterAdapterKey() == null && configuration.getKey()
+                .startsWith(StringUtils
+                        .join(new String[]{Util.AUTO_GENERATED_PREFIX, config.getDestination(),
+                                config.getGroupId()}, '-'));
+        return sameMatch || prefixMatch;
+    }
 }