Forráskód Böngészése

Merge pull request #1028 from ttting/master

Fix Kafka FlatMessage Model Null Value set to ""
rewerma 6 éve
szülő
commit
2202ec451a

+ 10 - 2
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -204,7 +204,11 @@ public class FlatMessage implements Serializable {
                         for (CanalEntry.Column column : columns) {
                         for (CanalEntry.Column column : columns) {
                             sqlType.put(column.getName(), column.getSqlType());
                             sqlType.put(column.getName(), column.getSqlType());
                             mysqlType.put(column.getName(), column.getMysqlType());
                             mysqlType.put(column.getName(), column.getMysqlType());
-                            row.put(column.getName(), column.getValue());
+                            if (column.getIsNull()) {
+                                row.put(column.getName(), null);
+                            } else  {
+                                row.put(column.getName(), column.getValue());
+                            }
                             // 获取update为true的字段
                             // 获取update为true的字段
                             if (column.getUpdated()) {
                             if (column.getUpdated()) {
                                 updateSet.add(column.getName());
                                 updateSet.add(column.getName());
@@ -218,7 +222,11 @@ public class FlatMessage implements Serializable {
                             Map<String, String> rowOld = new LinkedHashMap<>();
                             Map<String, String> rowOld = new LinkedHashMap<>();
                             for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                                 if (updateSet.contains(column.getName())) {
                                 if (updateSet.contains(column.getName())) {
-                                    rowOld.put(column.getName(), column.getValue());
+                                    if (column.getIsNull()) {
+                                        rowOld.put(column.getName(), null);
+                                    } else {
+                                        rowOld.put(column.getName(), column.getValue());
+                                    }
                                 }
                                 }
                             }
                             }
                             // update操作将记录修改前的值
                             // update操作将记录修改前的值

+ 2 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -5,6 +5,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Properties;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
 
 
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -141,7 +142,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                     canalDestination.getTopic(),
                                     canalDestination.getTopic(),
                                     0,
                                     0,
                                     null,
                                     null,
-                                    JSON.toJSONString(flatMessage));
+                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                                 producer2.send(record).get();
                                 producer2.send(record).get();
                             } catch (Exception e) {
                             } catch (Exception e) {
                                 logger.error(e.getMessage(), e);
                                 logger.error(e.getMessage(), e);