Selaa lähdekoodia

Merge pull request #36 from alibaba/master

merge
rewerma 6 vuotta sitten
vanhempi
commit
9db2bd291b

+ 2 - 2
README.md

@@ -82,7 +82,7 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 
 1. canal整体交互协议设计上使用了protobuf3.0,理论上可以支持绝大部分的多语言场景,欢迎大家提交多客户端的PR
     * canal java客户端: <a href="https://github.com/alibaba/canal/wiki/ClientExample"> https://github.com/alibaba/canal/wiki/ClientExample </a>
-    * canal c#客户端开源项目地址:<a href="https://github.com/CanalClient/CanalSharp"> https://github.com/CanalSharp/CanalSharp </a>
+    * canal c#客户端开源项目地址:<a href="https://github.com/dotnetcore/CanalSharp"> https://github.com/dotnetcore/CanalSharp </a>
     * canal go客户端开源项目地址:<a href="https://github.com/CanalClient/canal-go"> https://github.com/CanalClient/canal-go </a>
 2. canal作为MySQL binlog的增量获取工具,可以将数据投递到MQ系统中,比如Kafka/RocketMQ,可以借助于MQ的多语言能力 
     * 参考文档: [Canal Kafka/RocketMQ QuickStart](https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart)
@@ -117,7 +117,7 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 <h3>最新更新</h3>
 <ol>
 <li>canal发布重大版本更新1.1.0,具体releaseNode参考:<a href="https://github.com/alibaba/canal/releases/tag/canal-1.1.0">https://github.com/alibaba/canal/releases/tag/canal-1.1.0</a></li>
-<li>canal c#客户端开源项目地址:<a href="https://github.com/CanalClient/CanalSharp"> https://github.com/CanalClient/CanalSharp </a>,推荐! </li>
+<li>canal c#客户端开源项目地址:<a href="https://github.com/dotnetcore/CanalSharp"> https://github.com/dotnetcore/CanalSharp </a>,推荐! </li>
 <li>canal QQ讨论群已经建立,群号:161559791 ,欢迎加入进行技术讨论。</li>
 <li>canal消费端项目开源: Otter(分布式数据库同步系统),地址:<a href="https://github.com/alibaba/otter">https://github.com/alibaba/otter</a></li>
 

+ 2 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java

@@ -76,7 +76,7 @@ public class JdbcTypeUtil {
                || "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
     }
 
-    public static Object typeConvert(String columnName, String value, int sqlType, String mysqlType) {
+    public static Object typeConvert(String tableName ,String columnName, String value, int sqlType, String mysqlType) {
         if (value == null
             || (value.equals("") && !(isText(mysqlType) || sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR))) {
             return null;
@@ -161,7 +161,7 @@ public class JdbcTypeUtil {
             }
             return res;
         } catch (Exception e) {
-            logger.error("table: {} column: {}, failed convert type {} to {}", columnName, value, sqlType);
+            logger.error("table: {} column: {}, failed convert type {} to {}", tableName, columnName, value, sqlType);
             return value;
         }
     }

+ 6 - 6
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -77,7 +77,7 @@ public class MessageUtil {
                             }
                         }
                         row.put(column.getName(),
-                            JdbcTypeUtil.typeConvert(column.getName(),
+                            JdbcTypeUtil.typeConvert(dml.getTable(),column.getName(),
                                 column.getValue(),
                                 column.getSqlType(),
                                 column.getMysqlType()));
@@ -95,7 +95,7 @@ public class MessageUtil {
                         for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             if (updateSet.contains(column.getName())) {
                                 rowOld.put(column.getName(),
-                                    JdbcTypeUtil.typeConvert(column.getName(),
+                                    JdbcTypeUtil.typeConvert(dml.getTable(),column.getName(),
                                         column.getValue(),
                                         column.getSqlType(),
                                         column.getMysqlType()));
@@ -153,16 +153,16 @@ public class MessageUtil {
         // }
         List<Map<String, String>> data = flatMessage.getData();
         if (data != null) {
-            dml.setData(changeRows(data, flatMessage.getSqlType(), flatMessage.getMysqlType()));
+            dml.setData(changeRows(dml.getTable(), data, flatMessage.getSqlType(), flatMessage.getMysqlType()));
         }
         List<Map<String, String>> old = flatMessage.getOld();
         if (old != null) {
-            dml.setOld(changeRows(old, flatMessage.getSqlType(), flatMessage.getMysqlType()));
+            dml.setOld(changeRows(dml.getTable(), old, flatMessage.getSqlType(), flatMessage.getMysqlType()));
         }
         return dml;
     }
 
-    private static List<Map<String, Object>> changeRows(List<Map<String, String>> rows, Map<String, Integer> sqlTypes,
+    private static List<Map<String, Object>> changeRows(String table, List<Map<String, String>> rows, Map<String, Integer> sqlTypes,
                                                         Map<String, String> mysqlTypes) {
         List<Map<String, Object>> result = new ArrayList<>();
         for (Map<String, String> row : rows) {
@@ -181,7 +181,7 @@ public class MessageUtil {
                     continue;
                 }
 
-                Object finalValue = JdbcTypeUtil.typeConvert(columnName, columnValue, sqlType, mysqlType);
+                Object finalValue = JdbcTypeUtil.typeConvert(table, columnName, columnValue, sqlType, mysqlType);
                 resultRow.put(columnName, finalValue);
             }
             result.add(resultRow);

+ 41 - 40
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -107,8 +107,9 @@ public class RdbSyncService {
                     int j = i;
                     futures.add(executorThreads[i].submit(() -> {
                         try {
-                            dmlsPartition[j]
-                                .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                            dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
+                                syncItem.config,
+                                syncItem.singleDml));
                             dmlsPartition[j].clear();
                             batchExecutors[j].commit();
                             return true;
@@ -146,49 +147,49 @@ public class RdbSyncService {
         sync(dmls, dml -> {
             if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
                 // DDL
-                columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
-                return false;
+            columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+            return false;
+        } else {
+            // DML
+            String destination = StringUtils.trimToEmpty(dml.getDestination());
+            String groupId = StringUtils.trimToEmpty(dml.getGroupId());
+            String database = dml.getDatabase();
+            String table = dml.getTable();
+            Map<String, MappingConfig> configMap;
+            if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
             } else {
-                // DML
-                String destination = StringUtils.trimToEmpty(dml.getDestination());
-                String groupId = StringUtils.trimToEmpty(dml.getGroupId());
-                String database = dml.getDatabase();
-                String table = dml.getTable();
-                Map<String, MappingConfig> configMap;
-                if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
-                    configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
-                } else {
-                    configMap = mappingConfig.get(destination + "_" + database + "-" + table);
-                }
+                configMap = mappingConfig.get(destination + "_" + database + "-" + table);
+            }
 
-                if (configMap == null) {
-                    return false;
-                }
+            if (configMap == null) {
+                return false;
+            }
 
-                if (configMap.values().isEmpty()) {
-                    return false;
-                }
+            if (configMap.values().isEmpty()) {
+                return false;
+            }
 
-                for (MappingConfig config : configMap.values()) {
-                    if (config.getConcurrent()) {
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
-                    } else {
-                        int hash = 0;
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
-                    }
+            for (MappingConfig config : configMap.values()) {
+                if (config.getConcurrent()) {
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                        SyncItem syncItem = new SyncItem(config, singleDml);
+                        dmlsPartition[hash].add(syncItem);
+                    });
+                } else {
+                    int hash = 0;
+                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                    singleDmls.forEach(singleDml -> {
+                        SyncItem syncItem = new SyncItem(config, singleDml);
+                        dmlsPartition[hash].add(syncItem);
+                    });
                 }
-                return true;
             }
-        });
+            return true;
+        }
+    }   );
     }
 
     /**
@@ -314,7 +315,7 @@ public class RdbSyncService {
         for (String srcColumnName : old.keySet()) {
             List<String> targetColumnNames = new ArrayList<>();
             columnsMap.forEach((targetColumn, srcColumn) -> {
-                if (srcColumnName.toLowerCase().equals(srcColumn.toLowerCase())) {
+                if (srcColumnName.equalsIgnoreCase(srcColumn)) {
                     targetColumnNames.add(targetColumn);
                 }
             });

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -106,8 +106,8 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         // stage 3
         SequenceBarrier dmlParserSequenceBarrier = disruptorMsgBuffer.newBarrier(simpleParserStage.getSequence());
-        WorkHandler<MessageEvent>[] workHandlers = new DmlParserStage[parserThreadCount];
-        for (int i = 0; i < parserThreadCount; i++) {
+        WorkHandler<MessageEvent>[] workHandlers = new DmlParserStage[tc];
+        for (int i = 0; i < tc; i++) {
             workHandlers[i] = new DmlParserStage();
         }
         workerPool = new WorkerPool<MessageEvent>(disruptorMsgBuffer,