Browse Source

Merge pull request #1266 from rewerma/master

增加mq 分区规则配置正则表达式
agapple 6 years ago
parent
commit
bd5110b951

+ 2 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -70,6 +70,7 @@ public abstract class AbstractCanalAdapterWorker {
                     });
                     });
                     return true;
                     return true;
                 } catch (Exception e) {
                 } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                     return false;
                     return false;
                 }
                 }
             }));
             }));
@@ -108,6 +109,7 @@ public abstract class AbstractCanalAdapterWorker {
                     });
                     });
                     return true;
                     return true;
                 } catch (Exception e) {
                 } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                     return false;
                     return false;
                 }
                 }
             }));
             }));

+ 3 - 3
deployer/src/main/resources/example/instance.properties

@@ -1,5 +1,5 @@
 #################################################
 #################################################
-## mysql serverId , v1.0.26+ will autoGen 
+## mysql serverId , v1.0.26+ will autoGen
 # canal.instance.mysql.slaveId=0
 # canal.instance.mysql.slaveId=0
 
 
 # enable gtid use true/false
 # enable gtid use true/false
@@ -48,5 +48,5 @@ canal.mq.topic=example
 canal.mq.partition=0
 canal.mq.partition=0
 # hash partition config
 # hash partition config
 #canal.mq.partitionsNum=3
 #canal.mq.partitionsNum=3
-#canal.mq.partitionHash=mytest.person:id,mytest.role:id
-#################################################
+#canal.mq.partitionHash=.*\\..*.$pk$
+#################################################

+ 4 - 30
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -1,16 +1,11 @@
 package com.alibaba.otter.canal.instance.core;
 package com.alibaba.otter.canal.instance.core;
 
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-
 public class CanalMQConfig {
 public class CanalMQConfig {
 
 
-    private String                       topic;
-    private Integer                      partition;
-    private Integer                      partitionsNum;
-    private String                       partitionHash;
-
-    private volatile Map<String, String> partitionHashProperties;
+    private String  topic;
+    private Integer partition;
+    private Integer partitionsNum;
+    private String  partitionHash;
 
 
     public String getTopic() {
     public String getTopic() {
         return topic;
         return topic;
@@ -44,25 +39,4 @@ public class CanalMQConfig {
         this.partitionHash = partitionHash;
         this.partitionHash = partitionHash;
     }
     }
 
 
-    public Map<String, String> getPartitionHashProperties() {
-        if (partitionHashProperties == null) {
-            synchronized (CanalMQConfig.class) {
-                if (partitionHashProperties == null) {
-                    if (partitionHash != null) {
-                        partitionHashProperties = new LinkedHashMap<>();
-                        String[] items = partitionHash.split(",");
-                        for (String item : items) {
-                            int i = item.indexOf(":");
-                            if (i > -1) {
-                                String dbTable = item.substring(0, i).trim();
-                                String pk = item.substring(i + 1).trim();
-                                partitionHashProperties.put(dbTable, pk);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return partitionHashProperties;
-    }
 }
 }

+ 8 - 0
protocol/pom.xml

@@ -26,5 +26,13 @@
 			<groupId>commons-lang</groupId>
 			<groupId>commons-lang</groupId>
 			<artifactId>commons-lang</artifactId>
 			<artifactId>commons-lang</artifactId>
 		</dependency>
 		</dependency>
+		<dependency>
+			<groupId>oro</groupId>
+			<artifactId>oro</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.aviator</groupId>
+			<artifactId>aviator</artifactId>
+		</dependency>
 	</dependencies>
 	</dependencies>
 </project>
 </project>

+ 124 - 73
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,13 +1,13 @@
 package com.alibaba.otter.canal.protocol;
 package com.alibaba.otter.canal.protocol;
 
 
 import java.io.Serializable;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 
+import org.apache.commons.lang.StringUtils;
+
+import com.alibaba.otter.canal.protocol.aviater.AviaterRegexFilter;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
 
 
 /**
 /**
@@ -16,25 +16,27 @@ import com.google.protobuf.ByteString;
  */
  */
 public class FlatMessage implements Serializable {
 public class FlatMessage implements Serializable {
 
 
-    private static final long         serialVersionUID = -3386650678735860050L;
+    private static final long                                    serialVersionUID = -3386650678735860050L;
+
+    private static ConcurrentHashMap<String, AviaterRegexFilter> regexFilters     = new ConcurrentHashMap<>();
 
 
-    private long                      id;
-    private String                    database;
-    private String                    table;
-    private Boolean                   isDdl;
-    private String                    type;
+    private long                                                 id;
+    private String                                               database;
+    private String                                               table;
+    private String                                               pk;
+    private Boolean                                              isDdl;
+    private String                                               type;
     // binlog executeTime
     // binlog executeTime
-    private Long                      es;
+    private Long                                                 es;
     // dml build timeStamp
     // dml build timeStamp
-    private Long                      ts;
-    private String                    sql;
-    private Map<String, Integer>      sqlType;
-    private Map<String, String>       mysqlType;
-    private List<Map<String, String>> data;
-    private List<Map<String, String>> old;
+    private Long                                                 ts;
+    private String                                               sql;
+    private Map<String, Integer>                                 sqlType;
+    private Map<String, String>                                  mysqlType;
+    private List<Map<String, String>>                            data;
+    private List<Map<String, String>>                            old;
 
 
-    public FlatMessage(){
-    }
+    private transient CanalEntry.Entry                           entry;                                       // 所属entry
 
 
     public FlatMessage(long id){
     public FlatMessage(long id){
         this.id = id;
         this.id = id;
@@ -64,6 +66,14 @@ public class FlatMessage implements Serializable {
         this.table = table;
         this.table = table;
     }
     }
 
 
+    public String getPk() {
+        return pk;
+    }
+
+    public void setPk(String pk) {
+        this.pk = pk;
+    }
+
     public Boolean getIsDdl() {
     public Boolean getIsDdl() {
         return isDdl;
         return isDdl;
     }
     }
@@ -136,9 +146,17 @@ public class FlatMessage implements Serializable {
         this.es = es;
         this.es = es;
     }
     }
 
 
+    public CanalEntry.Entry getEntry() {
+        return entry;
+    }
+
+    public void setEntry(CanalEntry.Entry entry) {
+        this.entry = entry;
+    }
+
     /**
     /**
      * 将Message转换为FlatMessage
      * 将Message转换为FlatMessage
-     * 
+     *
      * @param message 原生message
      * @param message 原生message
      * @return FlatMessage列表
      * @return FlatMessage列表
      */
      */
@@ -187,6 +205,7 @@ public class FlatMessage implements Serializable {
                 flatMessage.setEs(entry.getHeader().getExecuteTime());
                 flatMessage.setEs(entry.getHeader().getExecuteTime());
                 flatMessage.setTs(System.currentTimeMillis());
                 flatMessage.setTs(System.currentTimeMillis());
                 flatMessage.setSql(rowChange.getSql());
                 flatMessage.setSql(rowChange.getSql());
+                flatMessage.setEntry(entry);
 
 
                 if (!rowChange.getIsDdl()) {
                 if (!rowChange.getIsDdl()) {
                     Map<String, Integer> sqlType = new LinkedHashMap<>();
                     Map<String, Integer> sqlType = new LinkedHashMap<>();
@@ -211,6 +230,9 @@ public class FlatMessage implements Serializable {
                         }
                         }
 
 
                         for (CanalEntry.Column column : columns) {
                         for (CanalEntry.Column column : columns) {
+                            if (flatMessage.getPk() == null && column.getIsKey()) {
+                                flatMessage.setPk(column.getName());
+                            }
                             sqlType.put(column.getName(), column.getSqlType());
                             sqlType.put(column.getName(), column.getSqlType());
                             mysqlType.put(column.getName(), column.getMysqlType());
                             mysqlType.put(column.getName(), column.getMysqlType());
                             if (column.getIsNull()) {
                             if (column.getIsNull()) {
@@ -266,73 +288,102 @@ public class FlatMessage implements Serializable {
 
 
     /**
     /**
      * 将FlatMessage按指定的字段值hash拆分
      * 将FlatMessage按指定的字段值hash拆分
-     * 
+     *
      * @param flatMessage flatMessage
      * @param flatMessage flatMessage
      * @param partitionsNum 分区数量
      * @param partitionsNum 分区数量
-     * @param pkHashConfig hash映射
+     * @param pkHashConfigs hash映射
      * @return 拆分后的flatMessage数组
      * @return 拆分后的flatMessage数组
      */
      */
-    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
-                                                 Map<String, String> pkHashConfig) {
+    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs) {
         if (partitionsNum == null) {
         if (partitionsNum == null) {
             partitionsNum = 1;
             partitionsNum = 1;
         }
         }
         FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
         FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
-        String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
-        if (pk == null || flatMessage.getIsDdl()) {
+
+        if (flatMessage.getIsDdl()) {
             partitionMessages[0] = flatMessage;
             partitionMessages[0] = flatMessage;
         } else {
         } else {
-            if (flatMessage.getData() != null) {
-                int idx = 0;
-                for (Map<String, String> row : flatMessage.getData()) {
-                    Map<String, String> o = null;
-                    if (flatMessage.getOld() != null) {
-                        o = flatMessage.getOld().get(idx);
+            if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
+                String database = flatMessage.getDatabase();
+                String table = flatMessage.getTable();
+
+                String pk = null;
+                boolean isMatch = false;
+
+                String[] pkHashConfigArray = StringUtils.split(pkHashConfigs, ",");
+                for (String pkHashConfig : pkHashConfigArray) {
+                    int i = pkHashConfig.lastIndexOf(".");
+                    if (!pkHashConfig.endsWith(".$pk$")) {
+                        // 如果指定了主键
+                        pk = pkHashConfig.substring(i + 1);
                     }
                     }
-                    String value;
-                    // 如果old中有pk值说明主键有修改, 以旧的主键值hash为准
-                    if (o != null && o.containsKey(pk)) {
-                        value = o.get(pk);
-                    } else {
-                        value = row.get(pk);
-                    }
-                    if (value == null) {
-                        value = "";
+                    pkHashConfig = pkHashConfig.substring(0, i);
+
+                    AviaterRegexFilter aviaterRegexFilter = regexFilters.get(pkHashConfig);
+                    if (aviaterRegexFilter == null) {
+                        aviaterRegexFilter = new AviaterRegexFilter(pkHashConfig);
+                        regexFilters.putIfAbsent(pkHashConfig, aviaterRegexFilter);
                     }
                     }
-                    int hash = value.hashCode();
-                    int pkHash = Math.abs(hash) % partitionsNum;
-                    // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
-                    pkHash = Math.abs(pkHash);
-
-                    FlatMessage flatMessageTmp = partitionMessages[pkHash];
-                    if (flatMessageTmp == null) {
-                        flatMessageTmp = new FlatMessage(flatMessage.getId());
-                        partitionMessages[pkHash] = flatMessageTmp;
-                        flatMessageTmp.setDatabase(flatMessage.getDatabase());
-                        flatMessageTmp.setTable(flatMessage.getTable());
-                        flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
-                        flatMessageTmp.setType(flatMessage.getType());
-                        flatMessageTmp.setSql(flatMessage.getSql());
-                        flatMessageTmp.setSqlType(flatMessage.getSqlType());
-                        flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
-                        flatMessageTmp.setEs(flatMessage.getEs());
-                        flatMessageTmp.setTs(flatMessage.getTs());
+
+                    isMatch = aviaterRegexFilter.filter(database + "." + table);
+                    if (isMatch) {
+                        break;
                     }
                     }
-                    List<Map<String, String>> data = flatMessageTmp.getData();
-                    if (data == null) {
-                        data = new ArrayList<>();
-                        flatMessageTmp.setData(data);
+                }
+
+                if (!isMatch) {
+                    // 如果都没有匹配,发送到第一个分区
+                    partitionMessages[0] = flatMessage;
+                } else {
+                    if (pk == null) {
+                        pk = flatMessage.getPk();
                     }
                     }
-                    data.add(row);
-                    if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
-                        List<Map<String, String>> old = flatMessageTmp.getOld();
-                        if (old == null) {
-                            old = new ArrayList<>();
-                            flatMessageTmp.setOld(old);
+                    if (pk == null || !flatMessage.getData().get(0).containsKey(pk)) {
+                        // 如果都没有匹配的主键,发送到第一个分区
+                        partitionMessages[0] = flatMessage;
+                    } else {
+                        int idx = 0;
+                        for (Map<String, String> row : flatMessage.getData()) {
+                            String value = row.get(pk);
+                            if (value == null) {
+                                value = "";
+                            }
+                            int hash = value.hashCode();
+                            int pkHash = Math.abs(hash) % partitionsNum;
+                            // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
+                            pkHash = Math.abs(pkHash);
+
+                            FlatMessage flatMessageTmp = partitionMessages[pkHash];
+                            if (flatMessageTmp == null) {
+                                flatMessageTmp = new FlatMessage(flatMessage.getId());
+                                partitionMessages[pkHash] = flatMessageTmp;
+                                flatMessageTmp.setDatabase(flatMessage.getDatabase());
+                                flatMessageTmp.setTable(flatMessage.getTable());
+                                flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
+                                flatMessageTmp.setType(flatMessage.getType());
+                                flatMessageTmp.setSql(flatMessage.getSql());
+                                flatMessageTmp.setSqlType(flatMessage.getSqlType());
+                                flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
+                                flatMessageTmp.setEs(flatMessage.getEs());
+                                flatMessageTmp.setTs(flatMessage.getTs());
+                            }
+                            List<Map<String, String>> data = flatMessageTmp.getData();
+                            if (data == null) {
+                                data = new ArrayList<>();
+                                flatMessageTmp.setData(data);
+                            }
+                            data.add(row);
+                            if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
+                                List<Map<String, String>> old = flatMessageTmp.getOld();
+                                if (old == null) {
+                                    old = new ArrayList<>();
+                                    flatMessageTmp.setOld(old);
+                                }
+                                old.add(flatMessage.getOld().get(idx));
+                            }
+                            idx++;
                         }
                         }
-                        old.add(flatMessage.getOld().get(idx));
                     }
                     }
-                    idx++;
                 }
                 }
             }
             }
         }
         }

+ 128 - 5
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -3,12 +3,16 @@ package com.alibaba.otter.canal.protocol;
 import java.io.Serializable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
 
 import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
 import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.aviater.AviaterRegexFilter;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 
 /**
 /**
  * @author zebin.xuzb @ 2012-6-19
  * @author zebin.xuzb @ 2012-6-19
@@ -16,14 +20,16 @@ import com.google.protobuf.ByteString;
  */
  */
 public class Message implements Serializable {
 public class Message implements Serializable {
 
 
-    private static final long      serialVersionUID = 1234034768477580009L;
+    private static final long                                    serialVersionUID = 1234034768477580009L;
 
 
-    private long                   id;
-    private List<CanalEntry.Entry> entries          = new ArrayList<CanalEntry.Entry>();
+    private static ConcurrentHashMap<String, AviaterRegexFilter> regexFilters     = new ConcurrentHashMap<>();
+
+    private long                                                 id;
+    private List<CanalEntry.Entry>                               entries          = new ArrayList<CanalEntry.Entry>();
     // row data for performance, see:
     // row data for performance, see:
     // https://github.com/alibaba/canal/issues/726
     // https://github.com/alibaba/canal/issues/726
-    private boolean                raw              = true;
-    private List<ByteString>       rawEntries       = new ArrayList<ByteString>();
+    private boolean                                              raw              = true;
+    private List<ByteString>                                     rawEntries       = new ArrayList<ByteString>();
 
 
     public Message(long id, List<Entry> entries){
     public Message(long id, List<Entry> entries){
         this.id = id;
         this.id = id;
@@ -89,4 +95,121 @@ public class Message implements Serializable {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }
     }
 
 
+    /**
+     * 将 message 分区
+     *
+     * @param partitionsNum 分区数
+     * @param pkHashConfigs 分区库表主键正则表达式
+     * @return 分区message数组
+     */
+    @SuppressWarnings("unchecked")
+    public Message[] messagePartition(Integer partitionsNum, String pkHashConfigs) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
+        Message[] partitionMessages = new Message[partitionsNum];
+        List<Entry>[] partitionEntries = new List[partitionsNum];
+        for (int i = 0; i < partitionsNum; i++) {
+            partitionEntries[i] = new ArrayList<>();
+        }
+
+        List<CanalEntry.Entry> entries;
+        if (this.isRaw()) {
+            List<ByteString> rawEntries = this.getRawEntries();
+            entries = new ArrayList<>(rawEntries.size());
+            for (ByteString byteString : rawEntries) {
+                Entry entry;
+                try {
+                    entry = Entry.parseFrom(byteString);
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
+                entries.add(entry);
+            }
+        } else {
+            entries = this.getEntries();
+        }
+
+        for (Entry entry : entries) {
+            CanalEntry.RowChange rowChange;
+            try {
+                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+            } catch (Exception e) {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+            if (rowChange.getIsDdl()) {
+                partitionEntries[0].add(entry);
+            } else {
+                if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
+                    String database = entry.getHeader().getSchemaName();
+                    String table = entry.getHeader().getTableName();
+
+                    String pk = null;
+                    boolean isMatch = false;
+
+                    String[] pkHashConfigArray = StringUtils.split(pkHashConfigs, ",");
+                    for (String pkHashConfig : pkHashConfigArray) {
+                        int i = pkHashConfig.lastIndexOf(".");
+                        if (!pkHashConfig.endsWith(".$pk$")) {
+                            // 如果指定了主键
+                            pk = pkHashConfig.substring(i + 1);
+                        }
+                        pkHashConfig = pkHashConfig.substring(0, i);
+
+                        AviaterRegexFilter aviaterRegexFilter = regexFilters.get(pkHashConfig);
+                        if (aviaterRegexFilter == null) {
+                            aviaterRegexFilter = new AviaterRegexFilter(pkHashConfig);
+                            regexFilters.putIfAbsent(pkHashConfig, aviaterRegexFilter);
+                        }
+
+                        isMatch = aviaterRegexFilter.filter(database + "." + table);
+                        if (isMatch) {
+                            break;
+                        }
+                    }
+
+                    if (!isMatch) {
+                        // 如果都没有匹配,发送到第一个分区
+                        partitionEntries[0].add(entry);
+                    }
+                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                        String pkValue = null;
+                        if (pk != null) {
+                            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                if (column.getName().equalsIgnoreCase(pk)) {
+                                    pkValue = column.getValue();
+                                    break;
+                                }
+                            }
+                        } else {
+                            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                if (column.getIsKey()) {
+                                    pkValue = column.getValue();
+                                    break;
+                                }
+                            }
+                        }
+                        if (pkValue != null) {
+                            int hash = pkValue.hashCode();
+                            int pkHash = Math.abs(hash) % partitionsNum;
+                            pkHash = Math.abs(pkHash);
+                            partitionEntries[pkHash].add(entry);
+                        } else {
+                            // 没有找到主键
+                            partitionEntries[0].add(entry);
+                        }
+                    }
+                }
+            }
+        }
+
+        for (int i = 0; i < partitionsNum; i++) {
+            List<Entry> entriesTmp = partitionEntries[i];
+            if (!entriesTmp.isEmpty()) {
+                partitionMessages[i] = new Message(this.id, entriesTmp);
+            }
+        }
+
+        return partitionMessages;
+    }
 }
 }

+ 124 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/aviater/AviaterRegexFilter.java

@@ -0,0 +1,124 @@
+package com.alibaba.otter.canal.protocol.aviater;
+
+import java.util.*;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.Expression;
+
+/**
+ * 基于aviater进行tableName正则匹配的过滤算法
+ *
+ * @author jianghang 2012-7-20 下午06:01:34
+ */
+public class AviaterRegexFilter {
+
+    private static final String             SPLIT             = ",";
+    private static final String             PATTERN_SPLIT     = "|";
+    private static final String             FILTER_EXPRESSION = "regex(pattern,target)";
+    private static final RegexFunction regexFunction     = new RegexFunction();
+    private final Expression                exp               = AviatorEvaluator.compile(FILTER_EXPRESSION, true);
+    static {
+        AviatorEvaluator.addFunction(regexFunction);
+    }
+
+    private static final Comparator<String> COMPARATOR        = new StringComparator();
+
+    final private String                    pattern;
+    final private boolean                   defaultEmptyValue;
+
+    public AviaterRegexFilter(String pattern){
+        this(pattern, true);
+    }
+
+    public AviaterRegexFilter(String pattern, boolean defaultEmptyValue){
+        this.defaultEmptyValue = defaultEmptyValue;
+        List<String> list = null;
+        if (StringUtils.isEmpty(pattern)) {
+            list = new ArrayList<String>();
+        } else {
+            String[] ss = StringUtils.split(pattern, SPLIT);
+            list = Arrays.asList(ss);
+        }
+
+        // 对pattern按照从长到短的排序
+        // 因为 foo|foot 匹配 foot 会出错,原因是 foot 匹配了 foo 之后,会返回 foo,但是 foo 的长度和 foot
+        // 的长度不一样
+        Collections.sort(list, COMPARATOR);
+        // 对pattern进行头尾完全匹配
+        list = completionPattern(list);
+        this.pattern = StringUtils.join(list, PATTERN_SPLIT);
+    }
+
+    public boolean filter(String filtered)  {
+        if (StringUtils.isEmpty(pattern)) {
+            return defaultEmptyValue;
+        }
+
+        if (StringUtils.isEmpty(filtered)) {
+            return defaultEmptyValue;
+        }
+
+        Map<String, Object> env = new HashMap<String, Object>();
+        env.put("pattern", pattern);
+        env.put("target", filtered.toLowerCase());
+        return (Boolean) exp.execute(env);
+    }
+
+    /**
+     * 修复正则表达式匹配的问题,因为使用了 oro 的 matches,会出现:
+     *
+     * <pre>
+     * foo|foot 匹配 foot 出错,原因是 foot 匹配了 foo 之后,会返回 foo,但是 foo 的长度和 foot 的长度不一样
+     * </pre>
+     *
+     * 因此此类对正则表达式进行了从长到短的排序
+     *
+     * @author zebin.xuzb 2012-10-22 下午2:02:26
+     * @version 1.0.0
+     */
+    private static class StringComparator implements Comparator<String> {
+
+        @Override
+        public int compare(String str1, String str2) {
+            if (str1.length() > str2.length()) {
+                return -1;
+            } else if (str1.length() < str2.length()) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
+
+    /**
+     * 修复正则表达式匹配的问题,即使按照长度递减排序,还是会出现以下问题:
+     *
+     * <pre>
+     * foooo|f.*t 匹配 fooooot 出错,原因是 fooooot 匹配了 foooo 之后,会将 fooo 和数据进行匹配,但是 foooo 的长度和 fooooot 的长度不一样
+     * </pre>
+     *
+     * 因此此类对正则表达式进行头尾完全匹配
+     *
+     * @author simon
+     * @version 1.0.0
+     */
+
+    private List<String> completionPattern(List<String> patterns) {
+        List<String> result = new ArrayList<String>();
+        for (String pattern : patterns) {
+            StringBuffer stringBuffer = new StringBuffer();
+            stringBuffer.append("^");
+            stringBuffer.append(pattern);
+            stringBuffer.append("$");
+            result.add(stringBuffer.toString());
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return pattern;
+    }
+}

+ 38 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/aviater/PatternUtils.java

@@ -0,0 +1,38 @@
+package com.alibaba.otter.canal.protocol.aviater;
+
+import java.util.Map;
+
+import org.apache.oro.text.regex.MalformedPatternException;
+import org.apache.oro.text.regex.Pattern;
+import org.apache.oro.text.regex.PatternCompiler;
+import org.apache.oro.text.regex.Perl5Compiler;
+
+import com.google.common.base.Function;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.MigrateMap;
+
+public class PatternUtils {
+    @SuppressWarnings("deprecation")
+    private static Map<String, Pattern> patterns = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+        new Function<String, Pattern>() {
+
+            public Pattern apply(String pattern) {
+                try {
+                    PatternCompiler pc = new Perl5Compiler();
+                    return pc.compile(pattern,
+                        Perl5Compiler.CASE_INSENSITIVE_MASK | Perl5Compiler.READ_ONLY_MASK
+                                               | Perl5Compiler.SINGLELINE_MASK);
+                } catch (MalformedPatternException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+    public static Pattern getPattern(String pattern) {
+        return patterns.get(pattern);
+    }
+
+    public static void clear() {
+        patterns.clear();
+    }
+}

+ 31 - 0
protocol/src/main/java/com/alibaba/otter/canal/protocol/aviater/RegexFunction.java

@@ -0,0 +1,31 @@
+package com.alibaba.otter.canal.protocol.aviater;
+
+import java.util.Map;
+
+import org.apache.oro.text.regex.Perl5Matcher;
+
+import com.googlecode.aviator.runtime.function.AbstractFunction;
+import com.googlecode.aviator.runtime.function.FunctionUtils;
+import com.googlecode.aviator.runtime.type.AviatorBoolean;
+import com.googlecode.aviator.runtime.type.AviatorObject;
+
+/**
+ * 提供aviator regex的代码扩展
+ *
+ * @author jianghang 2012-7-23 上午10:29:23
+ */
+public class RegexFunction extends AbstractFunction {
+
+    public AviatorObject call(Map<String, Object> env, AviatorObject arg1, AviatorObject arg2) {
+        String pattern = FunctionUtils.getStringValue(arg1, env);
+        String text = FunctionUtils.getStringValue(arg2, env);
+        Perl5Matcher matcher = new Perl5Matcher();
+        boolean isMatch = matcher.matches(text, PatternUtils.getPattern(pattern));
+        return AviatorBoolean.valueOf(isMatch);
+    }
+
+    public String getName() {
+        return "regex";
+    }
+
+}

+ 8 - 7
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -28,11 +28,11 @@ public class MQProperties {
 
 
     public static class CanalDestination {
     public static class CanalDestination {
 
 
-        private String              canalDestination;
-        private String              topic;
-        private Integer             partition;
-        private Integer             partitionsNum;
-        private Map<String, String> partitionHash;
+        private String  canalDestination;
+        private String  topic;
+        private Integer partition;
+        private Integer partitionsNum;
+        private String  partitionHash;
 
 
         public String getCanalDestination() {
         public String getCanalDestination() {
             return canalDestination;
             return canalDestination;
@@ -66,11 +66,11 @@ public class MQProperties {
             this.partitionsNum = partitionsNum;
             this.partitionsNum = partitionsNum;
         }
         }
 
 
-        public Map<String, String> getPartitionHash() {
+        public String getPartitionHash() {
             return partitionHash;
             return partitionHash;
         }
         }
 
 
-        public void setPartitionHash(Map<String, String> partitionHash) {
+        public void setPartitionHash(String partitionHash) {
             this.partitionHash = partitionHash;
             this.partitionHash = partitionHash;
         }
         }
     }
     }
@@ -186,6 +186,7 @@ public class MQProperties {
     public void setAliyunSecretKey(String aliyunSecretKey) {
     public void setAliyunSecretKey(String aliyunSecretKey) {
         this.aliyunSecretKey = aliyunSecretKey;
         this.aliyunSecretKey = aliyunSecretKey;
     }
     }
+
     public int getMaxRequestSize() {
     public int getMaxRequestSize() {
         return maxRequestSize;
         return maxRequestSize;
     }
     }

+ 22 - 8
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -80,22 +80,36 @@ public class CanalKafkaProducer implements CanalMQProducer {
         // producer.beginTransaction();
         // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
         if (!kafkaProperties.getFlatMessage()) {
             try {
             try {
-                ProducerRecord<String, Message> record;
+                ProducerRecord<String, Message> record = null;
                 if (canalDestination.getPartition() != null) {
                 if (canalDestination.getPartition() != null) {
-                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
+                    record = new ProducerRecord<>(canalDestination.getTopic(),
                         canalDestination.getPartition(),
                         canalDestination.getPartition(),
                         null,
                         null,
                         message);
                         message);
                 } else {
                 } else {
-                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
+                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
+                        Message[] messages = message.messagePartition(canalDestination.getPartitionsNum(),
+                            canalDestination.getPartitionHash());
+                        int length = messages.length;
+                        for (int i = 0; i < length; i++) {
+                            Message messagePartition = messages[i];
+                            if (messagePartition != null) {
+                                record = new ProducerRecord<>(canalDestination.getTopic(), i, null, messagePartition);
+                            }
+                        }
+                    } else {
+                        record = new ProducerRecord<>(canalDestination.getTopic(), 0, null, message);
+                    }
                 }
                 }
 
 
-                producer.send(record).get();
+                if (record != null) {
+                    producer.send(record).get();
 
 
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Send  message to kafka topic: [{}], packet: {}",
-                        canalDestination.getTopic(),
-                        message.toString());
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Send  message to kafka topic: [{}], packet: {}",
+                            canalDestination.getTopic(),
+                            message.toString());
+                    }
                 }
                 }
             } catch (Exception e) {
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
                 logger.error(e.getMessage(), e);

+ 83 - 35
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,14 +1,7 @@
 package com.alibaba.otter.canal.rocketmq;
 package com.alibaba.otter.canal.rocketmq;
 
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.server.exception.CanalServerException;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
+import java.util.List;
+
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -20,7 +13,15 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.util.List;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 
 
 public class CanalRocketMQProducer implements CanalMQProducer {
 public class CanalRocketMQProducer implements CanalMQProducer {
 
 
@@ -34,8 +35,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     public void init(MQProperties rocketMQProperties) {
     public void init(MQProperties rocketMQProperties) {
         this.mqProperties = rocketMQProperties;
         this.mqProperties = rocketMQProperties;
         RPCHook rpcHook = null;
         RPCHook rpcHook = null;
-        if(rocketMQProperties.getAliyunAccessKey().length() > 0
-            && rocketMQProperties.getAliyunSecretKey().length() > 0){
+        if (rocketMQProperties.getAliyunAccessKey().length() > 0
+            && rocketMQProperties.getAliyunSecretKey().length() > 0) {
             SessionCredentials sessionCredentials = new SessionCredentials();
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
             sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
             sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
             sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
@@ -59,23 +60,65 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                      Callback callback) {
                      Callback callback) {
         if (!mqProperties.getFlatMessage()) {
         if (!mqProperties.getFlatMessage()) {
             try {
             try {
-                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
-                    mqProperties.isFilterTransactionEntry()));
-                logger.debug("send message:{} to destination:{}, partition: {}",
-                    message,
-                    destination.getCanalDestination(),
-                    destination.getPartition());
-                this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                    @Override
-                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                        int partition = 0;
-                        if (destination.getPartition() != null) {
-                            partition = destination.getPartition();
+                if (destination.getPartition() != null) {
+                    Message message = new Message(destination.getTopic(),
+                        CanalMessageSerializer.serializer(data, mqProperties.isFilterTransactionEntry()));
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("send message:{} to destination:{}, partition: {}",
+                            message,
+                            destination.getCanalDestination(),
+                            destination.getPartition());
+                    }
+                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                        @Override
+                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                            int partition = 0;
+                            if (destination.getPartition() != null) {
+                                partition = destination.getPartition();
+                            }
+                            return mqs.get(partition);
+                        }
+                    }, null);
+                } else {
+                    if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                        com.alibaba.otter.canal.protocol.Message[] messages = data
+                            .messagePartition(destination.getPartitionsNum(), destination.getPartitionHash());
+                        int length = messages.length;
+                        for (int i = 0; i < length; i++) {
+                            com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
+                            if (dataPartition != null) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("flatMessagePart: {}, partition: {}",
+                                        JSON.toJSONString(dataPartition, SerializerFeature.WriteMapNullValue),
+                                        i);
+                                }
+                                final int index = i;
+                                try {
+                                    Message message = new Message(destination.getTopic(),
+                                        CanalMessageSerializer.serializer(dataPartition,
+                                            mqProperties.isFilterTransactionEntry()));
+                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                                        @Override
+                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                            if (index > mqs.size()) {
+                                                throw new CanalServerException("partition number is error,config num:"
+                                                                               + destination.getPartitionsNum()
+                                                                               + ", mq num: " + mqs.size());
+                                            }
+                                            return mqs.get(index);
+                                        }
+                                    }, null);
+                                } catch (Exception e) {
+                                    logger.error("send flat message to hashed partition error", e);
+                                    callback.rollback();
+                                    return;
+                                }
+                            }
                         }
                         }
-                        return mqs.get(partition);
                     }
                     }
-                }, null);
+                }
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
                 logger.error("Send message error!", e);
                 callback.rollback();
                 callback.rollback();
@@ -87,10 +130,12 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                 for (FlatMessage flatMessage : flatMessages) {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (destination.getPartition() != null) {
                     if (destination.getPartition() != null) {
                         try {
                         try {
-                            logger.info("send flat message: {} to topic: {} fixed partition: {}",
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
-                                destination.getTopic(),
-                                destination.getPartition());
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("send message: {} to topic: {} fixed partition: {}",
+                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
+                                    destination.getTopic(),
+                                    destination.getPartition());
+                            }
                             Message message = new Message(destination.getTopic(),
                             Message message = new Message(destination.getTopic(),
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
@@ -114,13 +159,16 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             for (int i = 0; i < length; i++) {
                             for (int i = 0; i < length; i++) {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                 if (flatMessagePart != null) {
-                                    logger.debug("flatMessagePart: {}, partition: {}",
-                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
-                                        i);
+                                    if (logger.isDebugEnabled()) {
+                                        logger.debug("flatMessagePart: {}, partition: {}",
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
+                                            i);
+                                    }
                                     final int index = i;
                                     final int index = i;
                                     try {
                                     try {
                                         Message message = new Message(destination.getTopic(),
                                         Message message = new Message(destination.getTopic(),
-                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue).getBytes());
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)
+                                                .getBytes());
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
 
                                             @Override
                                             @Override

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -138,7 +138,7 @@ public class CanalMQStarter {
                 canalDestination.setTopic(mqConfig.getTopic());
                 canalDestination.setTopic(mqConfig.getTopic());
                 canalDestination.setPartition(mqConfig.getPartition());
                 canalDestination.setPartition(mqConfig.getPartition());
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
-                canalDestination.setPartitionHash(mqConfig.getPartitionHashProperties());
+                canalDestination.setPartitionHash(mqConfig.getPartitionHash());
 
 
                 canalServer.subscribe(clientIdentity);
                 canalServer.subscribe(clientIdentity);
                 logger.info("## the MQ producer: {} is running now ......", destination);
                 logger.info("## the MQ producer: {} is running now ......", destination);