Ver código fonte

根据主键Hash增加正则表达式支持,pk通配符指定自动获取支持

machey 6 anos atrás
pai
commit
03ee9c7ecf

+ 4 - 30
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -1,16 +1,11 @@
 package com.alibaba.otter.canal.instance.core;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-
 public class CanalMQConfig {
 
-    private String                       topic;
-    private Integer                      partition;
-    private Integer                      partitionsNum;
-    private String                       partitionHash;
-
-    private volatile Map<String, String> partitionHashProperties;
+    private String  topic;
+    private Integer partition;
+    private Integer partitionsNum;
+    private String  partitionHash;
 
     public String getTopic() {
         return topic;
@@ -44,25 +39,4 @@ public class CanalMQConfig {
         this.partitionHash = partitionHash;
     }
 
-    public Map<String, String> getPartitionHashProperties() {
-        if (partitionHashProperties == null) {
-            synchronized (CanalMQConfig.class) {
-                if (partitionHashProperties == null) {
-                    if (partitionHash != null) {
-                        partitionHashProperties = new LinkedHashMap<>();
-                        String[] items = partitionHash.split(",");
-                        for (String item : items) {
-                            int i = item.indexOf(":");
-                            if (i > -1) {
-                                String dbTable = item.substring(0, i).trim();
-                                String pk = item.substring(i + 1).trim();
-                                partitionHashProperties.put(dbTable, pk);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return partitionHashProperties;
-    }
 }

+ 110 - 58
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,12 +1,10 @@
 package com.alibaba.otter.canal.protocol;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
 
 import com.google.protobuf.ByteString;
 
@@ -33,8 +31,7 @@ public class FlatMessage implements Serializable {
     private List<Map<String, String>> data;
     private List<Map<String, String>> old;
 
-    public FlatMessage(){
-    }
+    private transient Message         message;                                 // 所属message
 
     public FlatMessage(long id){
         this.id = id;
@@ -136,6 +133,14 @@ public class FlatMessage implements Serializable {
         this.es = es;
     }
 
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
     /**
      * 将Message转换为FlatMessage
      * 
@@ -187,6 +192,7 @@ public class FlatMessage implements Serializable {
                 flatMessage.setEs(entry.getHeader().getExecuteTime());
                 flatMessage.setTs(System.currentTimeMillis());
                 flatMessage.setSql(rowChange.getSql());
+                flatMessage.setMessage(message);
 
                 if (!rowChange.getIsDdl()) {
                     Map<String, Integer> sqlType = new LinkedHashMap<>();
@@ -269,70 +275,116 @@ public class FlatMessage implements Serializable {
      * 
      * @param flatMessage flatMessage
      * @param partitionsNum 分区数量
-     * @param pkHashConfig hash映射
+     * @param pkHashConfigs hash映射
      * @return 拆分后的flatMessage数组
      */
-    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
-                                                 Map<String, String> pkHashConfig) {
+    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs) {
         if (partitionsNum == null) {
             partitionsNum = 1;
         }
         FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
-        String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
-        if (pk == null || flatMessage.getIsDdl()) {
+
+        if (flatMessage.getIsDdl()) {
             partitionMessages[0] = flatMessage;
         } else {
             if (flatMessage.getData() != null) {
-                int idx = 0;
-                for (Map<String, String> row : flatMessage.getData()) {
-                    Map<String, String> o = null;
-                    if (flatMessage.getOld() != null) {
-                        o = flatMessage.getOld().get(idx);
+                String database = flatMessage.getDatabase();
+                String table = flatMessage.getTable();
+
+                String pk = null;
+                boolean isMatch = false;
+
+                String[] pkHashConfigArray = StringUtils.split(pkHashConfigs, ",");
+                for (String pkHashConfig : pkHashConfigArray) {
+                    int i = pkHashConfig.lastIndexOf(".");
+                    if (!pkHashConfig.endsWith(".$pk$")) {
+                        // 如果指定了主键
+                        pk = pkHashConfig.substring(i + 1);
                     }
-                    String value;
-                    // 如果old中有pk值说明主键有修改, 以旧的主键值hash为准
-                    if (o != null && o.containsKey(pk)) {
-                        value = o.get(pk);
-                    } else {
-                        value = row.get(pk);
-                    }
-                    if (value == null) {
-                        value = "";
-                    }
-                    int hash = value.hashCode();
-                    int pkHash = Math.abs(hash) % partitionsNum;
-                    // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
-                    pkHash = Math.abs(pkHash);
-
-                    FlatMessage flatMessageTmp = partitionMessages[pkHash];
-                    if (flatMessageTmp == null) {
-                        flatMessageTmp = new FlatMessage(flatMessage.getId());
-                        partitionMessages[pkHash] = flatMessageTmp;
-                        flatMessageTmp.setDatabase(flatMessage.getDatabase());
-                        flatMessageTmp.setTable(flatMessage.getTable());
-                        flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
-                        flatMessageTmp.setType(flatMessage.getType());
-                        flatMessageTmp.setSql(flatMessage.getSql());
-                        flatMessageTmp.setSqlType(flatMessage.getSqlType());
-                        flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
-                        flatMessageTmp.setEs(flatMessage.getEs());
-                        flatMessageTmp.setTs(flatMessage.getTs());
+                    pkHashConfig = pkHashConfig.substring(0, i);
+                    isMatch = Pattern.matches(pkHashConfig, database + "." + table);
+                    if (isMatch) {
+                        break;
                     }
-                    List<Map<String, String>> data = flatMessageTmp.getData();
-                    if (data == null) {
-                        data = new ArrayList<>();
-                        flatMessageTmp.setData(data);
+                }
+
+                if (!isMatch) {
+                    // 如果都没有匹配,发送到第一个分区
+                    partitionMessages[0] = flatMessage;
+                } else {
+                    if (pk == null) {
+                        // 如果未指定主键(通配符主键),从原生message中取主键字段
+                        CanalEntry.Entry entry = flatMessage.getMessage().getEntries().get(0);
+                        CanalEntry.RowChange rowChange;
+                        try {
+                            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                        } catch (Exception e) {
+                            throw new RuntimeException(e.getMessage(), e);
+                        }
+                        CanalEntry.RowData rowData = rowChange.getRowDatasList().get(0);
+                        for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                            if (column.getIsKey()) {
+                                pk = column.getName();
+                                break;
+                            }
+                        }
                     }
-                    data.add(row);
-                    if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
-                        List<Map<String, String>> old = flatMessageTmp.getOld();
-                        if (old == null) {
-                            old = new ArrayList<>();
-                            flatMessageTmp.setOld(old);
+                    if (pk == null || !flatMessage.getData().get(0).containsKey(pk)) {
+                        // 如果都没有匹配的主键,发送到第一个分区
+                        partitionMessages[0] = flatMessage;
+                    } else {
+                        int idx = 0;
+                        for (Map<String, String> row : flatMessage.getData()) {
+                            Map<String, String> o = null;
+                            if (flatMessage.getOld() != null) {
+                                o = flatMessage.getOld().get(idx);
+                            }
+                            String value;
+                            // 如果old中有pk值说明主键有修改, 以旧的主键值hash为准
+                            if (o != null && o.containsKey(pk)) {
+                                value = o.get(pk);
+                            } else {
+                                value = row.get(pk);
+                            }
+                            if (value == null) {
+                                value = "";
+                            }
+                            int hash = value.hashCode();
+                            int pkHash = Math.abs(hash) % partitionsNum;
+                            // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
+                            pkHash = Math.abs(pkHash);
+
+                            FlatMessage flatMessageTmp = partitionMessages[pkHash];
+                            if (flatMessageTmp == null) {
+                                flatMessageTmp = new FlatMessage(flatMessage.getId());
+                                partitionMessages[pkHash] = flatMessageTmp;
+                                flatMessageTmp.setDatabase(flatMessage.getDatabase());
+                                flatMessageTmp.setTable(flatMessage.getTable());
+                                flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
+                                flatMessageTmp.setType(flatMessage.getType());
+                                flatMessageTmp.setSql(flatMessage.getSql());
+                                flatMessageTmp.setSqlType(flatMessage.getSqlType());
+                                flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
+                                flatMessageTmp.setEs(flatMessage.getEs());
+                                flatMessageTmp.setTs(flatMessage.getTs());
+                            }
+                            List<Map<String, String>> data = flatMessageTmp.getData();
+                            if (data == null) {
+                                data = new ArrayList<>();
+                                flatMessageTmp.setData(data);
+                            }
+                            data.add(row);
+                            if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
+                                List<Map<String, String>> old = flatMessageTmp.getOld();
+                                if (old == null) {
+                                    old = new ArrayList<>();
+                                    flatMessageTmp.setOld(old);
+                                }
+                                old.add(flatMessage.getOld().get(idx));
+                            }
+                            idx++;
                         }
-                        old.add(flatMessage.getOld().get(idx));
                     }
-                    idx++;
                 }
             }
         }

+ 8 - 7
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -28,11 +28,11 @@ public class MQProperties {
 
     public static class CanalDestination {
 
-        private String              canalDestination;
-        private String              topic;
-        private Integer             partition;
-        private Integer             partitionsNum;
-        private Map<String, String> partitionHash;
+        private String  canalDestination;
+        private String  topic;
+        private Integer partition;
+        private Integer partitionsNum;
+        private String  partitionHash;
 
         public String getCanalDestination() {
             return canalDestination;
@@ -66,11 +66,11 @@ public class MQProperties {
             this.partitionsNum = partitionsNum;
         }
 
-        public Map<String, String> getPartitionHash() {
+        public String getPartitionHash() {
             return partitionHash;
         }
 
-        public void setPartitionHash(Map<String, String> partitionHash) {
+        public void setPartitionHash(String partitionHash) {
             this.partitionHash = partitionHash;
         }
     }
@@ -186,6 +186,7 @@ public class MQProperties {
     public void setAliyunSecretKey(String aliyunSecretKey) {
         this.aliyunSecretKey = aliyunSecretKey;
     }
+
     public int getMaxRequestSize() {
         return maxRequestSize;
     }

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -138,7 +138,7 @@ public class CanalMQStarter {
                 canalDestination.setTopic(mqConfig.getTopic());
                 canalDestination.setPartition(mqConfig.getPartition());
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
-                canalDestination.setPartitionHash(mqConfig.getPartitionHashProperties());
+                canalDestination.setPartitionHash(mqConfig.getPartitionHash());
 
                 canalServer.subscribe(clientIdentity);
                 logger.info("## the MQ producer: {} is running now ......", destination);