1
0
Эх сурвалжийг харах

Merge branch 'master' into feature/rdb

mcy 6 жил өмнө
parent
commit
89a4943349
20 өөрчлөгдсөн 679 нэмэгдсэн , 349 устгасан
  1. 13 0
      .github/issue_template.md
  2. 3 2
      client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterNodeAccessStrategy.java
  3. 29 0
      client/src/main/java/com/alibaba/otter/canal/client/impl/ServerNotFoundException.java
  4. 10 0
      client/src/main/java/com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor.java
  5. 3 3
      deployer/src/main/resources/example/instance.properties
  6. 1 8
      filter/src/main/java/com/alibaba/otter/canal/filter/PatternUtils.java
  7. 5 0
      filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.java
  8. 1 1
      filter/src/main/java/com/alibaba/otter/canal/filter/aviater/RegexFunction.java
  9. 4 30
      instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java
  10. 24 12
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  11. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  12. 29 9
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  13. 2 0
      pom.xml
  14. 17 212
      protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java
  15. 0 1
      protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java
  16. 405 0
      server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java
  17. 8 8
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  18. 29 16
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  19. 93 44
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java
  20. 1 1
      server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

+ 13 - 0
.github/issue_template.md

@@ -0,0 +1,13 @@
+### environment
+
+* canal version
+* mysql version
+
+### Issue Description
+
+
+### Steps to reproduce
+
+### Expected behaviour
+
+### Actual behaviour

+ 3 - 2
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterNodeAccessStrategy.java

@@ -15,7 +15,6 @@ import com.alibaba.otter.canal.common.utils.JsonUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
 import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningData;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 
 /**
  * 集群模式的调度策略
@@ -25,6 +24,7 @@ import com.alibaba.otter.canal.protocol.exception.CanalClientException;
  */
 public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
 
+    private String                           destination;
     private IZkChildListener                 childListener;                                      // 监听所有的服务器列表
     private IZkDataListener                  dataListener;                                       // 监听当前的工作节点
     private ZkClientx                        zkClient;
@@ -32,6 +32,7 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
     private volatile InetSocketAddress       runningAddress = null;
 
     public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
+        this.destination = destination;
         this.zkClient = zkClient;
         childListener = new IZkChildListener() {
 
@@ -73,7 +74,7 @@ public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {
         } else if (!currentAddress.isEmpty()) { // 如果不存在已经启动的服务,可能服务是一种lazy启动,随机选择一台触发服务器进行启动
             return currentAddress.get(0);// 默认返回第一个节点,之前已经做过shuffle
         } else {
-            throw new CanalClientException("no alive canal server");
+            throw new ServerNotFoundException("no alive canal server for " + destination);
         }
     }
 

+ 29 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/ServerNotFoundException.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.client.impl;
+
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+
+public class ServerNotFoundException extends CanalClientException {
+
+    private static final long serialVersionUID = -3471518241911601774L;
+
+    public ServerNotFoundException(String errorCode, String errorDesc, Throwable cause){
+        super(errorCode, errorDesc, cause);
+    }
+
+    public ServerNotFoundException(String errorCode, String errorDesc){
+        super(errorCode, errorDesc);
+    }
+
+    public ServerNotFoundException(String errorCode, Throwable cause){
+        super(errorCode, cause);
+    }
+
+    public ServerNotFoundException(String errorCode){
+        super(errorCode);
+    }
+
+    public ServerNotFoundException(Throwable cause){
+        super(cause);
+    }
+
+}

+ 10 - 0
client/src/main/java/com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor.java

@@ -16,6 +16,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import com.alibaba.otter.canal.client.impl.ServerNotFoundException;
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.utils.BooleanMutex;
 import com.alibaba.otter.canal.common.utils.JsonUtils;
@@ -141,6 +142,15 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
             logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
                 destination),
                 t);
+
+            // fixed issue 1220, 针对server节点不工作避免死循环
+            if (t instanceof ServerNotFoundException) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+
             // 出现任何异常尝试release
             releaseRunning();
             throw new CanalClientException("something goes wrong in initRunning method. ", t);

+ 3 - 3
deployer/src/main/resources/example/instance.properties

@@ -1,5 +1,5 @@
 #################################################
-## mysql serverId , v1.0.26+ will autoGen 
+## mysql serverId , v1.0.26+ will autoGen
 # canal.instance.mysql.slaveId=0
 
 # enable gtid use true/false
@@ -48,5 +48,5 @@ canal.mq.topic=example
 canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=3
-#canal.mq.partitionHash=mytest.person:id,mytest.role:id
-#################################################
+#canal.mq.partitionHash=test.table#id^name,.*\\..*
+#################################################

+ 1 - 8
filter/src/main/java/com/alibaba/otter/canal/filter/PatternUtils.java

@@ -7,17 +7,10 @@ import org.apache.oro.text.regex.Pattern;
 import org.apache.oro.text.regex.PatternCompiler;
 import org.apache.oro.text.regex.Perl5Compiler;
 
-import com.alibaba.otter.canal.filter.exception.CanalFilterException;
 import com.google.common.base.Function;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.MigrateMap;
 
-/**
- * 提供{@linkplain Pattern}的lazy get处理
- *
- * @author jianghang 2013-1-22 下午09:36:44
- * @version 1.0.0
- */
 public class PatternUtils {
 
     @SuppressWarnings("deprecation")
@@ -32,7 +25,7 @@ public class PatternUtils {
                                                                              | Perl5Compiler.READ_ONLY_MASK
                                                                              | Perl5Compiler.SINGLELINE_MASK);
                                                              } catch (MalformedPatternException e) {
-                                                                 throw new CanalFilterException(e);
+                                                                 throw new RuntimeException(e);
                                                              }
                                                          }
                                                      });

+ 5 - 0
filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.java

@@ -125,4 +125,9 @@ public class AviaterRegexFilter implements CanalEventFilter<String> {
         return result;
     }
 
+    @Override
+    public String toString() {
+        return pattern;
+    }
+
 }

+ 1 - 1
filter/src/main/java/com/alibaba/otter/canal/filter/aviater/RegexFunction.java

@@ -12,7 +12,7 @@ import com.googlecode.aviator.runtime.type.AviatorObject;
 
 /**
  * 提供aviator regex的代码扩展
- * 
+ *
  * @author jianghang 2012-7-23 上午10:29:23
  */
 public class RegexFunction extends AbstractFunction {

+ 4 - 30
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -1,16 +1,11 @@
 package com.alibaba.otter.canal.instance.core;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-
 public class CanalMQConfig {
 
-    private String                       topic;
-    private Integer                      partition;
-    private Integer                      partitionsNum;
-    private String                       partitionHash;
-
-    private volatile Map<String, String> partitionHashProperties;
+    private String  topic;
+    private Integer partition;
+    private Integer partitionsNum;
+    private String  partitionHash;
 
     public String getTopic() {
         return topic;
@@ -44,25 +39,4 @@ public class CanalMQConfig {
         this.partitionHash = partitionHash;
     }
 
-    public Map<String, String> getPartitionHashProperties() {
-        if (partitionHashProperties == null) {
-            synchronized (CanalMQConfig.class) {
-                if (partitionHashProperties == null) {
-                    if (partitionHash != null) {
-                        partitionHashProperties = new LinkedHashMap<>();
-                        String[] items = partitionHash.split(",");
-                        for (String item : items) {
-                            int i = item.indexOf(":");
-                            if (i > -1) {
-                                String dbTable = item.substring(0, i).trim();
-                                String pk = item.substring(i + 1).trim();
-                                partitionHashProperties.put(dbTable, pk);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return partitionHashProperties;
-    }
 }

+ 24 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -11,6 +11,7 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.MultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
 import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBFactory;
@@ -64,8 +65,29 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         super.setEventFilter(eventFilter);
 
         // 触发一下filter变更
-        if (eventFilter != null && eventFilter instanceof AviaterRegexFilter && binlogParser instanceof LogEventConvert) {
-            ((LogEventConvert) binlogParser).setNameFilter((AviaterRegexFilter) eventFilter);
+        if (eventFilter != null && eventFilter instanceof AviaterRegexFilter) {
+            if (binlogParser instanceof LogEventConvert) {
+                ((LogEventConvert) binlogParser).setNameFilter((AviaterRegexFilter) eventFilter);
+            }
+
+            if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
+                ((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
+            }
+        }
+    }
+
+    public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
+        super.setEventBlackFilter(eventBlackFilter);
+
+        // 触发一下filter变更
+        if (eventBlackFilter != null && eventBlackFilter instanceof AviaterRegexFilter) {
+            if (binlogParser instanceof LogEventConvert) {
+                ((LogEventConvert) binlogParser).setNameBlackFilter((AviaterRegexFilter) eventBlackFilter);
+            }
+
+            if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
+                ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
+            }
         }
     }
 
@@ -115,16 +137,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         super.stop();
     }
 
-    public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
-        super.setEventBlackFilter(eventBlackFilter);
-
-        // 触发一下filter变更
-        if (eventBlackFilter != null && eventBlackFilter instanceof AviaterRegexFilter
-            && binlogParser instanceof LogEventConvert) {
-            ((LogEventConvert) binlogParser).setNameBlackFilter((AviaterRegexFilter) eventBlackFilter);
-        }
-    }
-
     protected MultiStageCoprocessor buildMultiStageCoprocessor() {
         MysqlMultiStageCoprocessor mysqlMultiStageCoprocessor = new MysqlMultiStageCoprocessor(parallelBufferSize,
             parallelThreadSize,

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -506,8 +506,8 @@ public class MysqlConnection implements ErosaConnection {
         ResultSetPacket rs = null;
         try {
             rs = query("select @@global.binlog_checksum");
-        } catch (IOException e) {
-            throw new CanalParseException(e);
+        } catch (Throwable e) {
+            // ignore
         }
 
         List<String> columnValues = rs.getFieldValues();

+ 29 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -621,9 +621,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                                                       + "," + columnInfo.length + " vs " + tableMeta.getFields().size());
                     }
                 }
-            } else {
-                logger.warn("[" + event.getTable().getDbName() + "." + event.getTable().getTableName()
-                            + "] is no primary key , skip alibaba_rds_row_id column");
+                // } else {
+                // logger.warn("[" + event.getTable().getDbName() + "." +
+                // event.getTable().getTableName()
+                // + "] is no primary key , skip alibaba_rds_row_id column");
             }
         }
 
@@ -634,18 +635,35 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 continue;
             }
 
+            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
+                // 不解析最后一列
+                String rdsRowIdColumnName = "#alibaba_rds_row_id#";
+                buffer.nextValue(rdsRowIdColumnName, i, info.type, info.meta, false);
+                Column.Builder columnBuilder = Column.newBuilder();
+                columnBuilder.setName(rdsRowIdColumnName);
+                columnBuilder.setIsKey(true);
+                columnBuilder.setMysqlType("bigint");
+                columnBuilder.setIndex(i);
+                columnBuilder.setIsNull(false);
+                Serializable value = buffer.getValue();
+                columnBuilder.setValue(value.toString());
+                columnBuilder.setSqlType(Types.BIGINT);
+                columnBuilder.setUpdated(false);
+
+                if (isAfter) {
+                    rowDataBuilder.addAfterColumns(columnBuilder.build());
+                } else {
+                    rowDataBuilder.addBeforeColumns(columnBuilder.build());
+                }
+                continue;
+            }
+
             FieldMeta fieldMeta = null;
             if (tableMeta != null && !tableError) {
                 // 处理file meta
                 fieldMeta = tableMeta.getFields().get(i);
             }
 
-            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
-                // 不解析最后一列
-                buffer.nextValue(fieldMeta.getColumnName(), i, info.type, info.meta, false);
-                continue;
-            }
-
             if (fieldMeta != null && existOptionalMetaData && tableMetaCache.isOnTSDB()) {
                 // check column info
                 boolean check = StringUtils.equalsIgnoreCase(fieldMeta.getColumnName(), info.name);
@@ -974,10 +992,12 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     public void setNameFilter(AviaterRegexFilter nameFilter) {
         this.nameFilter = nameFilter;
+        logger.warn("--> init table filter : " + nameFilter.toString());
     }
 
     public void setNameBlackFilter(AviaterRegexFilter nameBlackFilter) {
         this.nameBlackFilter = nameBlackFilter;
+        logger.warn("--> init table black filter : " + nameBlackFilter.toString());
     }
 
     public void setTableMetaCache(TableMetaCache tableMetaCache) {

+ 2 - 0
pom.xml

@@ -381,6 +381,7 @@
                 </configuration>
             </plugin>
             <!-- javadoc -->
+            <!--
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-javadoc-plugin</artifactId>
@@ -399,6 +400,7 @@
                   <additionalparam>-Xdoclint:none</additionalparam>
                 </configuration>
             </plugin>
+            -->
         </plugins>
         <sourceDirectory>src/main/java</sourceDirectory>
         <testSourceDirectory>src/test/java</testSourceDirectory>

+ 17 - 212
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,14 +1,10 @@
 package com.alibaba.otter.canal.protocol;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import com.google.protobuf.ByteString;
+import com.google.common.collect.Lists;
 
 /**
  * @author machengyuan 2018-9-13 下午10:31:14
@@ -17,10 +13,10 @@ import com.google.protobuf.ByteString;
 public class FlatMessage implements Serializable {
 
     private static final long         serialVersionUID = -3386650678735860050L;
-
     private long                      id;
     private String                    database;
     private String                    table;
+    private List<String>              pkNames;
     private Boolean                   isDdl;
     private String                    type;
     // binlog executeTime
@@ -33,9 +29,6 @@ public class FlatMessage implements Serializable {
     private List<Map<String, String>> data;
     private List<Map<String, String>> old;
 
-    public FlatMessage(){
-    }
-
     public FlatMessage(long id){
         this.id = id;
     }
@@ -64,6 +57,21 @@ public class FlatMessage implements Serializable {
         this.table = table;
     }
 
+    public List<String> getPkNames() {
+        return pkNames;
+    }
+
+    public void addPkName(String pkName) {
+        if (this.pkNames == null) {
+            this.pkNames = Lists.newArrayList();
+        }
+        this.pkNames.add(pkName);
+    }
+
+    public void setPkNames(List<String> pkNames) {
+        this.pkNames = pkNames;
+    }
+
     public Boolean getIsDdl() {
         return isDdl;
     }
@@ -136,209 +144,6 @@ public class FlatMessage implements Serializable {
         this.es = es;
     }
 
-    /**
-     * 将Message转换为FlatMessage
-     * 
-     * @param message 原生message
-     * @return FlatMessage列表
-     */
-    public static List<FlatMessage> messageConverter(Message message) {
-        try {
-            if (message == null) {
-                return null;
-            }
-
-            List<FlatMessage> flatMessages = new ArrayList<>();
-            List<CanalEntry.Entry> entrys = null;
-            if (message.isRaw()) {
-                List<ByteString> rawEntries = message.getRawEntries();
-                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
-                for (ByteString byteString : rawEntries) {
-                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
-                    entrys.add(entry);
-                }
-            } else {
-                entrys = message.getEntries();
-            }
-
-            for (CanalEntry.Entry entry : entrys) {
-                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
-                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
-                    continue;
-                }
-
-                CanalEntry.RowChange rowChange;
-                try {
-                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-                } catch (Exception e) {
-                    throw new RuntimeException(
-                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
-                }
-
-                CanalEntry.EventType eventType = rowChange.getEventType();
-
-                FlatMessage flatMessage = new FlatMessage(message.getId());
-                flatMessages.add(flatMessage);
-                flatMessage.setDatabase(entry.getHeader().getSchemaName());
-                flatMessage.setTable(entry.getHeader().getTableName());
-                flatMessage.setIsDdl(rowChange.getIsDdl());
-                flatMessage.setType(eventType.toString());
-                flatMessage.setEs(entry.getHeader().getExecuteTime());
-                flatMessage.setTs(System.currentTimeMillis());
-                flatMessage.setSql(rowChange.getSql());
-
-                if (!rowChange.getIsDdl()) {
-                    Map<String, Integer> sqlType = new LinkedHashMap<>();
-                    Map<String, String> mysqlType = new LinkedHashMap<>();
-                    List<Map<String, String>> data = new ArrayList<>();
-                    List<Map<String, String>> old = new ArrayList<>();
-
-                    Set<String> updateSet = new HashSet<>();
-                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
-                            && eventType != CanalEntry.EventType.DELETE) {
-                            continue;
-                        }
-
-                        Map<String, String> row = new LinkedHashMap<>();
-                        List<CanalEntry.Column> columns;
-
-                        if (eventType == CanalEntry.EventType.DELETE) {
-                            columns = rowData.getBeforeColumnsList();
-                        } else {
-                            columns = rowData.getAfterColumnsList();
-                        }
-
-                        for (CanalEntry.Column column : columns) {
-                            sqlType.put(column.getName(), column.getSqlType());
-                            mysqlType.put(column.getName(), column.getMysqlType());
-                            if (column.getIsNull()) {
-                                row.put(column.getName(), null);
-                            } else {
-                                row.put(column.getName(), column.getValue());
-                            }
-                            // 获取update为true的字段
-                            if (column.getUpdated()) {
-                                updateSet.add(column.getName());
-                            }
-                        }
-                        if (!row.isEmpty()) {
-                            data.add(row);
-                        }
-
-                        if (eventType == CanalEntry.EventType.UPDATE) {
-                            Map<String, String> rowOld = new LinkedHashMap<>();
-                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
-                                if (updateSet.contains(column.getName())) {
-                                    if (column.getIsNull()) {
-                                        rowOld.put(column.getName(), null);
-                                    } else {
-                                        rowOld.put(column.getName(), column.getValue());
-                                    }
-                                }
-                            }
-                            // update操作将记录修改前的值
-                            if (!rowOld.isEmpty()) {
-                                old.add(rowOld);
-                            }
-                        }
-                    }
-                    if (!sqlType.isEmpty()) {
-                        flatMessage.setSqlType(sqlType);
-                    }
-                    if (!mysqlType.isEmpty()) {
-                        flatMessage.setMysqlType(mysqlType);
-                    }
-                    if (!data.isEmpty()) {
-                        flatMessage.setData(data);
-                    }
-                    if (!old.isEmpty()) {
-                        flatMessage.setOld(old);
-                    }
-                }
-            }
-            return flatMessages;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * 将FlatMessage按指定的字段值hash拆分
-     * 
-     * @param flatMessage flatMessage
-     * @param partitionsNum 分区数量
-     * @param pkHashConfig hash映射
-     * @return 拆分后的flatMessage数组
-     */
-    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
-                                                 Map<String, String> pkHashConfig) {
-        if (partitionsNum == null) {
-            partitionsNum = 1;
-        }
-        FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
-        String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
-        if (pk == null || flatMessage.getIsDdl()) {
-            partitionMessages[0] = flatMessage;
-        } else {
-            if (flatMessage.getData() != null) {
-                int idx = 0;
-                for (Map<String, String> row : flatMessage.getData()) {
-                    Map<String, String> o = null;
-                    if (flatMessage.getOld() != null) {
-                        o = flatMessage.getOld().get(idx);
-                    }
-                    String value;
-                    // 如果old中有pk值说明主键有修改, 以旧的主键值hash为准
-                    if (o != null && o.containsKey(pk)) {
-                        value = o.get(pk);
-                    } else {
-                        value = row.get(pk);
-                    }
-                    if (value == null) {
-                        value = "";
-                    }
-                    int hash = value.hashCode();
-                    int pkHash = Math.abs(hash) % partitionsNum;
-                    // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
-                    pkHash = Math.abs(pkHash);
-
-                    FlatMessage flatMessageTmp = partitionMessages[pkHash];
-                    if (flatMessageTmp == null) {
-                        flatMessageTmp = new FlatMessage(flatMessage.getId());
-                        partitionMessages[pkHash] = flatMessageTmp;
-                        flatMessageTmp.setDatabase(flatMessage.getDatabase());
-                        flatMessageTmp.setTable(flatMessage.getTable());
-                        flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
-                        flatMessageTmp.setType(flatMessage.getType());
-                        flatMessageTmp.setSql(flatMessage.getSql());
-                        flatMessageTmp.setSqlType(flatMessage.getSqlType());
-                        flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
-                        flatMessageTmp.setEs(flatMessage.getEs());
-                        flatMessageTmp.setTs(flatMessage.getTs());
-                    }
-                    List<Map<String, String>> data = flatMessageTmp.getData();
-                    if (data == null) {
-                        data = new ArrayList<>();
-                        flatMessageTmp.setData(data);
-                    }
-                    data.add(row);
-                    if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
-                        List<Map<String, String>> old = flatMessageTmp.getOld();
-                        if (old == null) {
-                            old = new ArrayList<>();
-                            flatMessageTmp.setOld(old);
-                        }
-                        old.add(flatMessage.getOld().get(idx));
-                    }
-                    idx++;
-                }
-            }
-        }
-        return partitionMessages;
-    }
-
     @Override
     public String toString() {
         return "FlatMessage [id=" + id + ", database=" + database + ", table=" + table + ", isDdl=" + isDdl + ", type="

+ 0 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -17,7 +17,6 @@ import com.google.protobuf.ByteString;
 public class Message implements Serializable {
 
     private static final long      serialVersionUID = 1234034768477580009L;
-
     private long                   id;
     private List<CanalEntry.Entry> entries          = new ArrayList<CanalEntry.Entry>();
     // row data for performance, see:

+ 405 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -0,0 +1,405 @@
+package com.alibaba.otter.canal.common;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.MigrateMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * process MQ Message utils
+ * 
+ * @author agapple 2018年12月11日 下午1:28:32
+ */
+public class MQMessageUtils {
+
+    @SuppressWarnings("deprecation")
+    private static Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                       new Function<String, List<PartitionData>>() {
+
+                                                                           public List<PartitionData> apply(String pkHashConfigs) {
+                                                                               List<PartitionData> datas = Lists.newArrayList();
+                                                                               String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
+                                                                                   ",");
+                                                                               // schema.table#id^name
+                                                                               for (String pkHashConfig : pkHashConfigArray) {
+                                                                                   PartitionData data = new PartitionData();
+                                                                                   int i = pkHashConfig.lastIndexOf("#");
+                                                                                   if (i > 0) {
+                                                                                       data.pkNames = Lists.newArrayList(StringUtils.split(pkHashConfig.substring(i + 1),
+                                                                                           '^'));
+                                                                                       pkHashConfig = pkHashConfig.substring(0,
+                                                                                           i);
+                                                                                   }
+                                                                                   if (!isWildCard(pkHashConfig)) {
+                                                                                       data.simpleName = pkHashConfig;
+                                                                                   } else {
+                                                                                       data.regexFilter = new AviaterRegexFilter(pkHashConfig);
+                                                                                   }
+                                                                                   datas.add(data);
+                                                                               }
+
+                                                                               return datas;
+                                                                           }
+                                                                       });
+
+    /**
+     * 将 message 分区
+     *
+     * @param partitionsNum 分区数
+     * @param pkHashConfigs 分区库表主键正则表达式
+     * @return 分区message数组
+     */
+    @SuppressWarnings("unchecked")
+    public static Message[] messagePartition(Message message, Integer partitionsNum, String pkHashConfigs) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
+        Message[] partitionMessages = new Message[partitionsNum];
+        List<Entry>[] partitionEntries = new List[partitionsNum];
+        for (int i = 0; i < partitionsNum; i++) {
+            partitionEntries[i] = new ArrayList<>();
+        }
+
+        List<CanalEntry.Entry> entries;
+        if (message.isRaw()) {
+            List<ByteString> rawEntries = message.getRawEntries();
+            entries = new ArrayList<>(rawEntries.size());
+            for (ByteString byteString : rawEntries) {
+                Entry entry;
+                try {
+                    entry = Entry.parseFrom(byteString);
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
+                entries.add(entry);
+            }
+        } else {
+            entries = message.getEntries();
+        }
+
+        for (Entry entry : entries) {
+            CanalEntry.RowChange rowChange;
+            try {
+                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+
+            if (rowChange.getIsDdl()) {
+                partitionEntries[0].add(entry);
+            } else {
+                if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
+                    String database = entry.getHeader().getSchemaName();
+                    String table = entry.getHeader().getTableName();
+                    List<String> pkNames = getParitionHashColumns(database + "." + table, pkHashConfigs);
+
+                    if (pkNames == null) {
+                        // 如果都没有匹配,发送到第一个分区
+                        partitionEntries[0].add(entry);
+                    } else {
+                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                            int hashCode = table.hashCode();
+                            if (pkNames.isEmpty()) {
+                                // isEmpty use default pkNames
+                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                    if (column.getIsKey()) {
+                                        hashCode = hashCode ^ column.getValue().hashCode();
+                                    }
+                                }
+                            } else {
+                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                    if (checkPkNamesHasContain(pkNames, column.getName())) {
+                                        hashCode = hashCode ^ column.getValue().hashCode();
+                                    }
+                                }
+                            }
+
+                            int pkHash = Math.abs(hashCode) % partitionsNum;
+                            pkHash = Math.abs(pkHash);
+                            partitionEntries[pkHash].add(entry);
+                        }
+                    }
+                }
+            }
+        }
+
+        for (int i = 0; i < partitionsNum; i++) {
+            List<Entry> entriesTmp = partitionEntries[i];
+            if (!entriesTmp.isEmpty()) {
+                partitionMessages[i] = new Message(message.getId(), entriesTmp);
+            }
+        }
+
+        return partitionMessages;
+    }
+
+    /**
+     * 将Message转换为FlatMessage
+     *
+     * @param message 原生message
+     * @return FlatMessage列表
+     */
+    public static List<FlatMessage> messageConverter(Message message) {
+        try {
+            if (message == null) {
+                return null;
+            }
+
+            List<FlatMessage> flatMessages = new ArrayList<>();
+            List<CanalEntry.Entry> entrys = null;
+            if (message.isRaw()) {
+                List<ByteString> rawEntries = message.getRawEntries();
+                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
+                for (ByteString byteString : rawEntries) {
+                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+                    entrys.add(entry);
+                }
+            } else {
+                entrys = message.getEntries();
+            }
+
+            for (CanalEntry.Entry entry : entrys) {
+                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                    continue;
+                }
+
+                CanalEntry.RowChange rowChange;
+                try {
+                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                } catch (Exception e) {
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                               + entry.toString(), e);
+                }
+
+                CanalEntry.EventType eventType = rowChange.getEventType();
+
+                FlatMessage flatMessage = new FlatMessage(message.getId());
+                flatMessages.add(flatMessage);
+                flatMessage.setDatabase(entry.getHeader().getSchemaName());
+                flatMessage.setTable(entry.getHeader().getTableName());
+                flatMessage.setIsDdl(rowChange.getIsDdl());
+                flatMessage.setType(eventType.toString());
+                flatMessage.setEs(entry.getHeader().getExecuteTime());
+                flatMessage.setTs(System.currentTimeMillis());
+                flatMessage.setSql(rowChange.getSql());
+
+                if (!rowChange.getIsDdl()) {
+                    Map<String, Integer> sqlType = new LinkedHashMap<>();
+                    Map<String, String> mysqlType = new LinkedHashMap<>();
+                    List<Map<String, String>> data = new ArrayList<>();
+                    List<Map<String, String>> old = new ArrayList<>();
+
+                    Set<String> updateSet = new HashSet<>();
+                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
+                            && eventType != CanalEntry.EventType.DELETE) {
+                            continue;
+                        }
+
+                        Map<String, String> row = new LinkedHashMap<>();
+                        List<CanalEntry.Column> columns;
+
+                        if (eventType == CanalEntry.EventType.DELETE) {
+                            columns = rowData.getBeforeColumnsList();
+                        } else {
+                            columns = rowData.getAfterColumnsList();
+                        }
+
+                        for (CanalEntry.Column column : columns) {
+                            if (column.getIsKey()) {
+                                flatMessage.addPkName(column.getName());
+                            }
+                            sqlType.put(column.getName(), column.getSqlType());
+                            mysqlType.put(column.getName(), column.getMysqlType());
+                            if (column.getIsNull()) {
+                                row.put(column.getName(), null);
+                            } else {
+                                row.put(column.getName(), column.getValue());
+                            }
+                            // 获取update为true的字段
+                            if (column.getUpdated()) {
+                                updateSet.add(column.getName());
+                            }
+                        }
+                        if (!row.isEmpty()) {
+                            data.add(row);
+                        }
+
+                        if (eventType == CanalEntry.EventType.UPDATE) {
+                            Map<String, String> rowOld = new LinkedHashMap<>();
+                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+                                if (updateSet.contains(column.getName())) {
+                                    if (column.getIsNull()) {
+                                        rowOld.put(column.getName(), null);
+                                    } else {
+                                        rowOld.put(column.getName(), column.getValue());
+                                    }
+                                }
+                            }
+                            // update操作将记录修改前的值
+                            if (!rowOld.isEmpty()) {
+                                old.add(rowOld);
+                            }
+                        }
+                    }
+                    if (!sqlType.isEmpty()) {
+                        flatMessage.setSqlType(sqlType);
+                    }
+                    if (!mysqlType.isEmpty()) {
+                        flatMessage.setMysqlType(mysqlType);
+                    }
+                    if (!data.isEmpty()) {
+                        flatMessage.setData(data);
+                    }
+                    if (!old.isEmpty()) {
+                        flatMessage.setOld(old);
+                    }
+                }
+            }
+            return flatMessages;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * 将FlatMessage按指定的字段值hash拆分
+     *
+     * @param flatMessage flatMessage
+     * @param partitionsNum 分区数量
+     * @param pkHashConfigs hash映射
+     * @return 拆分后的flatMessage数组
+     */
+    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
+        FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
+
+        if (flatMessage.getIsDdl()) {
+            partitionMessages[0] = flatMessage;
+        } else {
+            if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
+                String database = flatMessage.getDatabase();
+                String table = flatMessage.getTable();
+                List<String> pkNames = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                if (pkNames == null) {
+                    // 如果都没有匹配,发送到第一个分区
+                    partitionMessages[0] = flatMessage;
+                } else {
+                    if (pkNames.isEmpty()) {
+                        pkNames = flatMessage.getPkNames();
+                    }
+
+                    int hashCode = table.hashCode();
+                    int idx = 0;
+                    for (Map<String, String> row : flatMessage.getData()) {
+                        for (String pkName : pkNames) {
+                            String value = row.get(pkName);
+                            if (value == null) {
+                                value = "";
+                            }
+                            hashCode = hashCode ^ value.hashCode();
+                        }
+
+                        int pkHash = Math.abs(hashCode) % partitionsNum;
+                        // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
+                        pkHash = Math.abs(pkHash);
+
+                        FlatMessage flatMessageTmp = partitionMessages[pkHash];
+                        if (flatMessageTmp == null) {
+                            flatMessageTmp = new FlatMessage(flatMessage.getId());
+                            partitionMessages[pkHash] = flatMessageTmp;
+                            flatMessageTmp.setDatabase(flatMessage.getDatabase());
+                            flatMessageTmp.setTable(flatMessage.getTable());
+                            flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
+                            flatMessageTmp.setType(flatMessage.getType());
+                            flatMessageTmp.setSql(flatMessage.getSql());
+                            flatMessageTmp.setSqlType(flatMessage.getSqlType());
+                            flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
+                            flatMessageTmp.setEs(flatMessage.getEs());
+                            flatMessageTmp.setTs(flatMessage.getTs());
+                        }
+                        List<Map<String, String>> data = flatMessageTmp.getData();
+                        if (data == null) {
+                            data = new ArrayList<>();
+                            flatMessageTmp.setData(data);
+                        }
+                        data.add(row);
+                        if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
+                            List<Map<String, String>> old = flatMessageTmp.getOld();
+                            if (old == null) {
+                                old = new ArrayList<>();
+                                flatMessageTmp.setOld(old);
+                            }
+                            old.add(flatMessage.getOld().get(idx));
+                        }
+                        idx++;
+                    }
+                }
+            }
+        }
+        return partitionMessages;
+    }
+
+    /**
+     * match return List , not match return null
+     */
+    public static List<String> getParitionHashColumns(String name, String pkHashConfigs) {
+        List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
+        for (PartitionData data : datas) {
+            if (data.simpleName != null) {
+                if (data.simpleName.equalsIgnoreCase(name)) {
+                    return data.pkNames;
+                }
+            } else {
+                if (data.regexFilter.filter(name)) {
+                    return data.pkNames;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    public static boolean checkPkNamesHasContain(List<String> pkNames, String name) {
+        for (String pkName : pkNames) {
+            if (pkName.equalsIgnoreCase(name)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static boolean isWildCard(String value) {
+        // not contaiins '.' ?
+        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
+                '^' });
+    }
+
+    public static class PartitionData {
+
+        public String             simpleName;
+        public AviaterRegexFilter regexFilter;
+        public List<String>       pkNames = Lists.newArrayList();
+    }
+
+}

+ 8 - 8
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.Map;
 
 /**
  * kafka 配置项
@@ -28,11 +27,11 @@ public class MQProperties {
 
     public static class CanalDestination {
 
-        private String              canalDestination;
-        private String              topic;
-        private Integer             partition;
-        private Integer             partitionsNum;
-        private Map<String, String> partitionHash;
+        private String  canalDestination;
+        private String  topic;
+        private Integer partition;
+        private Integer partitionsNum;
+        private String  partitionHash;
 
         public String getCanalDestination() {
             return canalDestination;
@@ -66,11 +65,11 @@ public class MQProperties {
             this.partitionsNum = partitionsNum;
         }
 
-        public Map<String, String> getPartitionHash() {
+        public String getPartitionHash() {
             return partitionHash;
         }
 
-        public void setPartitionHash(Map<String, String> partitionHash) {
+        public void setPartitionHash(String partitionHash) {
             this.partitionHash = partitionHash;
         }
     }
@@ -186,6 +185,7 @@ public class MQProperties {
     public void setAliyunSecretKey(String aliyunSecretKey) {
         this.aliyunSecretKey = aliyunSecretKey;
     }
+
     public int getMaxRequestSize() {
         return maxRequestSize;
     }

+ 29 - 16
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQMessageUtils;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
@@ -80,22 +81,37 @@ public class CanalKafkaProducer implements CanalMQProducer {
         // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
             try {
-                ProducerRecord<String, Message> record;
+                ProducerRecord<String, Message> record = null;
                 if (canalDestination.getPartition() != null) {
-                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
+                    record = new ProducerRecord<>(canalDestination.getTopic(),
                         canalDestination.getPartition(),
                         null,
                         message);
                 } else {
-                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
+                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
+                        Message[] messages = MQMessageUtils.messagePartition(message,
+                            canalDestination.getPartitionsNum(),
+                            canalDestination.getPartitionHash());
+                        int length = messages.length;
+                        for (int i = 0; i < length; i++) {
+                            Message messagePartition = messages[i];
+                            if (messagePartition != null) {
+                                record = new ProducerRecord<>(canalDestination.getTopic(), i, null, messagePartition);
+                            }
+                        }
+                    } else {
+                        record = new ProducerRecord<>(canalDestination.getTopic(), 0, null, message);
+                    }
                 }
 
-                producer.send(record).get();
+                if (record != null) {
+                    producer.send(record).get();
 
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Send  message to kafka topic: [{}], packet: {}",
-                        canalDestination.getTopic(),
-                        message.toString());
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Send  message to kafka topic: [{}], packet: {}",
+                            canalDestination.getTopic(),
+                            message.toString());
+                    }
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
@@ -105,13 +121,12 @@ public class CanalKafkaProducer implements CanalMQProducer {
             }
         } else {
             // 发送扁平数据json
-            List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (canalDestination.getPartition() != null) {
                         try {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                canalDestination.getTopic(),
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                 canalDestination.getPartition(),
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
@@ -125,7 +140,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                     } else {
                         if (canalDestination.getPartitionHash() != null
                             && !canalDestination.getPartitionHash().isEmpty()) {
-                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                 canalDestination.getPartitionsNum(),
                                 canalDestination.getPartitionHash());
                             int length = partitionFlatMessage.length;
@@ -133,8 +148,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                            canalDestination.getTopic(),
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                             i,
                                             null,
                                             JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
@@ -149,8 +163,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         } else {
                             try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                    canalDestination.getTopic(),
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                     0,
                                     null,
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));

+ 93 - 44
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,14 +1,7 @@
 package com.alibaba.otter.canal.rocketmq;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.server.exception.CanalServerException;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
+import java.util.List;
+
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -20,7 +13,16 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQMessageUtils;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 
 public class CanalRocketMQProducer implements CanalMQProducer {
 
@@ -34,8 +36,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     public void init(MQProperties rocketMQProperties) {
         this.mqProperties = rocketMQProperties;
         RPCHook rpcHook = null;
-        if(rocketMQProperties.getAliyunAccessKey().length() > 0
-            && rocketMQProperties.getAliyunSecretKey().length() > 0){
+        if (rocketMQProperties.getAliyunAccessKey().length() > 0
+            && rocketMQProperties.getAliyunSecretKey().length() > 0) {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
             sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
@@ -59,40 +61,85 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                      Callback callback) {
         if (!mqProperties.getFlatMessage()) {
             try {
-                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
-                    mqProperties.isFilterTransactionEntry()));
-                logger.debug("send message:{} to destination:{}, partition: {}",
-                    message,
-                    destination.getCanalDestination(),
-                    destination.getPartition());
-                this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                    @Override
-                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                        int partition = 0;
-                        if (destination.getPartition() != null) {
-                            partition = destination.getPartition();
+                if (destination.getPartition() != null) {
+                    Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
+                        mqProperties.isFilterTransactionEntry()));
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("send message:{} to destination:{}, partition: {}",
+                            message,
+                            destination.getCanalDestination(),
+                            destination.getPartition());
+                    }
+                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                        @Override
+                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                            int partition = 0;
+                            if (destination.getPartition() != null) {
+                                partition = destination.getPartition();
+                            }
+                            return mqs.get(partition);
+                        }
+                    }, null);
+                } else {
+                    if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                        com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(data,
+                            destination.getPartitionsNum(),
+                            destination.getPartitionHash());
+                        int length = messages.length;
+                        for (int i = 0; i < length; i++) {
+                            com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
+                            if (dataPartition != null) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("flatMessagePart: {}, partition: {}",
+                                        JSON.toJSONString(dataPartition, SerializerFeature.WriteMapNullValue),
+                                        i);
+                                }
+                                final int index = i;
+                                try {
+                                    Message message = new Message(destination.getTopic(),
+                                        CanalMessageSerializer.serializer(dataPartition,
+                                            mqProperties.isFilterTransactionEntry()));
+                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                                        @Override
+                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                            if (index > mqs.size()) {
+                                                throw new CanalServerException("partition number is error,config num:"
+                                                                               + destination.getPartitionsNum()
+                                                                               + ", mq num: " + mqs.size());
+                                            }
+                                            return mqs.get(index);
+                                        }
+                                    }, null);
+                                } catch (Exception e) {
+                                    logger.error("send flat message to hashed partition error", e);
+                                    callback.rollback();
+                                    return;
+                                }
+                            }
                         }
-                        return mqs.get(partition);
                     }
-                }, null);
+                }
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
                 callback.rollback();
                 return;
             }
         } else {
-            List<FlatMessage> flatMessages = FlatMessage.messageConverter(data);
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (destination.getPartition() != null) {
                         try {
-                            logger.info("send flat message: {} to topic: {} fixed partition: {}",
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
-                                destination.getTopic(),
-                                destination.getPartition());
-                            Message message = new Message(destination.getTopic(),
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("send message: {} to topic: {} fixed partition: {}",
+                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
+                                    destination.getTopic(),
+                                    destination.getPartition());
+                            }
+                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage,
+                                SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
@@ -107,34 +154,36 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                         }
                     } else {
                         if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
-                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                 destination.getPartitionsNum(),
                                 destination.getPartitionHash());
                             int length = partitionFlatMessage.length;
                             for (int i = 0; i < length; i++) {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
-                                    logger.debug("flatMessagePart: {}, partition: {}",
-                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
-                                        i);
+                                    if (logger.isDebugEnabled()) {
+                                        logger.debug("flatMessagePart: {}, partition: {}",
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
+                                            i);
+                                    }
                                     final int index = i;
                                     try {
                                         Message message = new Message(destination.getTopic(),
-                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue).getBytes());
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)
+                                                .getBytes());
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                             @Override
-                                            public MessageQueue select(List<MessageQueue> mqs, Message msg,
-                                                                       Object arg) {
+                                            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                                 if (index > mqs.size()) {
-                                                    throw new CanalServerException(
-                                                        "partition number is error,config num:"
+                                                    throw new CanalServerException("partition number is error,config num:"
                                                                                    + destination.getPartitionsNum()
                                                                                    + ", mq num: " + mqs.size());
                                                 }
                                                 return mqs.get(index);
                                             }
-                                        }, null);
+                                        },
+                                            null);
                                     } catch (Exception e) {
                                         logger.error("send flat message to hashed partition error", e);
                                         callback.rollback();

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -138,7 +138,7 @@ public class CanalMQStarter {
                 canalDestination.setTopic(mqConfig.getTopic());
                 canalDestination.setPartition(mqConfig.getPartition());
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
-                canalDestination.setPartitionHash(mqConfig.getPartitionHashProperties());
+                canalDestination.setPartitionHash(mqConfig.getPartitionHash());
 
                 canalServer.subscribe(clientIdentity);
                 logger.info("## the MQ producer: {} is running now ......", destination);