瀏覽代碼

Merge pull request #1289 from rewerma/master

rdb adapter增加mysql镜像db(schema)同步
agapple 6 年之前
父節點
當前提交
689366248d
共有 19 個文件被更改,包括 637 次插入142 次删除
  1. 9 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java
  2. 11 6
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  3. 5 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java
  4. 6 2
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java
  5. 4 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  6. 9 2
      client-adapter/launcher/src/main/resources/application.yml
  7. 1 1
      client-adapter/rdb/pom.xml
  8. 89 22
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  9. 6 2
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/ConfigLoader.java
  10. 34 26
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java
  11. 44 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MirrorDbConfig.java
  12. 21 7
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java
  13. 3 3
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  14. 158 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java
  15. 124 65
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  16. 15 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java
  17. 11 1
      client-adapter/rdb/src/main/resources/rdb/mytest_user.yml
  18. 1 1
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java
  19. 86 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/SqlParserTest.java

+ 9 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -17,6 +17,7 @@ public class Dml implements Serializable {
     private String                    destination;                            // 对应canal的实例或者MQ的topic
     private String                    destination;                            // 对应canal的实例或者MQ的topic
     private String                    database;                               // 数据库或schema
     private String                    database;                               // 数据库或schema
     private String                    table;                                  // 表名
     private String                    table;                                  // 表名
+    private List<String>              pkNames;
     private String                    type;                                   // 类型: INSERT UPDATE DELETE
     private String                    type;                                   // 类型: INSERT UPDATE DELETE
     // binlog executeTime
     // binlog executeTime
     private Long                      es;                                     // 执行耗时
     private Long                      es;                                     // 执行耗时
@@ -50,6 +51,14 @@ public class Dml implements Serializable {
         this.table = table;
         this.table = table;
     }
     }
 
 
+    public List<String> getPkNames() {
+        return pkNames;
+    }
+
+    public void setPkNames(List<String> pkNames) {
+        this.pkNames = pkNames;
+    }
+
     public String getType() {
     public String getType() {
         return type;
         return type;
     }
     }

+ 11 - 6
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -1,11 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.support;
 package com.alibaba.otter.canal.client.adapter.support;
 
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -55,6 +50,8 @@ public class MessageUtil {
 
 
             if (!rowChange.getIsDdl()) {
             if (!rowChange.getIsDdl()) {
                 Set<String> updateSet = new HashSet<>();
                 Set<String> updateSet = new HashSet<>();
+                dml.setPkNames(new ArrayList<>());
+                int i = 0;
                 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                     if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                     if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                         && eventType != CanalEntry.EventType.DELETE) {
                         && eventType != CanalEntry.EventType.DELETE) {
@@ -71,6 +68,11 @@ public class MessageUtil {
                     }
                     }
 
 
                     for (CanalEntry.Column column : columns) {
                     for (CanalEntry.Column column : columns) {
+                        if (i == 0) {
+                            if (column.getIsKey()) {
+                                dml.getPkNames().add(column.getName());
+                            }
+                        }
                         row.put(column.getName(),
                         row.put(column.getName(),
                             JdbcTypeUtil.typeConvert(column.getName(),
                             JdbcTypeUtil.typeConvert(column.getName(),
                                 column.getValue(),
                                 column.getValue(),
@@ -101,6 +103,8 @@ public class MessageUtil {
                             old.add(rowOld);
                             old.add(rowOld);
                         }
                         }
                     }
                     }
+
+                    i++;
                 }
                 }
                 if (!data.isEmpty()) {
                 if (!data.isEmpty()) {
                     dml.setData(data);
                     dml.setData(data);
@@ -134,6 +138,7 @@ public class MessageUtil {
         dml.setDestination(destination);
         dml.setDestination(destination);
         dml.setDatabase(flatMessage.getDatabase());
         dml.setDatabase(flatMessage.getDatabase());
         dml.setTable(flatMessage.getTable());
         dml.setTable(flatMessage.getTable());
+        dml.setPkNames(flatMessage.getPkNames());
         dml.setType(flatMessage.getType());
         dml.setType(flatMessage.getType());
         dml.setTs(flatMessage.getTs());
         dml.setTs(flatMessage.getTs());
         dml.setEs(flatMessage.getEs());
         dml.setEs(flatMessage.getEs());

+ 5 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.es.config;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map;
 
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.Yaml;
@@ -19,6 +20,7 @@ public class ESSyncConfigLoader {
 
 
     private static Logger logger = LoggerFactory.getLogger(ESSyncConfigLoader.class);
     private static Logger logger = LoggerFactory.getLogger(ESSyncConfigLoader.class);
 
 
+    @SuppressWarnings("unchecked")
     public static synchronized Map<String, ESSyncConfig> load() {
     public static synchronized Map<String, ESSyncConfig> load() {
         logger.info("## Start loading es mapping config ... ");
         logger.info("## Start loading es mapping config ... ");
 
 
@@ -26,7 +28,9 @@ public class ESSyncConfigLoader {
 
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("es");
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("es");
         configContentMap.forEach((fileName, content) -> {
         configContentMap.forEach((fileName, content) -> {
-            ESSyncConfig config = new Yaml().loadAs(content, ESSyncConfig.class);
+            Map configMap = new Yaml().loadAs(content, Map.class); // yml自带的对象反射不是很稳定
+            JSONObject configJson = new JSONObject(configMap);
+            ESSyncConfig config = configJson.toJavaObject(ESSyncConfig.class);
             try {
             try {
                 config.validate();
                 config.validate();
             } catch (Exception e) {
             } catch (Exception e) {

+ 6 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.hbase.config;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map;
 
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.Yaml;
@@ -21,9 +22,10 @@ public class MappingConfigLoader {
 
 
     /**
     /**
      * 加载HBase表映射配置
      * 加载HBase表映射配置
-     * 
+     *
      * @return 配置名/配置文件名--对象
      * @return 配置名/配置文件名--对象
      */
      */
+    @SuppressWarnings("unchecked")
     public static Map<String, MappingConfig> load() {
     public static Map<String, MappingConfig> load() {
         logger.info("## Start loading hbase mapping config ... ");
         logger.info("## Start loading hbase mapping config ... ");
 
 
@@ -31,7 +33,9 @@ public class MappingConfigLoader {
 
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("hbase");
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("hbase");
         configContentMap.forEach((fileName, content) -> {
         configContentMap.forEach((fileName, content) -> {
-            MappingConfig config = new Yaml().loadAs(content, MappingConfig.class);
+            Map configMap = new Yaml().loadAs(content, Map.class); // yml自带的对象反射不是很稳定
+            JSONObject configJson = new JSONObject(configMap);
+            MappingConfig config = configJson.toJavaObject(MappingConfig.class);
             try {
             try {
                 config.validate();
                 config.validate();
             } catch (Exception e) {
             } catch (Exception e) {

+ 4 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -180,7 +180,7 @@ public abstract class AbstractCanalAdapterWorker {
 
 
     /**
     /**
      * 分批同步
      * 分批同步
-     * 
+     *
      * @param dmls
      * @param dmls
      * @param adapter
      * @param adapter
      */
      */
@@ -200,7 +200,9 @@ public abstract class AbstractCanalAdapterWorker {
                     len = 0;
                     len = 0;
                 }
                 }
             }
             }
-            adapter.sync(dmlsBatch);
+            if (!dmlsBatch.isEmpty()) {
+                adapter.sync(dmlsBatch);
+            }
         }
         }
     }
     }
 
 

+ 9 - 2
client-adapter/launcher/src/main/resources/application.yml

@@ -15,7 +15,7 @@ spring:
 canal.conf:
 canal.conf:
   canalServerHost: 127.0.0.1:11111
   canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
 #  zookeeperHosts: slave1:2181
-#  mqServers: slave1:6667 #or rocketmq
+#  mqServers: 127.0.0.1:9092 #or rocketmq
 #  flatMessage: true
 #  flatMessage: true
   batchSize: 500
   batchSize: 500
   syncBatchSize: 1000
   syncBatchSize: 1000
@@ -36,6 +36,13 @@ canal.conf:
       outerAdapters:
       outerAdapters:
       - name: logger
       - name: logger
 #      - name: rdb
 #      - name: rdb
+#        key: mysql1
+#        properties:
+#          jdbc.driverClassName: com.mysql.jdbc.Driver
+#          jdbc.url: jdbc:mysql://192.168.0.36/mytest?useUnicode=true
+#          jdbc.username: root
+#          jdbc.password: 121212
+#      - name: rdb
 #        key: oracle1
 #        key: oracle1
 #        properties:
 #        properties:
 #          jdbc.driverClassName: oracle.jdbc.OracleDriver
 #          jdbc.driverClassName: oracle.jdbc.OracleDriver
@@ -59,4 +66,4 @@ canal.conf:
 #      - name: es
 #      - name: es
 #        hosts: 127.0.0.1:9300
 #        hosts: 127.0.0.1:9300
 #        properties:
 #        properties:
-#          cluster.name: elasticsearch
+#          cluster.name: elasticsearch

+ 1 - 1
client-adapter/rdb/pom.xml

@@ -100,4 +100,4 @@
             </plugin>
             </plugin>
         </plugins>
         </plugins>
     </build>
     </build>
-</project>
+</project>

+ 89 - 22
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -2,12 +2,10 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 
 
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.SQLException;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 
 
 import javax.sql.DataSource;
 import javax.sql.DataSource;
 
 
@@ -19,24 +17,33 @@ import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
+import com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import com.alibaba.otter.canal.client.adapter.support.*;
 import com.alibaba.otter.canal.client.adapter.support.*;
 
 
+/**
+ * RDB适配器实现类
+ *
+ * @author rewerma 2018-11-7 下午06:45:49
+ * @version 1.0.0
+ */
 @SPI("rdb")
 @SPI("rdb")
 public class RdbAdapter implements OuterAdapter {
 public class RdbAdapter implements OuterAdapter {
 
 
-    private static Logger                           logger             = LoggerFactory.getLogger(RdbAdapter.class);
+    private static Logger                           logger              = LoggerFactory.getLogger(RdbAdapter.class);
 
 
-    private Map<String, MappingConfig>              rdbMapping         = new HashMap<>();                          // 文件名对应配置
-    private Map<String, Map<String, MappingConfig>> mappingConfigCache = new HashMap<>();                          // 库名-表名对应配置
+    private Map<String, MappingConfig>              rdbMapping          = new ConcurrentHashMap<>();                // 文件名对应配置
+    private Map<String, Map<String, MappingConfig>> mappingConfigCache  = new ConcurrentHashMap<>();                // 库名-表名对应配置
+    private Map<String, MirrorDbConfig>             mirrorDbConfigCache = new ConcurrentHashMap<>();                // 镜像库配置
 
 
     private DruidDataSource                         dataSource;
     private DruidDataSource                         dataSource;
 
 
     private RdbSyncService                          rdbSyncService;
     private RdbSyncService                          rdbSyncService;
-
-    private ExecutorService                         executor           = Executors.newFixedThreadPool(1);
+    private RdbMirrorDbSyncService                  rdbMirrorDbSyncService;
 
 
     private RdbConfigMonitor                        rdbConfigMonitor;
     private RdbConfigMonitor                        rdbConfigMonitor;
 
 
@@ -48,6 +55,15 @@ public class RdbAdapter implements OuterAdapter {
         return mappingConfigCache;
         return mappingConfigCache;
     }
     }
 
 
+    public Map<String, MirrorDbConfig> getMirrorDbConfigCache() {
+        return mirrorDbConfigCache;
+    }
+
+    /**
+     * 初始化方法
+     *
+     * @param configuration 外部适配器配置信息
+     */
     @Override
     @Override
     public void init(OuterAdapterConfig configuration) {
     public void init(OuterAdapterConfig configuration) {
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load();
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load();
@@ -62,14 +78,22 @@ public class RdbAdapter implements OuterAdapter {
         for (Map.Entry<String, MappingConfig> entry : rdbMapping.entrySet()) {
         for (Map.Entry<String, MappingConfig> entry : rdbMapping.entrySet()) {
             String configName = entry.getKey();
             String configName = entry.getKey();
             MappingConfig mappingConfig = entry.getValue();
             MappingConfig mappingConfig = entry.getValue();
-            Map<String, MappingConfig> configMap = mappingConfigCache
-                .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                 + mappingConfig.getDbMapping().getDatabase() + "."
-                                 + mappingConfig.getDbMapping().getTable(),
-                    k1 -> new HashMap<>());
-            configMap.put(configName, mappingConfig);
+            if (!mappingConfig.getDbMapping().getMirrorDb()) {
+                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(
+                    StringUtils.trimToEmpty(mappingConfig.getDestination()) + "." + mappingConfig.getDbMapping()
+                        .getDatabase() + "." + mappingConfig.getDbMapping().getTable(),
+                    k1 -> new ConcurrentHashMap<>());
+                configMap.put(configName, mappingConfig);
+            } else {
+                // mirrorDB
+
+                mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                        + mappingConfig.getDbMapping().getDatabase(),
+                        MirrorDbConfig.create(configName, mappingConfig));
+            }
         }
         }
 
 
+        // 初始化连接池
         Map<String, String> properties = configuration.getProperties();
         Map<String, String> properties = configuration.getProperties();
         dataSource = new DruidDataSource();
         dataSource = new DruidDataSource();
         dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
         dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
@@ -78,7 +102,7 @@ public class RdbAdapter implements OuterAdapter {
         dataSource.setPassword(properties.get("jdbc.password"));
         dataSource.setPassword(properties.get("jdbc.password"));
         dataSource.setInitialSize(1);
         dataSource.setInitialSize(1);
         dataSource.setMinIdle(1);
         dataSource.setMinIdle(1);
-        dataSource.setMaxActive(20);
+        dataSource.setMaxActive(10);
         dataSource.setMaxWait(60000);
         dataSource.setMaxWait(60000);
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
@@ -92,19 +116,49 @@ public class RdbAdapter implements OuterAdapter {
         String threads = properties.get("threads");
         String threads = properties.get("threads");
         // String commitSize = properties.get("commitSize");
         // String commitSize = properties.get("commitSize");
 
 
-        rdbSyncService = new RdbSyncService(mappingConfigCache,
+        rdbSyncService = new RdbSyncService(dataSource, threads != null ? Integer.valueOf(threads) : null);
+
+        rdbMirrorDbSyncService = new RdbMirrorDbSyncService(mirrorDbConfigCache,
             dataSource,
             dataSource,
-            threads != null ? Integer.valueOf(threads) : null);
+            threads != null ? Integer.valueOf(threads) : null,
+            rdbSyncService.getColumnsTypeCache());
 
 
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor.init(configuration.getKey(), this);
         rdbConfigMonitor.init(configuration.getKey(), this);
     }
     }
 
 
+    /**
+     * 同步方法
+     *
+     * @param dmls 数据包
+     */
     @Override
     @Override
     public void sync(List<Dml> dmls) {
     public void sync(List<Dml> dmls) {
-        rdbSyncService.sync(dmls);
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+        Future<Boolean> future1 = executorService.submit(() -> {
+            rdbSyncService.sync(mappingConfigCache, dmls);
+            return true;
+        });
+        Future<Boolean> future2 = executorService.submit(() -> {
+            rdbMirrorDbSyncService.sync(dmls);
+            return true;
+        });
+        try {
+            future1.get();
+            future2.get();
+        } catch (ExecutionException | InterruptedException e) {
+            // ignore
+        }
     }
     }
 
 
+    /**
+     * ETL方法
+     *
+     * @param task 任务名, 对应配置名
+     * @param params etl筛选条件
+     * @return ETL结果
+     */
     @Override
     @Override
     public EtlResult etl(String task, List<String> params) {
     public EtlResult etl(String task, List<String> params) {
         EtlResult etlResult = new EtlResult();
         EtlResult etlResult = new EtlResult();
@@ -153,11 +207,17 @@ public class RdbAdapter implements OuterAdapter {
         return etlResult;
         return etlResult;
     }
     }
 
 
+    /**
+     * 获取总数方法
+     *
+     * @param task 任务名, 对应配置名
+     * @return 总数
+     */
     @Override
     @Override
     public Map<String, Object> count(String task) {
     public Map<String, Object> count(String task) {
         MappingConfig config = rdbMapping.get(task);
         MappingConfig config = rdbMapping.get(task);
         MappingConfig.DbMapping dbMapping = config.getDbMapping();
         MappingConfig.DbMapping dbMapping = config.getDbMapping();
-        String sql = "SELECT COUNT(1) AS cnt FROM " + dbMapping.getTargetTable();
+        String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.getDbTableName(dbMapping);
         Connection conn = null;
         Connection conn = null;
         Map<String, Object> res = new LinkedHashMap<>();
         Map<String, Object> res = new LinkedHashMap<>();
         try {
         try {
@@ -183,11 +243,17 @@ public class RdbAdapter implements OuterAdapter {
                 }
                 }
             }
             }
         }
         }
-        res.put("targetTable", dbMapping.getTargetTable());
+        res.put("targetTable", SyncUtil.getDbTableName(dbMapping));
 
 
         return res;
         return res;
     }
     }
 
 
+    /**
+     * 获取对应canal instance name 或 mq topic
+     *
+     * @param task 任务名, 对应配置名
+     * @return destination
+     */
     @Override
     @Override
     public String getDestination(String task) {
     public String getDestination(String task) {
         MappingConfig config = rdbMapping.get(task);
         MappingConfig config = rdbMapping.get(task);
@@ -197,6 +263,9 @@ public class RdbAdapter implements OuterAdapter {
         return null;
         return null;
     }
     }
 
 
+    /**
+     * 销毁方法
+     */
     @Override
     @Override
     public void destroy() {
     public void destroy() {
         if (rdbConfigMonitor != null) {
         if (rdbConfigMonitor != null) {
@@ -207,8 +276,6 @@ public class RdbAdapter implements OuterAdapter {
             rdbSyncService.close();
             rdbSyncService.close();
         }
         }
 
 
-        executor.shutdown();
-
         if (dataSource != null) {
         if (dataSource != null) {
             dataSource.close();
             dataSource.close();
         }
         }

+ 6 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/ConfigLoader.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.rdb.config;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map;
 
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.Yaml;
@@ -21,9 +22,10 @@ public class ConfigLoader {
 
 
     /**
     /**
      * 加载HBase表映射配置
      * 加载HBase表映射配置
-     * 
+     *
      * @return 配置名/配置文件名--对象
      * @return 配置名/配置文件名--对象
      */
      */
+    @SuppressWarnings("unchecked")
     public static Map<String, MappingConfig> load() {
     public static Map<String, MappingConfig> load() {
         logger.info("## Start loading rdb mapping config ... ");
         logger.info("## Start loading rdb mapping config ... ");
 
 
@@ -31,7 +33,9 @@ public class ConfigLoader {
 
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("rdb");
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("rdb");
         configContentMap.forEach((fileName, content) -> {
         configContentMap.forEach((fileName, content) -> {
-            MappingConfig config = new Yaml().loadAs(content, MappingConfig.class);
+            Map configMap = new Yaml().loadAs(content, Map.class); // yml自带的对象反射不是很稳定
+            JSONObject configJson = new JSONObject(configMap);
+            MappingConfig config = configJson.toJavaObject(MappingConfig.class);
             try {
             try {
                 config.validate();
                 config.validate();
             } catch (Exception e) {
             } catch (Exception e) {

+ 34 - 26
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -1,8 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.rdb.config;
 package com.alibaba.otter.canal.client.adapter.rdb.config;
 
 
-import java.util.LinkedHashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map;
-import java.util.Set;
 
 
 /**
 /**
  * RDB表映射配置
  * RDB表映射配置
@@ -66,30 +65,39 @@ public class MappingConfig {
         if (dbMapping.database == null || dbMapping.database.isEmpty()) {
         if (dbMapping.database == null || dbMapping.database.isEmpty()) {
             throw new NullPointerException("dbMapping.database");
             throw new NullPointerException("dbMapping.database");
         }
         }
-        if (dbMapping.table == null || dbMapping.table.isEmpty()) {
+        if (!dbMapping.getMirrorDb() && (dbMapping.table == null || dbMapping.table.isEmpty())) {
             throw new NullPointerException("dbMapping.table");
             throw new NullPointerException("dbMapping.table");
         }
         }
-        if (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty()) {
+        if (!dbMapping.getMirrorDb() && (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty())) {
             throw new NullPointerException("dbMapping.targetTable");
             throw new NullPointerException("dbMapping.targetTable");
         }
         }
     }
     }
 
 
     public static class DbMapping {
     public static class DbMapping {
 
 
+        private Boolean             mirrorDb    = false;                 // 是否镜像库
         private String              database;                            // 数据库名或schema名
         private String              database;                            // 数据库名或schema名
-        private String              table;                               // 表面名
-        private Map<String, String> targetPk;                            // 目标表主键字段
-        private boolean             mapAll      = false;                 // 映射所有字段
+        private String              table;                               // 表名
+        private Map<String, String> targetPk    = new LinkedHashMap<>(); // 目标表主键字段
+        private Boolean             mapAll      = false;                 // 映射所有字段
+        private String              targetDb;                            // 目标库名
         private String              targetTable;                         // 目标表名
         private String              targetTable;                         // 目标表名
         private Map<String, String> targetColumns;                       // 目标表字段映射
         private Map<String, String> targetColumns;                       // 目标表字段映射
 
 
         private String              etlCondition;                        // etl条件sql
         private String              etlCondition;                        // etl条件sql
 
 
-        private Set<String>         families    = new LinkedHashSet<>(); // column family列表
         private int                 readBatch   = 5000;
         private int                 readBatch   = 5000;
         private int                 commitBatch = 5000;                  // etl等批量提交大小
         private int                 commitBatch = 5000;                  // etl等批量提交大小
 
 
-        // private volatile Map<String, String> allColumns; // mapAll为true,自动设置改字段
+        private Map<String, String> allMapColumns;
+
+        public Boolean getMirrorDb() {
+            return mirrorDb;
+        }
+
+        public void setMirrorDb(Boolean mirrorDb) {
+            this.mirrorDb = mirrorDb;
+        }
 
 
         public String getDatabase() {
         public String getDatabase() {
             return database;
             return database;
@@ -115,14 +123,22 @@ public class MappingConfig {
             this.targetPk = targetPk;
             this.targetPk = targetPk;
         }
         }
 
 
-        public boolean isMapAll() {
+        public Boolean getMapAll() {
             return mapAll;
             return mapAll;
         }
         }
 
 
-        public void setMapAll(boolean mapAll) {
+        public void setMapAll(Boolean mapAll) {
             this.mapAll = mapAll;
             this.mapAll = mapAll;
         }
         }
 
 
+        public String getTargetDb() {
+            return targetDb;
+        }
+
+        public void setTargetDb(String targetDb) {
+            this.targetDb = targetDb;
+        }
+
         public String getTargetTable() {
         public String getTargetTable() {
             return targetTable;
             return targetTable;
         }
         }
@@ -147,14 +163,6 @@ public class MappingConfig {
             this.etlCondition = etlCondition;
             this.etlCondition = etlCondition;
         }
         }
 
 
-        public Set<String> getFamilies() {
-            return families;
-        }
-
-        public void setFamilies(Set<String> families) {
-            this.families = families;
-        }
-
         public int getReadBatch() {
         public int getReadBatch() {
             return readBatch;
             return readBatch;
         }
         }
@@ -171,12 +179,12 @@ public class MappingConfig {
             this.commitBatch = commitBatch;
             this.commitBatch = commitBatch;
         }
         }
 
 
-        // public Map<String, String> getAllColumns() {
-        // return allColumns;
-        // }
-        //
-        // public void setAllColumns(Map<String, String> allColumns) {
-        // this.allColumns = allColumns;
-        // }
+        public Map<String, String> getAllMapColumns() {
+            return allMapColumns;
+        }
+
+        public void setAllMapColumns(Map<String, String> allMapColumns) {
+            this.allMapColumns = allMapColumns;
+        }
     }
     }
 }
 }

+ 44 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MirrorDbConfig.java

@@ -0,0 +1,44 @@
+package com.alibaba.otter.canal.client.adapter.rdb.config;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MirrorDbConfig {
+
+    private String             fileName;
+    private MappingConfig      mappingConfig;
+    Map<String, MappingConfig> tableConfig = new ConcurrentHashMap<>();
+
+    public static MirrorDbConfig create(String fileName, MappingConfig mappingConfig) {
+        return new MirrorDbConfig(fileName, mappingConfig);
+    }
+
+    public MirrorDbConfig(String fileName, MappingConfig mappingConfig){
+        this.fileName = fileName;
+        this.mappingConfig = mappingConfig;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public void setFileName(String fileName) {
+        this.fileName = fileName;
+    }
+
+    public MappingConfig getMappingConfig() {
+        return mappingConfig;
+    }
+
+    public void setMappingConfig(MappingConfig mappingConfig) {
+        this.mappingConfig = mappingConfig;
+    }
+
+    public Map<String, MappingConfig> getTableConfig() {
+        return tableConfig;
+    }
+
+    public void setTableConfig(Map<String, MappingConfig> tableConfig) {
+        this.tableConfig = tableConfig;
+    }
+}

+ 21 - 7
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -4,6 +4,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
 
 
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
@@ -20,7 +21,7 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
 
 
 public class RdbConfigMonitor {
 public class RdbConfigMonitor {
 
 
-    private static final Logger   logger = LoggerFactory.getLogger(RdbConfigMonitor.class);
+    private static final Logger   logger      = LoggerFactory.getLogger(RdbConfigMonitor.class);
 
 
     private static final String   adapterName = "rdb";
     private static final String   adapterName = "rdb";
 
 
@@ -83,7 +84,8 @@ public class RdbConfigMonitor {
             try {
             try {
                 if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
                 if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
                     // 加载配置文件
                     // 加载配置文件
-                    String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                    String configContent = MappingConfigsLoader
+                        .loadConfig(adapterName + File.separator + file.getName());
                     MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                     MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                     config.validate();
                     config.validate();
                     if ((key == null && config.getOuterAdapterKey() == null)
                     if ((key == null && config.getOuterAdapterKey() == null)
@@ -118,13 +120,19 @@ public class RdbConfigMonitor {
             }
             }
         }
         }
 
 
-        private void addConfigToCache(File file, MappingConfig config) {
-            rdbAdapter.getRdbMapping().put(file.getName(), config);
+        private void addConfigToCache(File file, MappingConfig mappingConfig) {
+            rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
             Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
             Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
-                .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "."
-                                 + config.getDbMapping().getDatabase() + "." + config.getDbMapping().getTable(),
+                .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                 + mappingConfig.getDbMapping().getDatabase() + "."
+                                 + mappingConfig.getDbMapping().getTable(),
                     k1 -> new HashMap<>());
                     k1 -> new HashMap<>());
-            configMap.put(file.getName(), config);
+            configMap.put(file.getName(), mappingConfig);
+
+            Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
+            mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                    + mappingConfig.getDbMapping().getDatabase(),
+                MirrorDbConfig.create(file.getName(), mappingConfig));
         }
         }
 
 
         private void deleteConfigFromCache(File file) {
         private void deleteConfigFromCache(File file) {
@@ -136,6 +144,12 @@ public class RdbConfigMonitor {
                 }
                 }
             }
             }
 
 
+            rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
+                if (mirrorDbConfig.getFileName().equals(file.getName())) {
+                    rdbAdapter.getMirrorDbConfigCache().remove(key);
+                }
+            });
+
         }
         }
     }
     }
 }
 }

+ 3 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -109,7 +109,7 @@ public class RdbEtlService {
             logger.info(
             logger.info(
                 dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
                 dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
 
 
-            etlResult.setResultMessage("导入目标表 " + dbMapping.getTargetTable() + " 数据:" + successCount.get() + " 条");
+            etlResult.setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get() + " 条");
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
@@ -187,7 +187,7 @@ public class RdbEtlService {
                     // }
                     // }
 
 
                     StringBuilder insertSql = new StringBuilder();
                     StringBuilder insertSql = new StringBuilder();
-                    insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+                    insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
                     columnsMap
                     columnsMap
                         .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
                         .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
 
 
@@ -209,7 +209,7 @@ public class RdbEtlService {
                             // 删除数据
                             // 删除数据
                             Map<String, Object> values = new LinkedHashMap<>();
                             Map<String, Object> values = new LinkedHashMap<>();
                             StringBuilder deleteSql = new StringBuilder(
                             StringBuilder deleteSql = new StringBuilder(
-                                "DELETE FROM " + dbMapping.getTargetTable() + " WHERE ");
+                                "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
                             appendCondition(dbMapping, deleteSql, values, rs);
                             appendCondition(dbMapping, deleteSql, values, rs);
                             try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
                             try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
                                 int k = 1;
                                 int k = 1;

+ 158 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -0,0 +1,158 @@
+package com.alibaba.otter.canal.client.adapter.rdb.service;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+/**
+ * RDB镜像库同步操作业务
+ *
+ * @author rewerma 2018-12-12 下午011:23
+ * @version 1.0.0
+ */
+public class RdbMirrorDbSyncService {
+
+    private static final Logger         logger = LoggerFactory.getLogger(RdbMirrorDbSyncService.class);
+
+    private Map<String, MirrorDbConfig> mirrorDbConfigCache;                                           // 镜像库配置
+    private DataSource                  dataSource;
+    private RdbSyncService              rdbSyncService;                                                // rdbSyncService代理
+
+    public RdbMirrorDbSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DataSource dataSource,
+                                  Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
+        this.mirrorDbConfigCache = mirrorDbConfigCache;
+        this.dataSource = dataSource;
+        this.rdbSyncService = new RdbSyncService(dataSource, threads, columnsTypeCache);
+    }
+
+    /**
+     * 批量同步方法
+     *
+     * @param dmls 批量 DML
+     */
+    public void sync(List<Dml> dmls) {
+        try {
+            List<Dml> dmlList = new ArrayList<>();
+            for (Dml dml : dmls) {
+                String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String database = dml.getDatabase();
+                MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
+                if (mirrorDbConfig == null) {
+                    continue;
+                }
+                if (StringUtils.isNotEmpty(dml.getSql())) {
+                    // DDL
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                    }
+                    executeDdl(mirrorDbConfig, dml);
+                    rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
+                    mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
+                } else {
+                    // DML
+                    initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
+                    dmlList.add(dml);
+                }
+            }
+            if (!dmlList.isEmpty()) {
+                rdbSyncService.sync(dmlList, dml -> {
+                    MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache
+                        .get(dml.getDestination() + "." + dml.getDatabase());
+                    String destination = StringUtils.trimToEmpty(dml.getDestination());
+                    String database = dml.getDatabase();
+                    String table = dml.getTable();
+                    MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
+
+                    if (config == null) {
+                        return false;
+                    }
+
+                    if (config.getConcurrent()) {
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    } else {
+                        int hash = 0;
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    }
+                    return true;
+                });
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 初始化表配置
+     *
+     * @param key 配置key: destination.database.table
+     * @param baseConfigMap db sync config
+     * @param dml DML
+     */
+    private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDbConfig mirrorDbConfig, Dml dml) {
+        MappingConfig mappingConfig = mirrorDbConfig.getTableConfig().get(key);
+        if (mappingConfig == null) {
+            // 构造表配置
+            mappingConfig = new MappingConfig();
+            mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());
+            mappingConfig.setDestination(baseConfigMap.getDestination());
+            mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey());
+            mappingConfig.setConcurrent(baseConfigMap.getConcurrent());
+            MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();
+            mappingConfig.setDbMapping(dbMapping);
+            dbMapping.setDatabase(dml.getDatabase());
+            dbMapping.setTable(dml.getTable());
+            dbMapping.setTargetDb(dml.getDatabase());
+            dbMapping.setTargetTable(dml.getTable());
+            dbMapping.setMapAll(true);
+            List<String> pkNames = dml.getPkNames();
+            Map<String, String> pkMapping = new LinkedHashMap<>();
+            pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
+            dbMapping.setTargetPk(pkMapping);
+
+            mirrorDbConfig.getTableConfig().put(key, mappingConfig);
+        }
+    }
+
+    /**
+     * DDL 操作
+     *
+     * @param ddl DDL
+     */
+    private void executeDdl(MirrorDbConfig mirrorDbConfig, Dml ddl) {
+        try (Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) {
+            statement.execute(ddl.getSql());
+            // 移除对应配置
+            mirrorDbConfig.getTableConfig().remove(ddl.getTable());
+            if (logger.isTraceEnabled()) {
+                logger.trace("Execute DDL sql: {} for database: {}", ddl.getSql(), ddl.getDatabase());
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+}

+ 124 - 65
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -11,6 +11,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
+import java.util.function.Function;
 
 
 import javax.sql.DataSource;
 import javax.sql.DataSource;
 
 
@@ -36,26 +37,37 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
  */
  */
 public class RdbSyncService {
 public class RdbSyncService {
 
 
-    private static final Logger                     logger             = LoggerFactory.getLogger(RdbSyncService.class);
+    private static final Logger               logger  = LoggerFactory.getLogger(RdbSyncService.class);
 
 
-    private final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
+    // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
+    private Map<String, Map<String, Integer>> columnsTypeCache;
 
 
-    private Map<String, Map<String, MappingConfig>> mappingConfigCache;                                                // 库名-表名对应配置
+    private int                               threads = 3;
 
 
-    private int                                     threads            = 3;
+    private List<SyncItem>[]                  dmlsPartition;
+    private BatchExecutor[]                   batchExecutors;
+    private ExecutorService[]                 executorThreads;
 
 
-    private List<SyncItem>[]                        dmlsPartition;
-    private BatchExecutor[]                         batchExecutors;
-    private ExecutorService[]                       executorThreads;
+    public List<SyncItem>[] getDmlsPartition() {
+        return dmlsPartition;
+    }
+
+    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
+        return columnsTypeCache;
+    }
+
+    @SuppressWarnings("unchecked")
+    public RdbSyncService(DataSource dataSource, Integer threads){
+        this(dataSource, threads, new ConcurrentHashMap<>());
+    }
 
 
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
-    public RdbSyncService(Map<String, Map<String, MappingConfig>> mappingConfigCache, DataSource dataSource,
-                          Integer threads){
+    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
+        this.columnsTypeCache = columnsTypeCache;
         try {
         try {
             if (threads != null) {
             if (threads != null) {
                 this.threads = threads;
                 this.threads = threads;
             }
             }
-            this.mappingConfigCache = mappingConfigCache;
             this.dmlsPartition = new List[this.threads];
             this.dmlsPartition = new List[this.threads];
             this.batchExecutors = new BatchExecutor[this.threads];
             this.batchExecutors = new BatchExecutor[this.threads];
             this.executorThreads = new ExecutorService[this.threads];
             this.executorThreads = new ExecutorService[this.threads];
@@ -69,64 +81,111 @@ public class RdbSyncService {
         }
         }
     }
     }
 
 
-    public void sync(List<Dml> dmls) {
+    /**
+     * 批量同步回调
+     *
+     * @param dmls 批量 DML
+     * @param function 回调方法
+     */
+    public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
         try {
         try {
+            boolean toExecute = false;
             for (Dml dml : dmls) {
             for (Dml dml : dmls) {
-                String destination = StringUtils.trimToEmpty(dml.getDestination());
-                String database = dml.getDatabase();
-                String table = dml.getTable();
-                Map<String, MappingConfig> configMap = mappingConfigCache
-                    .get(destination + "." + database + "." + table);
-
-                if (configMap == null) {
-                    continue;
+                if (!toExecute) {
+                    toExecute = function.apply(dml);
+                } else {
+                    function.apply(dml);
+                }
+            }
+            if (toExecute) {
+                List<Future> futures = new ArrayList<>();
+                for (int i = 0; i < threads; i++) {
+                    int j = i;
+                    futures.add(executorThreads[i].submit(() -> {
+                        dmlsPartition[j]
+                            .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                        batchExecutors[j].commit();
+                        return true;
+                    }));
                 }
                 }
-                for (MappingConfig config : configMap.values()) {
-
-                    if (config.getConcurrent()) {
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
-                    } else {
-                        int hash = Math.abs(Math.abs(config.getDbMapping().getTargetTable().hashCode()) % threads);
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
+
+                futures.forEach(future -> {
+                    try {
+                        future.get();
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
                     }
                     }
+                });
+
+                for (int i = 0; i < threads; i++) {
+                    dmlsPartition[i].clear();
                 }
                 }
             }
             }
-            List<Future> futures = new ArrayList<>();
-            for (int i = 0; i < threads; i++) {
-                int j = i;
-                futures.add(executorThreads[i].submit(() -> {
-                    dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
-                    batchExecutors[j].commit();
-                    return true;
-                }));
-            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 批量同步
+     *
+     * @param mappingConfig 配置集合
+     * @param dmls 批量 DML
+     */
+    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls) {
+        try {
+            sync(dmls, dml -> {
+                if (StringUtils.isNotEmpty(dml.getSql())) {
+                    // DDL
+                    columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+                    return false;
+                } else {
+                    // DML
+                    String destination = StringUtils.trimToEmpty(dml.getDestination());
+                    String database = dml.getDatabase();
+                    String table = dml.getTable();
+                    Map<String, MappingConfig> configMap = mappingConfig
+                        .get(destination + "." + database + "." + table);
+
+                    if (configMap == null) {
+                        return false;
+                    }
 
 
-            futures.forEach(future -> {
-                try {
-                    future.get();
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
+                    boolean executed = false;
+                    for (MappingConfig config : configMap.values()) {
+                        if (config.getConcurrent()) {
+                            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                            singleDmls.forEach(singleDml -> {
+                                int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                                SyncItem syncItem = new SyncItem(config, singleDml);
+                                dmlsPartition[hash].add(syncItem);
+                            });
+                        } else {
+                            int hash = 0;
+                            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                            singleDmls.forEach(singleDml -> {
+                                SyncItem syncItem = new SyncItem(config, singleDml);
+                                dmlsPartition[hash].add(syncItem);
+                            });
+                        }
+                        executed = true;
+                    }
+                    return executed;
                 }
                 }
             });
             });
-
-            for (int i = 0; i < threads; i++) {
-                dmlsPartition[i].clear();
-            }
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);
         }
         }
     }
     }
 
 
-    private void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+    /**
+     * 单条 dml 同步
+     *
+     * @param batchExecutor 批量事务执行器
+     * @param config 对应配置对象
+     * @param dml DML
+     */
+    public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
         try {
         try {
             if (config != null) {
             if (config != null) {
                 String type = dml.getType();
                 String type = dml.getType();
@@ -164,7 +223,7 @@ public class RdbSyncService {
             Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
             Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
 
             StringBuilder insertSql = new StringBuilder();
             StringBuilder insertSql = new StringBuilder();
-            insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+            insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
 
 
             columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
             columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
             int len = insertSql.length();
             int len = insertSql.length();
@@ -228,7 +287,7 @@ public class RdbSyncService {
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
 
             StringBuilder updateSql = new StringBuilder();
             StringBuilder updateSql = new StringBuilder();
-            updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+            updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
             List<Map<String, ?>> values = new ArrayList<>();
             List<Map<String, ?>> values = new ArrayList<>();
             for (String srcColumnName : old.keySet()) {
             for (String srcColumnName : old.keySet()) {
                 List<String> targetColumnNames = new ArrayList<>();
                 List<String> targetColumnNames = new ArrayList<>();
@@ -280,7 +339,7 @@ public class RdbSyncService {
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
 
             StringBuilder sql = new StringBuilder();
             StringBuilder sql = new StringBuilder();
-            sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
+            sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
 
 
             List<Map<String, ?>> values = new ArrayList<>();
             List<Map<String, ?>> values = new ArrayList<>();
             // 拼接主键
             // 拼接主键
@@ -306,14 +365,14 @@ public class RdbSyncService {
     private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
     private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
         DbMapping dbMapping = config.getDbMapping();
         DbMapping dbMapping = config.getDbMapping();
         String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
         String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
-        Map<String, Integer> columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+        Map<String, Integer> columnType = columnsTypeCache.get(cacheKey);
         if (columnType == null) {
         if (columnType == null) {
             synchronized (RdbSyncService.class) {
             synchronized (RdbSyncService.class) {
-                columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+                columnType = columnsTypeCache.get(cacheKey);
                 if (columnType == null) {
                 if (columnType == null) {
                     columnType = new LinkedHashMap<>();
                     columnType = new LinkedHashMap<>();
                     final Map<String, Integer> columnTypeTmp = columnType;
                     final Map<String, Integer> columnTypeTmp = columnType;
-                    String sql = "SELECT * FROM " + dbMapping.getTargetTable() + " WHERE 1=2";
+                    String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE 1=2";
                     Util.sqlRS(conn, sql, rs -> {
                     Util.sqlRS(conn, sql, rs -> {
                         try {
                         try {
                             ResultSetMetaData rsd = rs.getMetaData();
                             ResultSetMetaData rsd = rs.getMetaData();
@@ -321,7 +380,7 @@ public class RdbSyncService {
                             for (int i = 1; i <= columnCount; i++) {
                             for (int i = 1; i <= columnCount; i++) {
                                 columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
                                 columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
                             }
                             }
-                            COLUMNS_TYPE_CACHE.put(cacheKey, columnTypeTmp);
+                            columnsTypeCache.put(cacheKey, columnTypeTmp);
                         } catch (SQLException e) {
                         } catch (SQLException e) {
                             logger.error(e.getMessage(), e);
                             logger.error(e.getMessage(), e);
                         }
                         }
@@ -362,12 +421,12 @@ public class RdbSyncService {
         sql.delete(len - 4, len);
         sql.delete(len - 4, len);
     }
     }
 
 
-    private class SyncItem {
+    public static class SyncItem {
 
 
         private MappingConfig config;
         private MappingConfig config;
         private SingleDml     singleDml;
         private SingleDml     singleDml;
 
 
-        private SyncItem(MappingConfig config, SingleDml singleDml){
+        public SyncItem(MappingConfig config, SingleDml singleDml){
             this.config = config;
             this.config = config;
             this.singleDml = singleDml;
             this.singleDml = singleDml;
         }
         }
@@ -376,11 +435,11 @@ public class RdbSyncService {
     /**
     /**
      * 取主键hash
      * 取主键hash
      */
      */
-    private int pkHash(DbMapping dbMapping, Map<String, Object> d) {
+    public int pkHash(DbMapping dbMapping, Map<String, Object> d) {
         return pkHash(dbMapping, d, null);
         return pkHash(dbMapping, d, null);
     }
     }
 
 
-    private int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
+    public int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
         int hash = 0;
         int hash = 0;
         // 取主键
         // 取主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {

+ 15 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -9,6 +9,7 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map;
 
 
+import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTime;
 
 
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
@@ -21,7 +22,10 @@ public class SyncUtil {
 
 
     public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
     public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
         Map<String, String> columnsMap;
         Map<String, String> columnsMap;
-        if (dbMapping.isMapAll()) {
+        if (dbMapping.getMapAll()) {
+            if (dbMapping.getAllMapColumns() != null) {
+                return dbMapping.getAllMapColumns();
+            }
             columnsMap = new LinkedHashMap<>();
             columnsMap = new LinkedHashMap<>();
             for (String srcColumn : columns) {
             for (String srcColumn : columns) {
                 boolean flag = true;
                 boolean flag = true;
@@ -38,6 +42,7 @@ public class SyncUtil {
                     columnsMap.put(srcColumn, srcColumn);
                     columnsMap.put(srcColumn, srcColumn);
                 }
                 }
             }
             }
+            dbMapping.setAllMapColumns(columnsMap);
         } else {
         } else {
             columnsMap = dbMapping.getTargetColumns();
             columnsMap = dbMapping.getTargetColumns();
         }
         }
@@ -253,4 +258,13 @@ public class SyncUtil {
                 pstmt.setObject(i, value, type);
                 pstmt.setObject(i, value, type);
         }
         }
     }
     }
+
+    public static String getDbTableName(MappingConfig.DbMapping dbMapping) {
+        String result = "";
+        if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) {
+            result += dbMapping.getTargetDb() + ".";
+        }
+        result += dbMapping.getTargetTable();
+        return result;
+    }
 }
 }

+ 11 - 1
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -14,4 +14,14 @@ dbMapping:
 #    name:
 #    name:
 #    role_id:
 #    role_id:
 #    c_time:
 #    c_time:
-#    test1:
+#    test1:
+
+
+# Mirror schema synchronize config
+#dataSourceKey: defaultDS
+#destination: example
+#outerAdapterKey: mysql1
+#concurrent: true
+#dbMapping:
+#  mirrorDb: true
+#  database: mytest

+ 1 - 1
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java

@@ -42,7 +42,7 @@ public class DBTest {
             .prepareStatement("insert into user (id,name,role_id,c_time,test1,test2) values (?,?,?,?,?,?)");
             .prepareStatement("insert into user (id,name,role_id,c_time,test1,test2) values (?,?,?,?,?,?)");
 
 
         java.util.Date now = new java.util.Date();
         java.util.Date now = new java.util.Date();
-        for (int i = 1; i <= 100000; i++) {
+        for (int i = 1; i <= 10000; i++) {
             pstmt.clearParameters();
             pstmt.clearParameters();
             pstmt.setLong(1, (long) i);
             pstmt.setLong(1, (long) i);
             pstmt.setString(2, "test_" + i);
             pstmt.setString(2, "test_" + i);

+ 86 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/SqlParserTest.java

@@ -0,0 +1,86 @@
+//wpackage com.alibaba.otter.canal.client.adapter.rdb.test;
+//
+//import com.alibaba.fastsql.sql.ast.SQLName;
+//import com.alibaba.fastsql.sql.ast.SQLStatement;
+//import com.alibaba.fastsql.sql.ast.statement.SQLCreateTableStatement;
+//import com.alibaba.fastsql.sql.ast.statement.SQLExprTableSource;
+//import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlCreateTableParser;
+//import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
+//import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlOutputVisitor;
+//import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlSchemaStatVisitor;
+//import com.alibaba.fastsql.sql.parser.SQLStatementParser;
+//
+//import java.io.StringWriter;
+//
+//public class SqlParserTest {
+//
+//    public static class TableNameVisitor extends MySqlOutputVisitor {
+//
+//        public TableNameVisitor(Appendable appender){
+//            super(appender);
+//        }
+//
+//        @Override
+//        public boolean visit(SQLExprTableSource x) {
+//            SQLName table = (SQLName) x.getExpr();
+//            String tableName = table.getSimpleName();
+//
+//            // 改写tableName
+//            print0("new_" + tableName.toUpperCase());
+//
+//            return true;
+//        }
+//
+//    }
+//
+//    public static void main(String[] args) {
+//        // String sql = "select * from `mytest`.`t` where id=1 and name=ming group by
+//        // uid limit 1,200 order by ctime";
+//
+//        String sql = "CREATE TABLE `mytest`.`user` (\n" + "  `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+//                     + "  `name` varchar(30) NOT NULL,\n" + "  `c_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,\n"
+//                     + "  `role_id` bigint(20) DEFAULT NULL,\n" + "  `test1` text,\n" + "  `test2` blob,\n"
+//                     + "  `key` varchar(30) DEFAULT NULL,\n" + "  PRIMARY KEY (`id`)\n"
+//                     + ") ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;";
+//
+//        // // 新建 MySQL Parser
+//        // SQLStatementParser parser = new MySqlStatementParser(sql);
+//        //
+//        // // 使用Parser解析生成AST,这里SQLStatement就是AST
+//        // SQLStatement sqlStatement = parser.parseStatement();
+//        //
+//        // MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
+//        // sqlStatement.accept(visitor);
+//        //
+//        // System.out.println("getTables:" + visitor.getTables());
+//        // System.out.println("getParameters:" + visitor.getParameters());
+//        // System.out.println("getOrderByColumns:" + visitor.getOrderByColumns());
+//        // System.out.println("getGroupByColumns:" + visitor.getGroupByColumns());
+//        // System.out.println("---------------------------------------------------------------------------");
+//        //
+//        // // 使用select访问者进行select的关键信息打印
+//        // // SelectPrintVisitor selectPrintVisitor = new SelectPrintVisitor();
+//        // // sqlStatement.accept(selectPrintVisitor);
+//        //
+//        // System.out.println("---------------------------------------------------------------------------");
+//        // // 最终sql输出
+//        // StringWriter out = new StringWriter();
+//        // TableNameVisitor outputVisitor = new TableNameVisitor(out);
+//        // sqlStatement.accept(outputVisitor);
+//        // System.out.println(out.toString());
+//
+//        MySqlCreateTableParser parser1 = new MySqlCreateTableParser(sql);
+//        SQLCreateTableStatement createTableStatement = parser1.parseCreateTable();
+////        MySqlSchemaStatVisitor visitor1 = new MySqlSchemaStatVisitor();
+////        createTableStatement.accept(visitor1);
+//        // visitor1.getTables().forEach((k, v) -> {
+//        // System.out.println(k.);
+//        // System.out.println(v);
+//        // });
+//        // 最终sql输出
+//        StringWriter out = new StringWriter();
+//        TableNameVisitor outputVisitor = new TableNameVisitor(out);
+//        createTableStatement.accept(outputVisitor);
+//        System.out.println(out.toString());
+//    }
+//}