Browse Source

rdb etl count destination 接口适配
es hbase 多源适配

mcy 6 years ago
parent
commit
66912d3d05
18 changed files with 333 additions and 208 deletions
  1. 11 25
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java
  2. 58 8
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  3. 17 7
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java
  4. 6 34
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java
  5. 1 4
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  6. 4 4
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java
  7. 24 13
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java
  8. 24 13
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java
  9. 14 6
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java
  10. 44 26
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java
  11. 14 6
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java
  12. 16 5
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java
  13. 19 18
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  14. 13 3
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java
  15. 3 5
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java
  16. 10 10
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  17. 38 13
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java
  18. 17 8
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

+ 11 - 25
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -13,6 +13,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +41,9 @@ public class ExtensionLoader<T> {
 
     private static final ConcurrentMap<Class<?>, Object>             EXTENSION_INSTANCES        = new ConcurrentHashMap<>();
 
-    private static final ConcurrentMap<String, Object>               EXTENSION_KEY_INSTANCES    = new ConcurrentHashMap<>();
+    private static final ConcurrentMap<String, Object>               EXTENSION_KEY_INSTANCE     = new ConcurrentHashMap<>();
+
+    private static final ConcurrentMap<String, List<?>>              EXTENSION_KEY_INSTANCES    = new ConcurrentHashMap<>();
 
     private final Class<?>                                           type;
 
@@ -124,10 +127,11 @@ public class ExtensionLoader<T> {
         if ("true".equals(name)) {
             return getDefaultExtension();
         }
-        Holder<Object> holder = cachedInstances.get(name + "-" + key);
+        String extKey = name + "-" + StringUtils.trimToEmpty(key);
+        Holder<Object> holder = cachedInstances.get(extKey);
         if (holder == null) {
-            cachedInstances.putIfAbsent(name + "-" + key, new Holder<>());
-            holder = cachedInstances.get(name + "-" + key);
+            cachedInstances.putIfAbsent(extKey, new Holder<>());
+            holder = cachedInstances.get(extKey);
         }
         Object instance = holder.get();
         if (instance == null) {
@@ -182,10 +186,10 @@ public class ExtensionLoader<T> {
                                             + ")  could not be instantiated: class could not be found");
         }
         try {
-            T instance = (T) EXTENSION_KEY_INSTANCES.get(name + "-" + key);
+            T instance = (T) EXTENSION_KEY_INSTANCE.get(name + "-" + key);
             if (instance == null) {
-                EXTENSION_KEY_INSTANCES.putIfAbsent(name + "-" + key, clazz.newInstance());
-                instance = (T) EXTENSION_KEY_INSTANCES.get(name + "-" + key);
+                EXTENSION_KEY_INSTANCE.putIfAbsent(name + "-" + key, clazz.newInstance());
+                instance = (T) EXTENSION_KEY_INSTANCE.get(name + "-" + key);
             }
             return instance;
         } catch (Throwable t) {
@@ -195,24 +199,6 @@ public class ExtensionLoader<T> {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    // public T newInstance(String name) {
-    // Class<?> clazz = getExtensionClasses().get(name);
-    // if (clazz == null) {
-    // throw new IllegalStateException("Extension instance(name: " + name + ",
-    // class: " + type
-    // + ") could not be instantiated: class could not be found");
-    // }
-    // try {
-    // return (T) clazz.newInstance();
-    // } catch (Throwable t) {
-    // throw new IllegalStateException("Extension instance(name: " + name + ",
-    // class: " + type
-    // + ") could not be instantiated: " + t.getMessage(),
-    // t);
-    // }
-    // }
-
     private Map<String, Class<?>> getExtensionClasses() {
         Map<String, Class<?>> classes = cachedClasses.get();
         if (classes == null) {

+ 58 - 8
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -1,9 +1,12 @@
 package com.alibaba.otter.canal.client.adapter.es;
 
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import javax.sql.DataSource;
 
@@ -13,10 +16,13 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 
+import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
+import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
+import com.alibaba.otter.canal.client.adapter.es.config.SqlParser;
 import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
@@ -31,9 +37,12 @@ import com.alibaba.otter.canal.client.adapter.support.*;
 @SPI("es")
 public class ESAdapter implements OuterAdapter {
 
-    private TransportClient transportClient;
+    private Map<String, ESSyncConfig>       esSyncConfig        = new LinkedHashMap<>(); // 文件名对应配置
+    private Map<String, List<ESSyncConfig>> dbTableEsSyncConfig = new LinkedHashMap<>(); // schema-table对应配置
 
-    private ESSyncService   esSyncService;
+    private TransportClient                 transportClient;
+
+    private ESSyncService                   esSyncService;
 
     public TransportClient getTransportClient() {
         return transportClient;
@@ -43,10 +52,48 @@ public class ESAdapter implements OuterAdapter {
         return esSyncService;
     }
 
+    public Map<String, ESSyncConfig> getEsSyncConfig() {
+        return esSyncConfig;
+    }
+
+    public Map<String, List<ESSyncConfig>> getDbTableEsSyncConfig() {
+        return dbTableEsSyncConfig;
+    }
+
     @Override
     public void init(OuterAdapterConfig configuration) {
         try {
-            ESSyncConfigLoader.load();
+            SPI spi = this.getClass().getAnnotation(SPI.class);
+            Map<String, ESSyncConfig> esSyncConfigTmp = ESSyncConfigLoader.load(spi.value());
+            // 过滤不匹配的key的配置
+            esSyncConfigTmp.forEach((key, config) -> {
+                if ((config.getOuterAdapterKey() == null && configuration.getKey() == null)
+                    || config.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey())) {
+                    esSyncConfig.put(key, config);
+                }
+            });
+
+            for (ESSyncConfig config : esSyncConfig.values()) {
+                SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
+                config.getEsMapping().setSchemaItem(schemaItem);
+
+                DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+                if (dataSource == null || dataSource.getUrl() == null) {
+                    throw new RuntimeException("No data source found: " + config.getDataSourceKey());
+                }
+                Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
+                Matcher matcher = pattern.matcher(dataSource.getUrl());
+                if (!matcher.find()) {
+                    throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
+                }
+                String schema = matcher.group(2);
+
+                schemaItem.getAliasTableItems().values().forEach(tableItem -> {
+                    List<ESSyncConfig> esSyncConfigs = dbTableEsSyncConfig
+                        .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new ArrayList<>());
+                    esSyncConfigs.add(config);
+                });
+            }
 
             Map<String, String> properties = configuration.getProperties();
             Settings.Builder settingBuilder = Settings.builder();
@@ -68,13 +115,16 @@ public class ESAdapter implements OuterAdapter {
 
     @Override
     public void sync(Dml dml) {
-        esSyncService.sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = dbTableEsSyncConfig.get(database + "-" + table);
+        esSyncService.sync(esSyncConfigs, dml);
     }
 
     @Override
     public EtlResult etl(String task, List<String> params) {
         EtlResult etlResult = new EtlResult();
-        ESSyncConfig config = ESSyncConfigLoader.getEsSyncConfig().get(task);
+        ESSyncConfig config = esSyncConfig.get(task);
         if (config != null) {
             DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
             ESEtlService esEtlService = new ESEtlService(transportClient, config);
@@ -89,7 +139,7 @@ public class ESAdapter implements OuterAdapter {
             StringBuilder resultMsg = new StringBuilder();
             boolean resSuccess = true;
             // ds不为空说明传入的是datasourceKey
-            for (ESSyncConfig configTmp : ESSyncConfigLoader.getEsSyncConfig().values()) {
+            for (ESSyncConfig configTmp : esSyncConfig.values()) {
                 // 取所有的destination为task的配置
                 if (configTmp.getDestination().equals(task)) {
                     ESEtlService esEtlService = new ESEtlService(transportClient, configTmp);
@@ -119,7 +169,7 @@ public class ESAdapter implements OuterAdapter {
 
     @Override
     public Map<String, Object> count(String task) {
-        ESSyncConfig config = ESSyncConfigLoader.getEsSyncConfig().get(task);
+        ESSyncConfig config = esSyncConfig.get(task);
         ESMapping mapping = config.getEsMapping();
         SearchResponse response = transportClient.prepareSearch(mapping.get_index())
             .setTypes(mapping.get_type())
@@ -142,7 +192,7 @@ public class ESAdapter implements OuterAdapter {
 
     @Override
     public String getDestination(String task) {
-        ESSyncConfig config = ESSyncConfigLoader.getEsSyncConfig().get(task);
+        ESSyncConfig config = esSyncConfig.get(task);
         if (config != null) {
             return config.getDestination();
         }

+ 17 - 7
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfig.java

@@ -13,9 +13,11 @@ import java.util.Map;
  */
 public class ESSyncConfig {
 
-    private String    dataSourceKey; // 数据源key
+    private String    dataSourceKey;   // 数据源key
 
-    private String    destination;   // canal destination
+    private String    outerAdapterKey; // adapter key
+
+    private String    destination;     // canal destination
 
     private ESMapping esMapping;
 
@@ -42,6 +44,14 @@ public class ESSyncConfig {
         this.dataSourceKey = dataSourceKey;
     }
 
+    public String getOuterAdapterKey() {
+        return outerAdapterKey;
+    }
+
+    public void setOuterAdapterKey(String outerAdapterKey) {
+        this.outerAdapterKey = outerAdapterKey;
+    }
+
     public String getDestination() {
         return destination;
     }
@@ -67,15 +77,15 @@ public class ESSyncConfig {
         private String              parent;
         private String              sql;
         // 对象字段, 例: objFields:
-        //              - _labels: array:;
-        private Map<String, String> objFields     = new LinkedHashMap<>();
+        // - _labels: array:;
+        private Map<String, String> objFields       = new LinkedHashMap<>();
         private List<String>        skips           = new ArrayList<>();
         private int                 commitBatch     = 1000;
         private String              etlCondition;
-        private boolean             syncByTimestamp = false;                 // 是否按时间戳定时同步
-        private Long                syncInterval;                            // 同步时间间隔
+        private boolean             syncByTimestamp = false;                // 是否按时间戳定时同步
+        private Long                syncInterval;                           // 同步时间间隔
 
-        private SchemaItem          schemaItem;                              // sql解析结果模型
+        private SchemaItem          schemaItem;                             // sql解析结果模型
 
         public String get_index() {
             return _index;

+ 6 - 34
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -32,24 +32,14 @@ public class ESSyncConfigLoader {
     private static Logger                                   logger              = LoggerFactory
         .getLogger(ESSyncConfigLoader.class);
 
-    private static final String                             BASE_PATH           = "es";
-
-    private static volatile Map<String, ESSyncConfig>       esSyncConfig        = new LinkedHashMap<>(); // 文件名对应配置
-    private static volatile Map<String, List<ESSyncConfig>> dbTableEsSyncConfig = new LinkedHashMap<>(); // schema-table对应配置
-
-    public static Map<String, ESSyncConfig> getEsSyncConfig() {
-        return esSyncConfig;
-    }
+    public static synchronized Map<String, ESSyncConfig> load(String name) {
+        logger.info("## Start loading es mapping config ... ");
 
-    public static Map<String, List<ESSyncConfig>> getDbTableEsSyncConfig() {
-        return dbTableEsSyncConfig;
-    }
+        Map<String, ESSyncConfig> esSyncConfig = new LinkedHashMap<>();
 
-    public static synchronized void load() {
-        logger.info("## Start loading es mapping config ... ");
         Collection<String> configs = AdapterConfigs.get("es");
         if (configs == null) {
-            return;
+            return esSyncConfig;
         }
         for (String c : configs) {
             if (c == null) {
@@ -64,32 +54,13 @@ public class ESSyncConfigLoader {
             String configContent = null;
 
             if (c.endsWith(".yml")) {
-                configContent = readConfigContent(BASE_PATH + "/" + c);
+                configContent = readConfigContent(name + "/" + c);
             }
 
             config = new Yaml().loadAs(configContent, ESSyncConfig.class);
 
             try {
                 config.validate();
-                SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
-                config.getEsMapping().setSchemaItem(schemaItem);
-
-                DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-                if (dataSource == null || dataSource.getUrl() == null) {
-                    throw new RuntimeException("No data source found: " + config.getDataSourceKey());
-                }
-                Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
-                Matcher matcher = pattern.matcher(dataSource.getUrl());
-                if (!matcher.find()) {
-                    throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
-                }
-                String schema = matcher.group(2);
-
-                schemaItem.getAliasTableItems().values().forEach(tableItem -> {
-                    List<ESSyncConfig> esSyncConfigs = dbTableEsSyncConfig
-                        .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new ArrayList<>());
-                    esSyncConfigs.add(config);
-                });
             } catch (Exception e) {
                 throw new RuntimeException("ERROR Config: " + c, e);
             }
@@ -97,6 +68,7 @@ public class ESSyncConfigLoader {
         }
 
         logger.info("## ES mapping config loaded");
+        return esSyncConfig;
     }
 
     private static String readConfigContent(String config) {

+ 1 - 4
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -39,11 +39,8 @@ public class ESSyncService {
         this.esTemplate = esTemplate;
     }
 
-    public void sync(Dml dml) {
+    public void sync(List<ESSyncConfig> esSyncConfigs, Dml dml) {
         long begin = System.currentTimeMillis();
-        String database = dml.getDatabase();
-        String table = dml.getTable();
-        List<ESSyncConfig> esSyncConfigs = ESSyncConfigLoader.getDbTableEsSyncConfig().get(database + "-" + table);
         if (esSyncConfigs != null) {
             if (logger.isTraceEnabled()) {
                 logger.trace("Destination: {}, database:{}, table:{}, type:{}, effect index count: {}",

+ 4 - 4
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/ConfigLoadTest.java

@@ -23,8 +23,7 @@ public class ConfigLoadTest {
 
     @Test
     public void testLoad() {
-        ESSyncConfigLoader.load();
-        Map<String, ESSyncConfig> configMap = ESSyncConfigLoader.getEsSyncConfig();
+        Map<String, ESSyncConfig> configMap = ESSyncConfigLoader.load("es");
         ESSyncConfig config = configMap.get("mytest_user.yml");
         Assert.assertNotNull(config);
         Assert.assertEquals("defaultDS", config.getDataSourceKey());
@@ -34,7 +33,8 @@ public class ConfigLoadTest {
         Assert.assertEquals("id", esMapping.get_id());
         Assert.assertNotNull(esMapping.getSql());
 
-        Map<String, List<ESSyncConfig>> dbTableEsSyncConfig = ESSyncConfigLoader.getDbTableEsSyncConfig();
-        Assert.assertFalse(dbTableEsSyncConfig.isEmpty());
+        // Map<String, List<ESSyncConfig>> dbTableEsSyncConfig =
+        // ESSyncConfigLoader.getDbTableEsSyncConfig();
+        // Assert.assertFalse(dbTableEsSyncConfig.isEmpty());
     }
 }

+ 24 - 13
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSub2Test.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
 import java.util.*;
 
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import org.elasticsearch.action.get.GetResponse;
 import org.junit.After;
 import org.junit.Assert;
@@ -32,9 +33,9 @@ public class LabelSyncJoinSub2Test {
     @Test
     public void test01() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"delete from label where id=1 or id=2");
-        Common.sqlExe(ds,"insert into label (id,user_id,label) values (1,1,'a')");
-        Common.sqlExe(ds,"insert into label (id,user_id,label) values (2,1,'b')");
+        Common.sqlExe(ds, "delete from label where id=1 or id=2");
+        Common.sqlExe(ds, "insert into label (id,user_id,label) values (1,1,'a')");
+        Common.sqlExe(ds, "insert into label (id,user_id,label) values (2,1,'b')");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -46,12 +47,15 @@ public class LabelSyncJoinSub2Test {
         Map<String, Object> data = new LinkedHashMap<>();
         dataList.add(data);
         data.put("id", 2L);
-        data.put("user_id",1L);
+        data.put("user_id", 1L);
         data.put("label", "b");
-
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b;a_", response.getSource().get("_labels"));
@@ -63,7 +67,7 @@ public class LabelSyncJoinSub2Test {
     @Test
     public void test02() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"update label set label='aa' where id=1");
+        Common.sqlExe(ds, "update label set label='aa' where id=1");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -75,7 +79,7 @@ public class LabelSyncJoinSub2Test {
         Map<String, Object> data = new LinkedHashMap<>();
         dataList.add(data);
         data.put("id", 1L);
-        data.put("user_id",1L);
+        data.put("user_id", 1L);
         data.put("label", "aa");
         dml.setData(dataList);
 
@@ -85,7 +89,11 @@ public class LabelSyncJoinSub2Test {
         old.put("label", "v");
         dml.setOld(oldList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b;aa_", response.getSource().get("_labels"));
@@ -97,7 +105,7 @@ public class LabelSyncJoinSub2Test {
     @Test
     public void test03() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"delete from label where id=1");
+        Common.sqlExe(ds, "delete from label where id=1");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -109,12 +117,15 @@ public class LabelSyncJoinSub2Test {
         Map<String, Object> data = new LinkedHashMap<>();
         dataList.add(data);
         data.put("id", 1L);
-        data.put("user_id",1L);
+        data.put("user_id", 1L);
         data.put("label", "a");
-
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b_", response.getSource().get("_labels"));

+ 24 - 13
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/LabelSyncJoinSubTest.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
 import java.util.*;
 
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import org.elasticsearch.action.get.GetResponse;
 import org.junit.After;
 import org.junit.Assert;
@@ -32,9 +33,9 @@ public class LabelSyncJoinSubTest {
     @Test
     public void test01() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"delete from label where id=1 or id=2");
-        Common.sqlExe(ds,"insert into label (id,user_id,label) values (1,1,'a')");
-        Common.sqlExe(ds,"insert into label (id,user_id,label) values (2,1,'b')");
+        Common.sqlExe(ds, "delete from label where id=1 or id=2");
+        Common.sqlExe(ds, "insert into label (id,user_id,label) values (1,1,'a')");
+        Common.sqlExe(ds, "insert into label (id,user_id,label) values (2,1,'b')");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -46,12 +47,15 @@ public class LabelSyncJoinSubTest {
         Map<String, Object> data = new LinkedHashMap<>();
         dataList.add(data);
         data.put("id", 2L);
-        data.put("user_id",1L);
+        data.put("user_id", 1L);
         data.put("label", "b");
-
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b;a", response.getSource().get("_labels"));
@@ -63,7 +67,7 @@ public class LabelSyncJoinSubTest {
     @Test
     public void test02() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"update label set label='aa' where id=1");
+        Common.sqlExe(ds, "update label set label='aa' where id=1");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -75,7 +79,7 @@ public class LabelSyncJoinSubTest {
         Map<String, Object> data = new LinkedHashMap<>();
         dataList.add(data);
         data.put("id", 1L);
-        data.put("user_id",1L);
+        data.put("user_id", 1L);
         data.put("label", "aa");
         dml.setData(dataList);
 
@@ -85,7 +89,11 @@ public class LabelSyncJoinSubTest {
         old.put("label", "a");
         dml.setOld(oldList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b;aa", response.getSource().get("_labels"));
@@ -97,7 +105,7 @@ public class LabelSyncJoinSubTest {
     @Test
     public void test03() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"delete from label where id=1");
+        Common.sqlExe(ds, "delete from label where id=1");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -109,12 +117,15 @@ public class LabelSyncJoinSubTest {
         Map<String, Object> data = new LinkedHashMap<>();
         dataList.add(data);
         data.put("id", 1L);
-        data.put("user_id",1L);
+        data.put("user_id", 1L);
         data.put("label", "a");
-
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("b", response.getSource().get("_labels"));

+ 14 - 6
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOne2Test.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
 import java.util.*;
 
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import org.elasticsearch.action.get.GetResponse;
 import org.junit.After;
 import org.junit.Assert;
@@ -32,8 +33,8 @@ public class RoleSyncJoinOne2Test {
     @Test
     public void test01() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"delete from role where id=1");
-        Common.sqlExe(ds,"insert into role (id,role_name) values (1,'admin')");
+        Common.sqlExe(ds, "delete from role where id=1");
+        Common.sqlExe(ds, "insert into role (id,role_name) values (1,'admin')");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -46,10 +47,13 @@ public class RoleSyncJoinOne2Test {
         dataList.add(data);
         data.put("id", 1L);
         data.put("role_name", "admin");
-
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin_", response.getSource().get("_role_name"));
@@ -61,7 +65,7 @@ public class RoleSyncJoinOne2Test {
     @Test
     public void test02() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"update role set role_name='admin3' where id=1");
+        Common.sqlExe(ds, "update role set role_name='admin3' where id=1");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -82,7 +86,11 @@ public class RoleSyncJoinOne2Test {
         old.put("role_name", "admin");
         dml.setOld(oldList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin3_", response.getSource().get("_role_name"));

+ 44 - 26
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/RoleSyncJoinOneTest.java

@@ -4,6 +4,7 @@ import java.util.*;
 
 import javax.sql.DataSource;
 
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import org.elasticsearch.action.get.GetResponse;
 import org.junit.Assert;
 import org.junit.Before;
@@ -47,7 +48,11 @@ public class RoleSyncJoinOneTest {
 
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin", response.getSource().get("_role_name"));
@@ -80,7 +85,11 @@ public class RoleSyncJoinOneTest {
         old.put("role_name", "admin");
         dml.setOld(oldList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin2", response.getSource().get("_role_name"));
@@ -96,28 +105,32 @@ public class RoleSyncJoinOneTest {
         Common.sqlExe(ds, "insert into role (id,role_name) values (2,'operator')");
         Common.sqlExe(ds, "update user set role_id=2 where id=1");
 
-         Dml dml = new Dml();
-         dml.setDestination("example");
-         dml.setTs(new Date().getTime());
-         dml.setType("UPDATE");
-         dml.setDatabase("mytest");
-         dml.setTable("user");
-         List<Map<String, Object>> dataList = new ArrayList<>();
-         Map<String, Object> data = new LinkedHashMap<>();
-         dataList.add(data);
-         data.put("id", 1L);
-         data.put("role_id", 2L);
-         dml.setData(dataList);
-         List<Map<String, Object>> oldList = new ArrayList<>();
-         Map<String, Object> old = new LinkedHashMap<>();
-         oldList.add(old);
-         old.put("role_id", 1L);
-         dml.setOld(oldList);
-         esAdapter.getEsSyncService().sync(dml);
-
-         GetResponse response =
-         esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
-         Assert.assertEquals("operator", response.getSource().get("_role_name"));
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("role_id", 2L);
+        dml.setData(dataList);
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("role_id", 1L);
+        dml.setOld(oldList);
+
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
+
+        GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
+        Assert.assertEquals("operator", response.getSource().get("_role_name"));
 
         Common.sqlExe(ds, "update user set role_id=1 where id=1");
 
@@ -138,7 +151,8 @@ public class RoleSyncJoinOneTest {
         oldList2.add(old2);
         old2.put("role_id", 2L);
         dml2.setOld(oldList2);
-        esAdapter.getEsSyncService().sync(dml2);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml2);
 
         GetResponse response2 = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("admin2", response2.getSource().get("_role_name"));
@@ -166,7 +180,11 @@ public class RoleSyncJoinOneTest {
 
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertNull(response.getSource().get("_role_name"));

+ 14 - 6
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncJoinOneTest.java

@@ -4,6 +4,7 @@ import java.util.*;
 
 import javax.sql.DataSource;
 
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import org.elasticsearch.action.get.GetResponse;
 import org.junit.After;
 import org.junit.Assert;
@@ -32,8 +33,8 @@ public class UserSyncJoinOneTest {
     @Test
     public void test01() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"delete from user where id=1");
-        Common.sqlExe(ds,"insert into user (id,name,role_id) values (1,'Eric',1)");
+        Common.sqlExe(ds, "delete from user where id=1");
+        Common.sqlExe(ds, "insert into user (id,name,role_id) values (1,'Eric',1)");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -48,10 +49,13 @@ public class UserSyncJoinOneTest {
         data.put("name", "Eric");
         data.put("role_id", 1L);
         data.put("c_time", new Date());
-
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("Eric_", response.getSource().get("_name"));
@@ -63,7 +67,7 @@ public class UserSyncJoinOneTest {
     @Test
     public void test02() {
         DataSource ds = DatasourceConfig.DATA_SOURCES.get("defaultDS");
-        Common.sqlExe(ds,"update user set name='Eric2' where id=1");
+        Common.sqlExe(ds, "update user set name='Eric2' where id=1");
 
         Dml dml = new Dml();
         dml.setDestination("example");
@@ -83,7 +87,11 @@ public class UserSyncJoinOneTest {
         old.put("name", "Eric");
         dml.setOld(oldList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("Eric2_", response.getSource().get("_name"));

+ 16 - 5
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/UserSyncSingleTest.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.client.adapter.es.test.sync;
 
 import java.util.*;
 
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import org.elasticsearch.action.get.GetResponse;
 import org.junit.After;
 import org.junit.Assert;
@@ -42,10 +43,13 @@ public class UserSyncSingleTest {
         data.put("name", "Eric");
         data.put("role_id", 1L);
         data.put("c_time", new Date());
-
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("Eric", response.getSource().get("_name"));
@@ -74,7 +78,11 @@ public class UserSyncSingleTest {
         old.put("name", "Eric");
         dml.setOld(oldList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertEquals("Eric2", response.getSource().get("_name"));
@@ -98,10 +106,13 @@ public class UserSyncSingleTest {
         data.put("name", "Eric");
         data.put("role_id", 1L);
         data.put("c_time", new Date());
-
         dml.setData(dataList);
 
-        esAdapter.getEsSyncService().sync(dml);
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        List<ESSyncConfig> esSyncConfigs = esAdapter.getDbTableEsSyncConfig().get(database + "-" + table);
+
+        esAdapter.getEsSyncService().sync(esSyncConfigs, dml);
 
         GetResponse response = esAdapter.getTransportClient().prepareGet("mytest_user", "_doc", "1").get();
         Assert.assertNull(response.getSource());

+ 19 - 18
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -35,31 +35,32 @@ import com.alibaba.otter.canal.client.adapter.support.*;
 @SPI("hbase")
 public class HbaseAdapter implements OuterAdapter {
 
-    private static Logger                              logger             = LoggerFactory.getLogger(HbaseAdapter.class);
+    private static Logger              logger             = LoggerFactory.getLogger(HbaseAdapter.class);
 
-    private static volatile Map<String, MappingConfig> hbaseMapping       = null;                                       // 文件名对应配置
-    private static volatile Map<String, MappingConfig> mappingConfigCache = null;                                       // 库名-表名对应配置
+    private Map<String, MappingConfig> hbaseMapping       = new HashMap<>();                            // 文件名对应配置
+    private Map<String, MappingConfig> mappingConfigCache = new HashMap<>();                            // 库名-表名对应配置
 
-    private Connection                                 conn;
-    private HbaseSyncService                           hbaseSyncService;
-    private HbaseTemplate                              hbaseTemplate;
+    private Connection                 conn;
+    private HbaseSyncService           hbaseSyncService;
+    private HbaseTemplate              hbaseTemplate;
 
     @Override
     public void init(OuterAdapterConfig configuration) {
         try {
-            if (mappingConfigCache == null) {
-                synchronized (MappingConfig.class) {
-                    if (mappingConfigCache == null) {
-                        hbaseMapping = MappingConfigLoader.load();
-                        mappingConfigCache = new HashMap<>();
-                        for (MappingConfig mappingConfig : hbaseMapping.values()) {
-                            mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                                   + mappingConfig.getHbaseMapping().getDatabase() + "."
-                                                   + mappingConfig.getHbaseMapping().getTable(),
-                                mappingConfig);
-                        }
-                    }
+            SPI spi = this.getClass().getAnnotation(SPI.class);
+            Map<String, MappingConfig> hbaseMappingTmp = MappingConfigLoader.load(spi.value());
+            // 过滤不匹配的key的配置
+            hbaseMappingTmp.forEach((key, mappingConfig) -> {
+                if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
+                    || mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey())) {
+                    hbaseMapping.put(key, mappingConfig);
                 }
+            });
+            for (MappingConfig mappingConfig : hbaseMapping.values()) {
+                mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                       + mappingConfig.getHbaseMapping().getDatabase() + "."
+                                       + mappingConfig.getHbaseMapping().getTable(),
+                    mappingConfig);
             }
 
             Map<String, String> properties = configuration.getProperties();

+ 13 - 3
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -10,11 +10,13 @@ import java.util.*;
  */
 public class MappingConfig {
 
-    private String       dataSourceKey; // 数据源key
+    private String       dataSourceKey;   // 数据源key
 
-    private String       destination;   // canal实例或MQ的topic
+    private String       outerAdapterKey; // adapter key
 
-    private HbaseMapping hbaseMapping;  // hbase映射配置
+    private String       destination;     // canal实例或MQ的topic
+
+    private HbaseMapping hbaseMapping;    // hbase映射配置
 
     public String getDataSourceKey() {
         return dataSourceKey;
@@ -24,6 +26,14 @@ public class MappingConfig {
         this.dataSourceKey = dataSourceKey;
     }
 
+    public String getOuterAdapterKey() {
+        return outerAdapterKey;
+    }
+
+    public void setOuterAdapterKey(String outerAdapterKey) {
+        this.outerAdapterKey = outerAdapterKey;
+    }
+
     public String getDestination() {
         return destination;
     }

+ 3 - 5
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -26,19 +26,17 @@ public class MappingConfigLoader {
 
     private static Logger       logger    = LoggerFactory.getLogger(MappingConfigLoader.class);
 
-    private static final String BASE_PATH = "hbase";
-
     /**
      * 加载HBase表映射配置
      * 
      * @return 配置名/配置文件名--对象
      */
-    public static Map<String, MappingConfig> load() {
+    public static Map<String, MappingConfig> load(String name) {
         logger.info("## Start loading hbase mapping config ... ");
 
         Map<String, MappingConfig> result = new LinkedHashMap<>();
 
-        Collection<String> configs = AdapterConfigs.get("hbase");
+        Collection<String> configs = AdapterConfigs.get(name);
         if (configs == null) {
             return result;
         }
@@ -55,7 +53,7 @@ public class MappingConfigLoader {
             String configContent = null;
 
             if (c.endsWith(".yml")) {
-                configContent = readConfigContent(BASE_PATH + "/" + c);
+                configContent = readConfigContent(name + "/" + c);
             }
 
             // 简单配置database.table@datasourcekey?rowKey=key1,key2

+ 10 - 10
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -33,6 +33,8 @@ public class CanalAdapterLoader {
 
     private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
 
+    private Map<String, OuterAdapter>               outerAdapters = new HashMap<>();                                  // 配置文件对应adapter
+
     private ExtensionLoader<OuterAdapter>           loader;
 
     public CanalAdapterLoader(CanalClientConfig canalClientConfig){
@@ -53,8 +55,8 @@ public class CanalAdapterLoader {
         }
         String zkHosts = this.canalClientConfig.getZookeeperHosts();
 
-        // 初始化canal-client的适配器
         if (canalClientConfig.getCanalInstances() != null) {
+            // 初始化canal-client的适配器
             for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
                 List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
@@ -83,10 +85,8 @@ public class CanalAdapterLoader {
                 worker.start();
                 logger.info("Start adapter for canal instance: {} succeed", instance.getInstance());
             }
-        }
-
-        // 初始化canal-client-mq的适配器
-        if (canalClientConfig.getMqTopics() != null) {
+        } else if (canalClientConfig.getMqTopics() != null) {
+            // 初始化canal-client-mq的适配器
             for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
                 for (CanalClientConfig.MQGroup group : topic.getGroups()) {
                     List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
@@ -124,11 +124,11 @@ public class CanalAdapterLoader {
     private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
             OuterAdapter adapter;
-            if ("rdb".equalsIgnoreCase(config.getName())) {
-                adapter = loader.getExtension(config.getName(), config.getKey());
-            } else {
-                adapter = loader.getExtension(config.getName());
-            }
+            // if ("rdb".equalsIgnoreCase(config.getName())) {
+            adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey()));
+            // } else {
+            // adapter = loader.getExtension(config.getName());
+            // }
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             // 替换ClassLoader
             Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());

+ 38 - 13
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -46,17 +46,17 @@ public class CommonRest {
     }
 
     /**
-     * ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST
-     * 
+     * ETL curl http://127.0.0.1:8081/etl/rdb/oracle1/mytest_user.yml -X POST
+     *
      * @param type 类型 hbase, es
-     * @param task 任务名对应配置文件名 mytest_person2.yml
+     * @param key adapter key
+     * @param task 任务名对应配置文件名 mytest_user.yml
      * @param params etl where条件参数, 为空全部导入
-     * @return
      */
-    @PostMapping("/etl/{type}/{task}")
-    public EtlResult etl(@PathVariable String type, @PathVariable String task,
+    @PostMapping("/etl/{type}/{key}/{task}")
+    public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
                          @RequestParam(name = "params", required = false) String params) {
-        OuterAdapter adapter = loader.getExtension(type);
+        OuterAdapter adapter = loader.getExtension(type, key);
         String destination = adapter.getDestination(task);
         String lockKey = destination == null ? task : destination;
 
@@ -83,12 +83,11 @@ public class CommonRest {
                 }
             }
             try {
-                List<String> paramArr = null;
+                List<String> paramArray = null;
                 if (params != null) {
-                    String[] parmaArray = params.trim().split(";");
-                    paramArr = Arrays.asList(parmaArray);
+                    paramArray = Arrays.asList(params.trim().split(";"));
                 }
-                return adapter.etl(task, paramArr);
+                return adapter.etl(task, paramArray);
             } finally {
                 if (destination != null && oriSwitchStatus != null && oriSwitchStatus) {
                     syncSwitch.on(destination);
@@ -101,6 +100,33 @@ public class CommonRest {
         }
     }
 
+    /**
+     * ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST
+     * 
+     * @param type 类型 hbase, es
+     * @param task 任务名对应配置文件名 mytest_person2.yml
+     * @param params etl where条件参数, 为空全部导入
+     */
+    @PostMapping("/etl/{type}/{task}")
+    public EtlResult etl(@PathVariable String type, @PathVariable String task,
+                         @RequestParam(name = "params", required = false) String params) {
+        return etl(type, null, task, params);
+    }
+
+    /**
+     * 统计总数 curl http://127.0.0.1:8081/count/rdb/oracle1/mytest_user.yml
+     *
+     * @param type 类型 hbase, es
+     * @param key adapter key
+     * @param task 任务名对应配置文件名 mytest_person2.yml
+     * @return
+     */
+    @GetMapping("/count/{type}/{key}/{task}")
+    public Map<String, Object> count(@PathVariable String type, @PathVariable String key, @PathVariable String task) {
+        OuterAdapter adapter = loader.getExtension(type, key);
+        return adapter.count(task);
+    }
+
     /**
      * 统计总数 curl http://127.0.0.1:8081/count/hbase/mytest_person2.yml
      * 
@@ -110,8 +136,7 @@ public class CommonRest {
      */
     @GetMapping("/count/{type}/{task}")
     public Map<String, Object> count(@PathVariable String type, @PathVariable String task) {
-        OuterAdapter adapter = loader.getExtension(type);
-        return adapter.count(task);
+        return count(type, null, task);
     }
 
     /**

+ 17 - 8
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -22,26 +22,26 @@ import com.alibaba.otter.canal.client.adapter.support.*;
 @SPI("rdb")
 public class RdbAdapter implements OuterAdapter {
 
-    private static Logger                       logger     = LoggerFactory.getLogger(RdbAdapter.class);
+    private static Logger              logger             = LoggerFactory.getLogger(RdbAdapter.class);
 
-    private volatile Map<String, MappingConfig> rdbMapping = new HashMap<>();                          // 文件名对应配置
-    private Map<String, MappingConfig>          mappingConfigCache;                                    // 库名-表名对应配置
+    private Map<String, MappingConfig> rdbMapping         = new HashMap<>();                          // 文件名对应配置
+    private Map<String, MappingConfig> mappingConfigCache = new HashMap<>();                          // 库名-表名对应配置
 
-    private DruidDataSource                     dataSource;
+    private DruidDataSource            dataSource;
 
-    private RdbSyncService                      rdbSyncService;
+    private RdbSyncService             rdbSyncService;
 
     @Override
     public void init(OuterAdapterConfig configuration) {
         SPI spi = this.getClass().getAnnotation(SPI.class);
         Map<String, MappingConfig> rdbMappingTmp = MappingConfigLoader.load(spi.value());
-        // 过滤其他key的配置
+        // 过滤不匹配的key的配置
         rdbMappingTmp.forEach((key, mappingConfig) -> {
-            if (mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey())) {
+            if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
+                || mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey())) {
                 rdbMapping.put(key, mappingConfig);
             }
         });
-        mappingConfigCache = new HashMap<>();
         for (MappingConfig mappingConfig : rdbMapping.values()) {
             mappingConfigCache
                 .put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
@@ -213,6 +213,15 @@ public class RdbAdapter implements OuterAdapter {
         return res;
     }
 
+    @Override
+    public String getDestination(String task) {
+        MappingConfig config = rdbMapping.get(task);
+        if (config != null) {
+            return config.getDestination();
+        }
+        return null;
+    }
+
     @Override
     public void destroy() {
         if (dataSource != null) {