|
@@ -372,6 +372,10 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
topicName = this.defaultMQProducer.withNamespace(topicName);
|
|
|
DefaultMQProducerImpl innerProducer = this.defaultMQProducer.getDefaultMQProducerImpl();
|
|
|
TopicPublishInfo topicInfo = innerProducer.getTopicPublishInfoTable().get(topicName);
|
|
|
+ if (topicInfo == null) {
|
|
|
+ innerProducer.getMqClientFactory().updateTopicRouteInfoFromNameServer(topicName);
|
|
|
+ }
|
|
|
+ topicInfo = innerProducer.getTopicPublishInfoTable().get(topicName);
|
|
|
if (topicInfo == null) {
|
|
|
return null;
|
|
|
} else {
|