|
@@ -249,20 +249,27 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
// 批量发送
|
|
|
List<MessageQueue> queues = topicInfo.getMessageQueueList();
|
|
|
int size = queues.size();
|
|
|
- MessageQueue queue = null;
|
|
|
- if (partition > queues.size()) {
|
|
|
- queue = queues.get(partition % size);
|
|
|
+ if (size <= 0) {
|
|
|
+ // 可能是第一次创建
|
|
|
+ for (Message message : messages) {
|
|
|
+ sendMessage(message, partition);
|
|
|
+ }
|
|
|
} else {
|
|
|
- queue = queues.get(partition);
|
|
|
- }
|
|
|
+ MessageQueue queue = null;
|
|
|
+ if (partition > size) {
|
|
|
+ queue = queues.get(partition % size);
|
|
|
+ } else {
|
|
|
+ queue = queues.get(partition);
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- SendResult sendResult = this.defaultMQProducer.send(messages, queue);
|
|
|
- if (logger.isDebugEnabled()) {
|
|
|
- logger.debug("Send Message Result: {}", sendResult);
|
|
|
+ try {
|
|
|
+ SendResult sendResult = this.defaultMQProducer.send(messages, queue);
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("Send Message Result: {}", sendResult);
|
|
|
+ }
|
|
|
+ } catch (Throwable e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
}
|
|
|
- } catch (Throwable e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
}
|
|
|
}
|
|
|
}
|