|
@@ -222,34 +222,27 @@ public class MQMessageUtils {
|
|
|
} else {
|
|
|
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
|
|
|
int hashCode = database.hashCode();
|
|
|
+ CanalEntry.EventType eventType = rowChange.getEventType();
|
|
|
+ List<CanalEntry.Column> columns = null;
|
|
|
+ if (eventType == CanalEntry.EventType.DELETE) {
|
|
|
+ columns = rowData.getBeforeColumnsList();
|
|
|
+ } else {
|
|
|
+ columns = rowData.getAfterColumnsList();
|
|
|
+ }
|
|
|
+
|
|
|
if (hashMode.autoPkHash) {
|
|
|
// isEmpty use default pkNames
|
|
|
- for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
|
|
|
+ for (CanalEntry.Column column : columns) {
|
|
|
if (column.getIsKey()) {
|
|
|
hashCode = hashCode ^ column.getValue().hashCode();
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
- try {
|
|
|
- CanalEntry.EventType eventType = RowChange.parseFrom(entry.getStoreValue()).getEventType();
|
|
|
- if(eventType == CanalEntry.EventType.DELETE){
|
|
|
- for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
|
|
|
- if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
|
|
|
- hashCode = hashCode ^ column.getValue().hashCode();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- else {
|
|
|
- for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
|
|
|
- if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
|
|
|
- hashCode = hashCode ^ column.getValue().hashCode();
|
|
|
- }
|
|
|
- }
|
|
|
+ for (CanalEntry.Column column : columns) {
|
|
|
+ if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
|
|
|
+ hashCode = hashCode ^ column.getValue().hashCode();
|
|
|
}
|
|
|
}
|
|
|
- catch (InvalidProtocolBufferException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
int pkHash = Math.abs(hashCode) % partitionsNum;
|