|
@@ -1,5 +1,17 @@
|
|
|
package com.alibaba.otter.canal.connector.pulsarmq.producer;
|
|
|
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ArrayBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
+import org.apache.pulsar.client.admin.PulsarAdmin;
|
|
|
+import org.apache.pulsar.client.admin.PulsarAdminException;
|
|
|
+import org.apache.pulsar.client.api.*;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
import com.alibaba.fastjson2.JSON;
|
|
|
import com.alibaba.fastjson2.JSONWriter.Feature;
|
|
|
import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
|
|
@@ -16,19 +28,6 @@ import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
|
|
|
import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQProducerConfig;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
|
import com.alibaba.otter.canal.protocol.FlatMessage;
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
-import org.apache.pulsar.client.admin.PulsarAdmin;
|
|
|
-import org.apache.pulsar.client.admin.PulsarAdminException;
|
|
|
-import org.apache.pulsar.client.api.*;
|
|
|
-import org.apache.pulsar.shade.com.google.gson.JsonParser;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-
|
|
|
-import java.util.*;
|
|
|
-import java.util.concurrent.ArrayBlockingQueue;
|
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* PulsarMQ Producer SPI 实现
|
|
@@ -39,23 +38,22 @@ import java.util.stream.Collectors;
|
|
|
@SPI("pulsarmq")
|
|
|
public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQProducer {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(CanalPulsarMQProducer.class);
|
|
|
-
|
|
|
- private static final Map<String, Producer<byte[]>> PRODUCERS = new HashMap<>();
|
|
|
-
|
|
|
- protected ThreadPoolExecutor sendPartitionExecutor;
|
|
|
/**
|
|
|
* 消息体分区属性名称
|
|
|
*/
|
|
|
- public static final String MSG_PROPERTY_PARTITION_NAME = "partitionNum";
|
|
|
+ public static final String MSG_PROPERTY_PARTITION_NAME = "partitionNum";
|
|
|
+ private static final Logger logger = LoggerFactory
|
|
|
+ .getLogger(CanalPulsarMQProducer.class);
|
|
|
+ private static final Map<String, Producer<byte[]>> PRODUCERS = new HashMap<>();
|
|
|
+ protected ThreadPoolExecutor sendPartitionExecutor;
|
|
|
/**
|
|
|
* pulsar客户端,管理连接
|
|
|
*/
|
|
|
- protected PulsarClient client;
|
|
|
+ protected PulsarClient client;
|
|
|
/**
|
|
|
* Pulsar admin 客户端
|
|
|
*/
|
|
|
- protected PulsarAdmin pulsarAdmin;
|
|
|
+ protected PulsarAdmin pulsarAdmin;
|
|
|
|
|
|
@Override
|
|
|
public void init(Properties properties) {
|
|
@@ -68,9 +66,9 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
// 初始化连接客户端
|
|
|
try {
|
|
|
ClientBuilder builder = PulsarClient.builder()
|
|
|
- // 填写pulsar的连接地址
|
|
|
- .serviceUrl(pulsarMQProducerConfig.getServerUrl());
|
|
|
- if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) {
|
|
|
+ // 填写pulsar的连接地址
|
|
|
+ .serviceUrl(pulsarMQProducerConfig.getServerUrl());
|
|
|
+ if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) {
|
|
|
// 角色权限认证的token
|
|
|
builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
|
|
|
}
|
|
@@ -80,7 +78,7 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
}
|
|
|
|
|
|
// 初始化Pulsar admin
|
|
|
- if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) {
|
|
|
+ if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) {
|
|
|
try {
|
|
|
pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarMQProducerConfig.getAdminServerUrl()).build();
|
|
|
} catch (PulsarClientException e) {
|
|
@@ -91,12 +89,12 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
// 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
|
|
|
int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
|
|
|
sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
|
|
|
- parallelPartitionSendThreadSize,
|
|
|
- 0,
|
|
|
- TimeUnit.SECONDS,
|
|
|
- new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
|
|
|
- new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
|
|
|
- new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
+ parallelPartitionSendThreadSize,
|
|
|
+ 0,
|
|
|
+ TimeUnit.SECONDS,
|
|
|
+ new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
|
|
|
+ new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -119,12 +117,13 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
if (!StringUtils.isEmpty(roleToken)) {
|
|
|
tmpProperties.setRoleToken(roleToken);
|
|
|
}
|
|
|
- String topicTenantPrefix = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_TOPIC_TENANT_PREFIX);
|
|
|
+ String topicTenantPrefix = PropertiesUtils.getProperty(properties,
|
|
|
+ PulsarMQConstants.PULSARMQ_TOPIC_TENANT_PREFIX);
|
|
|
if (!StringUtils.isEmpty(topicTenantPrefix)) {
|
|
|
tmpProperties.setTopicTenantPrefix(topicTenantPrefix);
|
|
|
}
|
|
|
String adminServerUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ADMIN_SERVER_URL);
|
|
|
- if(!StringUtils.isEmpty(adminServerUrl)) {
|
|
|
+ if (!StringUtils.isEmpty(adminServerUrl)) {
|
|
|
tmpProperties.setAdminServerUrl(adminServerUrl);
|
|
|
}
|
|
|
if (logger.isDebugEnabled()) {
|
|
@@ -140,8 +139,8 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
* </p>
|
|
|
*
|
|
|
* @param destination 消息目标信息
|
|
|
- * @param message 消息
|
|
|
- * @param callback 消息发送结果回调
|
|
|
+ * @param message 消息
|
|
|
+ * @param callback 消息发送结果回调
|
|
|
* @return void
|
|
|
* @date 2021/9/2 22:01
|
|
|
* @author chad
|
|
@@ -154,9 +153,8 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
try {
|
|
|
if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
|
|
|
// 动态topic
|
|
|
- Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils.messageTopics(message,
|
|
|
- destination.getTopic(),
|
|
|
- destination.getDynamicTopic());
|
|
|
+ Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
|
|
|
+ .messageTopics(message, destination.getTopic(), destination.getDynamicTopic());
|
|
|
|
|
|
for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
|
|
|
String topicName = entry.getKey().replace('.', '_');
|
|
@@ -195,16 +193,17 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
* @author chad
|
|
|
* @since 1.0.0 by chad at 2021/9/2: 新增
|
|
|
*/
|
|
|
- public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
|
|
|
+ public void send(final MQDestination destination, String topicName,
|
|
|
+ com.alibaba.otter.canal.protocol.Message message) {
|
|
|
|
|
|
// 获取当前topic的分区数
|
|
|
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
|
|
|
- destination.getDynamicTopicPartitionNum());
|
|
|
+ destination.getDynamicTopicPartitionNum());
|
|
|
if (partitionNum == null) {
|
|
|
partitionNum = destination.getPartitionsNum();
|
|
|
}
|
|
|
// 创建多分区topic
|
|
|
- if(pulsarAdmin!=null && partitionNum!=null && partitionNum>0 && PRODUCERS.get(topicName)==null) {
|
|
|
+ if (pulsarAdmin != null && partitionNum != null && partitionNum > 0 && PRODUCERS.get(topicName) == null) {
|
|
|
createMultipleTopic(topicName, partitionNum);
|
|
|
}
|
|
|
|
|
@@ -221,10 +220,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
}
|
|
|
// 串行分区
|
|
|
com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
|
|
|
- message.getId(),
|
|
|
- partitionNum,
|
|
|
- destination.getPartitionHash(),
|
|
|
- mqProperties.isDatabaseHash());
|
|
|
+ message.getId(),
|
|
|
+ partitionNum,
|
|
|
+ destination.getPartitionHash(),
|
|
|
+ mqProperties.isDatabaseHash());
|
|
|
// 发送
|
|
|
int len = messages.length;
|
|
|
for (int i = 0; i < len; i++) {
|
|
@@ -254,9 +253,9 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
|
|
|
for (FlatMessage flatMessage : flatMessages) {
|
|
|
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
|
|
|
- partitionNum,
|
|
|
- destination.getPartitionHash(),
|
|
|
- mqProperties.isDatabaseHash());
|
|
|
+ partitionNum,
|
|
|
+ destination.getPartitionHash(),
|
|
|
+ mqProperties.isDatabaseHash());
|
|
|
int length = partitionFlatMessage.length;
|
|
|
for (int i = 0; i < length; i++) {
|
|
|
// 增加null判断,issue #3267
|
|
@@ -290,9 +289,9 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
/**
|
|
|
* 发送原始消息,需要做分区处理
|
|
|
*
|
|
|
- * @param topic topic
|
|
|
+ * @param topic topic
|
|
|
* @param partitionNum 目标分区
|
|
|
- * @param msg 原始消息内容
|
|
|
+ * @param msg 原始消息内容
|
|
|
* @return void
|
|
|
* @date 2021/9/10 17:55
|
|
|
* @author chad
|
|
@@ -303,8 +302,9 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry());
|
|
|
try {
|
|
|
MessageId msgResultId = producer.newMessage()
|
|
|
- .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum))
|
|
|
- .value(msgBytes).send();
|
|
|
+ .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum))
|
|
|
+ .value(msgBytes)
|
|
|
+ .send();
|
|
|
// todo 判断发送结果
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
logger.debug("Send Message to topic:{} Result: {}", topic, msgResultId);
|
|
@@ -317,7 +317,7 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
/**
|
|
|
* 发送扁平消息
|
|
|
*
|
|
|
- * @param topic topic主题
|
|
|
+ * @param topic topic主题
|
|
|
* @param flatMessages 扁平消息
|
|
|
* @return void
|
|
|
* @date 2021/9/10 18:22
|
|
@@ -328,13 +328,12 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
Producer<byte[]> producer = getProducer(topic);
|
|
|
for (FlatMessage f : flatMessages) {
|
|
|
try {
|
|
|
- MessageId msgResultId = producer
|
|
|
- .newMessage()
|
|
|
- .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition))
|
|
|
- .value(JSON.toJSONBytes(f, Feature.WriteNulls))
|
|
|
- .send()
|
|
|
- //
|
|
|
- ;
|
|
|
+ MessageId msgResultId = producer.newMessage()
|
|
|
+ .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition))
|
|
|
+ .value(JSON.toJSONBytes(f, Feature.WriteNulls))
|
|
|
+ .send()
|
|
|
+ //
|
|
|
+ ;
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
logger.debug("Send Messages to topic:{} Result: {}", topic, msgResultId);
|
|
|
}
|
|
@@ -346,10 +345,11 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
|
|
|
/**
|
|
|
* 创建多分区topic
|
|
|
+ *
|
|
|
* @param topic
|
|
|
* @param partitionNum
|
|
|
*/
|
|
|
- private void createMultipleTopic(String topic,Integer partitionNum) {
|
|
|
+ private void createMultipleTopic(String topic, Integer partitionNum) {
|
|
|
// 拼接topic前缀
|
|
|
PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties;
|
|
|
String prefix = pulsarMQProperties.getTopicTenantPrefix();
|
|
@@ -368,8 +368,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
// TODO 无论是否报错,都继续后续的操作,此处不进行阻塞
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
/**
|
|
|
* 获取topic
|
|
|
+ *
|
|
|
* @param topic
|
|
|
* @return
|
|
|
*/
|
|
@@ -396,10 +398,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
|
|
|
// 创建指定topic的生产者
|
|
|
producer = client.newProducer()
|
|
|
- .topic(fullTopic)
|
|
|
- // 指定路由器
|
|
|
- .messageRouter(new MessageRouterImpl(topic))
|
|
|
- .create();
|
|
|
+ .topic(fullTopic)
|
|
|
+ // 指定路由器
|
|
|
+ .messageRouter(new MessageRouterImpl(topic))
|
|
|
+ .create();
|
|
|
// 放入缓存
|
|
|
PRODUCERS.put(topic, producer);
|
|
|
}
|
|
@@ -412,6 +414,26 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
return producer;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void stop() {
|
|
|
+ logger.info("## Stop PulsarMQ producer##");
|
|
|
+
|
|
|
+ for (Producer p : PRODUCERS.values()) {
|
|
|
+ try {
|
|
|
+ if (null != p && p.isConnected()) {
|
|
|
+ p.close();
|
|
|
+ }
|
|
|
+ } catch (PulsarClientException e) {
|
|
|
+ logger.warn("close producer name: {}, topic: {}, error: {}",
|
|
|
+ p.getProducerName(),
|
|
|
+ p.getTopic(),
|
|
|
+ e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ super.stop();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Pulsar自定义路由策略
|
|
|
*
|
|
@@ -421,9 +443,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
* @since 2 by chad at 2021/9/17 修改为msg自带目标分区
|
|
|
*/
|
|
|
private static class MessageRouterImpl implements MessageRouter {
|
|
|
+
|
|
|
private String topicLocal;
|
|
|
|
|
|
- public MessageRouterImpl(String topicLocal) {
|
|
|
+ public MessageRouterImpl(String topicLocal){
|
|
|
this.topicLocal = topicLocal;
|
|
|
}
|
|
|
|
|
@@ -435,7 +458,8 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
try {
|
|
|
partition = Integer.parseInt(partitionStr);
|
|
|
} catch (NumberFormatException e) {
|
|
|
- logger.warn("Parse msg {} property failed for value: {}", MSG_PROPERTY_PARTITION_NAME, partitionStr);
|
|
|
+ logger
|
|
|
+ .warn("Parse msg {} property failed for value: {}", MSG_PROPERTY_PARTITION_NAME, partitionStr);
|
|
|
}
|
|
|
}
|
|
|
// topic创建时设置的分区数
|
|
@@ -447,21 +471,4 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
return partition;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void stop() {
|
|
|
- logger.info("## Stop PulsarMQ producer##");
|
|
|
-
|
|
|
- for (Producer p : PRODUCERS.values()) {
|
|
|
- try {
|
|
|
- if (null != p && p.isConnected()) {
|
|
|
- p.close();
|
|
|
- }
|
|
|
- } catch (PulsarClientException e) {
|
|
|
- logger.warn("close producer name: {}, topic: {}, error: {}", p.getProducerName(), p.getTopic(), e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- super.stop();
|
|
|
- }
|
|
|
}
|