Browse Source

去掉对before的判断,分区发送的表pk不能修改

machey 6 years ago
parent
commit
aeecf87490

+ 1 - 11
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -344,17 +344,7 @@ public class FlatMessage implements Serializable {
                     } else {
                         int idx = 0;
                         for (Map<String, String> row : flatMessage.getData()) {
-                            Map<String, String> o = null;
-                            if (flatMessage.getOld() != null) {
-                                o = flatMessage.getOld().get(idx);
-                            }
-                            String value;
-                            // 如果old中有pk值说明主键有修改, 以旧的主键值hash为准
-                            if (o != null && o.containsKey(pk)) {
-                                value = o.get(pk);
-                            } else {
-                                value = row.get(pk);
-                            }
+                            String value = row.get(pk);
                             if (value == null) {
                                 value = "";
                             }

+ 17 - 35
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -173,49 +173,31 @@ public class Message implements Serializable {
                         partitionEntries[0].add(entry);
                     }
                     for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-                        boolean hasOldPk = false;
-                        // 如果before中有pk值说明主键有修改, 以旧的主键值hash为准
-                        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
-                            CanalEntry.Column pkColumn = null;
-                            if (pk != null) {
+                        String pkValue = null;
+                        if (pk != null) {
+                            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                                 if (column.getName().equalsIgnoreCase(pk)) {
-                                    pkColumn = column;
-                                }
-                            } else {
-                                if (column.getIsKey()) {
-                                    pkColumn = column;
+                                    pkValue = column.getValue();
+                                    break;
                                 }
                             }
-                            if (pkColumn != null) {
-                                int hash = pkColumn.getValue().hashCode();
-                                int pkHash = Math.abs(hash) % partitionsNum;
-                                pkHash = Math.abs(pkHash);
-                                partitionEntries[pkHash].add(entry);
-                                hasOldPk = true;
-                                break;
-                            }
-                        }
-                        if (!hasOldPk) {
+                        } else {
                             for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
-                                CanalEntry.Column pkColumn = null;
-                                if (pk != null) {
-                                    if (column.getName().equalsIgnoreCase(pk)) {
-                                        pkColumn = column;
-                                    }
-                                } else {
-                                    if (column.getIsKey()) {
-                                        pkColumn = column;
-                                    }
-                                }
-                                if (pkColumn != null) {
-                                    int hash = pkColumn.getValue().hashCode();
-                                    int pkHash = Math.abs(hash) % partitionsNum;
-                                    pkHash = Math.abs(pkHash);
-                                    partitionEntries[pkHash].add(entry);
+                                if (column.getIsKey()) {
+                                    pkValue = column.getValue();
                                     break;
                                 }
                             }
                         }
+                        if (pkValue != null) {
+                            int hash = pkValue.hashCode();
+                            int pkHash = Math.abs(hash) % partitionsNum;
+                            pkHash = Math.abs(pkHash);
+                            partitionEntries[pkHash].add(entry);
+                        } else {
+                            // 没有找到主键
+                            partitionEntries[0].add(entry);
+                        }
                     }
                 }
             }