Browse Source

fixed issue #4923 , support fastJSON Feature.LargeObject

jianghang.loujh 1 year ago
parent
commit
163f719971

+ 5 - 3
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java

@@ -9,7 +9,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -21,6 +20,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONWriter;
 import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
 import com.alibaba.otter.canal.connector.core.producer.MQDestination;
 import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
@@ -249,13 +249,15 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
                         FlatMessage flatMessagePart = partitionFlatMessage[i];
                         if (flatMessagePart != null) {
                             records.add(new ProducerRecord<>(topicName, i, null, JSON.toJSONBytes(flatMessagePart,
-                                JSONWriter.Feature.WriteNulls)));
+                                JSONWriter.Feature.WriteNulls,
+                                JSONWriter.Feature.LargeObject)));
                         }
                     }
                 } else {
                     final int partition = mqDestination.getPartition() != null ? mqDestination.getPartition() : 0;
                     records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage,
-                        JSONWriter.Feature.WriteNulls)));
+                        JSONWriter.Feature.WriteNulls,
+                        JSONWriter.Feature.LargeObject)));
                 }
             }
         }

+ 2 - 1
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java

@@ -13,6 +13,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter;
 import com.alibaba.fastjson2.JSONWriter.Feature;
 import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
@@ -330,7 +331,7 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
             try {
                 MessageId msgResultId = producer.newMessage()
                     .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition))
-                    .value(JSON.toJSONBytes(f, Feature.WriteNulls))
+                    .value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject))
                     .send()
                 //
                 ;

+ 2 - 1
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -165,7 +165,8 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
             // 串行分区
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, messageSub.getId());
             for (FlatMessage flatMessage : flatMessages) {
-                byte[] message = JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls);
+                byte[] message = JSON
+                    .toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject);
                 if (logger.isDebugEnabled()) {
                     logger.debug("send message:{} to destination:{}", message, canalDestination.getCanalDestination());
                 }

+ 2 - 2
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -263,7 +263,7 @@ import java.util.stream.Collectors;
                             List<Message> messages = flatMessagePart.stream()
                                     .map(flatMessage -> new Message(topicName,
                                             ((RocketMQProducerConfig) this.mqProperties).getTag(),
-                                            JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls)))
+                                            JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject)))
                                     .collect(Collectors.toList());
                             // 批量发送
                             sendMessage(messages, index);
@@ -278,7 +278,7 @@ import java.util.stream.Collectors;
                 List<Message> messages = flatMessages.stream()
                         .map(flatMessage -> new Message(topicName,
                                 ((RocketMQProducerConfig) this.mqProperties).getTag(),
-                                JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls)))
+                                JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls,JSONWriter.Feature.LargeObject)))
                         .collect(Collectors.toList());
                 // 批量发送
                 sendMessage(messages, partition);