|
@@ -46,17 +46,15 @@ import java.util.stream.Collectors;
|
|
|
* @author rewerma 2020-01-27
|
|
|
* @version 1.0.0
|
|
|
*/
|
|
|
-@SPI("rocketmq")
|
|
|
-public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQProducer {
|
|
|
+@SPI("rocketmq") public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQProducer {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
|
|
|
|
|
|
- private DefaultMQProducer defaultMQProducer;
|
|
|
- private static final String CLOUD_ACCESS_CHANNEL = "cloud";
|
|
|
- protected ThreadPoolExecutor sendPartitionExecutor;
|
|
|
+ private DefaultMQProducer defaultMQProducer;
|
|
|
+ private static final String CLOUD_ACCESS_CHANNEL = "cloud";
|
|
|
+ protected ThreadPoolExecutor sendPartitionExecutor;
|
|
|
|
|
|
- @Override
|
|
|
- public void init(Properties properties) {
|
|
|
+ @Override public void init(Properties properties) {
|
|
|
RocketMQProducerConfig rocketMQProperties = new RocketMQProducerConfig();
|
|
|
this.mqProperties = rocketMQProperties;
|
|
|
super.init(properties);
|
|
@@ -71,9 +69,9 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
}
|
|
|
|
|
|
defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(),
|
|
|
- rpcHook,
|
|
|
- rocketMQProperties.isEnableMessageTrace(),
|
|
|
- rocketMQProperties.getCustomizedTraceTopic());
|
|
|
+ rpcHook,
|
|
|
+ rocketMQProperties.isEnableMessageTrace(),
|
|
|
+ rocketMQProperties.getCustomizedTraceTopic());
|
|
|
if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProperties.getAccessChannel())) {
|
|
|
defaultMQProducer.setAccessChannel(AccessChannel.CLOUD);
|
|
|
}
|
|
@@ -92,12 +90,12 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
|
|
|
int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
|
|
|
sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
|
|
|
- parallelPartitionSendThreadSize,
|
|
|
- 0,
|
|
|
- TimeUnit.SECONDS,
|
|
|
- new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
|
|
|
- new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
|
|
|
- new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
+ parallelPartitionSendThreadSize,
|
|
|
+ 0,
|
|
|
+ TimeUnit.SECONDS,
|
|
|
+ new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
|
|
|
+ new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
}
|
|
|
|
|
|
private void loadRocketMQProperties(Properties properties) {
|
|
@@ -113,12 +111,12 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
rocketMQProperties.setProducerGroup(producerGroup);
|
|
|
}
|
|
|
String enableMessageTrace = PropertiesUtils.getProperty(properties,
|
|
|
- RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
|
|
|
+ RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
|
|
|
if (!StringUtils.isEmpty(enableMessageTrace)) {
|
|
|
rocketMQProperties.setEnableMessageTrace(Boolean.parseBoolean(enableMessageTrace));
|
|
|
}
|
|
|
String customizedTraceTopic = PropertiesUtils.getProperty(properties,
|
|
|
- RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
|
|
|
+ RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
|
|
|
if (!StringUtils.isEmpty(customizedTraceTopic)) {
|
|
|
rocketMQProperties.setCustomizedTraceTopic(customizedTraceTopic);
|
|
|
}
|
|
@@ -135,7 +133,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
rocketMQProperties.setRetryTimesWhenSendFailed(Integer.parseInt(retry));
|
|
|
}
|
|
|
String vipChannelEnabled = PropertiesUtils.getProperty(properties,
|
|
|
- RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
|
|
|
+ RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
|
|
|
if (!StringUtils.isEmpty(vipChannelEnabled)) {
|
|
|
rocketMQProperties.setVipChannelEnabled(Boolean.parseBoolean(vipChannelEnabled));
|
|
|
}
|
|
@@ -145,15 +143,15 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Message message, Callback callback) {
|
|
|
+ @Override public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Message message,
|
|
|
+ Callback callback) {
|
|
|
ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
|
|
|
try {
|
|
|
if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
|
|
|
// 动态topic
|
|
|
Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils.messageTopics(message,
|
|
|
- destination.getTopic(),
|
|
|
- destination.getDynamicTopic());
|
|
|
+ destination.getTopic(),
|
|
|
+ destination.getDynamicTopic());
|
|
|
|
|
|
for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
|
|
|
String topicName = entry.getKey().replace('.', '_');
|
|
@@ -181,16 +179,17 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
|
|
|
+ public void send(final MQDestination destination, String topicName,
|
|
|
+ com.alibaba.otter.canal.protocol.Message message) {
|
|
|
// 获取当前topic的分区数
|
|
|
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
|
|
|
- destination.getDynamicTopicPartitionNum());
|
|
|
-
|
|
|
+ destination.getDynamicTopicPartitionNum());
|
|
|
+
|
|
|
// 获取topic的队列数为分区数
|
|
|
- if(partitionNum == null){
|
|
|
+ if (partitionNum == null) {
|
|
|
partitionNum = getTopicDynamicQueuesSize(destination.getEnableDynamicQueuePartition(), topicName);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (partitionNum == null) {
|
|
|
partitionNum = destination.getPartitionsNum();
|
|
|
}
|
|
@@ -200,10 +199,10 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
|
|
|
// 串行分区
|
|
|
com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
|
|
|
- message.getId(),
|
|
|
- partitionNum,
|
|
|
- destination.getPartitionHash(),
|
|
|
- mqProperties.isDatabaseHash());
|
|
|
+ message.getId(),
|
|
|
+ partitionNum,
|
|
|
+ destination.getPartitionHash(),
|
|
|
+ mqProperties.isDatabaseHash());
|
|
|
int length = messages.length;
|
|
|
|
|
|
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
|
|
@@ -213,9 +212,9 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
final int index = i;
|
|
|
template.submit(() -> {
|
|
|
Message data = new Message(topicName,
|
|
|
- ((RocketMQProducerConfig) this.mqProperties).getTag(),
|
|
|
- CanalMessageSerializerUtil.serializer(dataPartition,
|
|
|
- mqProperties.isFilterTransactionEntry()));
|
|
|
+ ((RocketMQProducerConfig) this.mqProperties).getTag(),
|
|
|
+ CanalMessageSerializerUtil.serializer(dataPartition,
|
|
|
+ mqProperties.isFilterTransactionEntry()));
|
|
|
sendMessage(data, index);
|
|
|
});
|
|
|
}
|
|
@@ -225,8 +224,8 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
} else {
|
|
|
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
|
|
|
Message data = new Message(topicName,
|
|
|
- ((RocketMQProducerConfig) this.mqProperties).getTag(),
|
|
|
- CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
|
|
|
+ ((RocketMQProducerConfig) this.mqProperties).getTag(),
|
|
|
+ CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
|
|
|
sendMessage(data, partition);
|
|
|
}
|
|
|
} else {
|
|
@@ -243,9 +242,9 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
|
|
|
for (FlatMessage flatMessage : flatMessages) {
|
|
|
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
|
|
|
- partitionNum,
|
|
|
- destination.getPartitionHash(),
|
|
|
- mqProperties.isDatabaseHash());
|
|
|
+ partitionNum,
|
|
|
+ destination.getPartitionHash(),
|
|
|
+ mqProperties.isDatabaseHash());
|
|
|
int length = partitionFlatMessage.length;
|
|
|
for (int i = 0; i < length; i++) {
|
|
|
// 增加null判断,issue #3267
|
|
@@ -262,10 +261,10 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
final int index = i;
|
|
|
template.submit(() -> {
|
|
|
List<Message> messages = flatMessagePart.stream()
|
|
|
- .map(flatMessage -> new Message(topicName,
|
|
|
- ((RocketMQProducerConfig) this.mqProperties).getTag(),
|
|
|
- JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
|
|
|
- .collect(Collectors.toList());
|
|
|
+ .map(flatMessage -> new Message(topicName,
|
|
|
+ ((RocketMQProducerConfig) this.mqProperties).getTag(),
|
|
|
+ JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
|
|
|
+ .collect(Collectors.toList());
|
|
|
// 批量发送
|
|
|
sendMessage(messages, index);
|
|
|
});
|
|
@@ -277,10 +276,10 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
} else {
|
|
|
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
|
|
|
List<Message> messages = flatMessages.stream()
|
|
|
- .map(flatMessage -> new Message(topicName,
|
|
|
- ((RocketMQProducerConfig) this.mqProperties).getTag(),
|
|
|
- JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
|
|
|
- .collect(Collectors.toList());
|
|
|
+ .map(flatMessage -> new Message(topicName,
|
|
|
+ ((RocketMQProducerConfig) this.mqProperties).getTag(),
|
|
|
+ JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
|
|
|
+ .collect(Collectors.toList());
|
|
|
// 批量发送
|
|
|
sendMessage(messages, partition);
|
|
|
}
|
|
@@ -305,8 +304,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("deprecation")
|
|
|
- private void sendMessage(List<Message> messages, int partition) {
|
|
|
+ @SuppressWarnings("deprecation") private void sendMessage(List<Message> messages, int partition) {
|
|
|
if (messages.isEmpty()) {
|
|
|
return;
|
|
|
}
|
|
@@ -348,8 +346,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void stop() {
|
|
|
+ @Override public void stop() {
|
|
|
logger.info("## Stop RocketMQ producer##");
|
|
|
this.defaultMQProducer.shutdown();
|
|
|
if (sendPartitionExecutor != null) {
|
|
@@ -358,15 +355,15 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
|
|
|
super.stop();
|
|
|
}
|
|
|
-
|
|
|
- private Integer getTopicDynamicQueuesSize(Boolean enable, String topicName){
|
|
|
- if(enable!=null && enable){
|
|
|
+
|
|
|
+ private Integer getTopicDynamicQueuesSize(Boolean enable, String topicName) {
|
|
|
+ if (enable != null && enable) {
|
|
|
topicName = this.defaultMQProducer.withNamespace(topicName);
|
|
|
DefaultMQProducerImpl innerProducer = this.defaultMQProducer.getDefaultMQProducerImpl();
|
|
|
TopicPublishInfo topicInfo = innerProducer.getTopicPublishInfoTable().get(topicName);
|
|
|
- if(topicInfo == null){
|
|
|
+ if (topicInfo == null) {
|
|
|
return null;
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
return topicInfo.getMessageQueueList().size();
|
|
|
}
|
|
|
}
|