Browse Source

fixed pull #1266 , optimizer code

agapple 6 years ago
parent
commit
277a273ebc

+ 1 - 1
deployer/src/main/resources/example/instance.properties

@@ -48,5 +48,5 @@ canal.mq.topic=example
 canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=3
-#canal.mq.partitionHash=.*\\..*.$pk$
+#canal.mq.partitionHash=test.table#id^name,.*\\..*
 #################################################

+ 1 - 8
filter/src/main/java/com/alibaba/otter/canal/filter/PatternUtils.java

@@ -7,17 +7,10 @@ import org.apache.oro.text.regex.Pattern;
 import org.apache.oro.text.regex.PatternCompiler;
 import org.apache.oro.text.regex.Perl5Compiler;
 
-import com.alibaba.otter.canal.filter.exception.CanalFilterException;
 import com.google.common.base.Function;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.MigrateMap;
 
-/**
- * 提供{@linkplain Pattern}的lazy get处理
- *
- * @author jianghang 2013-1-22 下午09:36:44
- * @version 1.0.0
- */
 public class PatternUtils {
 
     @SuppressWarnings("deprecation")
@@ -32,7 +25,7 @@ public class PatternUtils {
                                                                              | Perl5Compiler.READ_ONLY_MASK
                                                                              | Perl5Compiler.SINGLELINE_MASK);
                                                              } catch (MalformedPatternException e) {
-                                                                 throw new CanalFilterException(e);
+                                                                 throw new RuntimeException(e);
                                                              }
                                                          }
                                                      });

+ 1 - 0
filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.java

@@ -129,4 +129,5 @@ public class AviaterRegexFilter implements CanalEventFilter<String> {
     public String toString() {
         return pattern;
     }
+
 }

+ 1 - 1
filter/src/main/java/com/alibaba/otter/canal/filter/aviater/RegexFunction.java

@@ -12,7 +12,7 @@ import com.googlecode.aviator.runtime.type.AviatorObject;
 
 /**
  * 提供aviator regex的代码扩展
- * 
+ *
  * @author jianghang 2012-7-23 上午10:29:23
  */
 public class RegexFunction extends AbstractFunction {

+ 28 - 274
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,14 +1,10 @@
 package com.alibaba.otter.canal.protocol;
 
 import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
-
-import com.alibaba.otter.canal.protocol.aviater.AviaterRegexFilter;
-import com.google.protobuf.ByteString;
+import com.google.common.collect.Lists;
 
 /**
  * @author machengyuan 2018-9-13 下午10:31:14
@@ -16,27 +12,22 @@ import com.google.protobuf.ByteString;
  */
 public class FlatMessage implements Serializable {
 
-    private static final long                                    serialVersionUID = -3386650678735860050L;
-
-    private static ConcurrentHashMap<String, AviaterRegexFilter> regexFilters     = new ConcurrentHashMap<>();
-
-    private long                                                 id;
-    private String                                               database;
-    private String                                               table;
-    private String                                               pk;
-    private Boolean                                              isDdl;
-    private String                                               type;
+    private static final long         serialVersionUID = -3386650678735860050L;
+    private long                      id;
+    private String                    database;
+    private String                    table;
+    private List<String>              pkNames;
+    private Boolean                   isDdl;
+    private String                    type;
     // binlog executeTime
-    private Long                                                 es;
+    private Long                      es;
     // dml build timeStamp
-    private Long                                                 ts;
-    private String                                               sql;
-    private Map<String, Integer>                                 sqlType;
-    private Map<String, String>                                  mysqlType;
-    private List<Map<String, String>>                            data;
-    private List<Map<String, String>>                            old;
-
-    private transient CanalEntry.Entry                           entry;                                       // 所属entry
+    private Long                      ts;
+    private String                    sql;
+    private Map<String, Integer>      sqlType;
+    private Map<String, String>       mysqlType;
+    private List<Map<String, String>> data;
+    private List<Map<String, String>> old;
 
     public FlatMessage(long id){
         this.id = id;
@@ -66,12 +57,19 @@ public class FlatMessage implements Serializable {
         this.table = table;
     }
 
-    public String getPk() {
-        return pk;
+    public List<String> getPkNames() {
+        return pkNames;
     }
 
-    public void setPk(String pk) {
-        this.pk = pk;
+    public void addPkName(String pkName) {
+        if (this.pkNames == null) {
+            this.pkNames = Lists.newArrayList();
+        }
+        this.pkNames.add(pkName);
+    }
+
+    public void setPkNames(List<String> pkNames) {
+        this.pkNames = pkNames;
     }
 
     public Boolean getIsDdl() {
@@ -146,250 +144,6 @@ public class FlatMessage implements Serializable {
         this.es = es;
     }
 
-    public CanalEntry.Entry getEntry() {
-        return entry;
-    }
-
-    public void setEntry(CanalEntry.Entry entry) {
-        this.entry = entry;
-    }
-
-    /**
-     * 将Message转换为FlatMessage
-     *
-     * @param message 原生message
-     * @return FlatMessage列表
-     */
-    public static List<FlatMessage> messageConverter(Message message) {
-        try {
-            if (message == null) {
-                return null;
-            }
-
-            List<FlatMessage> flatMessages = new ArrayList<>();
-            List<CanalEntry.Entry> entrys = null;
-            if (message.isRaw()) {
-                List<ByteString> rawEntries = message.getRawEntries();
-                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
-                for (ByteString byteString : rawEntries) {
-                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
-                    entrys.add(entry);
-                }
-            } else {
-                entrys = message.getEntries();
-            }
-
-            for (CanalEntry.Entry entry : entrys) {
-                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
-                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
-                    continue;
-                }
-
-                CanalEntry.RowChange rowChange;
-                try {
-                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-                } catch (Exception e) {
-                    throw new RuntimeException(
-                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
-                }
-
-                CanalEntry.EventType eventType = rowChange.getEventType();
-
-                FlatMessage flatMessage = new FlatMessage(message.getId());
-                flatMessages.add(flatMessage);
-                flatMessage.setDatabase(entry.getHeader().getSchemaName());
-                flatMessage.setTable(entry.getHeader().getTableName());
-                flatMessage.setIsDdl(rowChange.getIsDdl());
-                flatMessage.setType(eventType.toString());
-                flatMessage.setEs(entry.getHeader().getExecuteTime());
-                flatMessage.setTs(System.currentTimeMillis());
-                flatMessage.setSql(rowChange.getSql());
-                flatMessage.setEntry(entry);
-
-                if (!rowChange.getIsDdl()) {
-                    Map<String, Integer> sqlType = new LinkedHashMap<>();
-                    Map<String, String> mysqlType = new LinkedHashMap<>();
-                    List<Map<String, String>> data = new ArrayList<>();
-                    List<Map<String, String>> old = new ArrayList<>();
-
-                    Set<String> updateSet = new HashSet<>();
-                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
-                            && eventType != CanalEntry.EventType.DELETE) {
-                            continue;
-                        }
-
-                        Map<String, String> row = new LinkedHashMap<>();
-                        List<CanalEntry.Column> columns;
-
-                        if (eventType == CanalEntry.EventType.DELETE) {
-                            columns = rowData.getBeforeColumnsList();
-                        } else {
-                            columns = rowData.getAfterColumnsList();
-                        }
-
-                        for (CanalEntry.Column column : columns) {
-                            if (flatMessage.getPk() == null && column.getIsKey()) {
-                                flatMessage.setPk(column.getName());
-                            }
-                            sqlType.put(column.getName(), column.getSqlType());
-                            mysqlType.put(column.getName(), column.getMysqlType());
-                            if (column.getIsNull()) {
-                                row.put(column.getName(), null);
-                            } else {
-                                row.put(column.getName(), column.getValue());
-                            }
-                            // 获取update为true的字段
-                            if (column.getUpdated()) {
-                                updateSet.add(column.getName());
-                            }
-                        }
-                        if (!row.isEmpty()) {
-                            data.add(row);
-                        }
-
-                        if (eventType == CanalEntry.EventType.UPDATE) {
-                            Map<String, String> rowOld = new LinkedHashMap<>();
-                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
-                                if (updateSet.contains(column.getName())) {
-                                    if (column.getIsNull()) {
-                                        rowOld.put(column.getName(), null);
-                                    } else {
-                                        rowOld.put(column.getName(), column.getValue());
-                                    }
-                                }
-                            }
-                            // update操作将记录修改前的值
-                            if (!rowOld.isEmpty()) {
-                                old.add(rowOld);
-                            }
-                        }
-                    }
-                    if (!sqlType.isEmpty()) {
-                        flatMessage.setSqlType(sqlType);
-                    }
-                    if (!mysqlType.isEmpty()) {
-                        flatMessage.setMysqlType(mysqlType);
-                    }
-                    if (!data.isEmpty()) {
-                        flatMessage.setData(data);
-                    }
-                    if (!old.isEmpty()) {
-                        flatMessage.setOld(old);
-                    }
-                }
-            }
-            return flatMessages;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * 将FlatMessage按指定的字段值hash拆分
-     *
-     * @param flatMessage flatMessage
-     * @param partitionsNum 分区数量
-     * @param pkHashConfigs hash映射
-     * @return 拆分后的flatMessage数组
-     */
-    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs) {
-        if (partitionsNum == null) {
-            partitionsNum = 1;
-        }
-        FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
-
-        if (flatMessage.getIsDdl()) {
-            partitionMessages[0] = flatMessage;
-        } else {
-            if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
-                String database = flatMessage.getDatabase();
-                String table = flatMessage.getTable();
-
-                String pk = null;
-                boolean isMatch = false;
-
-                String[] pkHashConfigArray = StringUtils.split(pkHashConfigs, ",");
-                for (String pkHashConfig : pkHashConfigArray) {
-                    int i = pkHashConfig.lastIndexOf(".");
-                    if (!pkHashConfig.endsWith(".$pk$")) {
-                        // 如果指定了主键
-                        pk = pkHashConfig.substring(i + 1);
-                    }
-                    pkHashConfig = pkHashConfig.substring(0, i);
-
-                    AviaterRegexFilter aviaterRegexFilter = regexFilters.get(pkHashConfig);
-                    if (aviaterRegexFilter == null) {
-                        aviaterRegexFilter = new AviaterRegexFilter(pkHashConfig);
-                        regexFilters.putIfAbsent(pkHashConfig, aviaterRegexFilter);
-                    }
-
-                    isMatch = aviaterRegexFilter.filter(database + "." + table);
-                    if (isMatch) {
-                        break;
-                    }
-                }
-
-                if (!isMatch) {
-                    // 如果都没有匹配,发送到第一个分区
-                    partitionMessages[0] = flatMessage;
-                } else {
-                    if (pk == null) {
-                        pk = flatMessage.getPk();
-                    }
-                    if (pk == null || !flatMessage.getData().get(0).containsKey(pk)) {
-                        // 如果都没有匹配的主键,发送到第一个分区
-                        partitionMessages[0] = flatMessage;
-                    } else {
-                        int idx = 0;
-                        for (Map<String, String> row : flatMessage.getData()) {
-                            String value = row.get(pk);
-                            if (value == null) {
-                                value = "";
-                            }
-                            int hash = value.hashCode();
-                            int pkHash = Math.abs(hash) % partitionsNum;
-                            // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
-                            pkHash = Math.abs(pkHash);
-
-                            FlatMessage flatMessageTmp = partitionMessages[pkHash];
-                            if (flatMessageTmp == null) {
-                                flatMessageTmp = new FlatMessage(flatMessage.getId());
-                                partitionMessages[pkHash] = flatMessageTmp;
-                                flatMessageTmp.setDatabase(flatMessage.getDatabase());
-                                flatMessageTmp.setTable(flatMessage.getTable());
-                                flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
-                                flatMessageTmp.setType(flatMessage.getType());
-                                flatMessageTmp.setSql(flatMessage.getSql());
-                                flatMessageTmp.setSqlType(flatMessage.getSqlType());
-                                flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
-                                flatMessageTmp.setEs(flatMessage.getEs());
-                                flatMessageTmp.setTs(flatMessage.getTs());
-                            }
-                            List<Map<String, String>> data = flatMessageTmp.getData();
-                            if (data == null) {
-                                data = new ArrayList<>();
-                                flatMessageTmp.setData(data);
-                            }
-                            data.add(row);
-                            if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
-                                List<Map<String, String>> old = flatMessageTmp.getOld();
-                                if (old == null) {
-                                    old = new ArrayList<>();
-                                    flatMessageTmp.setOld(old);
-                                }
-                                old.add(flatMessage.getOld().get(idx));
-                            }
-                            idx++;
-                        }
-                    }
-                }
-            }
-        }
-        return partitionMessages;
-    }
-
     @Override
     public String toString() {
         return "FlatMessage [id=" + id + ", database=" + database + ", table=" + table + ", isDdl=" + isDdl + ", type="

+ 5 - 129
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -3,16 +3,12 @@ package com.alibaba.otter.canal.protocol;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
 import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
-import com.alibaba.otter.canal.protocol.aviater.AviaterRegexFilter;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * @author zebin.xuzb @ 2012-6-19
@@ -20,16 +16,13 @@ import com.google.protobuf.InvalidProtocolBufferException;
  */
 public class Message implements Serializable {
 
-    private static final long                                    serialVersionUID = 1234034768477580009L;
-
-    private static ConcurrentHashMap<String, AviaterRegexFilter> regexFilters     = new ConcurrentHashMap<>();
-
-    private long                                                 id;
-    private List<CanalEntry.Entry>                               entries          = new ArrayList<CanalEntry.Entry>();
+    private static final long      serialVersionUID = 1234034768477580009L;
+    private long                   id;
+    private List<CanalEntry.Entry> entries          = new ArrayList<CanalEntry.Entry>();
     // row data for performance, see:
     // https://github.com/alibaba/canal/issues/726
-    private boolean                                              raw              = true;
-    private List<ByteString>                                     rawEntries       = new ArrayList<ByteString>();
+    private boolean                raw              = true;
+    private List<ByteString>       rawEntries       = new ArrayList<ByteString>();
 
     public Message(long id, List<Entry> entries){
         this.id = id;
@@ -95,121 +88,4 @@ public class Message implements Serializable {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }
 
-    /**
-     * 将 message 分区
-     *
-     * @param partitionsNum 分区数
-     * @param pkHashConfigs 分区库表主键正则表达式
-     * @return 分区message数组
-     */
-    @SuppressWarnings("unchecked")
-    public Message[] messagePartition(Integer partitionsNum, String pkHashConfigs) {
-        if (partitionsNum == null) {
-            partitionsNum = 1;
-        }
-        Message[] partitionMessages = new Message[partitionsNum];
-        List<Entry>[] partitionEntries = new List[partitionsNum];
-        for (int i = 0; i < partitionsNum; i++) {
-            partitionEntries[i] = new ArrayList<>();
-        }
-
-        List<CanalEntry.Entry> entries;
-        if (this.isRaw()) {
-            List<ByteString> rawEntries = this.getRawEntries();
-            entries = new ArrayList<>(rawEntries.size());
-            for (ByteString byteString : rawEntries) {
-                Entry entry;
-                try {
-                    entry = Entry.parseFrom(byteString);
-                } catch (InvalidProtocolBufferException e) {
-                    throw new RuntimeException(e);
-                }
-                entries.add(entry);
-            }
-        } else {
-            entries = this.getEntries();
-        }
-
-        for (Entry entry : entries) {
-            CanalEntry.RowChange rowChange;
-            try {
-                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-            } catch (Exception e) {
-                throw new RuntimeException(e.getMessage(), e);
-            }
-            if (rowChange.getIsDdl()) {
-                partitionEntries[0].add(entry);
-            } else {
-                if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
-                    String database = entry.getHeader().getSchemaName();
-                    String table = entry.getHeader().getTableName();
-
-                    String pk = null;
-                    boolean isMatch = false;
-
-                    String[] pkHashConfigArray = StringUtils.split(pkHashConfigs, ",");
-                    for (String pkHashConfig : pkHashConfigArray) {
-                        int i = pkHashConfig.lastIndexOf(".");
-                        if (!pkHashConfig.endsWith(".$pk$")) {
-                            // 如果指定了主键
-                            pk = pkHashConfig.substring(i + 1);
-                        }
-                        pkHashConfig = pkHashConfig.substring(0, i);
-
-                        AviaterRegexFilter aviaterRegexFilter = regexFilters.get(pkHashConfig);
-                        if (aviaterRegexFilter == null) {
-                            aviaterRegexFilter = new AviaterRegexFilter(pkHashConfig);
-                            regexFilters.putIfAbsent(pkHashConfig, aviaterRegexFilter);
-                        }
-
-                        isMatch = aviaterRegexFilter.filter(database + "." + table);
-                        if (isMatch) {
-                            break;
-                        }
-                    }
-
-                    if (!isMatch) {
-                        // 如果都没有匹配,发送到第一个分区
-                        partitionEntries[0].add(entry);
-                    }
-                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-                        String pkValue = null;
-                        if (pk != null) {
-                            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
-                                if (column.getName().equalsIgnoreCase(pk)) {
-                                    pkValue = column.getValue();
-                                    break;
-                                }
-                            }
-                        } else {
-                            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
-                                if (column.getIsKey()) {
-                                    pkValue = column.getValue();
-                                    break;
-                                }
-                            }
-                        }
-                        if (pkValue != null) {
-                            int hash = pkValue.hashCode();
-                            int pkHash = Math.abs(hash) % partitionsNum;
-                            pkHash = Math.abs(pkHash);
-                            partitionEntries[pkHash].add(entry);
-                        } else {
-                            // 没有找到主键
-                            partitionEntries[0].add(entry);
-                        }
-                    }
-                }
-            }
-        }
-
-        for (int i = 0; i < partitionsNum; i++) {
-            List<Entry> entriesTmp = partitionEntries[i];
-            if (!entriesTmp.isEmpty()) {
-                partitionMessages[i] = new Message(this.id, entriesTmp);
-            }
-        }
-
-        return partitionMessages;
-    }
 }

+ 0 - 124
protocol/src/main/java/com/alibaba/otter/canal/protocol/aviater/AviaterRegexFilter.java

@@ -1,124 +0,0 @@
-package com.alibaba.otter.canal.protocol.aviater;
-
-import java.util.*;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.googlecode.aviator.AviatorEvaluator;
-import com.googlecode.aviator.Expression;
-
-/**
- * 基于aviater进行tableName正则匹配的过滤算法
- *
- * @author jianghang 2012-7-20 下午06:01:34
- */
-public class AviaterRegexFilter {
-
-    private static final String             SPLIT             = ",";
-    private static final String             PATTERN_SPLIT     = "|";
-    private static final String             FILTER_EXPRESSION = "regex(pattern,target)";
-    private static final RegexFunction regexFunction     = new RegexFunction();
-    private final Expression                exp               = AviatorEvaluator.compile(FILTER_EXPRESSION, true);
-    static {
-        AviatorEvaluator.addFunction(regexFunction);
-    }
-
-    private static final Comparator<String> COMPARATOR        = new StringComparator();
-
-    final private String                    pattern;
-    final private boolean                   defaultEmptyValue;
-
-    public AviaterRegexFilter(String pattern){
-        this(pattern, true);
-    }
-
-    public AviaterRegexFilter(String pattern, boolean defaultEmptyValue){
-        this.defaultEmptyValue = defaultEmptyValue;
-        List<String> list = null;
-        if (StringUtils.isEmpty(pattern)) {
-            list = new ArrayList<String>();
-        } else {
-            String[] ss = StringUtils.split(pattern, SPLIT);
-            list = Arrays.asList(ss);
-        }
-
-        // 对pattern按照从长到短的排序
-        // 因为 foo|foot 匹配 foot 会出错,原因是 foot 匹配了 foo 之后,会返回 foo,但是 foo 的长度和 foot
-        // 的长度不一样
-        Collections.sort(list, COMPARATOR);
-        // 对pattern进行头尾完全匹配
-        list = completionPattern(list);
-        this.pattern = StringUtils.join(list, PATTERN_SPLIT);
-    }
-
-    public boolean filter(String filtered)  {
-        if (StringUtils.isEmpty(pattern)) {
-            return defaultEmptyValue;
-        }
-
-        if (StringUtils.isEmpty(filtered)) {
-            return defaultEmptyValue;
-        }
-
-        Map<String, Object> env = new HashMap<String, Object>();
-        env.put("pattern", pattern);
-        env.put("target", filtered.toLowerCase());
-        return (Boolean) exp.execute(env);
-    }
-
-    /**
-     * 修复正则表达式匹配的问题,因为使用了 oro 的 matches,会出现:
-     *
-     * <pre>
-     * foo|foot 匹配 foot 出错,原因是 foot 匹配了 foo 之后,会返回 foo,但是 foo 的长度和 foot 的长度不一样
-     * </pre>
-     *
-     * 因此此类对正则表达式进行了从长到短的排序
-     *
-     * @author zebin.xuzb 2012-10-22 下午2:02:26
-     * @version 1.0.0
-     */
-    private static class StringComparator implements Comparator<String> {
-
-        @Override
-        public int compare(String str1, String str2) {
-            if (str1.length() > str2.length()) {
-                return -1;
-            } else if (str1.length() < str2.length()) {
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-    }
-
-    /**
-     * 修复正则表达式匹配的问题,即使按照长度递减排序,还是会出现以下问题:
-     *
-     * <pre>
-     * foooo|f.*t 匹配 fooooot 出错,原因是 fooooot 匹配了 foooo 之后,会将 fooo 和数据进行匹配,但是 foooo 的长度和 fooooot 的长度不一样
-     * </pre>
-     *
-     * 因此此类对正则表达式进行头尾完全匹配
-     *
-     * @author simon
-     * @version 1.0.0
-     */
-
-    private List<String> completionPattern(List<String> patterns) {
-        List<String> result = new ArrayList<String>();
-        for (String pattern : patterns) {
-            StringBuffer stringBuffer = new StringBuffer();
-            stringBuffer.append("^");
-            stringBuffer.append(pattern);
-            stringBuffer.append("$");
-            result.add(stringBuffer.toString());
-        }
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return pattern;
-    }
-}

+ 0 - 38
protocol/src/main/java/com/alibaba/otter/canal/protocol/aviater/PatternUtils.java

@@ -1,38 +0,0 @@
-package com.alibaba.otter.canal.protocol.aviater;
-
-import java.util.Map;
-
-import org.apache.oro.text.regex.MalformedPatternException;
-import org.apache.oro.text.regex.Pattern;
-import org.apache.oro.text.regex.PatternCompiler;
-import org.apache.oro.text.regex.Perl5Compiler;
-
-import com.google.common.base.Function;
-import com.google.common.collect.MapMaker;
-import com.google.common.collect.MigrateMap;
-
-public class PatternUtils {
-    @SuppressWarnings("deprecation")
-    private static Map<String, Pattern> patterns = MigrateMap.makeComputingMap(new MapMaker().softValues(),
-        new Function<String, Pattern>() {
-
-            public Pattern apply(String pattern) {
-                try {
-                    PatternCompiler pc = new Perl5Compiler();
-                    return pc.compile(pattern,
-                        Perl5Compiler.CASE_INSENSITIVE_MASK | Perl5Compiler.READ_ONLY_MASK
-                                               | Perl5Compiler.SINGLELINE_MASK);
-                } catch (MalformedPatternException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-
-    public static Pattern getPattern(String pattern) {
-        return patterns.get(pattern);
-    }
-
-    public static void clear() {
-        patterns.clear();
-    }
-}

+ 0 - 31
protocol/src/main/java/com/alibaba/otter/canal/protocol/aviater/RegexFunction.java

@@ -1,31 +0,0 @@
-package com.alibaba.otter.canal.protocol.aviater;
-
-import java.util.Map;
-
-import org.apache.oro.text.regex.Perl5Matcher;
-
-import com.googlecode.aviator.runtime.function.AbstractFunction;
-import com.googlecode.aviator.runtime.function.FunctionUtils;
-import com.googlecode.aviator.runtime.type.AviatorBoolean;
-import com.googlecode.aviator.runtime.type.AviatorObject;
-
-/**
- * 提供aviator regex的代码扩展
- *
- * @author jianghang 2012-7-23 上午10:29:23
- */
-public class RegexFunction extends AbstractFunction {
-
-    public AviatorObject call(Map<String, Object> env, AviatorObject arg1, AviatorObject arg2) {
-        String pattern = FunctionUtils.getStringValue(arg1, env);
-        String text = FunctionUtils.getStringValue(arg2, env);
-        Perl5Matcher matcher = new Perl5Matcher();
-        boolean isMatch = matcher.matches(text, PatternUtils.getPattern(pattern));
-        return AviatorBoolean.valueOf(isMatch);
-    }
-
-    public String getName() {
-        return "regex";
-    }
-
-}

+ 405 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -0,0 +1,405 @@
+package com.alibaba.otter.canal.common;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.MigrateMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * process MQ Message utils
+ * 
+ * @author agapple 2018年12月11日 下午1:28:32
+ */
+public class MQMessageUtils {
+
+    @SuppressWarnings("deprecation")
+    private static Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                       new Function<String, List<PartitionData>>() {
+
+                                                                           public List<PartitionData> apply(String pkHashConfigs) {
+                                                                               List<PartitionData> datas = Lists.newArrayList();
+                                                                               String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
+                                                                                   ",");
+                                                                               // schema.table#id^name
+                                                                               for (String pkHashConfig : pkHashConfigArray) {
+                                                                                   PartitionData data = new PartitionData();
+                                                                                   int i = pkHashConfig.lastIndexOf("#");
+                                                                                   if (i > 0) {
+                                                                                       data.pkNames = Lists.newArrayList(StringUtils.split(pkHashConfig.substring(i + 1),
+                                                                                           '^'));
+                                                                                       pkHashConfig = pkHashConfig.substring(0,
+                                                                                           i);
+                                                                                   }
+                                                                                   if (!isWildCard(pkHashConfig)) {
+                                                                                       data.simpleName = pkHashConfig;
+                                                                                   } else {
+                                                                                       data.regexFilter = new AviaterRegexFilter(pkHashConfig);
+                                                                                   }
+                                                                                   datas.add(data);
+                                                                               }
+
+                                                                               return datas;
+                                                                           }
+                                                                       });
+
+    /**
+     * 将 message 分区
+     *
+     * @param partitionsNum 分区数
+     * @param pkHashConfigs 分区库表主键正则表达式
+     * @return 分区message数组
+     */
+    @SuppressWarnings("unchecked")
+    public static Message[] messagePartition(Message message, Integer partitionsNum, String pkHashConfigs) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
+        Message[] partitionMessages = new Message[partitionsNum];
+        List<Entry>[] partitionEntries = new List[partitionsNum];
+        for (int i = 0; i < partitionsNum; i++) {
+            partitionEntries[i] = new ArrayList<>();
+        }
+
+        List<CanalEntry.Entry> entries;
+        if (message.isRaw()) {
+            List<ByteString> rawEntries = message.getRawEntries();
+            entries = new ArrayList<>(rawEntries.size());
+            for (ByteString byteString : rawEntries) {
+                Entry entry;
+                try {
+                    entry = Entry.parseFrom(byteString);
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
+                entries.add(entry);
+            }
+        } else {
+            entries = message.getEntries();
+        }
+
+        for (Entry entry : entries) {
+            CanalEntry.RowChange rowChange;
+            try {
+                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+
+            if (rowChange.getIsDdl()) {
+                partitionEntries[0].add(entry);
+            } else {
+                if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
+                    String database = entry.getHeader().getSchemaName();
+                    String table = entry.getHeader().getTableName();
+                    List<String> pkNames = getParitionHashColumns(database + "." + table, pkHashConfigs);
+
+                    if (pkNames == null) {
+                        // 如果都没有匹配,发送到第一个分区
+                        partitionEntries[0].add(entry);
+                    } else {
+                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                            int hashCode = table.hashCode();
+                            if (pkNames.isEmpty()) {
+                                // isEmpty use default pkNames
+                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                    if (column.getIsKey()) {
+                                        hashCode = hashCode ^ column.getValue().hashCode();
+                                    }
+                                }
+                            } else {
+                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                    if (checkPkNamesHasContain(pkNames, column.getName())) {
+                                        hashCode = hashCode ^ column.getValue().hashCode();
+                                    }
+                                }
+                            }
+
+                            int pkHash = Math.abs(hashCode) % partitionsNum;
+                            pkHash = Math.abs(pkHash);
+                            partitionEntries[pkHash].add(entry);
+                        }
+                    }
+                }
+            }
+        }
+
+        for (int i = 0; i < partitionsNum; i++) {
+            List<Entry> entriesTmp = partitionEntries[i];
+            if (!entriesTmp.isEmpty()) {
+                partitionMessages[i] = new Message(message.getId(), entriesTmp);
+            }
+        }
+
+        return partitionMessages;
+    }
+
+    /**
+     * 将Message转换为FlatMessage
+     *
+     * @param message 原生message
+     * @return FlatMessage列表
+     */
+    public static List<FlatMessage> messageConverter(Message message) {
+        try {
+            if (message == null) {
+                return null;
+            }
+
+            List<FlatMessage> flatMessages = new ArrayList<>();
+            List<CanalEntry.Entry> entrys = null;
+            if (message.isRaw()) {
+                List<ByteString> rawEntries = message.getRawEntries();
+                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
+                for (ByteString byteString : rawEntries) {
+                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+                    entrys.add(entry);
+                }
+            } else {
+                entrys = message.getEntries();
+            }
+
+            for (CanalEntry.Entry entry : entrys) {
+                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                    continue;
+                }
+
+                CanalEntry.RowChange rowChange;
+                try {
+                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                } catch (Exception e) {
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                               + entry.toString(), e);
+                }
+
+                CanalEntry.EventType eventType = rowChange.getEventType();
+
+                FlatMessage flatMessage = new FlatMessage(message.getId());
+                flatMessages.add(flatMessage);
+                flatMessage.setDatabase(entry.getHeader().getSchemaName());
+                flatMessage.setTable(entry.getHeader().getTableName());
+                flatMessage.setIsDdl(rowChange.getIsDdl());
+                flatMessage.setType(eventType.toString());
+                flatMessage.setEs(entry.getHeader().getExecuteTime());
+                flatMessage.setTs(System.currentTimeMillis());
+                flatMessage.setSql(rowChange.getSql());
+
+                if (!rowChange.getIsDdl()) {
+                    Map<String, Integer> sqlType = new LinkedHashMap<>();
+                    Map<String, String> mysqlType = new LinkedHashMap<>();
+                    List<Map<String, String>> data = new ArrayList<>();
+                    List<Map<String, String>> old = new ArrayList<>();
+
+                    Set<String> updateSet = new HashSet<>();
+                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
+                            && eventType != CanalEntry.EventType.DELETE) {
+                            continue;
+                        }
+
+                        Map<String, String> row = new LinkedHashMap<>();
+                        List<CanalEntry.Column> columns;
+
+                        if (eventType == CanalEntry.EventType.DELETE) {
+                            columns = rowData.getBeforeColumnsList();
+                        } else {
+                            columns = rowData.getAfterColumnsList();
+                        }
+
+                        for (CanalEntry.Column column : columns) {
+                            if (column.getIsKey()) {
+                                flatMessage.addPkName(column.getName());
+                            }
+                            sqlType.put(column.getName(), column.getSqlType());
+                            mysqlType.put(column.getName(), column.getMysqlType());
+                            if (column.getIsNull()) {
+                                row.put(column.getName(), null);
+                            } else {
+                                row.put(column.getName(), column.getValue());
+                            }
+                            // 获取update为true的字段
+                            if (column.getUpdated()) {
+                                updateSet.add(column.getName());
+                            }
+                        }
+                        if (!row.isEmpty()) {
+                            data.add(row);
+                        }
+
+                        if (eventType == CanalEntry.EventType.UPDATE) {
+                            Map<String, String> rowOld = new LinkedHashMap<>();
+                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+                                if (updateSet.contains(column.getName())) {
+                                    if (column.getIsNull()) {
+                                        rowOld.put(column.getName(), null);
+                                    } else {
+                                        rowOld.put(column.getName(), column.getValue());
+                                    }
+                                }
+                            }
+                            // update操作将记录修改前的值
+                            if (!rowOld.isEmpty()) {
+                                old.add(rowOld);
+                            }
+                        }
+                    }
+                    if (!sqlType.isEmpty()) {
+                        flatMessage.setSqlType(sqlType);
+                    }
+                    if (!mysqlType.isEmpty()) {
+                        flatMessage.setMysqlType(mysqlType);
+                    }
+                    if (!data.isEmpty()) {
+                        flatMessage.setData(data);
+                    }
+                    if (!old.isEmpty()) {
+                        flatMessage.setOld(old);
+                    }
+                }
+            }
+            return flatMessages;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * 将FlatMessage按指定的字段值hash拆分
+     *
+     * @param flatMessage flatMessage
+     * @param partitionsNum 分区数量
+     * @param pkHashConfigs hash映射
+     * @return 拆分后的flatMessage数组
+     */
+    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
+        FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
+
+        if (flatMessage.getIsDdl()) {
+            partitionMessages[0] = flatMessage;
+        } else {
+            if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
+                String database = flatMessage.getDatabase();
+                String table = flatMessage.getTable();
+                List<String> pkNames = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                if (pkNames == null) {
+                    // 如果都没有匹配,发送到第一个分区
+                    partitionMessages[0] = flatMessage;
+                } else {
+                    if (pkNames.isEmpty()) {
+                        pkNames = flatMessage.getPkNames();
+                    }
+
+                    int hashCode = table.hashCode();
+                    int idx = 0;
+                    for (Map<String, String> row : flatMessage.getData()) {
+                        for (String pkName : pkNames) {
+                            String value = row.get(pkName);
+                            if (value == null) {
+                                value = "";
+                            }
+                            hashCode = hashCode ^ value.hashCode();
+                        }
+
+                        int pkHash = Math.abs(hashCode) % partitionsNum;
+                        // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
+                        pkHash = Math.abs(pkHash);
+
+                        FlatMessage flatMessageTmp = partitionMessages[pkHash];
+                        if (flatMessageTmp == null) {
+                            flatMessageTmp = new FlatMessage(flatMessage.getId());
+                            partitionMessages[pkHash] = flatMessageTmp;
+                            flatMessageTmp.setDatabase(flatMessage.getDatabase());
+                            flatMessageTmp.setTable(flatMessage.getTable());
+                            flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
+                            flatMessageTmp.setType(flatMessage.getType());
+                            flatMessageTmp.setSql(flatMessage.getSql());
+                            flatMessageTmp.setSqlType(flatMessage.getSqlType());
+                            flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
+                            flatMessageTmp.setEs(flatMessage.getEs());
+                            flatMessageTmp.setTs(flatMessage.getTs());
+                        }
+                        List<Map<String, String>> data = flatMessageTmp.getData();
+                        if (data == null) {
+                            data = new ArrayList<>();
+                            flatMessageTmp.setData(data);
+                        }
+                        data.add(row);
+                        if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
+                            List<Map<String, String>> old = flatMessageTmp.getOld();
+                            if (old == null) {
+                                old = new ArrayList<>();
+                                flatMessageTmp.setOld(old);
+                            }
+                            old.add(flatMessage.getOld().get(idx));
+                        }
+                        idx++;
+                    }
+                }
+            }
+        }
+        return partitionMessages;
+    }
+
+    /**
+     * match return List , not match return null
+     */
+    public static List<String> getParitionHashColumns(String name, String pkHashConfigs) {
+        List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
+        for (PartitionData data : datas) {
+            if (data.simpleName != null) {
+                if (data.simpleName.equalsIgnoreCase(name)) {
+                    return data.pkNames;
+                }
+            } else {
+                if (data.regexFilter.filter(name)) {
+                    return data.pkNames;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    public static boolean checkPkNamesHasContain(List<String> pkNames, String name) {
+        for (String pkName : pkNames) {
+            if (pkName.equalsIgnoreCase(name)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static boolean isWildCard(String value) {
+        // not contaiins '.' ?
+        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
+                '^' });
+    }
+
+    public static class PartitionData {
+
+        public String             simpleName;
+        public AviaterRegexFilter regexFilter;
+        public List<String>       pkNames = Lists.newArrayList();
+    }
+
+}

+ 0 - 1
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.Map;
 
 /**
  * kafka 配置项

+ 8 - 9
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQMessageUtils;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
@@ -88,7 +89,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
                         message);
                 } else {
                     if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
-                        Message[] messages = message.messagePartition(canalDestination.getPartitionsNum(),
+                        Message[] messages = MQMessageUtils.messagePartition(message,
+                            canalDestination.getPartitionsNum(),
                             canalDestination.getPartitionHash());
                         int length = messages.length;
                         for (int i = 0; i < length; i++) {
@@ -119,13 +121,12 @@ public class CanalKafkaProducer implements CanalMQProducer {
             }
         } else {
             // 发送扁平数据json
-            List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (canalDestination.getPartition() != null) {
                         try {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                canalDestination.getTopic(),
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                 canalDestination.getPartition(),
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
@@ -139,7 +140,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                     } else {
                         if (canalDestination.getPartitionHash() != null
                             && !canalDestination.getPartitionHash().isEmpty()) {
-                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                 canalDestination.getPartitionsNum(),
                                 canalDestination.getPartitionHash());
                             int length = partitionFlatMessage.length;
@@ -147,8 +148,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                            canalDestination.getTopic(),
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                             i,
                                             null,
                                             JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
@@ -163,8 +163,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         } else {
                             try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                    canalDestination.getTopic(),
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                     0,
                                     null,
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));

+ 14 - 13
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQMessageUtils;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
@@ -61,8 +62,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
         if (!mqProperties.getFlatMessage()) {
             try {
                 if (destination.getPartition() != null) {
-                    Message message = new Message(destination.getTopic(),
-                        CanalMessageSerializer.serializer(data, mqProperties.isFilterTransactionEntry()));
+                    Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
+                        mqProperties.isFilterTransactionEntry()));
                     if (logger.isDebugEnabled()) {
                         logger.debug("send message:{} to destination:{}, partition: {}",
                             message,
@@ -82,8 +83,9 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                     }, null);
                 } else {
                     if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
-                        com.alibaba.otter.canal.protocol.Message[] messages = data
-                            .messagePartition(destination.getPartitionsNum(), destination.getPartitionHash());
+                        com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(data,
+                            destination.getPartitionsNum(),
+                            destination.getPartitionHash());
                         int length = messages.length;
                         for (int i = 0; i < length; i++) {
                             com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
@@ -125,7 +127,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                 return;
             }
         } else {
-            List<FlatMessage> flatMessages = FlatMessage.messageConverter(data);
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (destination.getPartition() != null) {
@@ -136,8 +138,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     destination.getTopic(),
                                     destination.getPartition());
                             }
-                            Message message = new Message(destination.getTopic(),
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
+                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage,
+                                SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
@@ -152,7 +154,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                         }
                     } else {
                         if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
-                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                 destination.getPartitionsNum(),
                                 destination.getPartitionHash());
                             int length = partitionFlatMessage.length;
@@ -172,17 +174,16 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                             @Override
-                                            public MessageQueue select(List<MessageQueue> mqs, Message msg,
-                                                                       Object arg) {
+                                            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                                 if (index > mqs.size()) {
-                                                    throw new CanalServerException(
-                                                        "partition number is error,config num:"
+                                                    throw new CanalServerException("partition number is error,config num:"
                                                                                    + destination.getPartitionsNum()
                                                                                    + ", mq num: " + mqs.size());
                                                 }
                                                 return mqs.get(index);
                                             }
-                                        }, null);
+                                        },
+                                            null);
                                     } catch (Exception e) {
                                         logger.error("send flat message to hashed partition error", e);
                                         callback.rollback();