浏览代码

Some tweaks for pulsarmq-connector (#4060)

* 1. Add the com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer file to the META-INF/canal directory under the pulsarmq-connector project
2. Introduce pulsar-client-admin for Canal to automatically create multi-partition topics
3. Add the judgment that roleToken is null
4. The disconnect method in CanalPulsarMQConsumer removes this.pulsarMQConsumer.unsubscribe();, this code will cause data loss during stop
5. When getting Pulsar messages, they are all processed as flat messages, because CanalMessageSerializerUtil.deserializer(data) will deserialize exceptions
6. Use groupId as subscriptName, without the pulsarmq.subscriptName parameter, the entire adapter will be the same subscriber name using pulsarmq.subscriptName

* 逻辑优化

* 代码优化

* 针对pulsar的调整

* 恢复代码

* 恢复代码

* 处理Producer失效的情况
zhangjukai 3 年之前
父节点
当前提交
25be0259a4

+ 5 - 0
client/pom.xml

@@ -134,6 +134,11 @@
 			<artifactId>pulsar-client</artifactId>
 			<scope>provided</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.pulsar</groupId>
+			<artifactId>pulsar-client-admin</artifactId>
+			<scope>provided</scope>
+		</dependency>
 
 		<!-- junit -->
 		<dependency>

+ 4 - 0
connector/pulsarmq-connector/pom.xml

@@ -38,6 +38,10 @@
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-admin</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 4 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java

@@ -57,6 +57,10 @@ public class PulsarMQConstants {
      * 最大重试次数
      */
     public static final String PULSARMQ_MAX_REDELIVERY_COUNT = ROOT + "." + "maxRedeliveryCount";
+    /**
+     * Pulsar admin服务器地址
+     */
+    public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl";
 
 
 }

+ 12 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java

@@ -30,6 +30,10 @@ public class PulsarMQProducerConfig extends MQProperties {
      * 生产者角色权限,请确保该角色有canal使用的所有topic生产者权限(最低要求)
      */
     private String roleToken;
+    /**
+     * admin服务器地址
+     */
+    private String adminServerUrl;
 
     public String getServerUrl() {
         return serverUrl;
@@ -54,4 +58,12 @@ public class PulsarMQProducerConfig extends MQProperties {
     public void setTopicTenantPrefix(String topicTenantPrefix) {
         this.topicTenantPrefix = topicTenantPrefix;
     }
+
+    public String getAdminServerUrl() {
+        return adminServerUrl;
+    }
+
+    public void setAdminServerUrl(String adminServerUrl) {
+        this.adminServerUrl = adminServerUrl;
+    }
 }

+ 16 - 15
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java

@@ -6,20 +6,14 @@ import com.alibaba.otter.canal.connector.core.config.CanalConstants;
 import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
 import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
 import com.alibaba.otter.canal.connector.core.spi.SPI;
-import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
-import com.alibaba.otter.canal.connector.core.util.MessageUtil;
 import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
-import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.client.api.*;
 
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
 import java.util.List;
 import java.util.Properties;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -112,7 +106,9 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
         }
         this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
         this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
-        this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
+        //this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
+        // 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称
+        this.subscriptName = groupId;
         if (StringUtils.isEmpty(this.subscriptName)) {
             throw new RuntimeException("Pulsar Consumer subscriptName required");
         }
@@ -157,10 +153,12 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
         }
         // 连接创建客户端
         try {
-            pulsarClient = PulsarClient.builder()
-                    .serviceUrl(serviceUrl)
-                    .authentication(AuthenticationFactory.token(roleToken))
-                    .build();
+            //AuthenticationDataProvider
+            ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl);
+            if(StringUtils.isNotEmpty(roleToken)) {
+                builder.authentication(AuthenticationFactory.token(roleToken));
+            }
+            pulsarClient = builder.build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
@@ -220,7 +218,6 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
         List<CommonMessage> messageList = Lists.newArrayList();
         try {
             Messages<byte[]> messages = pulsarMQConsumer.batchReceive();
-
             if (null == messages || messages.size() == 0) {
                 return messageList;
             }
@@ -228,14 +225,17 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
             this.lastGetBatchMessage = messages;
             for (org.apache.pulsar.client.api.Message<byte[]> msg : messages) {
                 byte[] data = msg.getData();
-                if (!this.flatMessage) {
+                /*if (!this.flatMessage) {
                     Message message = CanalMessageSerializerUtil.deserializer(data);
                     List<CommonMessage> list = MessageUtil.convert(message);
                     messageList.addAll(list);
                 } else {
                     CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
                     messageList.add(commonMessage);
-                }
+                }*/
+                // CanalMessageSerializerUtil.deserializer(data) 会转换失败
+                CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
+                messageList.add(commonMessage);
             }
         } catch (PulsarClientException e) {
             throw new CanalClientException("Receive pulsar batch message error", e);
@@ -278,7 +278,8 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
             return;
         }
         try {
-            this.pulsarMQConsumer.unsubscribe();
+            // 会导致暂停期间数据丢失
+            // this.pulsarMQConsumer.unsubscribe();
             this.pulsarClient.close();
         } catch (PulsarClientException e) {
             throw new CanalClientException("Disconnect pulsar consumer error", e);

+ 61 - 16
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java

@@ -17,6 +17,8 @@ import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQProducerConfig;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.shade.com.google.gson.JsonParser;
 import org.slf4j.Logger;
@@ -50,6 +52,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
      * pulsar客户端,管理连接
      */
     protected PulsarClient client;
+    /**
+     * Pulsar admin 客户端
+     */
+    protected PulsarAdmin pulsarAdmin;
 
     @Override
     public void init(Properties properties) {
@@ -61,17 +67,28 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
 
         // 初始化连接客户端
         try {
-            client = PulsarClient.builder()
+            ClientBuilder builder = PulsarClient.builder()
                     // 填写pulsar的连接地址
-                    .serviceUrl(pulsarMQProducerConfig.getServerUrl())
-                    // 角色权限认证的token
-                    .authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()))
-                    .build();
+                    .serviceUrl(pulsarMQProducerConfig.getServerUrl());
+            if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) {
+                // 角色权限认证的token
+                builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
+            }
+            client = builder.build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
-        // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
 
+        // 初始化Pulsar admin
+        if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) {
+            try {
+                pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarMQProducerConfig.getAdminServerUrl()).build();
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        // 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
         int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
         sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
                 parallelPartitionSendThreadSize,
@@ -106,6 +123,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(topicTenantPrefix)) {
             tmpProperties.setTopicTenantPrefix(topicTenantPrefix);
         }
+        String adminServerUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ADMIN_SERVER_URL);
+        if(!StringUtils.isEmpty(adminServerUrl)) {
+            tmpProperties.setAdminServerUrl(adminServerUrl);
+        }
         if (logger.isDebugEnabled()) {
             logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
         }
@@ -182,6 +203,11 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
         if (partitionNum == null) {
             partitionNum = destination.getPartitionsNum();
         }
+        // 创建多分区topic
+        if(pulsarAdmin!=null && partitionNum!=null && partitionNum>0 && PRODUCERS.get(topicName)==null) {
+            createMultipleTopic(topicName, partitionNum);
+        }
+
         ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
         // 并发构造
         MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
@@ -319,22 +345,41 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
     }
 
     /**
-     * 获取指定topic的生产者,并且使用缓存
-     *
+     * 创建多分区topic
      * @param topic
-     * @return org.apache.pulsar.client.api.Producer<byte [ ]>
-     * @date 2021/9/10 11:21
-     * @author chad
-     * @since 1 by chad at 2021/9/10 新增
+     * @param partitionNum
+     */
+    private void createMultipleTopic(String topic,Integer partitionNum) {
+        // 拼接topic前缀
+        PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties;
+        String prefix = pulsarMQProperties.getTopicTenantPrefix();
+        String fullTopic = topic;
+        if (!StringUtils.isEmpty(prefix)) {
+            if (!prefix.endsWith("/")) {
+                fullTopic = "/" + fullTopic;
+            }
+            fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic;
+        }
+
+        // 创建分区topic
+        try {
+            pulsarAdmin.topics().createPartitionedTopic(fullTopic, partitionNum);
+        } catch (PulsarAdminException e) {
+            // TODO 无论是否报错,都继续后续的操作,此处不进行阻塞
+        }
+    }
+    /**
+     * 获取topic
+     * @param topic
+     * @return
      */
     private Producer<byte[]> getProducer(String topic) {
         Producer producer = PRODUCERS.get(topic);
-
-        if (null == producer) {
+        if (null == producer || !producer.isConnected()) {
             try {
                 synchronized (PRODUCERS) {
                     producer = PRODUCERS.get(topic);
-                    if (null != producer) {
+                    if (null != producer && producer.isConnected()) {
                         return producer;
                     }
 
@@ -405,7 +450,7 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
 
     @Override
     public void stop() {
-        logger.info("## Stop RocketMQ producer##");
+        logger.info("## Stop PulsarMQ producer##");
 
         for (Producer p : PRODUCERS.values()) {
             try {

+ 1 - 0
connector/pulsarmq-connector/src/main/resources/META-INF/canal/com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer

@@ -0,0 +1 @@
+pulsarmq=com.alibaba.otter.canal.connector.pulsarmq.consumer.CanalPulsarMQConsumer

+ 6 - 1
pom.xml

@@ -260,7 +260,7 @@
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>druid</artifactId>
-                <version>1.2.6</version>
+                <version>1.2.8</version>
             </dependency>
             <dependency>
                 <groupId>com.lmax</groupId>
@@ -346,6 +346,11 @@
                 <artifactId>pulsar-client</artifactId>
                 <version>2.8.1</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.pulsar</groupId>
+                <artifactId>pulsar-client-admin</artifactId>
+                <version>2.8.1</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>