فهرست منبع

Merge pull request #13 from alibaba/master

merge
rewerma 6 سال پیش
والد
کامیت
741f6310b3

+ 4 - 3
README.md

@@ -44,7 +44,7 @@ canal 1.1.x系列,参考release文档:<a href="https://github.com/alibaba/ca
 
 1. 整体性能测试&优化,提升了150%. #726 参考: 【[Performance](https://github.com/alibaba/canal/wiki/Performance)】
 2. 原生支持prometheus监控 #765 【[Prometheus QuickStart](https://github.com/alibaba/canal/wiki/Prometheus-QuickStart)】
-3. 原生支持kafka消息投递 #695 【[Canal Kafka QuickStart](https://github.com/alibaba/canal/wiki/Canal-Kafka-QuickStart)】
+3. 原生支持kafka消息投递 #695 【[Canal Kafka/RocketMQ QuickStart](https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart)】
 4. 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: 【[Aliyun RDS QuickStart](https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart)】
 5. 原生支持docker镜像 #801 参考:  【[Docker QuickStart](https://github.com/alibaba/canal/wiki/Docker-QuickStart)】
 
@@ -60,7 +60,7 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
 <a class="internal present" href="https://github.com/alibaba/canal/wiki/QuickStart">QuickStart</a>
 <ul>
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Docker-QuickStart">Docker QuickStart</a></li>
-<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Canal-Kafka-QuickStart">Canal Kafka QuickStart</a></li>
+<li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart">Canal Kafka/RocketMQ QuickStart</a></li>
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart">Aliyun RDS QuickStart</a></li>
 <li><a class="internal present" href="https://github.com/alibaba/canal/wiki/Prometheus-QuickStart">Prometheus QuickStart</a></li>
 </ul>
@@ -85,11 +85,12 @@ See the wiki page for : <a href="https://github.com/alibaba/canal/wiki" >wiki文
     * canal c#客户端开源项目地址:<a href="https://github.com/CanalClient/CanalSharp"> https://github.com/CanalSharp/CanalSharp </a>
     * canal go客户端开源项目地址:<a href="https://github.com/CanalClient/canal-go"> https://github.com/CanalClient/canal-go </a>
 2. canal作为MySQL binlog的增量获取工具,可以将数据投递到MQ系统中,比如Kafka/RocketMQ,可以借助于MQ的多语言能力 
+    * 参考文档: [Canal Kafka/RocketMQ QuickStart](https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart)
 
 <h1>相关资料</h1>
 
 * ADC阿里技术嘉年华分享ppt (放在google docs上,可能需要翻墙): <a href="https://docs.google.com/presentation/d/1MkszUPYRDkfVPz9IqOT1LLT5d9tuwde_WC8GZvjaDRg/edit?usp=sharing">ppt下载</href>  
-* [与阿里巴巴的RocketMQ配合使用](https://github.com/alibaba/RocketMQ)
+* [与阿里巴巴的RocketMQ配合使用](https://github.com/apache/RocketMQ)
 
 <h1>相关开源</h1>
 <ol>

+ 10 - 2
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -204,7 +204,11 @@ public class FlatMessage implements Serializable {
                         for (CanalEntry.Column column : columns) {
                             sqlType.put(column.getName(), column.getSqlType());
                             mysqlType.put(column.getName(), column.getMysqlType());
-                            row.put(column.getName(), column.getValue());
+                            if (column.getIsNull()) {
+                                row.put(column.getName(), null);
+                            } else  {
+                                row.put(column.getName(), column.getValue());
+                            }
                             // 获取update为true的字段
                             if (column.getUpdated()) {
                                 updateSet.add(column.getName());
@@ -218,7 +222,11 @@ public class FlatMessage implements Serializable {
                             Map<String, String> rowOld = new LinkedHashMap<>();
                             for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                                 if (updateSet.contains(column.getName())) {
-                                    rowOld.put(column.getName(), column.getValue());
+                                    if (column.getIsNull()) {
+                                        rowOld.put(column.getName(), null);
+                                    } else {
+                                        rowOld.put(column.getName(), column.getValue());
+                                    }
                                 }
                             }
                             // update操作将记录修改前的值

+ 2 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -5,6 +5,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -141,7 +142,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                     canalDestination.getTopic(),
                                     0,
                                     null,
-                                    JSON.toJSONString(flatMessage));
+                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                                 producer2.send(record).get();
                             } catch (Exception e) {
                                 logger.error(e.getMessage(), e);