Procházet zdrojové kódy

增加Apache Pulsar MQ支持 (#3791)

* done pulsarmq

* del pulsar consumer

* pulsar partition

* done pulsar consumer&connector

* update pulsar-client version to 2.8.1

Co-authored-by: agapple <jianghang115@gmail.com>
Chad2li před 3 roky
rodič
revize
5b6fd8094a
16 změnil soubory, kde provedl 1619 přidání a 10 odebrání
  1. 6 0
      client/pom.xml
  2. 458 0
      client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java
  3. 97 0
      common/src/main/java/com/alibaba/otter/canal/common/utils/MQUtil.java
  4. 19 2
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQMessageUtils.java
  5. 1 0
      connector/pom.xml
  6. 94 0
      connector/pulsarmq-connector/pom.xml
  7. 62 0
      connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java
  8. 57 0
      connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java
  9. 305 0
      connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java
  10. 422 0
      connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java
  11. 1 0
      connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMQProducer
  12. 76 0
      connector/pulsarmq-connector/src/test/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumerTest.java
  13. 1 1
      connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java
  14. 10 2
      deployer/src/main/resources/canal.properties
  15. 4 0
      deployer/src/main/resources/logback.xml
  16. 6 5
      pom.xml

+ 6 - 0
client/pom.xml

@@ -139,6 +139,12 @@
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
+
+		<!-- Pulsar -->
+		<dependency>
+			<groupId>org.apache.pulsar</groupId>
+			<artifactId>pulsar-client</artifactId>
+		</dependency>
 	</dependencies>
 
 	<build>

+ 458 - 0
client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java

@@ -0,0 +1,458 @@
+package com.alibaba.otter.canal.client.pulsarmq;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.CanalMQConnector;
+import com.alibaba.otter.canal.client.CanalMessageDeserializer;
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
+import com.alibaba.otter.canal.common.utils.MQUtil;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * PulsarMQ的连接
+ *
+ * <pre>
+ * 注意点:
+ * 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
+ * </pre>
+ * todo 重复消费的概率相当高。一次批处理中,只要有一个消息处理失败,则该批次全部重试
+ *
+ * @since 1.1.1
+ */
+public class PulsarMQCanalConnector implements CanalMQConnector {
+
+    private static final Logger logger = LoggerFactory.getLogger(PulsarMQCanalConnector.class);
+
+    private volatile Messages<byte[]> lastGetBatchMessage;
+
+    /**
+     * 连接pulsar客户端
+     */
+    private PulsarClient pulsarClient;
+    /**
+     * 消费者
+     */
+    private Consumer<byte[]> consumer;
+    /**
+     * 是否扁平化Canal消息内容
+     */
+    private boolean isFlatMessage = false;
+    /**
+     * 主题名称
+     */
+    private String topic;
+    /**
+     * 环境连接URL
+     */
+    private String serviceUrl;
+    /**
+     * 角色认证token
+     */
+    private String roleToken;
+    /**
+     * 订阅客户端名称
+     */
+    private String subscriptName;
+    /**
+     * 每次批量获取数据的最大条目数,默认30
+     */
+    private int batchSize = 30;
+    /**
+     * 与{@code batchSize}一起决定批量获取的数据大小
+     * 当:
+     * <p>
+     * 1. {@code batchSize} 条消息未消费时<br/>
+     * 2. 距上一次批量消费时间达到{@code batchTimeoutSeconds}秒时
+     * </p>
+     * 任一条件满足,即执行批量消费
+     */
+    private int getBatchTimeoutSeconds = 30;
+    /**
+     * 批量处理消息时,一次批量处理的超时时间秒数
+     * <p>
+     * 该时间应该根据{@code batchSize}和{@code batchTimeoutSeconds}合理设置
+     * </p>
+     */
+    private int batchProcessTimeoutSeconds = 60;
+    /**
+     * 消费失败后的重试秒数,默认60秒
+     */
+    private int redeliveryDelaySeconds = 60;
+    /**
+     * 当客户端接收到消息,30秒还没有返回ack给服务端时,ack超时,会重新消费该消息
+     */
+    private int ackTimeoutSeconds = 30;
+    /**
+     * 是否开启消息失败重试功能,默认开启
+     */
+    private boolean isRetry = true;
+    /**
+     * <p>
+     * true重试(-RETRY)和死信队列(-DLQ)后缀为大写,有些地方创建的为小写,需确保正确
+     * </p>
+     */
+    private boolean isRetryDLQUpperCase = false;
+    /**
+     * 最大重试次数
+     */
+    private int maxRedeliveryCount = 128;
+    /**
+     * 连接标识位,在连接或关闭连接后改变值
+     */
+    private boolean connected = false;
+
+    /**
+     * 除必要参数外,其他参数使用默认值
+     * <p>
+     * 由于pulsar会根据subscriptName来区分消费实例,并且已经分配的指定实例的消息会固定到该实例的retry(重试)和dlq(死信)队列中,
+     * 所以subscriptName必传,且必须跟之前的一致,否则会导致之前消费失败的消息不会重消费。
+     * </p>
+     *
+     * @param isFlatMessage true使用扁平消息
+     * @param serviceUrl    pulsar服务连接地址,通常为:pulsar:host:ip或http://host:ip
+     * @param roleToken     有对应topic的消费者权限的角色token
+     * @param topic         订阅主题
+     * @param subscriptName 订阅和客户端名称,同一个订阅名视为同一个消费实例
+     * @date 2021/9/18 08:54
+     * @author chad
+     * @since 1 by chad at 2021/9/18 完善
+     */
+    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
+            , String subscriptName) {
+        this.isFlatMessage = isFlatMessage;
+        this.serviceUrl = serviceUrl;
+        this.roleToken = roleToken;
+        this.topic = topic;
+        this.subscriptName = subscriptName;
+        if (StringUtils.isEmpty(this.subscriptName)) {
+            throw new RuntimeException("Pulsar Consumer subscriptName required");
+        }
+    }
+
+    /**
+     * 完全自定义的消费实例参数
+     *
+     * @date 2021/9/18 10:20
+     * @author chad
+     * @since 1 by chad at 2021/9/18 完善
+     */
+    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
+            , String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds
+            , int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase
+            , int maxRedeliveryCount) {
+        this.isFlatMessage = isFlatMessage;
+        this.serviceUrl = serviceUrl;
+        this.roleToken = roleToken;
+        this.topic = topic;
+        this.subscriptName = subscriptName;
+        if (StringUtils.isEmpty(this.subscriptName)) {
+            throw new RuntimeException("Pulsar Consumer subscriptName required");
+        }
+        this.batchSize = batchSize;
+        this.getBatchTimeoutSeconds = getBatchTimeoutSeconds;
+        this.batchProcessTimeoutSeconds = batchProcessTimeoutSeconds;
+        this.redeliveryDelaySeconds = redeliveryDelaySeconds;
+        this.ackTimeoutSeconds = ackTimeoutSeconds;
+        this.isRetry = isRetry;
+        this.isRetryDLQUpperCase = isRetryDLQUpperCase;
+        this.maxRedeliveryCount = maxRedeliveryCount;
+    }
+
+    @Override
+    public void connect() throws CanalClientException {
+        // 连接创建客户端
+        try {
+            pulsarClient = PulsarClient.builder()
+                    .serviceUrl(serviceUrl)
+                    .authentication(AuthenticationFactory.token(roleToken))
+                    .build();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void disconnect() throws CanalClientException {
+        try {
+            if (null != this.consumer && this.consumer.isConnected()) {
+                this.consumer.close();
+            }
+        } catch (PulsarClientException e) {
+            logger.error("close pulsar consumer error", e);
+        }
+        try {
+            if (null != this.pulsarClient) {
+                this.pulsarClient.close();
+            }
+        } catch (PulsarClientException e) {
+            logger.error("close pulsar client error", e);
+        }
+
+        this.connected = false;
+    }
+
+    @Override
+    public boolean checkValid() throws CanalClientException {
+        return connected;
+    }
+
+    @Override
+    public synchronized void subscribe(String filter) throws CanalClientException {
+        if (connected) {
+            return;
+        }
+        ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer();
+
+        if (MQUtil.isPatternTopic(this.topic)) {
+            // 正则
+            builder.topicsPattern(this.topic);
+        } else {// 多个topic
+            builder.topic(this.topic);
+        }
+        // 为保证消息的有序性,仅支持单消费实例模式
+        // 灾备模式,一个分区只能有一个消费者,如果当前消费者不可用,自动切换到其他消费者
+        builder.subscriptionType(SubscriptionType.Failover);
+
+        builder
+                // 调用consumer.negativeAcknowledge(message) (即nack)来表示消费失败的消息
+                // 在指定的时间进行重新消费,默认是1分钟。
+                .negativeAckRedeliveryDelay(this.redeliveryDelaySeconds, TimeUnit.SECONDS)
+                .subscriptionName(this.subscriptName)
+        ;
+        if (this.isRetry) {
+            DeadLetterPolicy.DeadLetterPolicyBuilder dlqBuilder = DeadLetterPolicy.builder()
+                    // 最大重试次数
+                    .maxRedeliverCount(this.maxRedeliveryCount);
+            // 指定重试队列,不是多个或通配符topic才能判断重试队列
+            if (!MQUtil.isPatternTag(this.topic)) {
+                String retryTopic = this.topic + (this.isRetryDLQUpperCase ? "-RETRY" : "-retry");
+                dlqBuilder.retryLetterTopic(retryTopic);
+                String dlqTopic = this.topic + (this.isRetryDLQUpperCase ? "-DLQ" : "-dlq");
+                dlqBuilder.deadLetterTopic(dlqTopic);
+            }
+
+            //默认关闭,如果需要重试则开启
+            builder.enableRetry(true)
+                    .deadLetterPolicy(dlqBuilder.build());
+        }
+
+        // ack超时
+        builder.ackTimeout(this.ackTimeoutSeconds, TimeUnit.SECONDS);
+
+        // pulsar批量获取消息设置
+        builder.batchReceivePolicy(new BatchReceivePolicy.Builder()
+                .maxNumMessages(this.batchSize)
+                .timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS)
+                .build());
+
+        try {
+            this.consumer = builder.subscribe();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+        connected = true;
+    }
+
+
+    @Override
+    public void subscribe() throws CanalClientException {
+        this.subscribe(null);
+    }
+
+    @Override
+    public void unsubscribe() throws CanalClientException {
+        try {
+            if (null != this.consumer) {
+                this.consumer.unsubscribe();
+            }
+        } catch (PulsarClientException e) {
+            throw new CanalClientException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 不关注业务执行结果,只要收到消息即认识消费成功,自动ack
+     *
+     * @param timeout 阻塞获取消息的超时时间
+     * @param unit    时间单位
+     * @return java.util.List<com.alibaba.otter.canal.protocol.Message>
+     * @date 2021/9/13 22:24
+     * @author chad
+     * @since 1 by chad at 2021/9/13 添加注释
+     */
+    @Override
+    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
+        List<Message> messages = getListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            ack();
+        }
+        return messages;
+    }
+
+    /**
+     * 关心业务执行结果,业务侧根据执行结果调用 {@link PulsarMQCanalConnector#ack()}或{@link PulsarMQCanalConnector#rollback()}
+     * <p>
+     * 本方法示支持多线程,在MQ保障顺序的前提下,也无法提供单Topic多线程
+     * </p>
+     *
+     * @param timeout 阻塞获取消息的超时时间
+     * @param unit    时间单位
+     * @return java.util.List<com.alibaba.otter.canal.protocol.Message>
+     * @date 2021/9/13 22:26
+     * @author chad
+     * @since 1 by chad at 2021/9/13 添加注释
+     */
+    @Override
+    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+        return getListWithoutAck();
+    }
+
+    @Override
+    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
+        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            ack();
+        }
+        return messages;
+    }
+
+    @Override
+    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+        return getListWithoutAck();
+    }
+
+    /**
+     * 获取泛型数据,供其他方法调用
+     * <p>
+     * 不支持多线程调用
+     * </p>
+     *
+     * @return java.util.List<T>
+     * @date 2021/9/14 15:20
+     * @author chad
+     * @since 1 by chad at 2021/9/14 供{@link PulsarMQCanalConnector#getListWithoutAck(Long, TimeUnit)}
+     * 和{@link PulsarMQCanalConnector#getFlatListWithoutAck(Long, TimeUnit)}调用
+     */
+    private <T> List<T> getListWithoutAck() {
+        if (null != this.lastGetBatchMessage) {
+            throw new CanalClientException("mq get/ack not support concurrent & async ack");
+        }
+        List messageList = Lists.newArrayList();
+
+        try {
+            this.lastGetBatchMessage = consumer.batchReceive();
+            if (null == this.lastGetBatchMessage || this.lastGetBatchMessage.size() < 1) {
+                this.lastGetBatchMessage = null;
+                return messageList;
+            }
+        } catch (PulsarClientException e) {
+            logger.error("Receiver Pulsar MQ message error", e);
+            throw new CanalClientException(e);
+        }
+
+        for (org.apache.pulsar.client.api.Message<byte[]> msgExt : this.lastGetBatchMessage) {
+            byte[] data = msgExt.getData();
+            if (data == null) {
+                logger.warn("Received message data is null");
+                continue;
+            }
+            try {
+                if (isFlatMessage) {
+                    FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
+                    messageList.add(flatMessage);
+                } else {
+                    Message message = CanalMessageDeserializer.deserializer(data);
+                    messageList.add(message);
+                }
+            } catch (Exception ex) {
+                logger.error("Add message error", ex);
+                throw new CanalClientException(ex);
+            }
+        }
+
+        return messageList;
+    }
+
+    /**
+     * 当业务侧执行成功时,需要手动执行消息的ack操作
+     *
+     * @return void
+     * @date 2021/9/13 22:27
+     * @author chad
+     * @since 1 by chad at 2021/9/13 添加注释
+     */
+    @Override
+    public void ack() throws CanalClientException {
+        // 为什么要一个批次要么全部成功要么全部失败
+        try {
+            if (this.lastGetBatchMessage != null) {
+                this.consumer.acknowledge(this.lastGetBatchMessage);
+            }
+        } catch (Throwable e) {
+            if (this.lastGetBatchMessage != null) {
+                this.consumer.negativeAcknowledge(this.lastGetBatchMessage);
+            }
+        } finally {
+            this.lastGetBatchMessage = null;
+        }
+    }
+
+    /**
+     * 当业务侧执行失败时,需要手动执行消息的rollback操作,从而让消息重新消费
+     *
+     * @return void
+     * @date 2021/9/13 22:28
+     * @author chad
+     * @since 1 by chad at 2021/9/13 添加注释
+     */
+    @Override
+    public void rollback() throws CanalClientException {
+        try {
+            if (this.lastGetBatchMessage != null) {
+                this.consumer.negativeAcknowledge(this.lastGetBatchMessage);
+            }
+        } finally {
+            this.lastGetBatchMessage = null;
+        }
+    }
+
+    @Override
+    public Message get(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void ack(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void rollback(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+}

+ 97 - 0
common/src/main/java/com/alibaba/otter/canal/common/utils/MQUtil.java

@@ -0,0 +1,97 @@
+package com.alibaba.otter.canal.common.utils;
+
+
+public class MQUtil {
+    /**
+     * topic是否是正则表达式
+     *
+     * @param topic
+     * @return true是正则表达式
+     */
+    public static boolean isPatternTopic(String topic) {
+        return !topic.matches("^[0-9a-z:/-]+$");
+    }
+
+    /**
+     * 检查topic有效性
+     *
+     * @param topic
+     * @return
+     */
+    public static boolean checkTopic(String topic) {
+        return topic.matches("^[0-9a-z:/.*-]+$");
+    }
+
+    /**
+     * 检查tag有效性
+     *
+     * @param tag
+     * @return
+     */
+    public static boolean checkTag(String tag) {
+        return tag.matches("^[0-9a-zA-Z.*]+$");
+    }
+
+    /**
+     * 判断tag是否是正则
+     *
+     * @param tag
+     * @return
+     */
+    public static boolean isPatternTag(String tag) {
+        return !tag.matches("^[0-9a-zA-Z]+$");
+    }
+
+    /**
+     * 检查topic有效性
+     *
+     * @param topics
+     */
+    public static void checkTopicWithErr(String... topics) {
+        if (null == topics || 0 == topics.length) {
+            throw new NullPointerException("topic cannot null");
+        }
+
+        if (1 == topics.length) {
+            boolean ok = checkTopic(topics[0]);
+            if (ok) {
+                return;
+            }
+            throw new RuntimeException("topic invalid: " + topics[0]);
+        }
+
+        for (String t : topics) {
+            if (!checkTopic(t)) {
+                throw new IllegalArgumentException("topic invalid: " + t);
+            }
+            if (isPatternTopic(t)) {
+                throw new RuntimeException("pattern topic cannot multi: " + t);
+            }
+        }
+    }
+
+    /**
+     * 检查tag有效性
+     *
+     * @param tags
+     */
+    public static void checkTagWithErr(String... tags) {
+        // 空表示不使用tag
+        if (null == tags || 0 == tags.length) {
+            return;
+        }
+
+//        if (1 == tags.length && (null == tags[0] || 0 == tags[0].trim().length())) {
+//            throw new NullPointerException("tag cannot null");
+//        }
+
+        for (String t : tags) {
+            if (!checkTag(t)) {
+                throw new IllegalArgumentException("tag invalid: " + t);
+            }
+            if (isPatternTag(t)) {
+                throw new RuntimeException("pattern tag cannot multi: " + t);
+            }
+        }
+    }
+}

+ 19 - 2
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQMessageUtils.java

@@ -671,16 +671,33 @@ public class MQMessageUtils {
     }
 
     public static class PartitionData {
-
+        /**
+         * 当{schemaName.tableName}没有正则时,就匹配{schemaName.tableName}
+         */
         public String             simpleName;
+        /**
+         * 当{schemaName.tableName}有正则时,匹配正则
+         */
         public AviaterRegexFilter regexFilter;
+        /**
+         * hash模式
+         */
         public HashMode           hashMode = new HashMode();
     }
 
     public static class HashMode {
-
+        /**
+         * 当{schemaName.tableName}:$pk$时,使用自动主键hash
+         */
         public boolean      autoPkHash = false;
+        /**
+         * 当表达式仅为{schemaName.tableName},没有:时,仅使用table hash
+         */
         public boolean      tableHash  = false;
+
+        /**
+         * 当表达式为{schemaName.tableName}:id^name^age时,pkNames为:id, name, age三个
+         */
         public List<String> pkNames    = new ArrayList<>();
     }
 

+ 1 - 0
connector/pom.xml

@@ -97,6 +97,7 @@
         <module>rocketmq-connector</module>
         <module>rabbitmq-connector</module>
         <module>tcp-connector</module>
+        <module>pulsarmq-connector</module>
     </modules>
 
     <build>

+ 94 - 0
connector/pulsarmq-connector/pom.xml

@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal.connector</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.6-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>connector.pulsarmq</artifactId>
+    <packaging>jar</packaging>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.protocol</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>connector.core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <tasks>
+                                <copy todir="${project.basedir}/../../deployer/target/canal/plugin" overwrite="true">
+                                    <fileset dir="${project.basedir}/target" erroronmissingdir="true">
+                                        <include name="connector.pulsarmq-${project.version}-jar-with-dependencies.jar" />
+                                    </fileset>
+                                </copy>
+                                <copy todir="${project.basedir}/../../client-adapter/launcher/target/canal-adapter/plugin" overwrite="true">
+                                    <fileset dir="${project.basedir}/target" erroronmissingdir="true">
+                                        <include name="connector.pulsarmq-${project.version}-jar-with-dependencies.jar" />
+                                    </fileset>
+                                </copy>
+                            </tasks>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 62 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java

@@ -0,0 +1,62 @@
+package com.alibaba.otter.canal.connector.pulsarmq.config;
+
+import org.joda.time.Seconds;
+
+/**
+ * PulsarMQ配置
+ *
+ * @author chad
+ * @date 2021/9/15 11:13
+ * @since 1 by chad at 2021/9/15 新增配置文件
+ */
+public class PulsarMQConstants {
+    public static final String ROOT = "pulsarmq";
+    /**
+     * pulsar服务连接地址
+     */
+    public static final String PULSARMQ_SERVER_URL = ROOT + "." + "serverUrl";
+    /**
+     * pulsar服务角色token,需要有对应token的生产者权限
+     */
+    public static final String PULSARMQ_ROLE_TOKEN = ROOT + "." + "roleToken";
+    /**
+     * topic前缀
+     */
+    public static final String PULSARMQ_TOPIC_TENANT_PREFIX = ROOT + "." + "topicTenantPrefix";
+
+    /**** 消费者 *****/
+    /**
+     * 获取批量消息超时等待时间
+     */
+    public static final String PULSARMQ_GET_BATCH_TIMEOUT_SECONDS = ROOT + "." + "getBatchTimeoutSeconds";
+    /**
+     * 批量处理超时时间
+     */
+    public static final String PULSARMQ_BATCH_PROCESS_TIMEOUT = ROOT + "." + "batchProcessTimeout";
+    /**
+     * 消费都订阅名称,将以该名称为消费者身份标识,同一个subscriptName,认为是同一个消费实例
+     */
+    public static final String PULSARMQ_SUBSCRIPT_NAME = ROOT + "." + "subscriptName";
+    /**
+     * 重试间隔秒数
+     */
+    public static final String PULSARMQ_REDELIVERY_DELAY_SECONDS = ROOT + "." + "redeliveryDelaySeconds";
+    /**
+     * ACK超时秒数
+     */
+    public static final String PULSARMQ_ACK_TIMEOUT_SECONDS = ROOT + "." + "ackTimeoutSeconds";
+    /**
+     * 是否开启消费重试
+     */
+    public static final String PULSARMQ_IS_RETRY = ROOT + "." + "isRetry";
+    /**
+     * 自动生成的 retry dlq队列名称后缀是否大写
+     */
+    public static final String PULSARMQ_IS_RETRY_DLQ_UPPERCASE = ROOT + "." + "isRetryDLQUpperCase";
+    /**
+     * 最大重试次数
+     */
+    public static final String PULSARMQ_MAX_REDELIVERY_COUNT = ROOT + "." + "maxRedeliveryCount";
+
+
+}

+ 57 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java

@@ -0,0 +1,57 @@
+package com.alibaba.otter.canal.connector.pulsarmq.config;
+
+import com.alibaba.otter.canal.connector.core.config.MQProperties;
+
+/**
+ * Pulsar生产者配置
+ * @author chad
+ * @date 2021/9/15 11:23
+ * @since 1 by chad at 2021/9/15 新增
+ */
+public class PulsarMQProducerConfig extends MQProperties {
+    /**
+     * pulsar服务连接地址
+     * <p>
+     *     broker: pulsar://localhost:6650<br/>
+     *     httpUrl: http://localhost:8080
+     * </p>
+     */
+    private String serverUrl;
+    /**
+     * pulsar topic前缀
+     * <p>
+     * 正常的pulsar topic全路径为:persistent://{tenant}/{namespace}/{TOPIC},
+     * 而为了方便,在配置文件中仅需要配置 {TOPIC} 段即可,persistent://{tenant}/{namespace}由此参数控制。
+     * 在发送消息时会自动拼接上
+     * </p>
+     */
+    private String topicTenantPrefix;
+    /**
+     * 生产者角色权限,请确保该角色有canal使用的所有topic生产者权限(最低要求)
+     */
+    private String roleToken;
+
+    public String getServerUrl() {
+        return serverUrl;
+    }
+
+    public void setServerUrl(String serverUrl) {
+        this.serverUrl = serverUrl;
+    }
+
+    public String getRoleToken() {
+        return roleToken;
+    }
+
+    public void setRoleToken(String roleToken) {
+        this.roleToken = roleToken;
+    }
+
+    public String getTopicTenantPrefix() {
+        return topicTenantPrefix;
+    }
+
+    public void setTopicTenantPrefix(String topicTenantPrefix) {
+        this.topicTenantPrefix = topicTenantPrefix;
+    }
+}

+ 305 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java

@@ -0,0 +1,305 @@
+package com.alibaba.otter.canal.connector.pulsarmq.consumer;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.common.utils.MQUtil;
+import com.alibaba.otter.canal.connector.core.config.CanalConstants;
+import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
+import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
+import com.alibaba.otter.canal.connector.core.spi.SPI;
+import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
+import com.alibaba.otter.canal.connector.core.util.MessageUtil;
+import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.*;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pulsar consumer SPI 实现
+ *
+ * @author rewerma @ 2020-02-01
+ * @version 1.0.0
+ */
+@SPI("pulsarmq")
+public class CanalPulsarMQConsumer implements CanalMsgConsumer {
+    /**
+     * 连接pulsar客户端
+     */
+    private PulsarClient pulsarClient;
+    private Consumer<byte[]> pulsarMQConsumer;
+    /**
+     * 是否为扁平消息
+     */
+    private boolean flatMessage = false;
+    /**
+     * 主题名称
+     */
+    private String topic;
+    /**
+     * 单线程控制
+     */
+    private volatile Messages<byte[]> lastGetBatchMessage;
+    /**
+     * 环境连接URL
+     */
+    private String serviceUrl;
+    /**
+     * 角色认证token
+     */
+    private String roleToken;
+    /**
+     * 订阅客户端名称
+     */
+    private String subscriptName;
+    /**
+     * 每次批量获取数据的最大条目数,默认30
+     */
+    private int batchSize = 30;
+    /**
+     * 与{@code batchSize}一起决定批量获取的数据大小
+     * 当:
+     * <p>
+     * 1. {@code batchSize} 条消息未消费时<br/>
+     * 2. 距上一次批量消费时间达到{@code batchTimeoutSeconds}秒时
+     * </p>
+     * 任一条件满足,即执行批量消费
+     */
+    private int getBatchTimeoutSeconds = 30;
+    /**
+     * 批量处理消息时,一次批量处理的超时时间
+     * <p>
+     * 该时间应该根据{@code batchSize}和{@code batchTimeoutSeconds}合理设置
+     * </p>
+     */
+    private long batchProcessTimeout = 60 * 1000;
+    /**
+     * 消费失败后的重试秒数,默认60秒
+     */
+    private int redeliveryDelaySeconds = 60;
+    /**
+     * 当客户端接收到消息,30秒还没有返回ack给服务端时,ack超时,会重新消费该消息
+     */
+    private int ackTimeoutSeconds = 30;
+    /**
+     * 是否开启消息失败重试功能,默认开启
+     */
+    private boolean isRetry = true;
+    /**
+     * <p>
+     * true重试(-RETRY)和死信队列(-DLQ)后缀为大写,有些地方创建的为小写,需确保正确
+     * </p>
+     */
+    private boolean isRetryDLQUpperCase = false;
+    /**
+     * 最大重试次数
+     */
+    private int maxRedeliveryCount = 128;
+
+    @Override
+    public void init(Properties properties, String topic, String groupId) {
+        this.topic = topic;
+        String flatMessageStr = properties.getProperty(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
+        if (StringUtils.isNotEmpty(flatMessageStr)) {
+            this.flatMessage = Boolean.parseBoolean(flatMessageStr);
+        }
+        this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
+        this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
+        this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
+        if (StringUtils.isEmpty(this.subscriptName)) {
+            throw new RuntimeException("Pulsar Consumer subscriptName required");
+        }
+        String batchSizeStr = properties.getProperty(CanalConstants.CANAL_MQ_CANAL_BATCH_SIZE);
+        if (StringUtils.isNotEmpty(batchSizeStr)) {
+            this.batchSize = Integer.parseInt(batchSizeStr);
+        }
+        String getBatchTimeoutSecondsStr = properties.getProperty(PulsarMQConstants.PULSARMQ_GET_BATCH_TIMEOUT_SECONDS);
+        if (StringUtils.isNotEmpty(getBatchTimeoutSecondsStr)) {
+            this.getBatchTimeoutSeconds = Integer.parseInt(getBatchTimeoutSecondsStr);
+        }
+        String batchProcessTimeoutStr = properties.getProperty(PulsarMQConstants.PULSARMQ_BATCH_PROCESS_TIMEOUT);
+        if (StringUtils.isNotEmpty(batchProcessTimeoutStr)) {
+            this.batchProcessTimeout = Integer.parseInt(batchProcessTimeoutStr);
+        }
+        String redeliveryDelaySecondsStr = properties.getProperty(PulsarMQConstants.PULSARMQ_REDELIVERY_DELAY_SECONDS);
+        if (StringUtils.isNotEmpty(redeliveryDelaySecondsStr)) {
+            this.redeliveryDelaySeconds = Integer.parseInt(redeliveryDelaySecondsStr);
+        }
+        String ackTimeoutSecondsStr = properties.getProperty(PulsarMQConstants.PULSARMQ_ACK_TIMEOUT_SECONDS);
+        if (StringUtils.isNotEmpty(ackTimeoutSecondsStr)) {
+            this.ackTimeoutSeconds = Integer.parseInt(ackTimeoutSecondsStr);
+        }
+        String isRetryStr = properties.getProperty(PulsarMQConstants.PULSARMQ_IS_RETRY);
+        if (StringUtils.isNotEmpty(isRetryStr)) {
+            this.isRetry = Boolean.parseBoolean(isRetryStr);
+        }
+        String isRetryDLQUpperCaseStr = properties.getProperty(PulsarMQConstants.PULSARMQ_IS_RETRY_DLQ_UPPERCASE);
+        if (StringUtils.isNotEmpty(isRetryDLQUpperCaseStr)) {
+            this.isRetryDLQUpperCase = Boolean.parseBoolean(isRetryDLQUpperCaseStr);
+        }
+        String maxRedeliveryCountStr = properties.getProperty(PulsarMQConstants.PULSARMQ_MAX_REDELIVERY_COUNT);
+        if (StringUtils.isNotEmpty(maxRedeliveryCountStr)) {
+            this.maxRedeliveryCount = Integer.parseInt(maxRedeliveryCountStr);
+        }
+    }
+
+    @Override
+    public void connect() {
+        if (isConsumerActive()) {
+            return;
+        }
+        // 连接创建客户端
+        try {
+            pulsarClient = PulsarClient.builder()
+                    .serviceUrl(serviceUrl)
+                    .authentication(AuthenticationFactory.token(roleToken))
+                    .build();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+        ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer();
+        if (MQUtil.isPatternTopic(this.topic)) {
+            // 正则只支持一个
+            builder.topicsPattern(this.topic);
+        } else {// 多个topic
+            builder.topic(this.topic);
+        }
+        // 为保证消息的有序性,仅支持单消费实例模式
+        // 灾备模式,一个分区只能有一个消费者,如果当前消费者不可用,自动切换到其他消费者
+        builder.subscriptionType(SubscriptionType.Failover);
+
+        builder
+                // 调用consumer.negativeAcknowledge(message) (即nack)来表示消费失败的消息
+                // 在指定的时间进行重新消费,默认是1分钟。
+                .negativeAckRedeliveryDelay(this.redeliveryDelaySeconds, TimeUnit.SECONDS)
+                .subscriptionName(this.subscriptName)
+        ;
+        if (this.isRetry) {
+            DeadLetterPolicy.DeadLetterPolicyBuilder dlqBuilder = DeadLetterPolicy.builder()
+                    // 最大重试次数
+                    .maxRedeliverCount(this.maxRedeliveryCount);
+            // 指定重试队列,不是多个或通配符topic才能判断重试队列
+            if (!MQUtil.isPatternTag(this.topic)) {
+                String retryTopic = this.topic + (this.isRetryDLQUpperCase ? "-RETRY" : "-retry");
+                dlqBuilder.retryLetterTopic(retryTopic);
+                String dlqTopic = this.topic + (this.isRetryDLQUpperCase ? "-DLQ" : "-dlq");
+                dlqBuilder.deadLetterTopic(dlqTopic);
+            }
+
+            //默认关闭,如果需要重试则开启
+            builder.enableRetry(true)
+                    .deadLetterPolicy(dlqBuilder.build());
+        }
+
+        // ack超时
+        builder.ackTimeout(this.ackTimeoutSeconds, TimeUnit.SECONDS);
+
+        // pulsar批量获取消息设置
+        builder.batchReceivePolicy(new BatchReceivePolicy.Builder()
+                .maxNumMessages(this.batchSize)
+                .timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS)
+                .build());
+
+        try {
+            this.pulsarMQConsumer = builder.subscribe();
+        } catch (PulsarClientException e) {
+            throw new CanalClientException("Subscript pulsar consumer error", e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
+        List<CommonMessage> messageList = Lists.newArrayList();
+        try {
+            Messages<byte[]> messages = pulsarMQConsumer.batchReceive();
+
+            if (null == messages || messages.size() == 0) {
+                return messageList;
+            }
+            // 保存当前消费记录,用于ack和rollback
+            this.lastGetBatchMessage = messages;
+            for (org.apache.pulsar.client.api.Message<byte[]> msg : messages) {
+                byte[] data = msg.getData();
+                if (!this.flatMessage) {
+                    Message message = CanalMessageSerializerUtil.deserializer(data);
+                    List<CommonMessage> list = MessageUtil.convert(message);
+                    messageList.addAll(list);
+                } else {
+                    CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
+                    messageList.add(commonMessage);
+                }
+            }
+        } catch (PulsarClientException e) {
+            throw new CanalClientException("Receive pulsar batch message error", e);
+        }
+
+        return messageList;
+    }
+
+    @Override
+    public void rollback() {
+        try {
+            if (isConsumerActive() && hasLastMessages()) {
+                // 回滚所有消息
+                this.pulsarMQConsumer.negativeAcknowledge(this.lastGetBatchMessage);
+            }
+        } finally {
+            this.lastGetBatchMessage = null;
+        }
+    }
+
+    @Override
+    public void ack() {
+        try {
+            if (isConsumerActive() && hasLastMessages()) {
+                // 确认所有消息
+                this.pulsarMQConsumer.acknowledge(this.lastGetBatchMessage);
+            }
+        } catch (PulsarClientException e) {
+            if (isConsumerActive() && hasLastMessages()) {
+                this.pulsarMQConsumer.negativeAcknowledge(this.lastGetBatchMessage);
+            }
+        } finally {
+            this.lastGetBatchMessage = null;
+        }
+    }
+
+    @Override
+    public void disconnect() {
+        if (null == this.pulsarMQConsumer || !this.pulsarMQConsumer.isConnected()) {
+            return;
+        }
+        try {
+            this.pulsarMQConsumer.unsubscribe();
+            this.pulsarClient.close();
+        } catch (PulsarClientException e) {
+            throw new CanalClientException("Disconnect pulsar consumer error", e);
+        }
+    }
+
+    /**
+     * 是否消费可用
+     *
+     * @return true消费者可用
+     */
+    private boolean isConsumerActive() {
+        return null != this.pulsarMQConsumer && this.pulsarMQConsumer.isConnected();
+    }
+
+    /**
+     * 是否有未确认消息
+     *
+     * @return true有正在消费的待确认消息
+     */
+    private boolean hasLastMessages() {
+        return null != this.lastGetBatchMessage && this.lastGetBatchMessage.size() > 0;
+    }
+}

+ 422 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java

@@ -0,0 +1,422 @@
+package com.alibaba.otter.canal.connector.pulsarmq.producer;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
+import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
+import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
+import com.alibaba.otter.canal.connector.core.producer.MQDestination;
+import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
+import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
+import com.alibaba.otter.canal.connector.core.spi.SPI;
+import com.alibaba.otter.canal.connector.core.util.Callback;
+import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
+import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
+import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQProducerConfig;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.shade.com.google.gson.JsonParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * PulsarMQ Producer SPI 实现
+ *
+ * @author chad 2021/9/2
+ * @version 1.0.0
+ */
+@SPI("pulsarmq")
+public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQProducer {
+
+    private static final Logger logger = LoggerFactory.getLogger(CanalPulsarMQProducer.class);
+
+    private static final Map<String, Producer<byte[]>> PRODUCERS = new HashMap<>();
+
+    protected ThreadPoolExecutor sendPartitionExecutor;
+    /**
+     * 消息体分区属性名称
+     */
+    public static final String MSG_PROPERTY_PARTITION_NAME = "partitionNum";
+    /**
+     * pulsar客户端,管理连接
+     */
+    protected PulsarClient client;
+
+    @Override
+    public void init(Properties properties) {
+        // 加载配置
+        PulsarMQProducerConfig pulsarMQProducerConfig = new PulsarMQProducerConfig();
+        this.mqProperties = pulsarMQProducerConfig;
+        super.init(properties);
+        loadPulsarMQProperties(properties);
+
+        // 初始化连接客户端
+        try {
+            client = PulsarClient.builder()
+                    // 填写pulsar的连接地址
+                    .serviceUrl(pulsarMQProducerConfig.getServerUrl())
+                    // 角色权限认证的token
+                    .authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()))
+                    .build();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+        // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
+
+        int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
+        sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
+                parallelPartitionSendThreadSize,
+                0,
+                TimeUnit.SECONDS,
+                new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
+                new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
+                new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    /**
+     * 加载配置
+     *
+     * @param properties
+     * @return void
+     * @date 2021/9/15 11:22
+     * @author chad
+     * @since 1 by chad at 2021/9/15 新增
+     */
+    private void loadPulsarMQProperties(Properties properties) {
+        PulsarMQProducerConfig tmpProperties = (PulsarMQProducerConfig) this.mqProperties;
+        String serverUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_SERVER_URL);
+        if (!StringUtils.isEmpty(serverUrl)) {
+            tmpProperties.setServerUrl(serverUrl);
+        }
+
+        String roleToken = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
+        if (!StringUtils.isEmpty(roleToken)) {
+            tmpProperties.setRoleToken(roleToken);
+        }
+        String topicTenantPrefix = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_TOPIC_TENANT_PREFIX);
+        if (!StringUtils.isEmpty(topicTenantPrefix)) {
+            tmpProperties.setTopicTenantPrefix(topicTenantPrefix);
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
+        }
+    }
+
+    /**
+     * 发送消息,处理的任务:
+     * <p>
+     * 1. 动态 Topic,根据schema.table或schema来匹配topic配置,将改变发送到指定的一个或多个具体的Topic<br/>
+     * 2. 使用线程池发送多个消息,单个消息不使用线程池
+     * </p>
+     *
+     * @param destination 消息目标信息
+     * @param message     消息
+     * @param callback    消息发送结果回调
+     * @return void
+     * @date 2021/9/2 22:01
+     * @author chad
+     * @since 1.0.0 by chad at 2021/9/2: 新增
+     */
+    @Override
+    public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Message message, Callback callback) {
+
+        ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
+        try {
+            if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
+                // 动态topic
+                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils.messageTopics(message,
+                        destination.getTopic(),
+                        destination.getDynamicTopic());
+
+                for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
+                    String topicName = entry.getKey().replace('.', '_');
+                    com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
+                    template.submit(() -> {
+                        try {
+                            send(destination, topicName, messageSub);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+                }
+
+                template.waitForResult();
+            } else {
+                send(destination, destination.getTopic(), message);
+            }
+
+            callback.commit();
+        } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+            callback.rollback();
+        } finally {
+            template.clear();
+        }
+    }
+
+    /**
+     * 发送单条消息到指定topic。区分是否发送扁平消息
+     *
+     * @param destination
+     * @param topicName
+     * @param message
+     * @return void
+     * @date 2021/9/2 22:05
+     * @author chad
+     * @since 1.0.0 by chad at 2021/9/2: 新增
+     */
+    public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
+
+        // 获取当前topic的分区数
+        Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
+                destination.getDynamicTopicPartitionNum());
+        if (partitionNum == null) {
+            partitionNum = destination.getPartitionsNum();
+        }
+        ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
+        // 并发构造
+        MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
+        if (!mqProperties.isFlatMessage()) {
+            // 动态计算目标分区
+            if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                for (MQMessageUtils.EntryRowData r : datas) {
+                    CanalEntry.Entry entry = r.entry;
+                    if (null == entry) {
+                        continue;
+                    }
+                    // 串行分区
+                    com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
+                            message.getId(),
+                            partitionNum,
+                            destination.getPartitionHash(),
+                            mqProperties.isDatabaseHash());
+                    // 发送
+                    int len = messages.length;
+                    for (int i = 0; i < len; i++) {
+                        final int partition = i;
+                        com.alibaba.otter.canal.protocol.Message m = messages[i];
+                        template.submit(() -> {
+                            sendMessage(topicName, partition, m);
+                        });
+                    }
+                }
+            } else {
+                // 默认分区
+                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
+                sendMessage(topicName, partition, message);
+            }
+        } else {
+            // 串行分区
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
+
+            // 初始化分区合并队列
+            if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
+                int len = partitionNum;
+                for (int i = 0; i < len; i++) {
+                    partitionFlatMessages.add(new ArrayList<>());
+                }
+
+                for (FlatMessage flatMessage : flatMessages) {
+                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
+                            partitionNum,
+                            destination.getPartitionHash(),
+                            mqProperties.isDatabaseHash());
+                    int length = partitionFlatMessage.length;
+                    for (int i = 0; i < length; i++) {
+                        // 增加null判断,issue #3267
+                        if (partitionFlatMessage[i] != null) {
+                            partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
+                        }
+                    }
+                }
+
+                for (int i = 0; i < len; i++) {
+                    final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
+                    if (flatMessagePart != null && flatMessagePart.size() > 0) {
+                        final int partition = i;
+                        template.submit(() -> {
+                            // 批量发送
+                            sendMessage(topicName, partition, flatMessagePart);
+                        });
+                    }
+                }
+
+                // 批量等所有分区的结果
+                template.waitForResult();
+            } else {
+                // 默认分区
+                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
+                sendMessage(topicName, partition, flatMessages);
+            }
+        }
+    }
+
+    /**
+     * 发送原始消息,需要做分区处理
+     *
+     * @param topic        topic
+     * @param partitionNum 目标分区
+     * @param msg          原始消息内容
+     * @return void
+     * @date 2021/9/10 17:55
+     * @author chad
+     * @since 1 by chad at 2021/9/10 新增
+     */
+    private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal.protocol.Message msg) {
+        Producer<byte[]> producer = getProducer(topic);
+        byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry());
+        try {
+            MessageId msgResultId = producer.newMessage()
+                    .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum))
+                    .value(msgBytes).send();
+            // todo 判断发送结果
+            if (logger.isDebugEnabled()) {
+                logger.debug("Send Message to topic:{} Result: {}", topic, msgResultId);
+            }
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * 发送扁平消息
+     *
+     * @param topic        topic主题
+     * @param flatMessages 扁平消息
+     * @return void
+     * @date 2021/9/10 18:22
+     * @author chad
+     * @since 1 by chad at 2021/9/10 新增
+     */
+    private void sendMessage(String topic, int partition, List<FlatMessage> flatMessages) {
+        Producer<byte[]> producer = getProducer(topic);
+        for (FlatMessage f : flatMessages) {
+            try {
+                MessageId msgResultId = producer
+                        .newMessage()
+                        .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition))
+                        .value(JSON.toJSONBytes(f, SerializerFeature.WriteMapNullValue))
+                        .send()
+                        //
+                        ;
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Send Messages to topic:{} Result: {}", topic, msgResultId);
+                }
+            } catch (Throwable e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * 获取指定topic的生产者,并且使用缓存
+     *
+     * @param topic
+     * @return org.apache.pulsar.client.api.Producer<byte [ ]>
+     * @date 2021/9/10 11:21
+     * @author chad
+     * @since 1 by chad at 2021/9/10 新增
+     */
+    private Producer<byte[]> getProducer(String topic) {
+        Producer producer = PRODUCERS.get(topic);
+
+        if (null == producer) {
+            try {
+                synchronized (PRODUCERS) {
+                    producer = PRODUCERS.get(topic);
+                    if (null != producer) {
+                        return producer;
+                    }
+
+                    // 拼接topic前缀
+                    PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties;
+                    String prefix = pulsarMQProperties.getTopicTenantPrefix();
+                    String fullTopic = topic;
+                    if (!StringUtils.isEmpty(prefix)) {
+                        if (!prefix.endsWith("/")) {
+                            fullTopic = "/" + fullTopic;
+                        }
+                        fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic;
+                    }
+
+                    // 创建指定topic的生产者
+                    producer = client.newProducer()
+                            .topic(fullTopic)
+                            // 指定路由器
+                            .messageRouter(new MessageRouterImpl(topic))
+                            .create();
+                    // 放入缓存
+                    PRODUCERS.put(topic, producer);
+                }
+            } catch (PulsarClientException e) {
+                logger.error("create producer failed for topic: " + topic, e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        return producer;
+    }
+
+    /**
+     * Pulsar自定义路由策略
+     *
+     * @author chad
+     * @version 1
+     * @since 1 by chad at 2021/9/10 新增
+     * @since 2 by chad at 2021/9/17 修改为msg自带目标分区
+     */
+    private static class MessageRouterImpl implements MessageRouter {
+        private String topicLocal;
+
+        public MessageRouterImpl(String topicLocal) {
+            this.topicLocal = topicLocal;
+        }
+
+        @Override
+        public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+            String partitionStr = msg.getProperty(MSG_PROPERTY_PARTITION_NAME);
+            int partition = 0;
+            if (!StringUtils.isEmpty(partitionStr)) {
+                try {
+                    partition = Integer.parseInt(partitionStr);
+                } catch (NumberFormatException e) {
+                    logger.warn("Parse msg {} property failed for value: {}", MSG_PROPERTY_PARTITION_NAME, partitionStr);
+                }
+            }
+            // topic创建时设置的分区数
+            Integer partitionNum = metadata.numPartitions();
+            // 如果 partition 超出 partitionNum,取余数
+            if (null != partitionNum && partition >= partitionNum) {
+                partition = partition % partitionNum;
+            }
+            return partition;
+        }
+    }
+
+    @Override
+    public void stop() {
+        logger.info("## Stop RocketMQ producer##");
+
+        for (Producer p : PRODUCERS.values()) {
+            try {
+                if (null != p && p.isConnected()) {
+                    p.close();
+                }
+            } catch (PulsarClientException e) {
+                logger.warn("close producer name: {}, topic: {}, error: {}", p.getProducerName(), p.getTopic(), e.getMessage());
+            }
+        }
+
+        super.stop();
+    }
+}

+ 1 - 0
connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMQProducer

@@ -0,0 +1 @@
+pulsarmq=com.alibaba.otter.canal.connector.pulsarmq.producer.CanalPulsarMQProducer

+ 76 - 0
connector/pulsarmq-connector/src/test/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumerTest.java

@@ -0,0 +1,76 @@
+package com.alibaba.otter.canal.connector.pulsarmq.consumer;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.connector.core.config.CanalConstants;
+import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
+import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
+import org.apache.commons.lang.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author chad
+ * @date 2021/9/17 22:32
+ * @since
+ */
+public class CanalPulsarMQConsumerTest {
+
+    private Properties properties;
+
+    private CanalPulsarMQConsumer consumer;
+
+    @Before
+    public void before() {
+        properties = new Properties();
+        properties.setProperty(CanalConstants.CANAL_MQ_FLAT_MESSAGE, String.valueOf(true));
+        properties.setProperty(CanalConstants.CANAL_MQ_CANAL_BATCH_SIZE, String.valueOf(30));
+        properties.setProperty(PulsarMQConstants.PULSARMQ_GET_BATCH_TIMEOUT_SECONDS, String.valueOf(5));
+        properties.setProperty(PulsarMQConstants.PULSARMQ_BATCH_PROCESS_TIMEOUT, String.valueOf(30));
+        properties.setProperty(PulsarMQConstants.PULSARMQ_SERVER_URL, "pulsar://192.168.1.200:6650");
+        properties.setProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN, "123456");
+        properties.setProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME, "test-for-canal-pulsar-consumer");
+        properties.setProperty(PulsarMQConstants.PULSARMQ_REDELIVERY_DELAY_SECONDS, String.valueOf(30));
+        properties.setProperty(PulsarMQConstants.PULSARMQ_ACK_TIMEOUT_SECONDS, String.valueOf(30));
+        properties.setProperty(PulsarMQConstants.PULSARMQ_IS_RETRY, String.valueOf(true));
+        properties.setProperty(PulsarMQConstants.PULSARMQ_IS_RETRY_DLQ_UPPERCASE, String.valueOf(true));
+        properties.setProperty(PulsarMQConstants.PULSARMQ_MAX_REDELIVERY_COUNT, String.valueOf(16));
+
+        consumer = new CanalPulsarMQConsumer();
+        consumer.init(this.properties, "persistent://public/canal/test-topics-partition", "groupid");
+    }
+
+    @After
+    public void after() {
+        consumer.disconnect();
+    }
+
+    @Test
+    public void getMessage() {
+        consumer.connect();
+
+        while (true) {
+            List<CommonMessage> list = consumer.getMessage(-1L, TimeUnit.SECONDS);
+            String time = LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
+            if (null == list || list.isEmpty()) {
+                System.out.println(time + " Receive empty");
+                continue;
+            }
+            for (CommonMessage m : list) {
+                System.out.println(time + " Receive ==> " + JSON.toJSONString(m));
+            }
+
+            consumer.ack();
+        }
+    }
+}

+ 1 - 1
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -237,7 +237,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
             // 初始化分区合并队列
             if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                 List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
-                for (int i = 0; i < destination.getPartitionsNum(); i++) {
+                for (int i = 0; i < partitionNum; i++) {
                     partitionFlatMessages.add(new ArrayList<>());
                 }
 

+ 10 - 2
deployer/src/main/resources/canal.properties

@@ -25,7 +25,7 @@ canal.zkServers =
 # flush data to zk
 canal.zookeeper.flush.period = 1000
 canal.withoutNetty = false
-# tcp, kafka, rocketMQ, rabbitMQ
+# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
 canal.serverMode = tcp
 # flush meta cursor/parse position to file
 canal.file.data.dir = ${canal.conf.dir}
@@ -171,4 +171,12 @@ rabbitmq.virtual.host =
 rabbitmq.exchange =
 rabbitmq.username =
 rabbitmq.password =
-rabbitmq.deliveryMode =
+rabbitmq.deliveryMode =
+
+
+##################################################
+######### 		      Pulsar         #############
+##################################################
+pulsarmq.serverUrl =
+pulsarmq.roleToken =
+pulsarmq.topicTenantPrefix =

+ 4 - 0
deployer/src/main/resources/logback.xml

@@ -85,6 +85,10 @@
 		<level value="INFO" />
 		<appender-ref ref="CANAL-ROOT" />
 	</logger>
+	<logger name="com.alibaba.otter.canal.connector.pulsarmq" additivity="false">
+		<level value="INFO" />
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
 	<root level="WARN">
 		<!-- <appender-ref ref="STDOUT"/>  -->
 		<appender-ref ref="CANAL-ROOT" />

+ 6 - 5
pom.xml

@@ -232,11 +232,6 @@
                 <artifactId>aviator</artifactId>
                 <version>2.2.1</version>
             </dependency>
-            <dependency>
-                <groupId>com.alibaba</groupId>
-                <artifactId>fastjson</artifactId>
-                <version>1.2.58.sec06</version>
-            </dependency>
             <dependency>
                 <groupId>oro</groupId>
                 <artifactId>oro</artifactId>
@@ -345,6 +340,12 @@
                 <artifactId>javax.annotation-api</artifactId>
                 <version>${javax.annotation-api.version}</version>
             </dependency>
+            <!-- Pulsar -->
+            <dependency>
+                <groupId>org.apache.pulsar</groupId>
+                <artifactId>pulsar-client</artifactId>
+                <version>2.8.1</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>