Bläddra i källkod

fixed Rocketmq can not batch send messages when producer set namespace (#5202)

iamssx 9 månader sedan
förälder
incheckning
2b8a309ec7

+ 6 - 1
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -52,6 +52,7 @@ import java.util.stream.Collectors;
 
     private              DefaultMQProducer  defaultMQProducer;
     private static final String             CLOUD_ACCESS_CHANNEL = "cloud";
+    private static final String             NAMESPACE_SEPARATOR  = "%";
     protected            ThreadPoolExecutor sendPartitionExecutor;
 
     @Override public void init(Properties properties) {
@@ -311,7 +312,11 @@ import java.util.stream.Collectors;
 
         // 获取一下messageQueue
         DefaultMQProducerImpl innerProducer = this.defaultMQProducer.getDefaultMQProducerImpl();
-        TopicPublishInfo topicInfo = innerProducer.getTopicPublishInfoTable().get(messages.get(0).getTopic());
+        String topic = messages.get(0).getTopic();
+        if (StringUtils.isNotBlank(this.defaultMQProducer.getNamespace())) {
+            topic = this.defaultMQProducer.getNamespace() + NAMESPACE_SEPARATOR + topic;
+        }
+        TopicPublishInfo topicInfo = innerProducer.getTopicPublishInfoTable().get(topic);
         if (topicInfo == null) {
             for (Message message : messages) {
                 sendMessage(message, partition);