Browse Source

新增配置 canal.mq.enableDynamicQueuePartition,获取topic对应的队列的数量为分区的数量,以支持动态队列的场景(如阿里云的rocketmq,无法人工设置队列数量,可能会动态伸缩) (#3670)

kkjinping 3 years ago
parent
commit
997957d6e0

+ 9 - 0
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQDestination.java

@@ -15,6 +15,7 @@ public class MQDestination {
     private String  partitionHash;
     private String  dynamicTopic;
     private String  dynamicTopicPartitionNum;
+    private Boolean enableDynamicQueuePartition;
 
     public String getCanalDestination() {
         return canalDestination;
@@ -71,4 +72,12 @@ public class MQDestination {
     public void setDynamicTopicPartitionNum(String dynamicTopicPartitionNum) {
         this.dynamicTopicPartitionNum = dynamicTopicPartitionNum;
     }
+
+    public Boolean getEnableDynamicQueuePartition() {
+        return enableDynamicQueuePartition;
+    }
+
+    public void setEnableDynamicQueuePartition(Boolean enableDynamicQueuePartition) {
+        this.enableDynamicQueuePartition = enableDynamicQueuePartition;
+    }
 }

+ 43 - 24
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -1,29 +1,5 @@
 package com.alibaba.otter.canal.connector.rocketmq.producer;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalException;
@@ -40,6 +16,29 @@ import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
 import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQConstants;
 import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQProducerConfig;
 import com.alibaba.otter.canal.protocol.FlatMessage;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * RocketMQ Producer SPI 实现
@@ -186,6 +185,12 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         // 获取当前topic的分区数
         Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
             destination.getDynamicTopicPartitionNum());
+        
+        // 获取topic的队列数为分区数
+        if(partitionNum == null){
+            partitionNum = getTopicDynamicQueuesSize(destination.getEnableDynamicQueuePartition(), topicName);
+        }
+        
         if (partitionNum == null) {
             partitionNum = destination.getPartitionsNum();
         }
@@ -353,4 +358,18 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
 
         super.stop();
     }
+    
+    private Integer getTopicDynamicQueuesSize(Boolean enable, String topicName){
+        if(enable!=null && enable){
+            topicName = this.defaultMQProducer.withNamespace(topicName);
+            DefaultMQProducerImpl innerProducer = this.defaultMQProducer.getDefaultMQProducerImpl();
+            TopicPublishInfo topicInfo = innerProducer.getTopicPublishInfoTable().get(topicName);
+            if(topicInfo == null){
+                return null;
+            }else{
+                return topicInfo.getMessageQueueList().size();
+            }
+        }
+        return null;
+    }
 }

+ 1 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -206,5 +206,6 @@
 		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
 		<property name="partitionHash" value="${canal.mq.partitionHash}" />
 		<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
+		<property name="enableDynamicQueuePartition" value="${canal.mq.enableDynamicQueuePartition}" />
 	</bean>
 </beans>

+ 1 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -192,5 +192,6 @@
         <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
         <property name="partitionHash" value="${canal.mq.partitionHash}" />
 		<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
+		<property name="enableDynamicQueuePartition" value="${canal.mq.enableDynamicQueuePartition}" />
 	</bean>
 </beans>

+ 1 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -287,5 +287,6 @@
         <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
         <property name="partitionHash" value="${canal.mq.partitionHash}" />
 		<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
+		<property name="enableDynamicQueuePartition" value="${canal.mq.enableDynamicQueuePartition}" />
     </bean>
 </beans>

+ 1 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -180,5 +180,6 @@
 		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
 		<property name="partitionHash" value="${canal.mq.partitionHash}" />
 		<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
+		<property name="enableDynamicQueuePartition" value="${canal.mq.enableDynamicQueuePartition}" />
 	</bean>
 </beans>

+ 9 - 0
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -8,6 +8,7 @@ public class CanalMQConfig {
     private String  partitionHash;
     private String  dynamicTopic;
     private String  dynamicTopicPartitionNum;
+    private Boolean enableDynamicQueuePartition;
 
     public String getTopic() {
         return topic;
@@ -56,4 +57,12 @@ public class CanalMQConfig {
     public void setDynamicTopicPartitionNum(String dynamicTopicPartitionNum) {
         this.dynamicTopicPartitionNum = dynamicTopicPartitionNum;
     }
+
+    public Boolean getEnableDynamicQueuePartition() {
+        return enableDynamicQueuePartition;
+    }
+
+    public void setEnableDynamicQueuePartition(Boolean enableDynamicQueuePartition) {
+        this.enableDynamicQueuePartition = enableDynamicQueuePartition;
+    }
 }

+ 12 - 12
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -1,17 +1,5 @@
 package com.alibaba.otter.canal.server;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
 import com.alibaba.otter.canal.connector.core.config.MQProperties;
 import com.alibaba.otter.canal.connector.core.producer.MQDestination;
 import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
@@ -21,6 +9,17 @@ import com.alibaba.otter.canal.instance.core.CanalMQConfig;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CanalMQStarter {
 
@@ -157,6 +156,7 @@ public class CanalMQStarter {
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                 canalDestination.setPartitionHash(mqConfig.getPartitionHash());
                 canalDestination.setDynamicTopicPartitionNum(mqConfig.getDynamicTopicPartitionNum());
+                canalDestination.setEnableDynamicQueuePartition(mqConfig.getEnableDynamicQueuePartition());
 
                 canalServer.subscribe(clientIdentity);
                 logger.info("## the MQ producer: {} is running now ......", destination);