1
0
Эх сурвалжийг харах

add support for listener name which is described in https://pulsar.apache.org/docs/next/concepts-multiple-advertised-listeners/ (#5118)

JZ 1 жил өмнө
parent
commit
e2219c88dd

+ 28 - 2
client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java

@@ -59,6 +59,12 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
      * 角色认证token
      */
     private String roleToken;
+
+    /**
+     * listener name
+     */
+    private String listenerName;
+
     /**
      * 订阅客户端名称
      */
@@ -129,6 +135,11 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
      */
     public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
             , String subscriptName) {
+        this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName,null);
+    }
+
+    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
+            , String subscriptName, String listenerName) {
         this.isFlatMessage = isFlatMessage;
         this.serviceUrl = serviceUrl;
         this.roleToken = roleToken;
@@ -137,6 +148,7 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
         if (StringUtils.isEmpty(this.subscriptName)) {
             throw new RuntimeException("Pulsar Consumer subscriptName required");
         }
+        this.listenerName = listenerName;
     }
 
     /**
@@ -150,6 +162,15 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
             , String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds
             , int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase
             , int maxRedeliveryCount) {
+        this(isFlatMessage, serviceUrl, roleToken, topic, subscriptName, batchSize, getBatchTimeoutSeconds
+                , batchProcessTimeoutSeconds, redeliveryDelaySeconds, ackTimeoutSeconds, isRetry, isRetryDLQUpperCase
+                , maxRedeliveryCount, null);
+    }
+
+    public PulsarMQCanalConnector(boolean isFlatMessage, String serviceUrl, String roleToken, String topic
+            , String subscriptName, int batchSize, int getBatchTimeoutSeconds, int batchProcessTimeoutSeconds
+            , int redeliveryDelaySeconds, int ackTimeoutSeconds, boolean isRetry, boolean isRetryDLQUpperCase
+            , int maxRedeliveryCount, String listenerName) {
         this.isFlatMessage = isFlatMessage;
         this.serviceUrl = serviceUrl;
         this.roleToken = roleToken;
@@ -166,15 +187,20 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
         this.isRetry = isRetry;
         this.isRetryDLQUpperCase = isRetryDLQUpperCase;
         this.maxRedeliveryCount = maxRedeliveryCount;
+        this.listenerName = listenerName;
     }
 
     @Override
     public void connect() throws CanalClientException {
         // 连接创建客户端
         try {
-            pulsarClient = PulsarClient.builder()
+            ClientBuilder builder = PulsarClient.builder()
                     .serviceUrl(serviceUrl)
-                    .authentication(AuthenticationFactory.token(roleToken))
+                    .authentication(AuthenticationFactory.token(roleToken));
+            if (StringUtils.isNotEmpty(listenerName)) {
+                builder.listenerName(listenerName);
+            }
+            pulsarClient = builder
                     .build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);

+ 5 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java

@@ -61,4 +61,9 @@ public class PulsarMQConstants {
      */
     public static final String PULSARMQ_ADMIN_SERVER_URL          = ROOT + "." + "adminServerUrl";
 
+    /**
+     * Pulsar admin服务器地址
+     */
+    public static final String PULSARMQ_LISTENER_NAME          = ROOT + "." + "listenerName";
+
 }

+ 13 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java

@@ -37,6 +37,11 @@ public class PulsarMQProducerConfig extends MQProperties {
      */
     private String adminServerUrl;
 
+    /**
+     * listener name
+     */
+    private String listenerName;
+
     public String getServerUrl() {
         return serverUrl;
     }
@@ -68,4 +73,12 @@ public class PulsarMQProducerConfig extends MQProperties {
     public void setAdminServerUrl(String adminServerUrl) {
         this.adminServerUrl = adminServerUrl;
     }
+
+    public String getListenerName() {
+        return listenerName;
+    }
+
+    public void setListenerName(String listenerName) {
+        this.listenerName = listenerName;
+    }
 }

+ 10 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java

@@ -54,6 +54,12 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
      * 角色认证token
      */
     private String                    roleToken;
+
+    /**
+     *  listener name
+     */
+    private String                    listenerName;
+
     /**
      * 订阅客户端名称
      */
@@ -110,6 +116,7 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
         }
         this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
         this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
+        this.listenerName = properties.getProperty(PulsarMQConstants.PULSARMQ_LISTENER_NAME);
         this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
         // 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称
         if (StringUtils.isEmpty(this.subscriptName)) {
@@ -165,6 +172,9 @@ public class CanalPulsarMQConsumer implements CanalMsgConsumer {
             if (StringUtils.isNotEmpty(roleToken)) {
                 builder.authentication(AuthenticationFactory.token(roleToken));
             }
+            if (StringUtils.isNotEmpty(listenerName)) {
+                builder.authentication(AuthenticationFactory.token(listenerName));
+            }
             pulsarClient = builder.build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);

+ 10 - 0
connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java

@@ -73,6 +73,11 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
                 // 角色权限认证的token
                 builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
             }
+            if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getListenerName())) {
+                //listener name
+                builder.listenerName(pulsarMQProducerConfig.getListenerName());
+            }
+
             client = builder.build();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
@@ -127,6 +132,11 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(adminServerUrl)) {
             tmpProperties.setAdminServerUrl(adminServerUrl);
         }
+        String listenerName = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_LISTENER_NAME);
+        if (!StringUtils.isEmpty(listenerName)) {
+            tmpProperties.setListenerName(listenerName);
+        }
+
         if (logger.isDebugEnabled()) {
             logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
         }