Bladeren bron

Merge pull request #1345 from rewerma/master

fixed 1341
agapple 6 jaren geleden
bovenliggende
commit
f100aeda39

+ 3 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -102,13 +102,13 @@ public class ESSyncService {
             }
         } catch (Exception e) {
             logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
-            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
     }
 
     /**
      * 插入操作dml
-     * 
+     *
      * @param config es配置
      * @param dml dml数据
      */
@@ -429,7 +429,7 @@ public class ESSyncService {
 
     /**
      * 主表(单表)复杂字段insert
-     * 
+     *
      * @param config es配置
      * @param dml dml信息
      * @param data 单行dml数据

+ 7 - 17
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -13,8 +13,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -30,11 +28,7 @@ import com.alibaba.otter.canal.client.adapter.hbase.monitor.HbaseConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
 import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.*;
 
 /**
  * HBase外部适配器
@@ -50,7 +44,6 @@ public class HbaseAdapter implements OuterAdapter {
     private Map<String, MappingConfig>              hbaseMapping       = new ConcurrentHashMap<>();                  // 文件名对应配置
     private Map<String, Map<String, MappingConfig>> mappingConfigCache = new ConcurrentHashMap<>();                  // 库名-表名对应配置
 
-    private Connection                              conn;
     private HbaseSyncService                        hbaseSyncService;
     private HbaseTemplate                           hbaseTemplate;
 
@@ -90,8 +83,7 @@ public class HbaseAdapter implements OuterAdapter {
 
             Configuration hbaseConfig = HBaseConfiguration.create();
             properties.forEach(hbaseConfig::set);
-            conn = ConnectionFactory.createConnection(hbaseConfig);
-            hbaseTemplate = new HbaseTemplate(conn);
+            hbaseTemplate = new HbaseTemplate(hbaseConfig);
             hbaseSyncService = new HbaseSyncService(hbaseTemplate);
 
             configMonitor = new HbaseConfigMonitor();
@@ -174,7 +166,7 @@ public class HbaseAdapter implements OuterAdapter {
         String hbaseTable = config.getHbaseMapping().getHbaseTable();
         long rowCount = 0L;
         try {
-            HTable table = (HTable) conn.getTable(TableName.valueOf(hbaseTable));
+            HTable table = (HTable) hbaseTemplate.getConnection().getTable(TableName.valueOf(hbaseTable));
             Scan scan = new Scan();
             scan.setFilter(new FirstKeyOnlyFilter());
             ResultScanner resultScanner = table.getScanner(scan);
@@ -195,12 +187,10 @@ public class HbaseAdapter implements OuterAdapter {
         if (configMonitor != null) {
             configMonitor.destroy();
         }
-        if (conn != null) {
-            try {
-                conn.close();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
+        try {
+            hbaseTemplate.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 

+ 15 - 19
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -29,28 +29,24 @@ public class HbaseSyncService {
     }
 
     public void sync(MappingConfig config, Dml dml) {
-        try {
-            if (config != null) {
-                String type = dml.getType();
-                if (type != null && type.equalsIgnoreCase("INSERT")) {
-                    insert(config, dml);
-                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                    update(config, dml);
-                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                    delete(config, dml);
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
-                }
+        if (config != null) {
+            String type = dml.getType();
+            if (type != null && type.equalsIgnoreCase("INSERT")) {
+                insert(config, dml);
+            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                update(config, dml);
+            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                delete(config, dml);
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
             }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * 插入操作
-     * 
+     *
      * @param config 配置项
      * @param dml DML数据
      */
@@ -102,7 +98,7 @@ public class HbaseSyncService {
 
     /**
      * 将Map数据转换为HRow行数据
-     * 
+     *
      * @param hbaseMapping hbase映射配置
      * @param hRow 行对象
      * @param data Map数据
@@ -160,7 +156,7 @@ public class HbaseSyncService {
 
     /**
      * 更新操作
-     * 
+     *
      * @param config 配置对象
      * @param dml dml对象
      */
@@ -380,7 +376,7 @@ public class HbaseSyncService {
 
     /**
      * 根据对应的类型进行转换
-     * 
+     *
      * @param columnItem 列项配置
      * @param hbaseMapping hbase映射配置
      * @param value 值

+ 39 - 19
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/HbaseTemplate.java

@@ -5,14 +5,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,16 +22,33 @@ import org.slf4j.LoggerFactory;
  */
 public class HbaseTemplate {
 
-    private Logger     logger = LoggerFactory.getLogger(this.getClass());
+    private Logger        logger = LoggerFactory.getLogger(this.getClass());
 
-    private Connection conn;
+    private Configuration hbaseConfig;                                      // hbase配置对象
+    private Connection    conn;                                             // hbase连接
 
-    public HbaseTemplate(Connection conn){
-        this.conn = conn;
+    public HbaseTemplate(Configuration hbaseConfig){
+        this.hbaseConfig = hbaseConfig;
+        initConn();
+    }
+
+    private void initConn() {
+        try {
+            this.conn = ConnectionFactory.createConnection(hbaseConfig);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Connection getConnection() {
+        if (conn == null || conn.isAborted() || conn.isClosed()) {
+            initConn();
+        }
+        return conn;
     }
 
     public boolean tableExists(String tableName) {
-        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
+        try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
 
             return admin.tableExists(TableName.valueOf(tableName));
         } catch (IOException e) {
@@ -43,7 +57,7 @@ public class HbaseTemplate {
     }
 
     public void createTable(String tableName, String... familyNames) {
-        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
+        try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
 
             HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
             // 添加列簇
@@ -60,7 +74,7 @@ public class HbaseTemplate {
     }
 
     public void disableTable(String tableName) {
-        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
+        try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
             admin.disableTable(tableName);
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
@@ -69,7 +83,7 @@ public class HbaseTemplate {
     }
 
     public void deleteTable(String tableName) {
-        try (HBaseAdmin admin = (HBaseAdmin) conn.getAdmin()) {
+        try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
             if (admin.isTableEnabled(tableName)) {
                 disableTable(tableName);
             }
@@ -82,7 +96,7 @@ public class HbaseTemplate {
 
     /**
      * 插入一行数据
-     * 
+     *
      * @param tableName 表名
      * @param hRow 行数据对象
      * @return 是否成功
@@ -90,7 +104,7 @@ public class HbaseTemplate {
     public Boolean put(String tableName, HRow hRow) {
         boolean flag = false;
         try {
-            HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
+            HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
             Put put = new Put(hRow.getRowKey());
             for (HRow.HCell hCell : hRow.getCells()) {
                 put.addColumn(Bytes.toBytes(hCell.getFamily()), Bytes.toBytes(hCell.getQualifier()), hCell.getValue());
@@ -106,7 +120,7 @@ public class HbaseTemplate {
 
     /**
      * 批量插入
-     * 
+     *
      * @param tableName 表名
      * @param rows 行数据对象集合
      * @return 是否成功
@@ -114,7 +128,7 @@ public class HbaseTemplate {
     public Boolean puts(String tableName, List<HRow> rows) {
         boolean flag = false;
         try {
-            HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
+            HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
             List<Put> puts = new ArrayList<>();
             for (HRow hRow : rows) {
                 Put put = new Put(hRow.getRowKey());
@@ -137,7 +151,7 @@ public class HbaseTemplate {
 
     /**
      * 批量删除数据
-     * 
+     *
      * @param tableName 表名
      * @param rowKeys rowKey集合
      * @return 是否成功
@@ -145,7 +159,7 @@ public class HbaseTemplate {
     public Boolean deletes(String tableName, Set<byte[]> rowKeys) {
         boolean flag = false;
         try {
-            HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
+            HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
             List<Delete> deletes = new ArrayList<>();
             for (byte[] rowKey : rowKeys) {
                 Delete delete = new Delete(rowKey);
@@ -160,4 +174,10 @@ public class HbaseTemplate {
         }
         return flag;
     }
+
+    public void close() throws IOException {
+        if (conn != null) {
+            conn.close();
+        }
+    }
 }

+ 107 - 120
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -7,10 +7,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 import java.util.function.Function;
 
 import javax.sql.DataSource;
@@ -88,41 +85,39 @@ public class RdbSyncService {
      * @param function 回调方法
      */
     public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
-        try {
-            boolean toExecute = false;
-            for (Dml dml : dmls) {
-                if (!toExecute) {
-                    toExecute = function.apply(dml);
-                } else {
-                    function.apply(dml);
-                }
+        boolean toExecute = false;
+        for (Dml dml : dmls) {
+            if (!toExecute) {
+                toExecute = function.apply(dml);
+            } else {
+                function.apply(dml);
             }
-            if (toExecute) {
-                List<Future> futures = new ArrayList<>();
-                for (int i = 0; i < threads; i++) {
-                    int j = i;
-                    futures.add(executorThreads[i].submit(() -> {
+        }
+        if (toExecute) {
+            List<Future> futures = new ArrayList<>();
+            for (int i = 0; i < threads; i++) {
+                int j = i;
+                futures.add(executorThreads[i].submit(() -> {
+                    try {
                         dmlsPartition[j]
                             .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                        dmlsPartition[j].clear();
                         batchExecutors[j].commit();
                         return true;
-                    }));
-                }
-
-                futures.forEach(future -> {
-                    try {
-                        future.get();
                     } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
+                        batchExecutors[j].rollback();
+                        throw new RuntimeException(e);
                     }
-                });
+                }));
+            }
 
-                for (int i = 0; i < threads; i++) {
-                    dmlsPartition[i].clear();
+            futures.forEach(future -> {
+                try {
+                    future.get();
+                } catch (ExecutionException | InterruptedException e) {
+                    throw new RuntimeException(e);
                 }
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+            });
         }
     }
 
@@ -181,22 +176,18 @@ public class RdbSyncService {
      * @param dml DML
      */
     public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
-        try {
-            if (config != null) {
-                String type = dml.getType();
-                if (type != null && type.equalsIgnoreCase("INSERT")) {
-                    insert(batchExecutor, config, dml);
-                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                    update(batchExecutor, config, dml);
-                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                    delete(batchExecutor, config, dml);
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
-                }
+        if (config != null) {
+            String type = dml.getType();
+            if (type != null && type.equalsIgnoreCase("INSERT")) {
+                insert(batchExecutor, config, dml);
+            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                update(batchExecutor, config, dml);
+            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                delete(batchExecutor, config, dml);
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
             }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
         }
     }
 
@@ -214,47 +205,45 @@ public class RdbSyncService {
 
         DbMapping dbMapping = config.getDbMapping();
 
-        try {
-            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
-            StringBuilder insertSql = new StringBuilder();
-            insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
+        StringBuilder insertSql = new StringBuilder();
+        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
 
-            columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
-            int len = insertSql.length();
-            insertSql.delete(len - 1, len).append(") VALUES (");
-            int mapLen = columnsMap.size();
-            for (int i = 0; i < mapLen; i++) {
-                insertSql.append("?,");
-            }
-            len = insertSql.length();
-            insertSql.delete(len - 1, len).append(")");
-
-            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
-
-            List<Map<String, ?>> values = new ArrayList<>();
-            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                String targetColumnName = entry.getKey();
-                String srcColumnName = entry.getValue();
-                if (srcColumnName == null) {
-                    srcColumnName = Util.cleanColumn(targetColumnName);
-                }
-
-                Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+        int len = insertSql.length();
+        insertSql.delete(len - 1, len).append(") VALUES (");
+        int mapLen = columnsMap.size();
+        for (int i = 0; i < mapLen; i++) {
+            insertSql.append("?,");
+        }
+        len = insertSql.length();
+        insertSql.delete(len - 1, len).append(")");
 
-                Object value = data.get(srcColumnName);
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
-                BatchExecutor.setValue(values, type, value);
+        List<Map<String, ?>> values = new ArrayList<>();
+        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = Util.cleanColumn(targetColumnName);
             }
 
-            batchExecutor.execute(insertSql.toString(), values);
-            if (logger.isTraceEnabled()) {
-                logger.trace("Insert into target table, sql: {}", insertSql);
+            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
             }
+            Object value = data.get(srcColumnName);
 
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+            BatchExecutor.setValue(values, type, value);
         }
+
+        batchExecutor.execute(insertSql.toString(), values);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Insert into target table, sql: {}", insertSql);
+        }
+
     }
 
     /**
@@ -276,43 +265,42 @@ public class RdbSyncService {
 
         DbMapping dbMapping = config.getDbMapping();
 
-        try {
-            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
-
-            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
-
-            StringBuilder updateSql = new StringBuilder();
-            updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
-            List<Map<String, ?>> values = new ArrayList<>();
-            for (String srcColumnName : old.keySet()) {
-                List<String> targetColumnNames = new ArrayList<>();
-                columnsMap.forEach((targetColumn, srcColumn) -> {
-                    if (srcColumnName.toLowerCase().equals(srcColumn)) {
-                        targetColumnNames.add(targetColumn);
-                    }
-                });
-                if (!targetColumnNames.isEmpty()) {
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
-                    for (String targetColumnName : targetColumnNames) {
-                        updateSql.append(targetColumnName).append("=?, ");
-                        Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
-                        BatchExecutor.setValue(values, type, data.get(srcColumnName));
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+        StringBuilder updateSql = new StringBuilder();
+        updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
+        List<Map<String, ?>> values = new ArrayList<>();
+        for (String srcColumnName : old.keySet()) {
+            List<String> targetColumnNames = new ArrayList<>();
+            columnsMap.forEach((targetColumn, srcColumn) -> {
+                if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                    targetColumnNames.add(targetColumn);
+                }
+            });
+            if (!targetColumnNames.isEmpty()) {
+
+                for (String targetColumnName : targetColumnNames) {
+                    updateSql.append(targetColumnName).append("=?, ");
+                    Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+                    if (type == null) {
+                        throw new RuntimeException("Target column: " + targetColumnName + " not matched");
                     }
+                    BatchExecutor.setValue(values, type, data.get(srcColumnName));
                 }
             }
-            int len = updateSql.length();
-            updateSql.delete(len - 2, len).append(" WHERE ");
+        }
+        int len = updateSql.length();
+        updateSql.delete(len - 2, len).append(" WHERE ");
 
-            // 拼接主键
-            appendCondition(dbMapping, updateSql, ctype, values, data, old);
+        // 拼接主键
+        appendCondition(dbMapping, updateSql, ctype, values, data, old);
 
-            batchExecutor.execute(updateSql.toString(), values);
+        batchExecutor.execute(updateSql.toString(), values);
 
-            if (logger.isTraceEnabled()) {
-                logger.trace("Update target table, sql: {}", updateSql);
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Update target table, sql: {}", updateSql);
         }
     }
 
@@ -330,23 +318,19 @@ public class RdbSyncService {
 
         DbMapping dbMapping = config.getDbMapping();
 
-        try {
-            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
-            StringBuilder sql = new StringBuilder();
-            sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
+        StringBuilder sql = new StringBuilder();
+        sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
 
-            List<Map<String, ?>> values = new ArrayList<>();
-            // 拼接主键
-            appendCondition(dbMapping, sql, ctype, values, data);
+        List<Map<String, ?>> values = new ArrayList<>();
+        // 拼接主键
+        appendCondition(dbMapping, sql, ctype, values, data);
 
-            batchExecutor.execute(sql.toString(), values);
+        batchExecutor.execute(sql.toString(), values);
 
-            if (logger.isTraceEnabled()) {
-                logger.trace("Delete from target table, sql: {}", sql);
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+        if (logger.isTraceEnabled()) {
+            logger.trace("Delete from target table, sql: {}", sql);
         }
     }
 
@@ -405,6 +389,9 @@ public class RdbSyncService {
             }
             sql.append(targetColumnName).append("=? AND ");
             Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
+            if (type == null) {
+                throw new RuntimeException("Target column: " + targetColumnName + " not matched");
+            }
             // 如果有修改主键的情况
             if (o != null && o.containsKey(srcColumnName)) {
                 BatchExecutor.setValue(values, type, o.get(srcColumnName));

+ 13 - 11
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java

@@ -16,18 +16,11 @@ public class BatchExecutor implements Closeable {
 
     private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);
 
-    private Integer             key;
     private Connection          conn;
     private AtomicInteger       idx    = new AtomicInteger(0);
 
     public BatchExecutor(Connection conn){
-        this(1, conn);
-    }
-
-    public BatchExecutor(Integer key, Connection conn){
-        this.key = key;
         this.conn = conn;
-
         try {
             this.conn.setAutoCommit(false);
         } catch (SQLException e) {
@@ -35,9 +28,6 @@ public class BatchExecutor implements Closeable {
         }
     }
 
-    public Integer getKey() {
-        return key;
-    }
 
     public Connection getConn() {
         return conn;
@@ -70,7 +60,19 @@ public class BatchExecutor implements Closeable {
         try {
             conn.commit();
             if (logger.isTraceEnabled()) {
-                logger.trace("Batch executor: " + key + " commit " + idx.get() + " rows");
+                logger.trace("Batch executor commit " + idx.get() + " rows");
+            }
+            idx.set(0);
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void rollback() {
+        try {
+            conn.rollback();
+            if (logger.isTraceEnabled()) {
+                logger.trace("Batch executor rollback " + idx.get() + " rows");
             }
             idx.set(0);
         } catch (SQLException e) {

+ 7 - 7
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -8,13 +8,13 @@ dbMapping:
   targetTable: mytest.tb_user
   targetPk:
     id: id
-  mapAll: true
-#  targetColumns:
-#    id:
-#    name:
-#    role_id:
-#    c_time:
-#    test1:
+#  mapAll: true
+  targetColumns:
+    id:
+    name:
+    role_id:
+    c_time:
+    test1:
 
 
 # Mirror schema synchronize config

+ 2 - 0
deployer/src/main/resources/example/instance.properties

@@ -45,6 +45,8 @@ canal.instance.filter.black.regex=
 
 # mq config
 canal.mq.topic=example
+# 动态topic, 需mq支持动态创建topic
+#canal.mq.dynamicTopic=.*,mytest\\..*,mytest2.user
 canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=3

+ 8 - 0
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -6,6 +6,7 @@ public class CanalMQConfig {
     private Integer partition;
     private Integer partitionsNum;
     private String  partitionHash;
+    private String dynamicTopic;
 
     public String getTopic() {
         return topic;
@@ -39,4 +40,11 @@ public class CanalMQConfig {
         this.partitionHash = partitionHash;
     }
 
+    public String getDynamicTopic() {
+        return dynamicTopic;
+    }
+
+    public void setDynamicTopic(String dynamicTopic) {
+        this.dynamicTopic = dynamicTopic;
+    }
 }

+ 185 - 51
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -1,11 +1,6 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -23,49 +18,143 @@ import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * process MQ Message utils
- * 
+ *
  * @author agapple 2018年12月11日 下午1:28:32
  */
 public class MQMessageUtils {
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
-                                                                       new Function<String, List<PartitionData>>() {
-
-                                                                           public List<PartitionData> apply(String pkHashConfigs) {
-                                                                               List<PartitionData> datas = Lists.newArrayList();
-                                                                               String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
-                                                                                   ",");
-                                                                               // schema.table:id^name
-                                                                               for (String pkHashConfig : pkHashConfigArray) {
-                                                                                   PartitionData data = new PartitionData();
-                                                                                   int i = pkHashConfig.lastIndexOf(":");
-                                                                                   if (i > 0) {
-                                                                                       String pkStr = pkHashConfig.substring(i + 1);
-                                                                                       if (pkStr.equalsIgnoreCase("$pk$")) {
-                                                                                           data.hashMode.autoPkHash = true;
-                                                                                       } else {
-                                                                                           data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
-                                                                                               '^'));
-                                                                                       }
-
-                                                                                       pkHashConfig = pkHashConfig.substring(0,
-                                                                                           i);
-                                                                                   } else {
-                                                                                       data.hashMode.tableHash = true;
-                                                                                   }
-
-                                                                                   if (!isWildCard(pkHashConfig)) {
-                                                                                       data.simpleName = pkHashConfig;
-                                                                                   } else {
-                                                                                       data.regexFilter = new AviaterRegexFilter(pkHashConfig);
-                                                                                   }
-                                                                                   datas.add(data);
-                                                                               }
-
-                                                                               return datas;
-                                                                           }
-                                                                       });
+    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap
+        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<PartitionData>>() {
+
+                                                                                 public List<PartitionData> apply(String pkHashConfigs) {
+                                                                                     List<PartitionData> datas = Lists
+                                                                                         .newArrayList();
+                                                                                     String[] pkHashConfigArray = StringUtils
+                                                                                         .split(pkHashConfigs, ",");
+                                                                                     // schema.table:id^name
+                                                                                     for (String pkHashConfig : pkHashConfigArray) {
+                                                                                         PartitionData data = new PartitionData();
+                                                                                         int i = pkHashConfig
+                                                                                             .lastIndexOf(":");
+                                                                                         if (i > 0) {
+                                                                                             String pkStr = pkHashConfig
+                                                                                                 .substring(i + 1);
+                                                                                             if (pkStr.equalsIgnoreCase(
+                                                                                                 "$pk$")) {
+                                                                                                 data.hashMode.autoPkHash = true;
+                                                                                             } else {
+                                                                                                 data.hashMode.pkNames = Lists
+                                                                                                     .newArrayList(
+                                                                                                         StringUtils
+                                                                                                             .split(
+                                                                                                                 pkStr,
+                                                                                                                 '^'));
+                                                                                             }
+
+                                                                                             pkHashConfig = pkHashConfig
+                                                                                                 .substring(0, i);
+                                                                                         } else {
+                                                                                             data.hashMode.tableHash = true;
+                                                                                         }
+
+                                                                                         if (!isWildCard(
+                                                                                             pkHashConfig)) {
+                                                                                             data.simpleName = pkHashConfig;
+                                                                                         } else {
+                                                                                             data.regexFilter = new AviaterRegexFilter(
+                                                                                                 pkHashConfig);
+                                                                                         }
+                                                                                         datas.add(data);
+                                                                                     }
+
+                                                                                     return datas;
+                                                                                 }
+                                                                             });
+
+    @SuppressWarnings("deprecation")
+    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap
+        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<DynamicTopicData>>() {
+
+                                                                                 public List<DynamicTopicData> apply(String pkHashConfigs) {
+                                                                                     List<DynamicTopicData> datas = Lists
+                                                                                         .newArrayList();
+                                                                                     String[] dynamicTopicArray = StringUtils
+                                                                                         .split(pkHashConfigs, ",");
+                                                                                     // schema.table
+                                                                                     for (String dynamicTopic : dynamicTopicArray) {
+                                                                                         DynamicTopicData data = new DynamicTopicData();
+
+                                                                                         if (!isWildCard(
+                                                                                             dynamicTopic)) {
+                                                                                             data.simpleName = dynamicTopic;
+                                                                                         } else {
+                                                                                             if (dynamicTopic
+                                                                                                 .contains("\\.")) {
+                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(
+                                                                                                     dynamicTopic);
+                                                                                             } else {
+                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(
+                                                                                                     dynamicTopic);
+                                                                                             }
+                                                                                         }
+                                                                                         datas.add(data);
+                                                                                     }
+
+                                                                                     return datas;
+                                                                                 }
+                                                                             });
+
+    /**
+     * 按 schema 或者 schema+table 将 message 分配到对应topic
+     *
+     * @param message 原message
+     * @param defaultTopic 默认topic
+     * @param dynamicTopicConfigs 动态topic规则
+     * @return 分隔后的message map
+     */
+    public static Map<String, Message> messageTopics(Message message, String defaultTopic, String dynamicTopicConfigs) {
+        List<CanalEntry.Entry> entries;
+        if (message.isRaw()) {
+            List<ByteString> rawEntries = message.getRawEntries();
+            entries = new ArrayList<>(rawEntries.size());
+            for (ByteString byteString : rawEntries) {
+                CanalEntry.Entry entry;
+                try {
+                    entry = CanalEntry.Entry.parseFrom(byteString);
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
+                entries.add(entry);
+            }
+        } else {
+            entries = message.getEntries();
+        }
+        Map<String, Message> messages = new HashMap<>();
+        for (CanalEntry.Entry entry : entries) {
+            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                continue;
+            }
+
+            String schemaName = entry.getHeader().getSchemaName();
+            String tableName = entry.getHeader().getTableName();
+
+            if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) {
+                put2MapMessage(messages, message.getId(), defaultTopic, entry);
+            } else {
+                if (matchDynamicTopic(schemaName + "." + tableName, dynamicTopicConfigs)) {
+                    put2MapMessage(messages, message.getId(), schemaName + "." + tableName, entry);
+                } else if (matchDynamicTopic(schemaName, dynamicTopicConfigs)) {
+                    put2MapMessage(messages, message.getId(), schemaName, entry);
+                } else {
+                    put2MapMessage(messages, message.getId(), defaultTopic, entry);
+                }
+            }
+
+        }
+        return messages;
+    }
 
     /**
      * 将 message 分区
@@ -116,7 +205,7 @@ public class MQMessageUtils {
                 if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
                     String database = entry.getHeader().getSchemaName();
                     String table = entry.getHeader().getTableName();
-                    HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                    HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
                     if (hashMode == null) {
                         // 如果都没有匹配,发送到第一个分区
                         partitionEntries[0].add(entry);
@@ -192,8 +281,9 @@ public class MQMessageUtils {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
-                                               + entry.toString(), e);
+                    throw new RuntimeException(
+                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
+                        e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -307,7 +397,7 @@ public class MQMessageUtils {
             if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
                 String database = flatMessage.getDatabase();
                 String table = flatMessage.getTable();
-                HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
                 if (hashMode == null) {
                     // 如果都没有匹配,发送到第一个分区
                     partitionMessages[0] = flatMessage;
@@ -373,7 +463,7 @@ public class MQMessageUtils {
     /**
      * match return List , not match return null
      */
-    public static HashMode getParitionHashColumns(String name, String pkHashConfigs) {
+    public static HashMode getPartitionHashColumns(String name, String pkHashConfigs) {
         if (StringUtils.isEmpty(pkHashConfigs)) {
             return null;
         }
@@ -394,6 +484,34 @@ public class MQMessageUtils {
         return null;
     }
 
+    public static boolean matchDynamicTopic(String name, String dynamicTopicConfigs) {
+        if (StringUtils.isEmpty(dynamicTopicConfigs)) {
+            return false;
+        }
+
+        boolean res = false;
+        List<DynamicTopicData> datas = dynamicTopicDatas.get(dynamicTopicConfigs);
+        for (DynamicTopicData data : datas) {
+            if (data.simpleName != null) {
+                if (data.simpleName.equalsIgnoreCase(name)) {
+                    res = true;
+                    break;
+                }
+            } else if (name.contains(".")) {
+                if (data.tableRegexFilter != null && data.tableRegexFilter.filter(name)) {
+                    res = true;
+                    break;
+                }
+            } else {
+                if (data.schemaRegexFilter != null && data.schemaRegexFilter.filter(name)) {
+                    res = true;
+                    break;
+                }
+            }
+        }
+        return res;
+    }
+
     public static boolean checkPkNamesHasContain(List<String> pkNames, String name) {
         for (String pkName : pkNames) {
             if (pkName.equalsIgnoreCase(name)) {
@@ -406,8 +524,18 @@ public class MQMessageUtils {
 
     private static boolean isWildCard(String value) {
         // not contaiins '.' ?
-        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
-                '^' });
+        return StringUtils.containsAny(value,
+            new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$', '^' });
+    }
+
+    private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,
+                                       CanalEntry.Entry entry) {
+        Message message = messageMap.get(topicName);
+        if (message == null) {
+            message = new Message(messageId, new ArrayList<CanalEntry.Entry>());
+            messageMap.put(topicName, message);
+        }
+        message.getEntries().add(entry);
     }
 
     public static class PartitionData {
@@ -424,4 +552,10 @@ public class MQMessageUtils {
         public List<String> pkNames    = Lists.newArrayList();
     }
 
+    public static class DynamicTopicData {
+
+        public String             simpleName;
+        public AviaterRegexFilter schemaRegexFilter;
+        public AviaterRegexFilter tableRegexFilter;
+    }
 }

+ 9 - 1
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.common;
 
-
 /**
  * kafka 配置项
  *
@@ -32,6 +31,7 @@ public class MQProperties {
         private Integer partition;
         private Integer partitionsNum;
         private String  partitionHash;
+        private String  dynamicTopic;
 
         public String getCanalDestination() {
             return canalDestination;
@@ -72,6 +72,14 @@ public class MQProperties {
         public void setPartitionHash(String partitionHash) {
             this.partitionHash = partitionHash;
         }
+
+        public String getDynamicTopic() {
+            return dynamicTopic;
+        }
+
+        public void setDynamicTopic(String dynamicTopic) {
+            this.dynamicTopic = dynamicTopic;
+        }
     }
 
     public String getServers() {

+ 55 - 49
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,8 +1,10 @@
 package com.alibaba.otter.canal.kafka;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -78,15 +80,32 @@ public class CanalKafkaProducer implements CanalMQProducer {
     @Override
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
 
+        if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
+            // 动态topic
+            Map<String, Message> messageMap = MQMessageUtils
+                .messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
+
+            for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
+                String topicName = entry.getKey();
+                Message messageSub = entry.getValue();
+                if (logger.isDebugEnabled()) {
+                    logger.debug("## Send message to kafka topic: " + topicName);
+                }
+                send(canalDestination, topicName, messageSub, callback);
+            }
+        } else {
+            send(canalDestination, canalDestination.getTopic(), message, callback);
+        }
+    }
+
+    private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message,
+                      Callback callback) {
         // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
             try {
                 ProducerRecord<String, Message> record = null;
                 if (canalDestination.getPartition() != null) {
-                    record = new ProducerRecord<>(canalDestination.getTopic(),
-                        canalDestination.getPartition(),
-                        null,
-                        message);
+                    record = new ProducerRecord<>(topicName, canalDestination.getPartition(), null, message);
                 } else {
                     if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                         Message[] messages = MQMessageUtils.messagePartition(message,
@@ -96,21 +115,20 @@ public class CanalKafkaProducer implements CanalMQProducer {
                         for (int i = 0; i < length; i++) {
                             Message messagePartition = messages[i];
                             if (messagePartition != null) {
-                                record = new ProducerRecord<>(canalDestination.getTopic(), i, null, messagePartition);
+                                record = new ProducerRecord<>(topicName, i, null, messagePartition);
                             }
                         }
                     } else {
-                        record = new ProducerRecord<>(canalDestination.getTopic(), 0, null, message);
+                        record = new ProducerRecord<>(topicName, 0, null, message);
                     }
                 }
 
                 if (record != null) {
+                    // 同步发送原生message
                     producer.send(record).get();
 
                     if (logger.isDebugEnabled()) {
-                        logger.debug("Send  message to kafka topic: [{}], packet: {}",
-                            canalDestination.getTopic(),
-                            message.toString());
+                        logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
                     }
                 }
             } catch (Exception e) {
@@ -124,13 +142,17 @@ public class CanalKafkaProducer implements CanalMQProducer {
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
-                    if (canalDestination.getPartition() != null) {
+                    if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
                         try {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
-                                canalDestination.getPartition(),
+                            Integer partition = canalDestination.getPartition();
+                            if (partition == null) {
+                                partition = 0;
+                            }
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
+                                partition,
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                            producer2.send(record).get();
+                            producer2.send(record);
                         } catch (Exception e) {
                             logger.error(e.getMessage(), e);
                             // producer.abortTransaction();
@@ -138,47 +160,32 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             return;
                         }
                     } else {
-                        if (canalDestination.getPartitionHash() != null
-                            && !canalDestination.getPartitionHash().isEmpty()) {
-                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
-                                canalDestination.getPartitionsNum(),
-                                canalDestination.getPartitionHash());
-                            int length = partitionFlatMessage.length;
-                            for (int i = 0; i < length; i++) {
-                                FlatMessage flatMessagePart = partitionFlatMessage[i];
-                                if (flatMessagePart != null) {
-                                    try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
-                                            i,
-                                            null,
-                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
-                                        producer2.send(record).get();
-                                    } catch (Exception e) {
-                                        logger.error(e.getMessage(), e);
-                                        // producer.abortTransaction();
-                                        callback.rollback();
-                                        return;
-                                    }
+                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
+                            canalDestination.getPartitionsNum(),
+                            canalDestination.getPartitionHash());
+                        int length = partitionFlatMessage.length;
+                        for (int i = 0; i < length; i++) {
+                            FlatMessage flatMessagePart = partitionFlatMessage[i];
+                            if (flatMessagePart != null) {
+                                try {
+                                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                        topicName,
+                                        i,
+                                        null,
+                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
+                                    producer2.send(record);
+                                } catch (Exception e) {
+                                    logger.error(e.getMessage(), e);
+                                    // producer.abortTransaction();
+                                    callback.rollback();
+                                    return;
                                 }
                             }
-                        } else {
-                            try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
-                                    0,
-                                    null,
-                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                                producer2.send(record).get();
-                            } catch (Exception e) {
-                                logger.error(e.getMessage(), e);
-                                // producer.abortTransaction();
-                                callback.rollback();
-                                return;
-                            }
                         }
                     }
                     if (logger.isDebugEnabled()) {
                         logger.debug("Send flat message to kafka topic: [{}], packet: {}",
-                            canalDestination.getTopic(),
+                            topicName,
                             JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                     }
                 }
@@ -187,7 +194,6 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
         // producer.commitTransaction();
         callback.commit();
-
     }
 
 }

+ 33 - 14
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.rocketmq;
 
 import java.util.List;
+import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -59,11 +61,28 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     @Override
     public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
                      Callback callback) {
+        if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
+            // 动态topic
+            Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
+                .messageTopics(data, destination.getTopic(), destination.getDynamicTopic());
+
+            for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
+                String topicName = entry.getKey();
+                com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
+                send(destination, topicName, messageSub, callback);
+            }
+        } else {
+            send(destination, destination.getTopic(), data, callback);
+        }
+    }
+
+    public void send(final MQProperties.CanalDestination destination, String topicName,
+                     com.alibaba.otter.canal.protocol.Message data, Callback callback) {
         if (!mqProperties.getFlatMessage()) {
             try {
                 if (destination.getPartition() != null) {
-                    Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
-                        mqProperties.isFilterTransactionEntry()));
+                    Message message = new Message(topicName,
+                        CanalMessageSerializer.serializer(data, mqProperties.isFilterTransactionEntry()));
                     if (logger.isDebugEnabled()) {
                         logger.debug("send message:{} to destination:{}, partition: {}",
                             message,
@@ -83,9 +102,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                     }, null);
                 } else {
                     if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
-                        com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(data,
-                            destination.getPartitionsNum(),
-                            destination.getPartitionHash());
+                        com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils
+                            .messagePartition(data, destination.getPartitionsNum(), destination.getPartitionHash());
                         int length = messages.length;
                         for (int i = 0; i < length; i++) {
                             com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
@@ -97,7 +115,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 }
                                 final int index = i;
                                 try {
-                                    Message message = new Message(destination.getTopic(),
+                                    Message message = new Message(topicName,
                                         CanalMessageSerializer.serializer(dataPartition,
                                             mqProperties.isFilterTransactionEntry()));
                                     this.defaultMQProducer.send(message, new MessageQueueSelector() {
@@ -135,11 +153,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             if (logger.isDebugEnabled()) {
                                 logger.debug("send message: {} to topic: {} fixed partition: {}",
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
-                                    destination.getTopic(),
+                                    topicName,
                                     destination.getPartition());
                             }
-                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage,
-                                SerializerFeature.WriteMapNullValue).getBytes());
+                            Message message = new Message(topicName,
+                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
@@ -168,22 +186,23 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     }
                                     final int index = i;
                                     try {
-                                        Message message = new Message(destination.getTopic(),
+                                        Message message = new Message(topicName,
                                             JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)
                                                 .getBytes());
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                             @Override
-                                            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                            public MessageQueue select(List<MessageQueue> mqs, Message msg,
+                                                                       Object arg) {
                                                 if (index > mqs.size()) {
-                                                    throw new CanalServerException("partition number is error,config num:"
+                                                    throw new CanalServerException(
+                                                        "partition number is error,config num:"
                                                                                    + destination.getPartitionsNum()
                                                                                    + ", mq num: " + mqs.size());
                                                 }
                                                 return mqs.get(index);
                                             }
-                                        },
-                                            null);
+                                        }, null);
                                     } catch (Exception e) {
                                         logger.error("send flat message to hashed partition error", e);
                                         callback.rollback();

+ 1 - 0
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -137,6 +137,7 @@ public class CanalMQStarter {
                 CanalMQConfig mqConfig = canalInstance.getMqConfig();
                 canalDestination.setTopic(mqConfig.getTopic());
                 canalDestination.setPartition(mqConfig.getPartition());
+                canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                 canalDestination.setPartitionHash(mqConfig.getPartitionHash());