Browse Source

支持不同topic指定专属partitionNum (#2479)

liguangyao 5 years ago
parent
commit
b81e9dfeb8

+ 9 - 0
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQDestination.java

@@ -14,6 +14,7 @@ public class MQDestination {
     private Integer partitionsNum;
     private Integer partitionsNum;
     private String  partitionHash;
     private String  partitionHash;
     private String  dynamicTopic;
     private String  dynamicTopic;
+    private String  dynamicTopicPartitionNum;
 
 
     public String getCanalDestination() {
     public String getCanalDestination() {
         return canalDestination;
         return canalDestination;
@@ -62,4 +63,12 @@ public class MQDestination {
     public void setDynamicTopic(String dynamicTopic) {
     public void setDynamicTopic(String dynamicTopic) {
         this.dynamicTopic = dynamicTopic;
         this.dynamicTopic = dynamicTopic;
     }
     }
+
+    public String getDynamicTopicPartitionNum() {
+        return dynamicTopicPartitionNum;
+    }
+
+    public void setDynamicTopicPartitionNum(String dynamicTopicPartitionNum) {
+        this.dynamicTopicPartitionNum = dynamicTopicPartitionNum;
+    }
 }
 }

+ 57 - 0
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQMessageUtils.java

@@ -103,6 +103,38 @@ public class MQMessageUtils {
                                                                                  }
                                                                                  }
                                                                              });
                                                                              });
 
 
+    private static Map<String, List<TopicPartitionData>> topicPartitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
+                                                                                .softValues(),
+                                                                                new Function<String, List<TopicPartitionData>>() {
+
+                                                                                    public List<TopicPartitionData> apply(String tPConfigs) {
+                                                                                        List<TopicPartitionData> datas = Lists.newArrayList();
+                                                                                        String[] tPArray = StringUtils.split(StringUtils.replace(tPConfigs,
+                                                                                                ",",
+                                                                                                ";"),
+                                                                                                ";");
+                                                                                        for (String tPConfig : tPArray) {
+                                                                                            TopicPartitionData data = new TopicPartitionData();
+                                                                                            int i = tPConfig.lastIndexOf(":");
+                                                                                            if (i > 0) {
+                                                                                                String tStr = tPConfig.substring(0, i);
+                                                                                                String pStr = tPConfig.substring(i + 1);
+                                                                                                if (!isWildCard(tStr)) {
+                                                                                                    data.simpleName = tStr;
+                                                                                                } else {
+                                                                                                    data.regexFilter = new AviaterRegexFilter(tStr);
+                                                                                                }
+                                                                                                if (!StringUtils.isEmpty(pStr) && StringUtils.isNumeric(pStr)) {
+                                                                                                    data.partitionNum = Integer.valueOf(pStr);
+                                                                                                }
+                                                                                                datas.add(data);
+                                                                                            }
+                                                                                        }
+
+                                                                                        return datas;
+                                                                                    }
+                                                                                });
+
     /**
     /**
      * 按 schema 或者 schema+table 将 message 分配到对应topic
      * 按 schema 或者 schema+table 将 message 分配到对应topic
      *
      *
@@ -619,6 +651,24 @@ public class MQMessageUtils {
         return false;
         return false;
     }
     }
 
 
+    public static Integer parseDynamicTopicPartition(String name, String tPConfigs) {
+        if (!StringUtils.isEmpty(tPConfigs)) {
+            List<TopicPartitionData> datas = topicPartitionDatas.get(tPConfigs);
+            for (TopicPartitionData data : datas) {
+                if (data.simpleName != null) {
+                    if (data.simpleName.equalsIgnoreCase(name)) {
+                        return data.partitionNum;
+                    }
+                } else {
+                    if (data.regexFilter.filter(name)) {
+                        return data.partitionNum;
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
     private static boolean isWildCard(String value) {
     private static boolean isWildCard(String value) {
         // not contaiins '.' ?
         // not contaiins '.' ?
         return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
         return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
@@ -656,6 +706,13 @@ public class MQMessageUtils {
         public AviaterRegexFilter tableRegexFilter;
         public AviaterRegexFilter tableRegexFilter;
     }
     }
 
 
+    public static class TopicPartitionData {
+
+        public String             simpleName;
+        public AviaterRegexFilter regexFilter;
+        public Integer            partitionNum;
+    }
+
     public static class EntryRowData {
     public static class EntryRowData {
 
 
         public Entry     entry;
         public Entry     entry;

+ 7 - 2
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java

@@ -196,6 +196,11 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
 
 
     private List<Future> send(MQDestination mqDestination, String topicName, Message message, boolean flat) {
     private List<Future> send(MQDestination mqDestination, String topicName, Message message, boolean flat) {
         List<ProducerRecord<String, byte[]>> records = new ArrayList<>();
         List<ProducerRecord<String, byte[]>> records = new ArrayList<>();
+        // 获取当前topic的分区数
+        Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName, mqDestination.getDynamicTopicPartitionNum());
+        if (partitionNum == null) {
+            partitionNum = mqDestination.getPartitionsNum();
+        }
         if (!flat) {
         if (!flat) {
             if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
             if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
                 // 并发构造
                 // 并发构造
@@ -203,7 +208,7 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
                 // 串行分区
                 // 串行分区
                 Message[] messages = MQMessageUtils.messagePartition(datas,
                 Message[] messages = MQMessageUtils.messagePartition(datas,
                     message.getId(),
                     message.getId(),
-                    mqDestination.getPartitionsNum(),
+                    partitionNum,
                     mqDestination.getPartitionHash(),
                     mqDestination.getPartitionHash(),
                     this.mqProperties.isDatabaseHash());
                     this.mqProperties.isDatabaseHash());
                 int length = messages.length;
                 int length = messages.length;
@@ -233,7 +238,7 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
             for (FlatMessage flatMessage : flatMessages) {
             for (FlatMessage flatMessage : flatMessages) {
                 if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
                 if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
                     FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                     FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
-                        mqDestination.getPartitionsNum(),
+                        partitionNum,
                         mqDestination.getPartitionHash(),
                         mqDestination.getPartitionHash(),
                         this.mqProperties.isDatabaseHash());
                         this.mqProperties.isDatabaseHash());
                     int length = partitionFlatMessage.length;
                     int length = partitionFlatMessage.length;

+ 7 - 2
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -161,6 +161,11 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
     }
     }
 
 
     public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
     public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
+        // 获取当前topic的分区数
+        Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName, destination.getDynamicTopicPartitionNum());
+        if (partitionNum == null) {
+            partitionNum = destination.getPartitionsNum();
+        }
         if (!mqProperties.isFlatMessage()) {
         if (!mqProperties.isFlatMessage()) {
             if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
             if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                 // 并发构造
                 // 并发构造
@@ -168,7 +173,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                 // 串行分区
                 // 串行分区
                 com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
                 com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
                     message.getId(),
                     message.getId(),
-                    destination.getPartitionsNum(),
+                    partitionNum,
                     destination.getPartitionHash(),
                     destination.getPartitionHash(),
                     mqProperties.isDatabaseHash());
                     mqProperties.isDatabaseHash());
                 int length = messages.length;
                 int length = messages.length;
@@ -207,7 +212,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
 
 
                 for (FlatMessage flatMessage : flatMessages) {
                 for (FlatMessage flatMessage : flatMessages) {
                     FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                     FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
-                        destination.getPartitionsNum(),
+                        partitionNum,
                         destination.getPartitionHash(),
                         destination.getPartitionHash(),
                         mqProperties.isDatabaseHash());
                         mqProperties.isDatabaseHash());
                     int length = partitionFlatMessage.length;
                     int length = partitionFlatMessage.length;

+ 1 - 0
deployer/src/main/resources/example/instance.properties

@@ -54,4 +54,5 @@ canal.mq.partition=0
 # hash partition config
 # hash partition config
 #canal.mq.partitionsNum=3
 #canal.mq.partitionsNum=3
 #canal.mq.partitionHash=test.table:id^name,.*\\..*
 #canal.mq.partitionHash=test.table:id^name,.*\\..*
+#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
 #################################################
 #################################################

+ 1 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -200,5 +200,6 @@
 		<property name="partition" value="${canal.mq.partition}" />
 		<property name="partition" value="${canal.mq.partition}" />
 		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
 		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
 		<property name="partitionHash" value="${canal.mq.partitionHash}" />
 		<property name="partitionHash" value="${canal.mq.partitionHash}" />
+		<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
 	</bean>
 	</bean>
 </beans>
 </beans>

+ 1 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -186,5 +186,6 @@
         <property name="partition" value="${canal.mq.partition}" />
         <property name="partition" value="${canal.mq.partition}" />
         <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
         <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
         <property name="partitionHash" value="${canal.mq.partitionHash}" />
         <property name="partitionHash" value="${canal.mq.partitionHash}" />
+		<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
 	</bean>
 	</bean>
 </beans>
 </beans>

+ 1 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -276,5 +276,6 @@
         <property name="partition" value="${canal.mq.partition}" />
         <property name="partition" value="${canal.mq.partition}" />
         <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
         <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
         <property name="partitionHash" value="${canal.mq.partitionHash}" />
         <property name="partitionHash" value="${canal.mq.partitionHash}" />
+		<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
     </bean>
     </bean>
 </beans>
 </beans>

+ 1 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -174,5 +174,6 @@
 		<property name="partition" value="${canal.mq.partition}" />
 		<property name="partition" value="${canal.mq.partition}" />
 		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
 		<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
 		<property name="partitionHash" value="${canal.mq.partitionHash}" />
 		<property name="partitionHash" value="${canal.mq.partitionHash}" />
+		<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
 	</bean>
 	</bean>
 </beans>
 </beans>

+ 9 - 0
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -7,6 +7,7 @@ public class CanalMQConfig {
     private Integer partitionsNum;
     private Integer partitionsNum;
     private String  partitionHash;
     private String  partitionHash;
     private String  dynamicTopic;
     private String  dynamicTopic;
+    private String  dynamicTopicPartitionNum;
 
 
     public String getTopic() {
     public String getTopic() {
         return topic;
         return topic;
@@ -47,4 +48,12 @@ public class CanalMQConfig {
     public void setDynamicTopic(String dynamicTopic) {
     public void setDynamicTopic(String dynamicTopic) {
         this.dynamicTopic = dynamicTopic;
         this.dynamicTopic = dynamicTopic;
     }
     }
+
+    public String getDynamicTopicPartitionNum() {
+        return dynamicTopicPartitionNum;
+    }
+
+    public void setDynamicTopicPartitionNum(String dynamicTopicPartitionNum) {
+        this.dynamicTopicPartitionNum = dynamicTopicPartitionNum;
+    }
 }
 }

+ 1 - 0
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -160,6 +160,7 @@ public class CanalMQStarter {
                 canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
                 canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                 canalDestination.setPartitionHash(mqConfig.getPartitionHash());
                 canalDestination.setPartitionHash(mqConfig.getPartitionHash());
+                canalDestination.setDynamicTopicPartitionNum(mqConfig.getDynamicTopicPartitionNum());
 
 
                 canalServer.subscribe(clientIdentity);
                 canalServer.subscribe(clientIdentity);
                 logger.info("## the MQ producer: {} is running now ......", destination);
                 logger.info("## the MQ producer: {} is running now ......", destination);