Browse Source

flagMessage json 序列化无null值的bug

mcy 6 years ago
parent
commit
6fa111866f

+ 2 - 2
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -104,7 +104,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                 canalDestination.getPartition(),
                                 null,
-                                JSON.toJSONString(flatMessage));
+                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                             producer2.send(record).get();
                         } catch (Exception e) {
                             logger.error(e.getMessage(), e);
@@ -126,7 +126,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                         ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                             i,
                                             null,
-                                            JSON.toJSONString(flatMessagePart));
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
                                         producer2.send(record).get();
                                     } catch (Exception e) {
                                         logger.error(e.getMessage(), e);

+ 5 - 4
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.rocketmq;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -75,11 +76,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                     if (destination.getPartition() != null) {
                         try {
                             logger.info("send flat message: {} to topic: {} fixed partition: {}",
-                                JSON.toJSONString(flatMessage),
+                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
                                 destination.getTopic(),
                                 destination.getPartition());
                             Message message = new Message(destination.getTopic(),
-                                JSON.toJSONString(flatMessage).getBytes());
+                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
@@ -102,12 +103,12 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     logger.debug("flatMessagePart: {}, partition: {}",
-                                        JSON.toJSONString(flatMessagePart),
+                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
                                         i);
                                     final int index = i;
                                     try {
                                         Message message = new Message(destination.getTopic(),
-                                            JSON.toJSONString(flatMessagePart).getBytes());
+                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue).getBytes());
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                             @Override