1
0
Эх сурвалжийг харах

预留sync(MappingConfig config, List<Dml> dmlList)方法
pk hash以old pk值为准若存在

mcy 6 жил өмнө
parent
commit
915849b83a

+ 0 - 3
client-adapter/launcher/src/main/resources/application.yml

@@ -35,8 +35,6 @@ canal.conf:
 #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
 #          jdbc.username: mytest
 #          jdbc.password: m121212
-#          threads: 5
-#          commitSize: 3000
 #      - name: rdb
 #        key: postgres1
 #        properties:
@@ -44,7 +42,6 @@ canal.conf:
 #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
 #          jdbc.username: postgres
 #          jdbc.password: 121212
-#          threads: 5
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: 127.0.0.1

+ 26 - 4
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -66,6 +66,9 @@ public class RdbSyncService {
         }
     }
 
+    public void sync(MappingConfig config, List<Dml> dmlList) {
+    }
+
     public void sync(MappingConfig config, Dml dml) {
         try {
             if (config != null) {
@@ -175,7 +178,7 @@ public class RdbSyncService {
 
         for (Map<String, Object> o : old) {
             Map<String, Object> d = data.get(idx - 1);
-            int pkHash = pkHash(dbMapping, d, threads);
+            int pkHash = pkHash(dbMapping, d, o, threads);
 
             ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[pkHash];
             checkQueue(tpe);
@@ -206,7 +209,7 @@ public class RdbSyncService {
                     updateSql.delete(len - 2, len).append(" WHERE ");
 
                     // 拼接主键
-                    appendCondition(dbMapping, updateSql, ctype, values, d);
+                    appendCondition(dbMapping, updateSql, ctype, values, d, o);
 
                     if (logger.isTraceEnabled()) {
                         logger.trace("Update target table, sql: {}", updateSql);
@@ -470,6 +473,11 @@ public class RdbSyncService {
      */
     private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
                                         List<Map<String, ?>> values, Map<String, Object> d) {
+        appendCondition(dbMapping, sql, ctype, values, d, null);
+    }
+
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                        List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
         // 拼接主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
             String targetColumnName = entry.getKey();
@@ -479,7 +487,12 @@ public class RdbSyncService {
             }
             sql.append(targetColumnName).append("=? AND ");
             Integer type = ctype.get(targetColumnName.toLowerCase());
-            BatchExecutor.setValue(values, type, d.get(srcColumnName));
+            // 如果有修改主键的情况
+            if (o != null && o.containsKey(srcColumnName)) {
+                BatchExecutor.setValue(values, type, o.get(srcColumnName));
+            } else {
+                BatchExecutor.setValue(values, type, d.get(srcColumnName));
+            }
         }
         int len = sql.length();
         sql.delete(len - 4, len);
@@ -489,6 +502,10 @@ public class RdbSyncService {
      * 取主键hash
      */
     private static int pkHash(DbMapping dbMapping, Map<String, Object> d, int threads) {
+        return pkHash(dbMapping, d, null, threads);
+    }
+
+    private static int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o, int threads) {
         int hash = 0;
         // 取主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
@@ -497,7 +514,12 @@ public class RdbSyncService {
             if (srcColumnName == null) {
                 srcColumnName = targetColumnName;
             }
-            Object value = d.get(srcColumnName);
+            Object value;
+            if (o != null && o.containsKey(srcColumnName)) {
+                value = o.get(srcColumnName);
+            } else {
+                value = d.get(srcColumnName);
+            }
             if (value != null) {
                 hash += value.hashCode();
             }

+ 12 - 4
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -171,8 +171,9 @@ public class FlatMessage implements Serializable {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
-                                               + entry.toString(), e);
+                    throw new RuntimeException(
+                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
+                        e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -214,7 +215,7 @@ public class FlatMessage implements Serializable {
                             mysqlType.put(column.getName(), column.getMysqlType());
                             if (column.getIsNull()) {
                                 row.put(column.getName(), null);
-                            } else  {
+                            } else {
                                 row.put(column.getName(), column.getValue());
                             }
                             // 获取update为true的字段
@@ -284,7 +285,14 @@ public class FlatMessage implements Serializable {
             if (flatMessage.getData() != null) {
                 int idx = 0;
                 for (Map<String, String> row : flatMessage.getData()) {
-                    String value = row.get(pk);
+                    Map<String, String> o = flatMessage.getOld().get(idx);
+                    String value;
+                    // 如果old中有pk值说明主键有修改, 以旧的主键值hash为准
+                    if (o != null && o.containsKey(pk)) {
+                        value = o.get(pk);
+                    } else {
+                        value = row.get(pk);
+                    }
                     if (value == null) {
                         value = "";
                     }