Ver código fonte

fixed hash mode

agapple 6 anos atrás
pai
commit
35e0e882c1

+ 42 - 20
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -41,11 +41,20 @@ public class MQMessageUtils {
                                                                                    PartitionData data = new PartitionData();
                                                                                    int i = pkHashConfig.lastIndexOf(":");
                                                                                    if (i > 0) {
-                                                                                       data.pkNames = Lists.newArrayList(StringUtils.split(pkHashConfig.substring(i + 1),
-                                                                                           '^'));
+                                                                                       String pkStr = pkHashConfig.substring(i + 1);
+                                                                                       if (pkStr.equalsIgnoreCase("$pk$")) {
+                                                                                           data.hashMode.autoPkHash = true;
+                                                                                       } else {
+                                                                                           data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
+                                                                                               '^'));
+                                                                                       }
+
                                                                                        pkHashConfig = pkHashConfig.substring(0,
                                                                                            i);
+                                                                                   } else {
+                                                                                       data.hashMode.tableHash = true;
                                                                                    }
+
                                                                                    if (!isWildCard(pkHashConfig)) {
                                                                                        data.simpleName = pkHashConfig;
                                                                                    } else {
@@ -107,24 +116,23 @@ public class MQMessageUtils {
                 if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
                     String database = entry.getHeader().getSchemaName();
                     String table = entry.getHeader().getTableName();
-                    List<String> pkNames = getParitionHashColumns(database + "." + table, pkHashConfigs);
-
-                    if (pkNames == null) {
+                    HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                    if (hashMode == null) {
                         // 如果都没有匹配,发送到第一个分区
                         partitionEntries[0].add(entry);
                     } else {
                         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                             int hashCode = table.hashCode();
-                            if (pkNames.isEmpty()) {
+                            if (hashMode.autoPkHash) {
                                 // isEmpty use default pkNames
                                 for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                                     if (column.getIsKey()) {
                                         hashCode = hashCode ^ column.getValue().hashCode();
                                     }
                                 }
-                            } else {
+                            } else if (!hashMode.tableHash) {
                                 for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
-                                    if (checkPkNamesHasContain(pkNames, column.getName())) {
+                                    if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
                                         hashCode = hashCode ^ column.getValue().hashCode();
                                     }
                                 }
@@ -299,24 +307,27 @@ public class MQMessageUtils {
             if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
                 String database = flatMessage.getDatabase();
                 String table = flatMessage.getTable();
-                List<String> pkNames = getParitionHashColumns(database + "." + table, pkHashConfigs);
-                if (pkNames == null) {
+                HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                if (hashMode == null) {
                     // 如果都没有匹配,发送到第一个分区
                     partitionMessages[0] = flatMessage;
                 } else {
-                    if (pkNames.isEmpty()) {
+                    List<String> pkNames = hashMode.pkNames;
+                    if (hashMode.autoPkHash) {
                         pkNames = flatMessage.getPkNames();
                     }
 
                     int hashCode = table.hashCode();
                     int idx = 0;
                     for (Map<String, String> row : flatMessage.getData()) {
-                        for (String pkName : pkNames) {
-                            String value = row.get(pkName);
-                            if (value == null) {
-                                value = "";
+                        if (!hashMode.tableHash) {
+                            for (String pkName : pkNames) {
+                                String value = row.get(pkName);
+                                if (value == null) {
+                                    value = "";
+                                }
+                                hashCode = hashCode ^ value.hashCode();
                             }
-                            hashCode = hashCode ^ value.hashCode();
                         }
 
                         int pkHash = Math.abs(hashCode) % partitionsNum;
@@ -362,16 +373,20 @@ public class MQMessageUtils {
     /**
      * match return List , not match return null
      */
-    public static List<String> getParitionHashColumns(String name, String pkHashConfigs) {
+    public static HashMode getParitionHashColumns(String name, String pkHashConfigs) {
+        if (StringUtils.isEmpty(pkHashConfigs)) {
+            return null;
+        }
+
         List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
         for (PartitionData data : datas) {
             if (data.simpleName != null) {
                 if (data.simpleName.equalsIgnoreCase(name)) {
-                    return data.pkNames;
+                    return data.hashMode;
                 }
             } else {
                 if (data.regexFilter.filter(name)) {
-                    return data.pkNames;
+                    return data.hashMode;
                 }
             }
         }
@@ -399,7 +414,14 @@ public class MQMessageUtils {
 
         public String             simpleName;
         public AviaterRegexFilter regexFilter;
-        public List<String>       pkNames = Lists.newArrayList();
+        public HashMode           hashMode = new HashMode();
+    }
+
+    public static class HashMode {
+
+        public boolean      autoPkHash = false;
+        public boolean      tableHash  = false;
+        public List<String> pkNames    = Lists.newArrayList();
     }
 
 }