浏览代码

Merge remote-tracking branch 'upstream/master'

winger 6 年之前
父节点
当前提交
d8d3246e30
共有 35 个文件被更改,包括 1251 次插入479 次删除
  1. 1 1
      README.md
  2. 18 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java
  3. 15 9
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  4. 5 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java
  5. 6 2
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java
  6. 6 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  7. 9 2
      client-adapter/launcher/src/main/resources/application.yml
  8. 1 1
      client-adapter/rdb/pom.xml
  9. 94 22
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  10. 6 2
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/ConfigLoader.java
  11. 34 26
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java
  12. 44 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MirrorDbConfig.java
  13. 21 7
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java
  14. 3 3
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  15. 158 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java
  16. 124 65
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  17. 15 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java
  18. 11 1
      client-adapter/rdb/src/main/resources/rdb/mytest_user.yml
  19. 1 1
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java
  20. 86 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/SqlParserTest.java
  21. 0 9
      common/src/main/java/com/alibaba/otter/canal/common/utils/AddressUtils.java
  22. 3 3
      deployer/src/main/resources/example/instance.properties
  23. 1 8
      filter/src/main/java/com/alibaba/otter/canal/filter/PatternUtils.java
  24. 1 0
      filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.java
  25. 1 1
      filter/src/main/java/com/alibaba/otter/canal/filter/aviater/RegexFunction.java
  26. 4 30
      instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java
  27. 1 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  28. 6 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java
  29. 18 210
      protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java
  30. 0 1
      protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java
  31. 427 0
      server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java
  32. 8 8
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  33. 29 16
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  34. 93 44
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java
  35. 1 1
      server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

+ 1 - 1
README.md

@@ -75,7 +75,7 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/TableMetaTSDB">TableMetaTSDB</a></li>
 <li><a href="http://alibaba.github.com/canal/release.html">ReleaseNotes</a></li>
 <li><a href="https://github.com/alibaba/canal/releases">Download</a></li>
-<li><a class="internal present" href="/alibaba/canal/wiki/FAQ">FAQ</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/FAQ">FAQ</a></li>
 </ul>
 
 <h1>多语言业务</h1>

+ 18 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -17,6 +17,8 @@ public class Dml implements Serializable {
     private String                    destination;                            // 对应canal的实例或者MQ的topic
     private String                    database;                               // 数据库或schema
     private String                    table;                                  // 表名
+    private List<String>              pkNames;
+    private Boolean                   isDdl;
     private String                    type;                                   // 类型: INSERT UPDATE DELETE
     // binlog executeTime
     private Long                      es;                                     // 执行耗时
@@ -50,6 +52,22 @@ public class Dml implements Serializable {
         this.table = table;
     }
 
+    public List<String> getPkNames() {
+        return pkNames;
+    }
+
+    public void setPkNames(List<String> pkNames) {
+        this.pkNames = pkNames;
+    }
+
+    public Boolean getIsDdl() {
+        return isDdl;
+    }
+
+    public void setIsDdl(Boolean isDdl) {
+        this.isDdl = isDdl;
+    }
+
     public String getType() {
         return type;
     }

+ 15 - 9
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -1,11 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -55,6 +50,8 @@ public class MessageUtil {
 
             if (!rowChange.getIsDdl()) {
                 Set<String> updateSet = new HashSet<>();
+                dml.setPkNames(new ArrayList<>());
+                int i = 0;
                 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                     if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                         && eventType != CanalEntry.EventType.DELETE) {
@@ -71,6 +68,11 @@ public class MessageUtil {
                     }
 
                     for (CanalEntry.Column column : columns) {
+                        if (i == 0) {
+                            if (column.getIsKey()) {
+                                dml.getPkNames().add(column.getName());
+                            }
+                        }
                         row.put(column.getName(),
                             JdbcTypeUtil.typeConvert(column.getName(),
                                 column.getValue(),
@@ -101,6 +103,8 @@ public class MessageUtil {
                             old.add(rowOld);
                         }
                     }
+
+                    i++;
                 }
                 if (!data.isEmpty()) {
                     dml.setData(data);
@@ -134,13 +138,15 @@ public class MessageUtil {
         dml.setDestination(destination);
         dml.setDatabase(flatMessage.getDatabase());
         dml.setTable(flatMessage.getTable());
+        dml.setPkNames(flatMessage.getPkNames());
+        dml.setIsDdl(flatMessage.getIsDdl());
         dml.setType(flatMessage.getType());
         dml.setTs(flatMessage.getTs());
         dml.setEs(flatMessage.getEs());
         dml.setSql(flatMessage.getSql());
-        if (flatMessage.getSqlType() == null || flatMessage.getMysqlType() == null) {
-            throw new RuntimeException("SqlType or mysqlType is null");
-        }
+        // if (flatMessage.getSqlType() == null || flatMessage.getMysqlType() == null) {
+        // throw new RuntimeException("SqlType or mysqlType is null");
+        // }
         List<Map<String, String>> data = flatMessage.getData();
         if (data != null) {
             dml.setData(changeRows(data, flatMessage.getSqlType(), flatMessage.getMysqlType()));

+ 5 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.es.config;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -19,6 +20,7 @@ public class ESSyncConfigLoader {
 
     private static Logger logger = LoggerFactory.getLogger(ESSyncConfigLoader.class);
 
+    @SuppressWarnings("unchecked")
     public static synchronized Map<String, ESSyncConfig> load() {
         logger.info("## Start loading es mapping config ... ");
 
@@ -26,7 +28,9 @@ public class ESSyncConfigLoader {
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("es");
         configContentMap.forEach((fileName, content) -> {
-            ESSyncConfig config = new Yaml().loadAs(content, ESSyncConfig.class);
+            Map configMap = new Yaml().loadAs(content, Map.class); // yml自带的对象反射不是很稳定
+            JSONObject configJson = new JSONObject(configMap);
+            ESSyncConfig config = configJson.toJavaObject(ESSyncConfig.class);
             try {
                 config.validate();
             } catch (Exception e) {

+ 6 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.hbase.config;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -21,9 +22,10 @@ public class MappingConfigLoader {
 
     /**
      * 加载HBase表映射配置
-     * 
+     *
      * @return 配置名/配置文件名--对象
      */
+    @SuppressWarnings("unchecked")
     public static Map<String, MappingConfig> load() {
         logger.info("## Start loading hbase mapping config ... ");
 
@@ -31,7 +33,9 @@ public class MappingConfigLoader {
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("hbase");
         configContentMap.forEach((fileName, content) -> {
-            MappingConfig config = new Yaml().loadAs(content, MappingConfig.class);
+            Map configMap = new Yaml().loadAs(content, Map.class); // yml自带的对象反射不是很稳定
+            JSONObject configJson = new JSONObject(configMap);
+            MappingConfig config = configJson.toJavaObject(MappingConfig.class);
             try {
                 config.validate();
             } catch (Exception e) {

+ 6 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -70,6 +70,7 @@ public abstract class AbstractCanalAdapterWorker {
                     });
                     return true;
                 } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                     return false;
                 }
             }));
@@ -108,6 +109,7 @@ public abstract class AbstractCanalAdapterWorker {
                     });
                     return true;
                 } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                     return false;
                 }
             }));
@@ -178,7 +180,7 @@ public abstract class AbstractCanalAdapterWorker {
 
     /**
      * 分批同步
-     * 
+     *
      * @param dmls
      * @param adapter
      */
@@ -198,7 +200,9 @@ public abstract class AbstractCanalAdapterWorker {
                     len = 0;
                 }
             }
-            adapter.sync(dmlsBatch);
+            if (!dmlsBatch.isEmpty()) {
+                adapter.sync(dmlsBatch);
+            }
         }
     }
 

+ 9 - 2
client-adapter/launcher/src/main/resources/application.yml

@@ -15,7 +15,7 @@ spring:
 canal.conf:
   canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-#  mqServers: slave1:6667 #or rocketmq
+#  mqServers: 127.0.0.1:9092 #or rocketmq
 #  flatMessage: true
   batchSize: 500
   syncBatchSize: 1000
@@ -36,6 +36,13 @@ canal.conf:
       outerAdapters:
       - name: logger
 #      - name: rdb
+#        key: mysql1
+#        properties:
+#          jdbc.driverClassName: com.mysql.jdbc.Driver
+#          jdbc.url: jdbc:mysql://192.168.0.36/mytest?useUnicode=true
+#          jdbc.username: root
+#          jdbc.password: 121212
+#      - name: rdb
 #        key: oracle1
 #        properties:
 #          jdbc.driverClassName: oracle.jdbc.OracleDriver
@@ -59,4 +66,4 @@ canal.conf:
 #      - name: es
 #        hosts: 127.0.0.1:9300
 #        properties:
-#          cluster.name: elasticsearch
+#          cluster.name: elasticsearch

+ 1 - 1
client-adapter/rdb/pom.xml

@@ -100,4 +100,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
+</project>

+ 94 - 22
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -2,12 +2,10 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 
 import javax.sql.DataSource;
 
@@ -19,24 +17,33 @@ import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
+import com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import com.alibaba.otter.canal.client.adapter.support.*;
 
+/**
+ * RDB适配器实现类
+ *
+ * @author rewerma 2018-11-7 下午06:45:49
+ * @version 1.0.0
+ */
 @SPI("rdb")
 public class RdbAdapter implements OuterAdapter {
 
-    private static Logger                           logger             = LoggerFactory.getLogger(RdbAdapter.class);
+    private static Logger                           logger              = LoggerFactory.getLogger(RdbAdapter.class);
 
-    private Map<String, MappingConfig>              rdbMapping         = new HashMap<>();                          // 文件名对应配置
-    private Map<String, Map<String, MappingConfig>> mappingConfigCache = new HashMap<>();                          // 库名-表名对应配置
+    private Map<String, MappingConfig>              rdbMapping          = new ConcurrentHashMap<>();                // 文件名对应配置
+    private Map<String, Map<String, MappingConfig>> mappingConfigCache  = new ConcurrentHashMap<>();                // 库名-表名对应配置
+    private Map<String, MirrorDbConfig>             mirrorDbConfigCache = new ConcurrentHashMap<>();                // 镜像库配置
 
     private DruidDataSource                         dataSource;
 
     private RdbSyncService                          rdbSyncService;
-
-    private ExecutorService                         executor           = Executors.newFixedThreadPool(1);
+    private RdbMirrorDbSyncService                  rdbMirrorDbSyncService;
 
     private RdbConfigMonitor                        rdbConfigMonitor;
 
@@ -48,6 +55,15 @@ public class RdbAdapter implements OuterAdapter {
         return mappingConfigCache;
     }
 
+    public Map<String, MirrorDbConfig> getMirrorDbConfigCache() {
+        return mirrorDbConfigCache;
+    }
+
+    /**
+     * 初始化方法
+     *
+     * @param configuration 外部适配器配置信息
+     */
     @Override
     public void init(OuterAdapterConfig configuration) {
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load();
@@ -59,17 +75,30 @@ public class RdbAdapter implements OuterAdapter {
                 rdbMapping.put(key, mappingConfig);
             }
         });
+
+        if (rdbMapping.isEmpty()) {
+            throw new RuntimeException("No rdb adapter found for config key: " + configuration.getKey());
+        }
+
         for (Map.Entry<String, MappingConfig> entry : rdbMapping.entrySet()) {
             String configName = entry.getKey();
             MappingConfig mappingConfig = entry.getValue();
-            Map<String, MappingConfig> configMap = mappingConfigCache
-                .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                 + mappingConfig.getDbMapping().getDatabase() + "."
-                                 + mappingConfig.getDbMapping().getTable(),
-                    k1 -> new HashMap<>());
-            configMap.put(configName, mappingConfig);
+            if (!mappingConfig.getDbMapping().getMirrorDb()) {
+                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(
+                    StringUtils.trimToEmpty(mappingConfig.getDestination()) + "." + mappingConfig.getDbMapping()
+                        .getDatabase() + "." + mappingConfig.getDbMapping().getTable(),
+                    k1 -> new ConcurrentHashMap<>());
+                configMap.put(configName, mappingConfig);
+            } else {
+                // mirrorDB
+
+                mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                        + mappingConfig.getDbMapping().getDatabase(),
+                    MirrorDbConfig.create(configName, mappingConfig));
+            }
         }
 
+        // 初始化连接池
         Map<String, String> properties = configuration.getProperties();
         dataSource = new DruidDataSource();
         dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
@@ -78,7 +107,7 @@ public class RdbAdapter implements OuterAdapter {
         dataSource.setPassword(properties.get("jdbc.password"));
         dataSource.setInitialSize(1);
         dataSource.setMinIdle(1);
-        dataSource.setMaxActive(20);
+        dataSource.setMaxActive(10);
         dataSource.setMaxWait(60000);
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
@@ -92,19 +121,49 @@ public class RdbAdapter implements OuterAdapter {
         String threads = properties.get("threads");
         // String commitSize = properties.get("commitSize");
 
-        rdbSyncService = new RdbSyncService(mappingConfigCache,
+        rdbSyncService = new RdbSyncService(dataSource, threads != null ? Integer.valueOf(threads) : null);
+
+        rdbMirrorDbSyncService = new RdbMirrorDbSyncService(mirrorDbConfigCache,
             dataSource,
-            threads != null ? Integer.valueOf(threads) : null);
+            threads != null ? Integer.valueOf(threads) : null,
+            rdbSyncService.getColumnsTypeCache());
 
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor.init(configuration.getKey(), this);
     }
 
+    /**
+     * 同步方法
+     *
+     * @param dmls 数据包
+     */
     @Override
     public void sync(List<Dml> dmls) {
-        rdbSyncService.sync(dmls);
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+        Future<Boolean> future1 = executorService.submit(() -> {
+            rdbSyncService.sync(mappingConfigCache, dmls);
+            return true;
+        });
+        Future<Boolean> future2 = executorService.submit(() -> {
+            rdbMirrorDbSyncService.sync(dmls);
+            return true;
+        });
+        try {
+            future1.get();
+            future2.get();
+        } catch (ExecutionException | InterruptedException e) {
+            // ignore
+        }
     }
 
+    /**
+     * ETL方法
+     *
+     * @param task 任务名, 对应配置名
+     * @param params etl筛选条件
+     * @return ETL结果
+     */
     @Override
     public EtlResult etl(String task, List<String> params) {
         EtlResult etlResult = new EtlResult();
@@ -153,11 +212,17 @@ public class RdbAdapter implements OuterAdapter {
         return etlResult;
     }
 
+    /**
+     * 获取总数方法
+     *
+     * @param task 任务名, 对应配置名
+     * @return 总数
+     */
     @Override
     public Map<String, Object> count(String task) {
         MappingConfig config = rdbMapping.get(task);
         MappingConfig.DbMapping dbMapping = config.getDbMapping();
-        String sql = "SELECT COUNT(1) AS cnt FROM " + dbMapping.getTargetTable();
+        String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.getDbTableName(dbMapping);
         Connection conn = null;
         Map<String, Object> res = new LinkedHashMap<>();
         try {
@@ -183,11 +248,17 @@ public class RdbAdapter implements OuterAdapter {
                 }
             }
         }
-        res.put("targetTable", dbMapping.getTargetTable());
+        res.put("targetTable", SyncUtil.getDbTableName(dbMapping));
 
         return res;
     }
 
+    /**
+     * 获取对应canal instance name 或 mq topic
+     *
+     * @param task 任务名, 对应配置名
+     * @return destination
+     */
     @Override
     public String getDestination(String task) {
         MappingConfig config = rdbMapping.get(task);
@@ -197,6 +268,9 @@ public class RdbAdapter implements OuterAdapter {
         return null;
     }
 
+    /**
+     * 销毁方法
+     */
     @Override
     public void destroy() {
         if (rdbConfigMonitor != null) {
@@ -207,8 +281,6 @@ public class RdbAdapter implements OuterAdapter {
             rdbSyncService.close();
         }
 
-        executor.shutdown();
-
         if (dataSource != null) {
             dataSource.close();
         }

+ 6 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/ConfigLoader.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.rdb.config;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -21,9 +22,10 @@ public class ConfigLoader {
 
     /**
      * 加载HBase表映射配置
-     * 
+     *
      * @return 配置名/配置文件名--对象
      */
+    @SuppressWarnings("unchecked")
     public static Map<String, MappingConfig> load() {
         logger.info("## Start loading rdb mapping config ... ");
 
@@ -31,7 +33,9 @@ public class ConfigLoader {
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("rdb");
         configContentMap.forEach((fileName, content) -> {
-            MappingConfig config = new Yaml().loadAs(content, MappingConfig.class);
+            Map configMap = new Yaml().loadAs(content, Map.class); // yml自带的对象反射不是很稳定
+            JSONObject configJson = new JSONObject(configMap);
+            MappingConfig config = configJson.toJavaObject(MappingConfig.class);
             try {
                 config.validate();
             } catch (Exception e) {

+ 34 - 26
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -1,8 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.rdb.config;
 
-import java.util.LinkedHashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * RDB表映射配置
@@ -66,30 +65,39 @@ public class MappingConfig {
         if (dbMapping.database == null || dbMapping.database.isEmpty()) {
             throw new NullPointerException("dbMapping.database");
         }
-        if (dbMapping.table == null || dbMapping.table.isEmpty()) {
+        if (!dbMapping.getMirrorDb() && (dbMapping.table == null || dbMapping.table.isEmpty())) {
             throw new NullPointerException("dbMapping.table");
         }
-        if (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty()) {
+        if (!dbMapping.getMirrorDb() && (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty())) {
             throw new NullPointerException("dbMapping.targetTable");
         }
     }
 
     public static class DbMapping {
 
+        private Boolean             mirrorDb    = false;                 // 是否镜像库
         private String              database;                            // 数据库名或schema名
-        private String              table;                               // 表面名
-        private Map<String, String> targetPk;                            // 目标表主键字段
-        private boolean             mapAll      = false;                 // 映射所有字段
+        private String              table;                               // 表名
+        private Map<String, String> targetPk    = new LinkedHashMap<>(); // 目标表主键字段
+        private Boolean             mapAll      = false;                 // 映射所有字段
+        private String              targetDb;                            // 目标库名
         private String              targetTable;                         // 目标表名
         private Map<String, String> targetColumns;                       // 目标表字段映射
 
         private String              etlCondition;                        // etl条件sql
 
-        private Set<String>         families    = new LinkedHashSet<>(); // column family列表
         private int                 readBatch   = 5000;
         private int                 commitBatch = 5000;                  // etl等批量提交大小
 
-        // private volatile Map<String, String> allColumns; // mapAll为true,自动设置改字段
+        private Map<String, String> allMapColumns;
+
+        public Boolean getMirrorDb() {
+            return mirrorDb;
+        }
+
+        public void setMirrorDb(Boolean mirrorDb) {
+            this.mirrorDb = mirrorDb;
+        }
 
         public String getDatabase() {
             return database;
@@ -115,14 +123,22 @@ public class MappingConfig {
             this.targetPk = targetPk;
         }
 
-        public boolean isMapAll() {
+        public Boolean getMapAll() {
             return mapAll;
         }
 
-        public void setMapAll(boolean mapAll) {
+        public void setMapAll(Boolean mapAll) {
             this.mapAll = mapAll;
         }
 
+        public String getTargetDb() {
+            return targetDb;
+        }
+
+        public void setTargetDb(String targetDb) {
+            this.targetDb = targetDb;
+        }
+
         public String getTargetTable() {
             return targetTable;
         }
@@ -147,14 +163,6 @@ public class MappingConfig {
             this.etlCondition = etlCondition;
         }
 
-        public Set<String> getFamilies() {
-            return families;
-        }
-
-        public void setFamilies(Set<String> families) {
-            this.families = families;
-        }
-
         public int getReadBatch() {
             return readBatch;
         }
@@ -171,12 +179,12 @@ public class MappingConfig {
             this.commitBatch = commitBatch;
         }
 
-        // public Map<String, String> getAllColumns() {
-        // return allColumns;
-        // }
-        //
-        // public void setAllColumns(Map<String, String> allColumns) {
-        // this.allColumns = allColumns;
-        // }
+        public Map<String, String> getAllMapColumns() {
+            return allMapColumns;
+        }
+
+        public void setAllMapColumns(Map<String, String> allMapColumns) {
+            this.allMapColumns = allMapColumns;
+        }
     }
 }

+ 44 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MirrorDbConfig.java

@@ -0,0 +1,44 @@
+package com.alibaba.otter.canal.client.adapter.rdb.config;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MirrorDbConfig {
+
+    private String             fileName;
+    private MappingConfig      mappingConfig;
+    Map<String, MappingConfig> tableConfig = new ConcurrentHashMap<>();
+
+    public static MirrorDbConfig create(String fileName, MappingConfig mappingConfig) {
+        return new MirrorDbConfig(fileName, mappingConfig);
+    }
+
+    public MirrorDbConfig(String fileName, MappingConfig mappingConfig){
+        this.fileName = fileName;
+        this.mappingConfig = mappingConfig;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public void setFileName(String fileName) {
+        this.fileName = fileName;
+    }
+
+    public MappingConfig getMappingConfig() {
+        return mappingConfig;
+    }
+
+    public void setMappingConfig(MappingConfig mappingConfig) {
+        this.mappingConfig = mappingConfig;
+    }
+
+    public Map<String, MappingConfig> getTableConfig() {
+        return tableConfig;
+    }
+
+    public void setTableConfig(Map<String, MappingConfig> tableConfig) {
+        this.tableConfig = tableConfig;
+    }
+}

+ 21 - 7
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -4,6 +4,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
@@ -20,7 +21,7 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
 
 public class RdbConfigMonitor {
 
-    private static final Logger   logger = LoggerFactory.getLogger(RdbConfigMonitor.class);
+    private static final Logger   logger      = LoggerFactory.getLogger(RdbConfigMonitor.class);
 
     private static final String   adapterName = "rdb";
 
@@ -83,7 +84,8 @@ public class RdbConfigMonitor {
             try {
                 if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
                     // 加载配置文件
-                    String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                    String configContent = MappingConfigsLoader
+                        .loadConfig(adapterName + File.separator + file.getName());
                     MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                     config.validate();
                     if ((key == null && config.getOuterAdapterKey() == null)
@@ -118,13 +120,19 @@ public class RdbConfigMonitor {
             }
         }
 
-        private void addConfigToCache(File file, MappingConfig config) {
-            rdbAdapter.getRdbMapping().put(file.getName(), config);
+        private void addConfigToCache(File file, MappingConfig mappingConfig) {
+            rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
             Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
-                .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "."
-                                 + config.getDbMapping().getDatabase() + "." + config.getDbMapping().getTable(),
+                .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                 + mappingConfig.getDbMapping().getDatabase() + "."
+                                 + mappingConfig.getDbMapping().getTable(),
                     k1 -> new HashMap<>());
-            configMap.put(file.getName(), config);
+            configMap.put(file.getName(), mappingConfig);
+
+            Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
+            mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                    + mappingConfig.getDbMapping().getDatabase(),
+                MirrorDbConfig.create(file.getName(), mappingConfig));
         }
 
         private void deleteConfigFromCache(File file) {
@@ -136,6 +144,12 @@ public class RdbConfigMonitor {
                 }
             }
 
+            rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
+                if (mirrorDbConfig.getFileName().equals(file.getName())) {
+                    rdbAdapter.getMirrorDbConfigCache().remove(key);
+                }
+            });
+
         }
     }
 }

+ 3 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -109,7 +109,7 @@ public class RdbEtlService {
             logger.info(
                 dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
 
-            etlResult.setResultMessage("导入目标表 " + dbMapping.getTargetTable() + " 数据:" + successCount.get() + " 条");
+            etlResult.setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get() + " 条");
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
@@ -187,7 +187,7 @@ public class RdbEtlService {
                     // }
 
                     StringBuilder insertSql = new StringBuilder();
-                    insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+                    insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
                     columnsMap
                         .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
 
@@ -209,7 +209,7 @@ public class RdbEtlService {
                             // 删除数据
                             Map<String, Object> values = new LinkedHashMap<>();
                             StringBuilder deleteSql = new StringBuilder(
-                                "DELETE FROM " + dbMapping.getTargetTable() + " WHERE ");
+                                "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
                             appendCondition(dbMapping, deleteSql, values, rs);
                             try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
                                 int k = 1;

+ 158 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -0,0 +1,158 @@
+package com.alibaba.otter.canal.client.adapter.rdb.service;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+/**
+ * RDB镜像库同步操作业务
+ *
+ * @author rewerma 2018-12-12 下午011:23
+ * @version 1.0.0
+ */
+public class RdbMirrorDbSyncService {
+
+    private static final Logger         logger = LoggerFactory.getLogger(RdbMirrorDbSyncService.class);
+
+    private Map<String, MirrorDbConfig> mirrorDbConfigCache;                                           // 镜像库配置
+    private DataSource                  dataSource;
+    private RdbSyncService              rdbSyncService;                                                // rdbSyncService代理
+
+    public RdbMirrorDbSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DataSource dataSource,
+                                  Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
+        this.mirrorDbConfigCache = mirrorDbConfigCache;
+        this.dataSource = dataSource;
+        this.rdbSyncService = new RdbSyncService(dataSource, threads, columnsTypeCache);
+    }
+
+    /**
+     * 批量同步方法
+     *
+     * @param dmls 批量 DML
+     */
+    public void sync(List<Dml> dmls) {
+        try {
+            List<Dml> dmlList = new ArrayList<>();
+            for (Dml dml : dmls) {
+                String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String database = dml.getDatabase();
+                MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
+                if (mirrorDbConfig == null) {
+                    continue;
+                }
+                if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
+                    // DDL
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                    }
+                    executeDdl(mirrorDbConfig, dml);
+                    rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
+                    mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
+                } else {
+                    // DML
+                    initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
+                    dmlList.add(dml);
+                }
+            }
+            if (!dmlList.isEmpty()) {
+                rdbSyncService.sync(dmlList, dml -> {
+                    MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache
+                        .get(dml.getDestination() + "." + dml.getDatabase());
+                    String destination = StringUtils.trimToEmpty(dml.getDestination());
+                    String database = dml.getDatabase();
+                    String table = dml.getTable();
+                    MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
+
+                    if (config == null) {
+                        return false;
+                    }
+
+                    if (config.getConcurrent()) {
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    } else {
+                        int hash = 0;
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    }
+                    return true;
+                });
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 初始化表配置
+     *
+     * @param key 配置key: destination.database.table
+     * @param baseConfigMap db sync config
+     * @param dml DML
+     */
+    private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDbConfig mirrorDbConfig, Dml dml) {
+        MappingConfig mappingConfig = mirrorDbConfig.getTableConfig().get(key);
+        if (mappingConfig == null) {
+            // 构造表配置
+            mappingConfig = new MappingConfig();
+            mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());
+            mappingConfig.setDestination(baseConfigMap.getDestination());
+            mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey());
+            mappingConfig.setConcurrent(baseConfigMap.getConcurrent());
+            MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();
+            mappingConfig.setDbMapping(dbMapping);
+            dbMapping.setDatabase(dml.getDatabase());
+            dbMapping.setTable(dml.getTable());
+            dbMapping.setTargetDb(dml.getDatabase());
+            dbMapping.setTargetTable(dml.getTable());
+            dbMapping.setMapAll(true);
+            List<String> pkNames = dml.getPkNames();
+            Map<String, String> pkMapping = new LinkedHashMap<>();
+            pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
+            dbMapping.setTargetPk(pkMapping);
+
+            mirrorDbConfig.getTableConfig().put(key, mappingConfig);
+        }
+    }
+
+    /**
+     * DDL 操作
+     *
+     * @param ddl DDL
+     */
+    private void executeDdl(MirrorDbConfig mirrorDbConfig, Dml ddl) {
+        try (Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) {
+            statement.execute(ddl.getSql());
+            // 移除对应配置
+            mirrorDbConfig.getTableConfig().remove(ddl.getTable());
+            if (logger.isTraceEnabled()) {
+                logger.trace("Execute DDL sql: {} for database: {}", ddl.getSql(), ddl.getDatabase());
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+}

+ 124 - 65
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -11,6 +11,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.function.Function;
 
 import javax.sql.DataSource;
 
@@ -36,26 +37,37 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
  */
 public class RdbSyncService {
 
-    private static final Logger                     logger             = LoggerFactory.getLogger(RdbSyncService.class);
+    private static final Logger               logger  = LoggerFactory.getLogger(RdbSyncService.class);
 
-    private final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
+    // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
+    private Map<String, Map<String, Integer>> columnsTypeCache;
 
-    private Map<String, Map<String, MappingConfig>> mappingConfigCache;                                                // 库名-表名对应配置
+    private int                               threads = 3;
 
-    private int                                     threads            = 3;
+    private List<SyncItem>[]                  dmlsPartition;
+    private BatchExecutor[]                   batchExecutors;
+    private ExecutorService[]                 executorThreads;
 
-    private List<SyncItem>[]                        dmlsPartition;
-    private BatchExecutor[]                         batchExecutors;
-    private ExecutorService[]                       executorThreads;
+    public List<SyncItem>[] getDmlsPartition() {
+        return dmlsPartition;
+    }
+
+    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
+        return columnsTypeCache;
+    }
+
+    @SuppressWarnings("unchecked")
+    public RdbSyncService(DataSource dataSource, Integer threads){
+        this(dataSource, threads, new ConcurrentHashMap<>());
+    }
 
     @SuppressWarnings("unchecked")
-    public RdbSyncService(Map<String, Map<String, MappingConfig>> mappingConfigCache, DataSource dataSource,
-                          Integer threads){
+    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
+        this.columnsTypeCache = columnsTypeCache;
         try {
             if (threads != null) {
                 this.threads = threads;
             }
-            this.mappingConfigCache = mappingConfigCache;
             this.dmlsPartition = new List[this.threads];
             this.batchExecutors = new BatchExecutor[this.threads];
             this.executorThreads = new ExecutorService[this.threads];
@@ -69,64 +81,111 @@ public class RdbSyncService {
         }
     }
 
-    public void sync(List<Dml> dmls) {
+    /**
+     * 批量同步回调
+     *
+     * @param dmls 批量 DML
+     * @param function 回调方法
+     */
+    public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
         try {
+            boolean toExecute = false;
             for (Dml dml : dmls) {
-                String destination = StringUtils.trimToEmpty(dml.getDestination());
-                String database = dml.getDatabase();
-                String table = dml.getTable();
-                Map<String, MappingConfig> configMap = mappingConfigCache
-                    .get(destination + "." + database + "." + table);
-
-                if (configMap == null) {
-                    continue;
+                if (!toExecute) {
+                    toExecute = function.apply(dml);
+                } else {
+                    function.apply(dml);
+                }
+            }
+            if (toExecute) {
+                List<Future> futures = new ArrayList<>();
+                for (int i = 0; i < threads; i++) {
+                    int j = i;
+                    futures.add(executorThreads[i].submit(() -> {
+                        dmlsPartition[j]
+                            .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                        batchExecutors[j].commit();
+                        return true;
+                    }));
                 }
-                for (MappingConfig config : configMap.values()) {
-
-                    if (config.getConcurrent()) {
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
-                    } else {
-                        int hash = Math.abs(Math.abs(config.getDbMapping().getTargetTable().hashCode()) % threads);
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
+
+                futures.forEach(future -> {
+                    try {
+                        future.get();
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
                     }
+                });
+
+                for (int i = 0; i < threads; i++) {
+                    dmlsPartition[i].clear();
                 }
             }
-            List<Future> futures = new ArrayList<>();
-            for (int i = 0; i < threads; i++) {
-                int j = i;
-                futures.add(executorThreads[i].submit(() -> {
-                    dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
-                    batchExecutors[j].commit();
-                    return true;
-                }));
-            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 批量同步
+     *
+     * @param mappingConfig 配置集合
+     * @param dmls 批量 DML
+     */
+    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls) {
+        try {
+            sync(dmls, dml -> {
+                if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
+                    // DDL
+                    columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+                    return false;
+                } else {
+                    // DML
+                    String destination = StringUtils.trimToEmpty(dml.getDestination());
+                    String database = dml.getDatabase();
+                    String table = dml.getTable();
+                    Map<String, MappingConfig> configMap = mappingConfig
+                        .get(destination + "." + database + "." + table);
+
+                    if (configMap == null) {
+                        return false;
+                    }
 
-            futures.forEach(future -> {
-                try {
-                    future.get();
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
+                    boolean executed = false;
+                    for (MappingConfig config : configMap.values()) {
+                        if (config.getConcurrent()) {
+                            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                            singleDmls.forEach(singleDml -> {
+                                int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                                SyncItem syncItem = new SyncItem(config, singleDml);
+                                dmlsPartition[hash].add(syncItem);
+                            });
+                        } else {
+                            int hash = 0;
+                            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                            singleDmls.forEach(singleDml -> {
+                                SyncItem syncItem = new SyncItem(config, singleDml);
+                                dmlsPartition[hash].add(syncItem);
+                            });
+                        }
+                        executed = true;
+                    }
+                    return executed;
                 }
             });
-
-            for (int i = 0; i < threads; i++) {
-                dmlsPartition[i].clear();
-            }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
     }
 
-    private void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+    /**
+     * 单条 dml 同步
+     *
+     * @param batchExecutor 批量事务执行器
+     * @param config 对应配置对象
+     * @param dml DML
+     */
+    public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
         try {
             if (config != null) {
                 String type = dml.getType();
@@ -164,7 +223,7 @@ public class RdbSyncService {
             Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
             StringBuilder insertSql = new StringBuilder();
-            insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+            insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
 
             columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
             int len = insertSql.length();
@@ -228,7 +287,7 @@ public class RdbSyncService {
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
             StringBuilder updateSql = new StringBuilder();
-            updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+            updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
             List<Map<String, ?>> values = new ArrayList<>();
             for (String srcColumnName : old.keySet()) {
                 List<String> targetColumnNames = new ArrayList<>();
@@ -280,7 +339,7 @@ public class RdbSyncService {
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
             StringBuilder sql = new StringBuilder();
-            sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
+            sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
 
             List<Map<String, ?>> values = new ArrayList<>();
             // 拼接主键
@@ -306,14 +365,14 @@ public class RdbSyncService {
     private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
         DbMapping dbMapping = config.getDbMapping();
         String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
-        Map<String, Integer> columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+        Map<String, Integer> columnType = columnsTypeCache.get(cacheKey);
         if (columnType == null) {
             synchronized (RdbSyncService.class) {
-                columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+                columnType = columnsTypeCache.get(cacheKey);
                 if (columnType == null) {
                     columnType = new LinkedHashMap<>();
                     final Map<String, Integer> columnTypeTmp = columnType;
-                    String sql = "SELECT * FROM " + dbMapping.getTargetTable() + " WHERE 1=2";
+                    String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE 1=2";
                     Util.sqlRS(conn, sql, rs -> {
                         try {
                             ResultSetMetaData rsd = rs.getMetaData();
@@ -321,7 +380,7 @@ public class RdbSyncService {
                             for (int i = 1; i <= columnCount; i++) {
                                 columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
                             }
-                            COLUMNS_TYPE_CACHE.put(cacheKey, columnTypeTmp);
+                            columnsTypeCache.put(cacheKey, columnTypeTmp);
                         } catch (SQLException e) {
                             logger.error(e.getMessage(), e);
                         }
@@ -362,12 +421,12 @@ public class RdbSyncService {
         sql.delete(len - 4, len);
     }
 
-    private class SyncItem {
+    public static class SyncItem {
 
         private MappingConfig config;
         private SingleDml     singleDml;
 
-        private SyncItem(MappingConfig config, SingleDml singleDml){
+        public SyncItem(MappingConfig config, SingleDml singleDml){
             this.config = config;
             this.singleDml = singleDml;
         }
@@ -376,11 +435,11 @@ public class RdbSyncService {
     /**
      * 取主键hash
      */
-    private int pkHash(DbMapping dbMapping, Map<String, Object> d) {
+    public int pkHash(DbMapping dbMapping, Map<String, Object> d) {
         return pkHash(dbMapping, d, null);
     }
 
-    private int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
+    public int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
         int hash = 0;
         // 取主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {

+ 15 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -9,6 +9,7 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
 
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
@@ -21,7 +22,10 @@ public class SyncUtil {
 
     public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
         Map<String, String> columnsMap;
-        if (dbMapping.isMapAll()) {
+        if (dbMapping.getMapAll()) {
+            if (dbMapping.getAllMapColumns() != null) {
+                return dbMapping.getAllMapColumns();
+            }
             columnsMap = new LinkedHashMap<>();
             for (String srcColumn : columns) {
                 boolean flag = true;
@@ -38,6 +42,7 @@ public class SyncUtil {
                     columnsMap.put(srcColumn, srcColumn);
                 }
             }
+            dbMapping.setAllMapColumns(columnsMap);
         } else {
             columnsMap = dbMapping.getTargetColumns();
         }
@@ -253,4 +258,13 @@ public class SyncUtil {
                 pstmt.setObject(i, value, type);
         }
     }
+
+    public static String getDbTableName(MappingConfig.DbMapping dbMapping) {
+        String result = "";
+        if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) {
+            result += dbMapping.getTargetDb() + ".";
+        }
+        result += dbMapping.getTargetTable();
+        return result;
+    }
 }

+ 11 - 1
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -14,4 +14,14 @@ dbMapping:
 #    name:
 #    role_id:
 #    c_time:
-#    test1:
+#    test1:
+
+
+# Mirror schema synchronize config
+#dataSourceKey: defaultDS
+#destination: example
+#outerAdapterKey: mysql1
+#concurrent: true
+#dbMapping:
+#  mirrorDb: true
+#  database: mytest

+ 1 - 1
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java

@@ -42,7 +42,7 @@ public class DBTest {
             .prepareStatement("insert into user (id,name,role_id,c_time,test1,test2) values (?,?,?,?,?,?)");
 
         java.util.Date now = new java.util.Date();
-        for (int i = 1; i <= 100000; i++) {
+        for (int i = 1; i <= 10000; i++) {
             pstmt.clearParameters();
             pstmt.setLong(1, (long) i);
             pstmt.setString(2, "test_" + i);

+ 86 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/SqlParserTest.java

@@ -0,0 +1,86 @@
+//wpackage com.alibaba.otter.canal.client.adapter.rdb.test;
+//
+//import com.alibaba.fastsql.sql.ast.SQLName;
+//import com.alibaba.fastsql.sql.ast.SQLStatement;
+//import com.alibaba.fastsql.sql.ast.statement.SQLCreateTableStatement;
+//import com.alibaba.fastsql.sql.ast.statement.SQLExprTableSource;
+//import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlCreateTableParser;
+//import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
+//import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlOutputVisitor;
+//import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlSchemaStatVisitor;
+//import com.alibaba.fastsql.sql.parser.SQLStatementParser;
+//
+//import java.io.StringWriter;
+//
+//public class SqlParserTest {
+//
+//    public static class TableNameVisitor extends MySqlOutputVisitor {
+//
+//        public TableNameVisitor(Appendable appender){
+//            super(appender);
+//        }
+//
+//        @Override
+//        public boolean visit(SQLExprTableSource x) {
+//            SQLName table = (SQLName) x.getExpr();
+//            String tableName = table.getSimpleName();
+//
+//            // 改写tableName
+//            print0("new_" + tableName.toUpperCase());
+//
+//            return true;
+//        }
+//
+//    }
+//
+//    public static void main(String[] args) {
+//        // String sql = "select * from `mytest`.`t` where id=1 and name=ming group by
+//        // uid limit 1,200 order by ctime";
+//
+//        String sql = "CREATE TABLE `mytest`.`user` (\n" + "  `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+//                     + "  `name` varchar(30) NOT NULL,\n" + "  `c_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,\n"
+//                     + "  `role_id` bigint(20) DEFAULT NULL,\n" + "  `test1` text,\n" + "  `test2` blob,\n"
+//                     + "  `key` varchar(30) DEFAULT NULL,\n" + "  PRIMARY KEY (`id`)\n"
+//                     + ") ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;";
+//
+//        // // 新建 MySQL Parser
+//        // SQLStatementParser parser = new MySqlStatementParser(sql);
+//        //
+//        // // 使用Parser解析生成AST,这里SQLStatement就是AST
+//        // SQLStatement sqlStatement = parser.parseStatement();
+//        //
+//        // MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
+//        // sqlStatement.accept(visitor);
+//        //
+//        // System.out.println("getTables:" + visitor.getTables());
+//        // System.out.println("getParameters:" + visitor.getParameters());
+//        // System.out.println("getOrderByColumns:" + visitor.getOrderByColumns());
+//        // System.out.println("getGroupByColumns:" + visitor.getGroupByColumns());
+//        // System.out.println("---------------------------------------------------------------------------");
+//        //
+//        // // 使用select访问者进行select的关键信息打印
+//        // // SelectPrintVisitor selectPrintVisitor = new SelectPrintVisitor();
+//        // // sqlStatement.accept(selectPrintVisitor);
+//        //
+//        // System.out.println("---------------------------------------------------------------------------");
+//        // // 最终sql输出
+//        // StringWriter out = new StringWriter();
+//        // TableNameVisitor outputVisitor = new TableNameVisitor(out);
+//        // sqlStatement.accept(outputVisitor);
+//        // System.out.println(out.toString());
+//
+//        MySqlCreateTableParser parser1 = new MySqlCreateTableParser(sql);
+//        SQLCreateTableStatement createTableStatement = parser1.parseCreateTable();
+////        MySqlSchemaStatVisitor visitor1 = new MySqlSchemaStatVisitor();
+////        createTableStatement.accept(visitor1);
+//        // visitor1.getTables().forEach((k, v) -> {
+//        // System.out.println(k.);
+//        // System.out.println(v);
+//        // });
+//        // 最终sql输出
+//        StringWriter out = new StringWriter();
+//        TableNameVisitor outputVisitor = new TableNameVisitor(out);
+//        createTableStatement.accept(outputVisitor);
+//        System.out.println(out.toString());
+//    }
+//}

+ 0 - 9
common/src/main/java/com/alibaba/otter/canal/common/utils/AddressUtils.java

@@ -54,15 +54,6 @@ public class AddressUtils {
 
     public static InetAddress getHostAddress() {
         InetAddress localAddress = null;
-        try {
-            localAddress = InetAddress.getLocalHost();
-            if (isValidHostAddress(localAddress)) {
-                return localAddress;
-            }
-        } catch (Throwable e) {
-            logger.warn("Failed to retriving local host ip address, try scan network card ip address. cause: "
-                        + e.getMessage());
-        }
         try {
             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
             if (interfaces != null) {

+ 3 - 3
deployer/src/main/resources/example/instance.properties

@@ -1,5 +1,5 @@
 #################################################
-## mysql serverId , v1.0.26+ will autoGen 
+## mysql serverId , v1.0.26+ will autoGen
 # canal.instance.mysql.slaveId=0
 
 # enable gtid use true/false
@@ -48,5 +48,5 @@ canal.mq.topic=example
 canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=3
-#canal.mq.partitionHash=mytest.person:id,mytest.role:id
-#################################################
+#canal.mq.partitionHash=test.table:id^name,.*\\..*
+#################################################

+ 1 - 8
filter/src/main/java/com/alibaba/otter/canal/filter/PatternUtils.java

@@ -7,17 +7,10 @@ import org.apache.oro.text.regex.Pattern;
 import org.apache.oro.text.regex.PatternCompiler;
 import org.apache.oro.text.regex.Perl5Compiler;
 
-import com.alibaba.otter.canal.filter.exception.CanalFilterException;
 import com.google.common.base.Function;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.MigrateMap;
 
-/**
- * 提供{@linkplain Pattern}的lazy get处理
- *
- * @author jianghang 2013-1-22 下午09:36:44
- * @version 1.0.0
- */
 public class PatternUtils {
 
     @SuppressWarnings("deprecation")
@@ -32,7 +25,7 @@ public class PatternUtils {
                                                                              | Perl5Compiler.READ_ONLY_MASK
                                                                              | Perl5Compiler.SINGLELINE_MASK);
                                                              } catch (MalformedPatternException e) {
-                                                                 throw new CanalFilterException(e);
+                                                                 throw new RuntimeException(e);
                                                              }
                                                          }
                                                      });

+ 1 - 0
filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.java

@@ -129,4 +129,5 @@ public class AviaterRegexFilter implements CanalEventFilter<String> {
     public String toString() {
         return pattern;
     }
+
 }

+ 1 - 1
filter/src/main/java/com/alibaba/otter/canal/filter/aviater/RegexFunction.java

@@ -12,7 +12,7 @@ import com.googlecode.aviator.runtime.type.AviatorObject;
 
 /**
  * 提供aviator regex的代码扩展
- * 
+ *
  * @author jianghang 2012-7-23 上午10:29:23
  */
 public class RegexFunction extends AbstractFunction {

+ 4 - 30
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -1,16 +1,11 @@
 package com.alibaba.otter.canal.instance.core;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-
 public class CanalMQConfig {
 
-    private String                       topic;
-    private Integer                      partition;
-    private Integer                      partitionsNum;
-    private String                       partitionHash;
-
-    private volatile Map<String, String> partitionHashProperties;
+    private String  topic;
+    private Integer partition;
+    private Integer partitionsNum;
+    private String  partitionHash;
 
     public String getTopic() {
         return topic;
@@ -44,25 +39,4 @@ public class CanalMQConfig {
         this.partitionHash = partitionHash;
     }
 
-    public Map<String, String> getPartitionHashProperties() {
-        if (partitionHashProperties == null) {
-            synchronized (CanalMQConfig.class) {
-                if (partitionHashProperties == null) {
-                    if (partitionHash != null) {
-                        partitionHashProperties = new LinkedHashMap<>();
-                        String[] items = partitionHash.split(",");
-                        for (String item : items) {
-                            int i = item.indexOf(":");
-                            if (i > -1) {
-                                String dbTable = item.substring(0, i).trim();
-                                String pk = item.substring(i + 1).trim();
-                                partitionHashProperties.put(dbTable, pk);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return partitionHashProperties;
-    }
 }

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -508,6 +508,7 @@ public class MysqlConnection implements ErosaConnection {
             rs = query("select @@global.binlog_checksum");
         } catch (Throwable e) {
             // ignore
+            return;
         }
 
         List<String> columnValues = rs.getFieldValues();

+ 6 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -58,7 +58,7 @@ public class MemoryTableMeta implements TableMetaTSDB {
     public boolean init(String destination) {
         return true;
     }
-    
+
     @Override
     public void destory() {
         tableMetas.clear();
@@ -74,7 +74,11 @@ public class MemoryTableMeta implements TableMetaTSDB {
             try {
                 // druid暂时flush privileges语法解析有问题
                 if (!StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "flush")
-                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "grant")) {
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "grant")
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "create user")
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "drop user")
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "create database")
+                    && !StringUtils.startsWithIgnoreCase(StringUtils.trim(ddl), "drop database")) {
                     repository.console(ddl);
                 }
             } catch (Throwable e) {

+ 18 - 210
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,14 +1,10 @@
 package com.alibaba.otter.canal.protocol;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import com.google.protobuf.ByteString;
+import com.google.common.collect.Lists;
 
 /**
  * @author machengyuan 2018-9-13 下午10:31:14
@@ -17,10 +13,10 @@ import com.google.protobuf.ByteString;
 public class FlatMessage implements Serializable {
 
     private static final long         serialVersionUID = -3386650678735860050L;
-
     private long                      id;
     private String                    database;
     private String                    table;
+    private List<String>              pkNames;
     private Boolean                   isDdl;
     private String                    type;
     // binlog executeTime
@@ -33,7 +29,7 @@ public class FlatMessage implements Serializable {
     private List<Map<String, String>> data;
     private List<Map<String, String>> old;
 
-    public FlatMessage(){
+    public FlatMessage() {
     }
 
     public FlatMessage(long id){
@@ -64,6 +60,21 @@ public class FlatMessage implements Serializable {
         this.table = table;
     }
 
+    public List<String> getPkNames() {
+        return pkNames;
+    }
+
+    public void addPkName(String pkName) {
+        if (this.pkNames == null) {
+            this.pkNames = Lists.newArrayList();
+        }
+        this.pkNames.add(pkName);
+    }
+
+    public void setPkNames(List<String> pkNames) {
+        this.pkNames = pkNames;
+    }
+
     public Boolean getIsDdl() {
         return isDdl;
     }
@@ -136,209 +147,6 @@ public class FlatMessage implements Serializable {
         this.es = es;
     }
 
-    /**
-     * 将Message转换为FlatMessage
-     * 
-     * @param message 原生message
-     * @return FlatMessage列表
-     */
-    public static List<FlatMessage> messageConverter(Message message) {
-        try {
-            if (message == null) {
-                return null;
-            }
-
-            List<FlatMessage> flatMessages = new ArrayList<>();
-            List<CanalEntry.Entry> entrys = null;
-            if (message.isRaw()) {
-                List<ByteString> rawEntries = message.getRawEntries();
-                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
-                for (ByteString byteString : rawEntries) {
-                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
-                    entrys.add(entry);
-                }
-            } else {
-                entrys = message.getEntries();
-            }
-
-            for (CanalEntry.Entry entry : entrys) {
-                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
-                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
-                    continue;
-                }
-
-                CanalEntry.RowChange rowChange;
-                try {
-                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-                } catch (Exception e) {
-                    throw new RuntimeException(
-                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
-                }
-
-                CanalEntry.EventType eventType = rowChange.getEventType();
-
-                FlatMessage flatMessage = new FlatMessage(message.getId());
-                flatMessages.add(flatMessage);
-                flatMessage.setDatabase(entry.getHeader().getSchemaName());
-                flatMessage.setTable(entry.getHeader().getTableName());
-                flatMessage.setIsDdl(rowChange.getIsDdl());
-                flatMessage.setType(eventType.toString());
-                flatMessage.setEs(entry.getHeader().getExecuteTime());
-                flatMessage.setTs(System.currentTimeMillis());
-                flatMessage.setSql(rowChange.getSql());
-
-                if (!rowChange.getIsDdl()) {
-                    Map<String, Integer> sqlType = new LinkedHashMap<>();
-                    Map<String, String> mysqlType = new LinkedHashMap<>();
-                    List<Map<String, String>> data = new ArrayList<>();
-                    List<Map<String, String>> old = new ArrayList<>();
-
-                    Set<String> updateSet = new HashSet<>();
-                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
-                            && eventType != CanalEntry.EventType.DELETE) {
-                            continue;
-                        }
-
-                        Map<String, String> row = new LinkedHashMap<>();
-                        List<CanalEntry.Column> columns;
-
-                        if (eventType == CanalEntry.EventType.DELETE) {
-                            columns = rowData.getBeforeColumnsList();
-                        } else {
-                            columns = rowData.getAfterColumnsList();
-                        }
-
-                        for (CanalEntry.Column column : columns) {
-                            sqlType.put(column.getName(), column.getSqlType());
-                            mysqlType.put(column.getName(), column.getMysqlType());
-                            if (column.getIsNull()) {
-                                row.put(column.getName(), null);
-                            } else {
-                                row.put(column.getName(), column.getValue());
-                            }
-                            // 获取update为true的字段
-                            if (column.getUpdated()) {
-                                updateSet.add(column.getName());
-                            }
-                        }
-                        if (!row.isEmpty()) {
-                            data.add(row);
-                        }
-
-                        if (eventType == CanalEntry.EventType.UPDATE) {
-                            Map<String, String> rowOld = new LinkedHashMap<>();
-                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
-                                if (updateSet.contains(column.getName())) {
-                                    if (column.getIsNull()) {
-                                        rowOld.put(column.getName(), null);
-                                    } else {
-                                        rowOld.put(column.getName(), column.getValue());
-                                    }
-                                }
-                            }
-                            // update操作将记录修改前的值
-                            if (!rowOld.isEmpty()) {
-                                old.add(rowOld);
-                            }
-                        }
-                    }
-                    if (!sqlType.isEmpty()) {
-                        flatMessage.setSqlType(sqlType);
-                    }
-                    if (!mysqlType.isEmpty()) {
-                        flatMessage.setMysqlType(mysqlType);
-                    }
-                    if (!data.isEmpty()) {
-                        flatMessage.setData(data);
-                    }
-                    if (!old.isEmpty()) {
-                        flatMessage.setOld(old);
-                    }
-                }
-            }
-            return flatMessages;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * 将FlatMessage按指定的字段值hash拆分
-     * 
-     * @param flatMessage flatMessage
-     * @param partitionsNum 分区数量
-     * @param pkHashConfig hash映射
-     * @return 拆分后的flatMessage数组
-     */
-    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
-                                                 Map<String, String> pkHashConfig) {
-        if (partitionsNum == null) {
-            partitionsNum = 1;
-        }
-        FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
-        String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
-        if (pk == null || flatMessage.getIsDdl()) {
-            partitionMessages[0] = flatMessage;
-        } else {
-            if (flatMessage.getData() != null) {
-                int idx = 0;
-                for (Map<String, String> row : flatMessage.getData()) {
-                    Map<String, String> o = null;
-                    if (flatMessage.getOld() != null) {
-                        o = flatMessage.getOld().get(idx);
-                    }
-                    String value;
-                    // 如果old中有pk值说明主键有修改, 以旧的主键值hash为准
-                    if (o != null && o.containsKey(pk)) {
-                        value = o.get(pk);
-                    } else {
-                        value = row.get(pk);
-                    }
-                    if (value == null) {
-                        value = "";
-                    }
-                    int hash = value.hashCode();
-                    int pkHash = Math.abs(hash) % partitionsNum;
-                    // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
-                    pkHash = Math.abs(pkHash);
-
-                    FlatMessage flatMessageTmp = partitionMessages[pkHash];
-                    if (flatMessageTmp == null) {
-                        flatMessageTmp = new FlatMessage(flatMessage.getId());
-                        partitionMessages[pkHash] = flatMessageTmp;
-                        flatMessageTmp.setDatabase(flatMessage.getDatabase());
-                        flatMessageTmp.setTable(flatMessage.getTable());
-                        flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
-                        flatMessageTmp.setType(flatMessage.getType());
-                        flatMessageTmp.setSql(flatMessage.getSql());
-                        flatMessageTmp.setSqlType(flatMessage.getSqlType());
-                        flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
-                        flatMessageTmp.setEs(flatMessage.getEs());
-                        flatMessageTmp.setTs(flatMessage.getTs());
-                    }
-                    List<Map<String, String>> data = flatMessageTmp.getData();
-                    if (data == null) {
-                        data = new ArrayList<>();
-                        flatMessageTmp.setData(data);
-                    }
-                    data.add(row);
-                    if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
-                        List<Map<String, String>> old = flatMessageTmp.getOld();
-                        if (old == null) {
-                            old = new ArrayList<>();
-                            flatMessageTmp.setOld(old);
-                        }
-                        old.add(flatMessage.getOld().get(idx));
-                    }
-                    idx++;
-                }
-            }
-        }
-        return partitionMessages;
-    }
-
     @Override
     public String toString() {
         return "FlatMessage [id=" + id + ", database=" + database + ", table=" + table + ", isDdl=" + isDdl + ", type="

+ 0 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -17,7 +17,6 @@ import com.google.protobuf.ByteString;
 public class Message implements Serializable {
 
     private static final long      serialVersionUID = 1234034768477580009L;
-
     private long                   id;
     private List<CanalEntry.Entry> entries          = new ArrayList<CanalEntry.Entry>();
     // row data for performance, see:

+ 427 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -0,0 +1,427 @@
+package com.alibaba.otter.canal.common;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.MigrateMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * process MQ Message utils
+ * 
+ * @author agapple 2018年12月11日 下午1:28:32
+ */
+public class MQMessageUtils {
+
+    @SuppressWarnings("deprecation")
+    private static Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                       new Function<String, List<PartitionData>>() {
+
+                                                                           public List<PartitionData> apply(String pkHashConfigs) {
+                                                                               List<PartitionData> datas = Lists.newArrayList();
+                                                                               String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
+                                                                                   ",");
+                                                                               // schema.table:id^name
+                                                                               for (String pkHashConfig : pkHashConfigArray) {
+                                                                                   PartitionData data = new PartitionData();
+                                                                                   int i = pkHashConfig.lastIndexOf(":");
+                                                                                   if (i > 0) {
+                                                                                       String pkStr = pkHashConfig.substring(i + 1);
+                                                                                       if (pkStr.equalsIgnoreCase("$pk$")) {
+                                                                                           data.hashMode.autoPkHash = true;
+                                                                                       } else {
+                                                                                           data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
+                                                                                               '^'));
+                                                                                       }
+
+                                                                                       pkHashConfig = pkHashConfig.substring(0,
+                                                                                           i);
+                                                                                   } else {
+                                                                                       data.hashMode.tableHash = true;
+                                                                                   }
+
+                                                                                   if (!isWildCard(pkHashConfig)) {
+                                                                                       data.simpleName = pkHashConfig;
+                                                                                   } else {
+                                                                                       data.regexFilter = new AviaterRegexFilter(pkHashConfig);
+                                                                                   }
+                                                                                   datas.add(data);
+                                                                               }
+
+                                                                               return datas;
+                                                                           }
+                                                                       });
+
+    /**
+     * 将 message 分区
+     *
+     * @param partitionsNum 分区数
+     * @param pkHashConfigs 分区库表主键正则表达式
+     * @return 分区message数组
+     */
+    @SuppressWarnings("unchecked")
+    public static Message[] messagePartition(Message message, Integer partitionsNum, String pkHashConfigs) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
+        Message[] partitionMessages = new Message[partitionsNum];
+        List<Entry>[] partitionEntries = new List[partitionsNum];
+        for (int i = 0; i < partitionsNum; i++) {
+            partitionEntries[i] = new ArrayList<>();
+        }
+
+        List<CanalEntry.Entry> entries;
+        if (message.isRaw()) {
+            List<ByteString> rawEntries = message.getRawEntries();
+            entries = new ArrayList<>(rawEntries.size());
+            for (ByteString byteString : rawEntries) {
+                Entry entry;
+                try {
+                    entry = Entry.parseFrom(byteString);
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
+                entries.add(entry);
+            }
+        } else {
+            entries = message.getEntries();
+        }
+
+        for (Entry entry : entries) {
+            CanalEntry.RowChange rowChange;
+            try {
+                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+
+            if (rowChange.getIsDdl()) {
+                partitionEntries[0].add(entry);
+            } else {
+                if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
+                    String database = entry.getHeader().getSchemaName();
+                    String table = entry.getHeader().getTableName();
+                    HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                    if (hashMode == null) {
+                        // 如果都没有匹配,发送到第一个分区
+                        partitionEntries[0].add(entry);
+                    } else {
+                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                            int hashCode = table.hashCode();
+                            if (hashMode.autoPkHash) {
+                                // isEmpty use default pkNames
+                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                    if (column.getIsKey()) {
+                                        hashCode = hashCode ^ column.getValue().hashCode();
+                                    }
+                                }
+                            } else if (!hashMode.tableHash) {
+                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                    if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
+                                        hashCode = hashCode ^ column.getValue().hashCode();
+                                    }
+                                }
+                            }
+
+                            int pkHash = Math.abs(hashCode) % partitionsNum;
+                            pkHash = Math.abs(pkHash);
+                            partitionEntries[pkHash].add(entry);
+                        }
+                    }
+                }
+            }
+        }
+
+        for (int i = 0; i < partitionsNum; i++) {
+            List<Entry> entriesTmp = partitionEntries[i];
+            if (!entriesTmp.isEmpty()) {
+                partitionMessages[i] = new Message(message.getId(), entriesTmp);
+            }
+        }
+
+        return partitionMessages;
+    }
+
+    /**
+     * 将Message转换为FlatMessage
+     *
+     * @param message 原生message
+     * @return FlatMessage列表
+     */
+    public static List<FlatMessage> messageConverter(Message message) {
+        try {
+            if (message == null) {
+                return null;
+            }
+
+            List<FlatMessage> flatMessages = new ArrayList<>();
+            List<CanalEntry.Entry> entrys = null;
+            if (message.isRaw()) {
+                List<ByteString> rawEntries = message.getRawEntries();
+                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
+                for (ByteString byteString : rawEntries) {
+                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+                    entrys.add(entry);
+                }
+            } else {
+                entrys = message.getEntries();
+            }
+
+            for (CanalEntry.Entry entry : entrys) {
+                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                    continue;
+                }
+
+                CanalEntry.RowChange rowChange;
+                try {
+                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                } catch (Exception e) {
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                               + entry.toString(), e);
+                }
+
+                CanalEntry.EventType eventType = rowChange.getEventType();
+
+                FlatMessage flatMessage = new FlatMessage(message.getId());
+                flatMessages.add(flatMessage);
+                flatMessage.setDatabase(entry.getHeader().getSchemaName());
+                flatMessage.setTable(entry.getHeader().getTableName());
+                flatMessage.setIsDdl(rowChange.getIsDdl());
+                flatMessage.setType(eventType.toString());
+                flatMessage.setEs(entry.getHeader().getExecuteTime());
+                flatMessage.setTs(System.currentTimeMillis());
+                flatMessage.setSql(rowChange.getSql());
+
+                if (!rowChange.getIsDdl()) {
+                    Map<String, Integer> sqlType = new LinkedHashMap<>();
+                    Map<String, String> mysqlType = new LinkedHashMap<>();
+                    List<Map<String, String>> data = new ArrayList<>();
+                    List<Map<String, String>> old = new ArrayList<>();
+
+                    Set<String> updateSet = new HashSet<>();
+                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
+                            && eventType != CanalEntry.EventType.DELETE) {
+                            continue;
+                        }
+
+                        Map<String, String> row = new LinkedHashMap<>();
+                        List<CanalEntry.Column> columns;
+
+                        if (eventType == CanalEntry.EventType.DELETE) {
+                            columns = rowData.getBeforeColumnsList();
+                        } else {
+                            columns = rowData.getAfterColumnsList();
+                        }
+
+                        for (CanalEntry.Column column : columns) {
+                            if (column.getIsKey()) {
+                                flatMessage.addPkName(column.getName());
+                            }
+                            sqlType.put(column.getName(), column.getSqlType());
+                            mysqlType.put(column.getName(), column.getMysqlType());
+                            if (column.getIsNull()) {
+                                row.put(column.getName(), null);
+                            } else {
+                                row.put(column.getName(), column.getValue());
+                            }
+                            // 获取update为true的字段
+                            if (column.getUpdated()) {
+                                updateSet.add(column.getName());
+                            }
+                        }
+                        if (!row.isEmpty()) {
+                            data.add(row);
+                        }
+
+                        if (eventType == CanalEntry.EventType.UPDATE) {
+                            Map<String, String> rowOld = new LinkedHashMap<>();
+                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+                                if (updateSet.contains(column.getName())) {
+                                    if (column.getIsNull()) {
+                                        rowOld.put(column.getName(), null);
+                                    } else {
+                                        rowOld.put(column.getName(), column.getValue());
+                                    }
+                                }
+                            }
+                            // update操作将记录修改前的值
+                            if (!rowOld.isEmpty()) {
+                                old.add(rowOld);
+                            }
+                        }
+                    }
+                    if (!sqlType.isEmpty()) {
+                        flatMessage.setSqlType(sqlType);
+                    }
+                    if (!mysqlType.isEmpty()) {
+                        flatMessage.setMysqlType(mysqlType);
+                    }
+                    if (!data.isEmpty()) {
+                        flatMessage.setData(data);
+                    }
+                    if (!old.isEmpty()) {
+                        flatMessage.setOld(old);
+                    }
+                }
+            }
+            return flatMessages;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * 将FlatMessage按指定的字段值hash拆分
+     *
+     * @param flatMessage flatMessage
+     * @param partitionsNum 分区数量
+     * @param pkHashConfigs hash映射
+     * @return 拆分后的flatMessage数组
+     */
+    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
+        FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
+
+        if (flatMessage.getIsDdl()) {
+            partitionMessages[0] = flatMessage;
+        } else {
+            if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
+                String database = flatMessage.getDatabase();
+                String table = flatMessage.getTable();
+                HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                if (hashMode == null) {
+                    // 如果都没有匹配,发送到第一个分区
+                    partitionMessages[0] = flatMessage;
+                } else {
+                    List<String> pkNames = hashMode.pkNames;
+                    if (hashMode.autoPkHash) {
+                        pkNames = flatMessage.getPkNames();
+                    }
+
+                    int hashCode = table.hashCode();
+                    int idx = 0;
+                    for (Map<String, String> row : flatMessage.getData()) {
+                        if (!hashMode.tableHash) {
+                            for (String pkName : pkNames) {
+                                String value = row.get(pkName);
+                                if (value == null) {
+                                    value = "";
+                                }
+                                hashCode = hashCode ^ value.hashCode();
+                            }
+                        }
+
+                        int pkHash = Math.abs(hashCode) % partitionsNum;
+                        // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
+                        pkHash = Math.abs(pkHash);
+
+                        FlatMessage flatMessageTmp = partitionMessages[pkHash];
+                        if (flatMessageTmp == null) {
+                            flatMessageTmp = new FlatMessage(flatMessage.getId());
+                            partitionMessages[pkHash] = flatMessageTmp;
+                            flatMessageTmp.setDatabase(flatMessage.getDatabase());
+                            flatMessageTmp.setTable(flatMessage.getTable());
+                            flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
+                            flatMessageTmp.setType(flatMessage.getType());
+                            flatMessageTmp.setSql(flatMessage.getSql());
+                            flatMessageTmp.setSqlType(flatMessage.getSqlType());
+                            flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
+                            flatMessageTmp.setEs(flatMessage.getEs());
+                            flatMessageTmp.setTs(flatMessage.getTs());
+                        }
+                        List<Map<String, String>> data = flatMessageTmp.getData();
+                        if (data == null) {
+                            data = new ArrayList<>();
+                            flatMessageTmp.setData(data);
+                        }
+                        data.add(row);
+                        if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
+                            List<Map<String, String>> old = flatMessageTmp.getOld();
+                            if (old == null) {
+                                old = new ArrayList<>();
+                                flatMessageTmp.setOld(old);
+                            }
+                            old.add(flatMessage.getOld().get(idx));
+                        }
+                        idx++;
+                    }
+                }
+            }
+        }
+        return partitionMessages;
+    }
+
+    /**
+     * match return List , not match return null
+     */
+    public static HashMode getParitionHashColumns(String name, String pkHashConfigs) {
+        if (StringUtils.isEmpty(pkHashConfigs)) {
+            return null;
+        }
+
+        List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
+        for (PartitionData data : datas) {
+            if (data.simpleName != null) {
+                if (data.simpleName.equalsIgnoreCase(name)) {
+                    return data.hashMode;
+                }
+            } else {
+                if (data.regexFilter.filter(name)) {
+                    return data.hashMode;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    public static boolean checkPkNamesHasContain(List<String> pkNames, String name) {
+        for (String pkName : pkNames) {
+            if (pkName.equalsIgnoreCase(name)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static boolean isWildCard(String value) {
+        // not contaiins '.' ?
+        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
+                '^' });
+    }
+
+    public static class PartitionData {
+
+        public String             simpleName;
+        public AviaterRegexFilter regexFilter;
+        public HashMode           hashMode = new HashMode();
+    }
+
+    public static class HashMode {
+
+        public boolean      autoPkHash = false;
+        public boolean      tableHash  = false;
+        public List<String> pkNames    = Lists.newArrayList();
+    }
+
+}

+ 8 - 8
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.Map;
 
 /**
  * kafka 配置项
@@ -28,11 +27,11 @@ public class MQProperties {
 
     public static class CanalDestination {
 
-        private String              canalDestination;
-        private String              topic;
-        private Integer             partition;
-        private Integer             partitionsNum;
-        private Map<String, String> partitionHash;
+        private String  canalDestination;
+        private String  topic;
+        private Integer partition;
+        private Integer partitionsNum;
+        private String  partitionHash;
 
         public String getCanalDestination() {
             return canalDestination;
@@ -66,11 +65,11 @@ public class MQProperties {
             this.partitionsNum = partitionsNum;
         }
 
-        public Map<String, String> getPartitionHash() {
+        public String getPartitionHash() {
             return partitionHash;
         }
 
-        public void setPartitionHash(Map<String, String> partitionHash) {
+        public void setPartitionHash(String partitionHash) {
             this.partitionHash = partitionHash;
         }
     }
@@ -186,6 +185,7 @@ public class MQProperties {
     public void setAliyunSecretKey(String aliyunSecretKey) {
         this.aliyunSecretKey = aliyunSecretKey;
     }
+
     public int getMaxRequestSize() {
         return maxRequestSize;
     }

+ 29 - 16
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQMessageUtils;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
@@ -80,22 +81,37 @@ public class CanalKafkaProducer implements CanalMQProducer {
         // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
             try {
-                ProducerRecord<String, Message> record;
+                ProducerRecord<String, Message> record = null;
                 if (canalDestination.getPartition() != null) {
-                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
+                    record = new ProducerRecord<>(canalDestination.getTopic(),
                         canalDestination.getPartition(),
                         null,
                         message);
                 } else {
-                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
+                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
+                        Message[] messages = MQMessageUtils.messagePartition(message,
+                            canalDestination.getPartitionsNum(),
+                            canalDestination.getPartitionHash());
+                        int length = messages.length;
+                        for (int i = 0; i < length; i++) {
+                            Message messagePartition = messages[i];
+                            if (messagePartition != null) {
+                                record = new ProducerRecord<>(canalDestination.getTopic(), i, null, messagePartition);
+                            }
+                        }
+                    } else {
+                        record = new ProducerRecord<>(canalDestination.getTopic(), 0, null, message);
+                    }
                 }
 
-                producer.send(record).get();
+                if (record != null) {
+                    producer.send(record).get();
 
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Send  message to kafka topic: [{}], packet: {}",
-                        canalDestination.getTopic(),
-                        message.toString());
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Send  message to kafka topic: [{}], packet: {}",
+                            canalDestination.getTopic(),
+                            message.toString());
+                    }
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
@@ -105,13 +121,12 @@ public class CanalKafkaProducer implements CanalMQProducer {
             }
         } else {
             // 发送扁平数据json
-            List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (canalDestination.getPartition() != null) {
                         try {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                canalDestination.getTopic(),
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                 canalDestination.getPartition(),
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
@@ -125,7 +140,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                     } else {
                         if (canalDestination.getPartitionHash() != null
                             && !canalDestination.getPartitionHash().isEmpty()) {
-                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                 canalDestination.getPartitionsNum(),
                                 canalDestination.getPartitionHash());
                             int length = partitionFlatMessage.length;
@@ -133,8 +148,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                            canalDestination.getTopic(),
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                             i,
                                             null,
                                             JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
@@ -149,8 +163,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         } else {
                             try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                    canalDestination.getTopic(),
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                     0,
                                     null,
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));

+ 93 - 44
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,14 +1,7 @@
 package com.alibaba.otter.canal.rocketmq;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.server.exception.CanalServerException;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
+import java.util.List;
+
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -20,7 +13,16 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQMessageUtils;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 
 public class CanalRocketMQProducer implements CanalMQProducer {
 
@@ -34,8 +36,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     public void init(MQProperties rocketMQProperties) {
         this.mqProperties = rocketMQProperties;
         RPCHook rpcHook = null;
-        if(rocketMQProperties.getAliyunAccessKey().length() > 0
-            && rocketMQProperties.getAliyunSecretKey().length() > 0){
+        if (rocketMQProperties.getAliyunAccessKey().length() > 0
+            && rocketMQProperties.getAliyunSecretKey().length() > 0) {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
             sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
@@ -59,40 +61,85 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                      Callback callback) {
         if (!mqProperties.getFlatMessage()) {
             try {
-                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
-                    mqProperties.isFilterTransactionEntry()));
-                logger.debug("send message:{} to destination:{}, partition: {}",
-                    message,
-                    destination.getCanalDestination(),
-                    destination.getPartition());
-                this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                    @Override
-                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                        int partition = 0;
-                        if (destination.getPartition() != null) {
-                            partition = destination.getPartition();
+                if (destination.getPartition() != null) {
+                    Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
+                        mqProperties.isFilterTransactionEntry()));
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("send message:{} to destination:{}, partition: {}",
+                            message,
+                            destination.getCanalDestination(),
+                            destination.getPartition());
+                    }
+                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                        @Override
+                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                            int partition = 0;
+                            if (destination.getPartition() != null) {
+                                partition = destination.getPartition();
+                            }
+                            return mqs.get(partition);
+                        }
+                    }, null);
+                } else {
+                    if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                        com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(data,
+                            destination.getPartitionsNum(),
+                            destination.getPartitionHash());
+                        int length = messages.length;
+                        for (int i = 0; i < length; i++) {
+                            com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
+                            if (dataPartition != null) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("flatMessagePart: {}, partition: {}",
+                                        JSON.toJSONString(dataPartition, SerializerFeature.WriteMapNullValue),
+                                        i);
+                                }
+                                final int index = i;
+                                try {
+                                    Message message = new Message(destination.getTopic(),
+                                        CanalMessageSerializer.serializer(dataPartition,
+                                            mqProperties.isFilterTransactionEntry()));
+                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                                        @Override
+                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                            if (index > mqs.size()) {
+                                                throw new CanalServerException("partition number is error,config num:"
+                                                                               + destination.getPartitionsNum()
+                                                                               + ", mq num: " + mqs.size());
+                                            }
+                                            return mqs.get(index);
+                                        }
+                                    }, null);
+                                } catch (Exception e) {
+                                    logger.error("send flat message to hashed partition error", e);
+                                    callback.rollback();
+                                    return;
+                                }
+                            }
                         }
-                        return mqs.get(partition);
                     }
-                }, null);
+                }
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
                 callback.rollback();
                 return;
             }
         } else {
-            List<FlatMessage> flatMessages = FlatMessage.messageConverter(data);
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (destination.getPartition() != null) {
                         try {
-                            logger.info("send flat message: {} to topic: {} fixed partition: {}",
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
-                                destination.getTopic(),
-                                destination.getPartition());
-                            Message message = new Message(destination.getTopic(),
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("send message: {} to topic: {} fixed partition: {}",
+                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
+                                    destination.getTopic(),
+                                    destination.getPartition());
+                            }
+                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage,
+                                SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
@@ -107,34 +154,36 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                         }
                     } else {
                         if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
-                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                 destination.getPartitionsNum(),
                                 destination.getPartitionHash());
                             int length = partitionFlatMessage.length;
                             for (int i = 0; i < length; i++) {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
-                                    logger.debug("flatMessagePart: {}, partition: {}",
-                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
-                                        i);
+                                    if (logger.isDebugEnabled()) {
+                                        logger.debug("flatMessagePart: {}, partition: {}",
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
+                                            i);
+                                    }
                                     final int index = i;
                                     try {
                                         Message message = new Message(destination.getTopic(),
-                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue).getBytes());
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)
+                                                .getBytes());
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                             @Override
-                                            public MessageQueue select(List<MessageQueue> mqs, Message msg,
-                                                                       Object arg) {
+                                            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                                 if (index > mqs.size()) {
-                                                    throw new CanalServerException(
-                                                        "partition number is error,config num:"
+                                                    throw new CanalServerException("partition number is error,config num:"
                                                                                    + destination.getPartitionsNum()
                                                                                    + ", mq num: " + mqs.size());
                                                 }
                                                 return mqs.get(index);
                                             }
-                                        }, null);
+                                        },
+                                            null);
                                     } catch (Exception e) {
                                         logger.error("send flat message to hashed partition error", e);
                                         callback.rollback();

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -138,7 +138,7 @@ public class CanalMQStarter {
                 canalDestination.setTopic(mqConfig.getTopic());
                 canalDestination.setPartition(mqConfig.getPartition());
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
-                canalDestination.setPartitionHash(mqConfig.getPartitionHashProperties());
+                canalDestination.setPartitionHash(mqConfig.getPartitionHash());
 
                 canalServer.subscribe(clientIdentity);
                 logger.info("## the MQ producer: {} is running now ......", destination);