Browse Source

修改convert时设置pk

machey 6 years ago
parent
commit
eac3f5ce5f

+ 14 - 21
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -5,9 +5,9 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import com.alibaba.otter.canal.protocol.aviater.AviaterRegexFilter;
 import org.apache.commons.lang.StringUtils;
 
+import com.alibaba.otter.canal.protocol.aviater.AviaterRegexFilter;
 import com.google.protobuf.ByteString;
 
 /**
@@ -18,12 +18,12 @@ public class FlatMessage implements Serializable {
 
     private static final long                                    serialVersionUID = -3386650678735860050L;
 
-    private static ConcurrentMap<String, String>                 schemaTabPk      = new ConcurrentHashMap<>();
     private static ConcurrentHashMap<String, AviaterRegexFilter> regexFilters     = new ConcurrentHashMap<>();
 
     private long                                                 id;
     private String                                               database;
     private String                                               table;
+    private String                                               pk;
     private Boolean                                              isDdl;
     private String                                               type;
     // binlog executeTime
@@ -66,6 +66,14 @@ public class FlatMessage implements Serializable {
         this.table = table;
     }
 
+    public String getPk() {
+        return pk;
+    }
+
+    public void setPk(String pk) {
+        this.pk = pk;
+    }
+
     public Boolean getIsDdl() {
         return isDdl;
     }
@@ -222,6 +230,9 @@ public class FlatMessage implements Serializable {
                         }
 
                         for (CanalEntry.Column column : columns) {
+                            if (flatMessage.getPk() == null && column.getIsKey()) {
+                                flatMessage.setPk(column.getName());
+                            }
                             sqlType.put(column.getName(), column.getSqlType());
                             mysqlType.put(column.getName(), column.getMysqlType());
                             if (column.getIsNull()) {
@@ -325,25 +336,7 @@ public class FlatMessage implements Serializable {
                     partitionMessages[0] = flatMessage;
                 } else {
                     if (pk == null) {
-                        pk = schemaTabPk.get(database + "." + table);
-                        if (pk == null) {
-                            // 如果未指定主键(通配符主键),从原生message中取主键字段
-                            CanalEntry.Entry entry = flatMessage.getEntry();
-                            CanalEntry.RowChange rowChange;
-                            try {
-                                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-                            } catch (Exception e) {
-                                throw new RuntimeException(e.getMessage(), e);
-                            }
-                            CanalEntry.RowData rowData = rowChange.getRowDatasList().get(0);
-                            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
-                                if (column.getIsKey()) {
-                                    pk = column.getName();
-                                    schemaTabPk.putIfAbsent(database + "." + table, pk);
-                                    break;
-                                }
-                            }
-                        }
+                        pk = flatMessage.getPk();
                     }
                     if (pk == null || !flatMessage.getData().get(0).containsKey(pk)) {
                         // 如果都没有匹配的主键,发送到第一个分区

+ 35 - 42
protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java

@@ -4,14 +4,13 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
-import com.alibaba.otter.canal.protocol.aviater.AviaterRegexFilter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
 import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.aviater.AviaterRegexFilter;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -23,7 +22,6 @@ public class Message implements Serializable {
 
     private static final long                                    serialVersionUID = 1234034768477580009L;
 
-    private static ConcurrentMap<String, String>                 schemaTabPk      = new ConcurrentHashMap<>();
     private static ConcurrentHashMap<String, AviaterRegexFilter> regexFilters     = new ConcurrentHashMap<>();
 
     private long                                                 id;
@@ -173,55 +171,50 @@ public class Message implements Serializable {
                     if (!isMatch) {
                         // 如果都没有匹配,发送到第一个分区
                         partitionEntries[0].add(entry);
-                    } else {
-                        if (pk == null) {
-                            pk = schemaTabPk.get(database + "." + table);
-                            if (pk == null) {
-                                // 如果未指定主键(通配符主键),取主键字段
-                                try {
-                                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-                                } catch (Exception e) {
-                                    throw new RuntimeException(e.getMessage(), e);
+                    }
+                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                        boolean hasOldPk = false;
+                        // 如果before中有pk值说明主键有修改, 以旧的主键值hash为准
+                        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+                            CanalEntry.Column pkColumn = null;
+                            if (pk != null) {
+                                if (column.getName().equalsIgnoreCase(pk)) {
+                                    pkColumn = column;
                                 }
-                                CanalEntry.RowData rowData = rowChange.getRowDatasList().get(0);
-                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
-                                    if (column.getIsKey()) {
-                                        pk = column.getName();
-                                        schemaTabPk.putIfAbsent(database + "." + table, pk);
-                                        break;
-                                    }
+                            } else {
+                                if (column.getIsKey()) {
+                                    pkColumn = column;
                                 }
                             }
+                            if (pkColumn != null) {
+                                int hash = pkColumn.getValue().hashCode();
+                                int pkHash = Math.abs(hash) % partitionsNum;
+                                pkHash = Math.abs(pkHash);
+                                partitionEntries[pkHash].add(entry);
+                                hasOldPk = true;
+                                break;
+                            }
                         }
-                    }
-                    if (pk == null) {
-                        // 如果都没有匹配的主键,发送到第一个分区
-                        partitionEntries[0].add(entry);
-                    } else {
-                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-                            boolean hasOldPk = false;
-                            // 如果before中有pk值说明主键有修改, 以旧的主键值hash为准
-                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
-                                if (column.getName().equalsIgnoreCase(pk)) {
-                                    int hash = column.getValue().hashCode();
+                        if (!hasOldPk) {
+                            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                CanalEntry.Column pkColumn = null;
+                                if (pk != null) {
+                                    if (column.getName().equalsIgnoreCase(pk)) {
+                                        pkColumn = column;
+                                    }
+                                } else {
+                                    if (column.getIsKey()) {
+                                        pkColumn = column;
+                                    }
+                                }
+                                if (pkColumn != null) {
+                                    int hash = pkColumn.getValue().hashCode();
                                     int pkHash = Math.abs(hash) % partitionsNum;
                                     pkHash = Math.abs(pkHash);
                                     partitionEntries[pkHash].add(entry);
-                                    hasOldPk = true;
                                     break;
                                 }
                             }
-                            if (!hasOldPk) {
-                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
-                                    if (column.getName().equalsIgnoreCase(pk)) {
-                                        int hash = column.getValue().hashCode();
-                                        int pkHash = Math.abs(hash) % partitionsNum;
-                                        pkHash = Math.abs(pkHash);
-                                        partitionEntries[pkHash].add(entry);
-                                        break;
-                                    }
-                                }
-                            }
                         }
                     }
                 }