Explorar o código

增加支持按库名、表名动态发送topic支持

mcy %!s(int64=6) %!d(string=hai) anos
pai
achega
d9e259dde4

+ 2 - 0
deployer/src/main/resources/example/instance.properties

@@ -45,6 +45,8 @@ canal.instance.filter.black.regex=
 
 # mq config
 canal.mq.topic=example
+# 动态topic, 需mq支持动态创建topic
+#canal.mq.dynamicTopic=.*,mytest\\..*,mytest2.user
 canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=3

+ 8 - 0
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

@@ -6,6 +6,7 @@ public class CanalMQConfig {
     private Integer partition;
     private Integer partitionsNum;
     private String  partitionHash;
+    private String dynamicTopic;
 
     public String getTopic() {
         return topic;
@@ -39,4 +40,11 @@ public class CanalMQConfig {
         this.partitionHash = partitionHash;
     }
 
+    public String getDynamicTopic() {
+        return dynamicTopic;
+    }
+
+    public void setDynamicTopic(String dynamicTopic) {
+        this.dynamicTopic = dynamicTopic;
+    }
 }

+ 185 - 51
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -1,11 +1,6 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -23,49 +18,143 @@ import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * process MQ Message utils
- * 
+ *
  * @author agapple 2018年12月11日 下午1:28:32
  */
 public class MQMessageUtils {
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
-                                                                       new Function<String, List<PartitionData>>() {
-
-                                                                           public List<PartitionData> apply(String pkHashConfigs) {
-                                                                               List<PartitionData> datas = Lists.newArrayList();
-                                                                               String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
-                                                                                   ",");
-                                                                               // schema.table:id^name
-                                                                               for (String pkHashConfig : pkHashConfigArray) {
-                                                                                   PartitionData data = new PartitionData();
-                                                                                   int i = pkHashConfig.lastIndexOf(":");
-                                                                                   if (i > 0) {
-                                                                                       String pkStr = pkHashConfig.substring(i + 1);
-                                                                                       if (pkStr.equalsIgnoreCase("$pk$")) {
-                                                                                           data.hashMode.autoPkHash = true;
-                                                                                       } else {
-                                                                                           data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
-                                                                                               '^'));
-                                                                                       }
-
-                                                                                       pkHashConfig = pkHashConfig.substring(0,
-                                                                                           i);
-                                                                                   } else {
-                                                                                       data.hashMode.tableHash = true;
-                                                                                   }
-
-                                                                                   if (!isWildCard(pkHashConfig)) {
-                                                                                       data.simpleName = pkHashConfig;
-                                                                                   } else {
-                                                                                       data.regexFilter = new AviaterRegexFilter(pkHashConfig);
-                                                                                   }
-                                                                                   datas.add(data);
-                                                                               }
-
-                                                                               return datas;
-                                                                           }
-                                                                       });
+    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap
+        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<PartitionData>>() {
+
+                                                                                 public List<PartitionData> apply(String pkHashConfigs) {
+                                                                                     List<PartitionData> datas = Lists
+                                                                                         .newArrayList();
+                                                                                     String[] pkHashConfigArray = StringUtils
+                                                                                         .split(pkHashConfigs, ",");
+                                                                                     // schema.table:id^name
+                                                                                     for (String pkHashConfig : pkHashConfigArray) {
+                                                                                         PartitionData data = new PartitionData();
+                                                                                         int i = pkHashConfig
+                                                                                             .lastIndexOf(":");
+                                                                                         if (i > 0) {
+                                                                                             String pkStr = pkHashConfig
+                                                                                                 .substring(i + 1);
+                                                                                             if (pkStr.equalsIgnoreCase(
+                                                                                                 "$pk$")) {
+                                                                                                 data.hashMode.autoPkHash = true;
+                                                                                             } else {
+                                                                                                 data.hashMode.pkNames = Lists
+                                                                                                     .newArrayList(
+                                                                                                         StringUtils
+                                                                                                             .split(
+                                                                                                                 pkStr,
+                                                                                                                 '^'));
+                                                                                             }
+
+                                                                                             pkHashConfig = pkHashConfig
+                                                                                                 .substring(0, i);
+                                                                                         } else {
+                                                                                             data.hashMode.tableHash = true;
+                                                                                         }
+
+                                                                                         if (!isWildCard(
+                                                                                             pkHashConfig)) {
+                                                                                             data.simpleName = pkHashConfig;
+                                                                                         } else {
+                                                                                             data.regexFilter = new AviaterRegexFilter(
+                                                                                                 pkHashConfig);
+                                                                                         }
+                                                                                         datas.add(data);
+                                                                                     }
+
+                                                                                     return datas;
+                                                                                 }
+                                                                             });
+
+    @SuppressWarnings("deprecation")
+    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap
+        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<DynamicTopicData>>() {
+
+                                                                                 public List<DynamicTopicData> apply(String pkHashConfigs) {
+                                                                                     List<DynamicTopicData> datas = Lists
+                                                                                         .newArrayList();
+                                                                                     String[] dynamicTopicArray = StringUtils
+                                                                                         .split(pkHashConfigs, ",");
+                                                                                     // schema.table
+                                                                                     for (String dynamicTopic : dynamicTopicArray) {
+                                                                                         DynamicTopicData data = new DynamicTopicData();
+
+                                                                                         if (!isWildCard(
+                                                                                             dynamicTopic)) {
+                                                                                             data.simpleName = dynamicTopic;
+                                                                                         } else {
+                                                                                             if (dynamicTopic
+                                                                                                 .contains("\\.")) {
+                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(
+                                                                                                     dynamicTopic);
+                                                                                             } else {
+                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(
+                                                                                                     dynamicTopic);
+                                                                                             }
+                                                                                         }
+                                                                                         datas.add(data);
+                                                                                     }
+
+                                                                                     return datas;
+                                                                                 }
+                                                                             });
+
+    /**
+     * 按 schema 或者 schema+table 将 message 分配到对应topic
+     *
+     * @param message 原message
+     * @param defaultTopic 默认topic
+     * @param dynamicTopicConfigs 动态topic规则
+     * @return 分隔后的message map
+     */
+    public static Map<String, Message> messageTopics(Message message, String defaultTopic, String dynamicTopicConfigs) {
+        List<CanalEntry.Entry> entries;
+        if (message.isRaw()) {
+            List<ByteString> rawEntries = message.getRawEntries();
+            entries = new ArrayList<>(rawEntries.size());
+            for (ByteString byteString : rawEntries) {
+                CanalEntry.Entry entry;
+                try {
+                    entry = CanalEntry.Entry.parseFrom(byteString);
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
+                entries.add(entry);
+            }
+        } else {
+            entries = message.getEntries();
+        }
+        Map<String, Message> messages = new HashMap<>();
+        for (CanalEntry.Entry entry : entries) {
+            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                continue;
+            }
+
+            String schemaName = entry.getHeader().getSchemaName();
+            String tableName = entry.getHeader().getTableName();
+
+            if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) {
+                put2MapMessage(messages, message.getId(), defaultTopic, entry);
+            } else {
+                if (matchDynamicTopic(schemaName + "." + tableName, dynamicTopicConfigs)) {
+                    put2MapMessage(messages, message.getId(), schemaName + "." + tableName, entry);
+                } else if (matchDynamicTopic(schemaName, dynamicTopicConfigs)) {
+                    put2MapMessage(messages, message.getId(), schemaName, entry);
+                } else {
+                    put2MapMessage(messages, message.getId(), defaultTopic, entry);
+                }
+            }
+
+        }
+        return messages;
+    }
 
     /**
      * 将 message 分区
@@ -116,7 +205,7 @@ public class MQMessageUtils {
                 if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
                     String database = entry.getHeader().getSchemaName();
                     String table = entry.getHeader().getTableName();
-                    HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                    HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
                     if (hashMode == null) {
                         // 如果都没有匹配,发送到第一个分区
                         partitionEntries[0].add(entry);
@@ -192,8 +281,9 @@ public class MQMessageUtils {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
-                                               + entry.toString(), e);
+                    throw new RuntimeException(
+                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
+                        e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -307,7 +397,7 @@ public class MQMessageUtils {
             if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
                 String database = flatMessage.getDatabase();
                 String table = flatMessage.getTable();
-                HashMode hashMode = getParitionHashColumns(database + "." + table, pkHashConfigs);
+                HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
                 if (hashMode == null) {
                     // 如果都没有匹配,发送到第一个分区
                     partitionMessages[0] = flatMessage;
@@ -373,7 +463,7 @@ public class MQMessageUtils {
     /**
      * match return List , not match return null
      */
-    public static HashMode getParitionHashColumns(String name, String pkHashConfigs) {
+    public static HashMode getPartitionHashColumns(String name, String pkHashConfigs) {
         if (StringUtils.isEmpty(pkHashConfigs)) {
             return null;
         }
@@ -394,6 +484,34 @@ public class MQMessageUtils {
         return null;
     }
 
+    public static boolean matchDynamicTopic(String name, String dynamicTopicConfigs) {
+        if (StringUtils.isEmpty(dynamicTopicConfigs)) {
+            return false;
+        }
+
+        boolean res = false;
+        List<DynamicTopicData> datas = dynamicTopicDatas.get(dynamicTopicConfigs);
+        for (DynamicTopicData data : datas) {
+            if (data.simpleName != null) {
+                if (data.simpleName.equalsIgnoreCase(name)) {
+                    res = true;
+                    break;
+                }
+            } else if (name.contains(".")) {
+                if (data.tableRegexFilter != null && data.tableRegexFilter.filter(name)) {
+                    res = true;
+                    break;
+                }
+            } else {
+                if (data.schemaRegexFilter != null && data.schemaRegexFilter.filter(name)) {
+                    res = true;
+                    break;
+                }
+            }
+        }
+        return res;
+    }
+
     public static boolean checkPkNamesHasContain(List<String> pkNames, String name) {
         for (String pkName : pkNames) {
             if (pkName.equalsIgnoreCase(name)) {
@@ -406,8 +524,18 @@ public class MQMessageUtils {
 
     private static boolean isWildCard(String value) {
         // not contaiins '.' ?
-        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
-                '^' });
+        return StringUtils.containsAny(value,
+            new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$', '^' });
+    }
+
+    private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,
+                                       CanalEntry.Entry entry) {
+        Message message = messageMap.get(topicName);
+        if (message == null) {
+            message = new Message(messageId, new ArrayList<CanalEntry.Entry>());
+            messageMap.put(topicName, message);
+        }
+        message.getEntries().add(entry);
     }
 
     public static class PartitionData {
@@ -424,4 +552,10 @@ public class MQMessageUtils {
         public List<String> pkNames    = Lists.newArrayList();
     }
 
+    public static class DynamicTopicData {
+
+        public String             simpleName;
+        public AviaterRegexFilter schemaRegexFilter;
+        public AviaterRegexFilter tableRegexFilter;
+    }
 }

+ 9 - 1
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.common;
 
-
 /**
  * kafka 配置项
  *
@@ -32,6 +31,7 @@ public class MQProperties {
         private Integer partition;
         private Integer partitionsNum;
         private String  partitionHash;
+        private String  dynamicTopic;
 
         public String getCanalDestination() {
             return canalDestination;
@@ -72,6 +72,14 @@ public class MQProperties {
         public void setPartitionHash(String partitionHash) {
             this.partitionHash = partitionHash;
         }
+
+        public String getDynamicTopic() {
+            return dynamicTopic;
+        }
+
+        public void setDynamicTopic(String dynamicTopic) {
+            this.dynamicTopic = dynamicTopic;
+        }
     }
 
     public String getServers() {

+ 31 - 14
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,8 +1,10 @@
 package com.alibaba.otter.canal.kafka;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -78,15 +80,32 @@ public class CanalKafkaProducer implements CanalMQProducer {
     @Override
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
 
+        if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
+            // 动态topic
+            Map<String, Message> messageMap = MQMessageUtils
+                .messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
+
+            for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
+                String topicName = entry.getKey();
+                Message messageSub = entry.getValue();
+                if (logger.isDebugEnabled()) {
+                    logger.debug("## Send message to kafka topic: " + topicName);
+                }
+                send(canalDestination, topicName, messageSub, callback);
+            }
+        } else {
+            send(canalDestination, canalDestination.getTopic(), message, callback);
+        }
+    }
+
+    private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message,
+                      Callback callback) {
         // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
             try {
                 ProducerRecord<String, Message> record = null;
                 if (canalDestination.getPartition() != null) {
-                    record = new ProducerRecord<>(canalDestination.getTopic(),
-                        canalDestination.getPartition(),
-                        null,
-                        message);
+                    record = new ProducerRecord<>(topicName, canalDestination.getPartition(), null, message);
                 } else {
                     if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                         Message[] messages = MQMessageUtils.messagePartition(message,
@@ -96,11 +115,11 @@ public class CanalKafkaProducer implements CanalMQProducer {
                         for (int i = 0; i < length; i++) {
                             Message messagePartition = messages[i];
                             if (messagePartition != null) {
-                                record = new ProducerRecord<>(canalDestination.getTopic(), i, null, messagePartition);
+                                record = new ProducerRecord<>(topicName, i, null, messagePartition);
                             }
                         }
                     } else {
-                        record = new ProducerRecord<>(canalDestination.getTopic(), 0, null, message);
+                        record = new ProducerRecord<>(topicName, 0, null, message);
                     }
                 }
 
@@ -108,9 +127,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                     producer.send(record).get();
 
                     if (logger.isDebugEnabled()) {
-                        logger.debug("Send  message to kafka topic: [{}], packet: {}",
-                            canalDestination.getTopic(),
-                            message.toString());
+                        logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
                     }
                 }
             } catch (Exception e) {
@@ -126,7 +143,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (canalDestination.getPartition() != null) {
                         try {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
                                 canalDestination.getPartition(),
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
@@ -148,7 +165,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                            topicName,
                                             i,
                                             null,
                                             JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
@@ -163,7 +181,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         } else {
                             try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
                                     0,
                                     null,
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
@@ -178,7 +196,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                     }
                     if (logger.isDebugEnabled()) {
                         logger.debug("Send flat message to kafka topic: [{}], packet: {}",
-                            canalDestination.getTopic(),
+                            topicName,
                             JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                     }
                 }
@@ -187,7 +205,6 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
         // producer.commitTransaction();
         callback.commit();
-
     }
 
 }

+ 33 - 14
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.rocketmq;
 
 import java.util.List;
+import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -59,11 +61,28 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     @Override
     public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
                      Callback callback) {
+        if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
+            // 动态topic
+            Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
+                .messageTopics(data, destination.getTopic(), destination.getDynamicTopic());
+
+            for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
+                String topicName = entry.getKey();
+                com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
+                send(destination, topicName, messageSub, callback);
+            }
+        } else {
+            send(destination, destination.getTopic(), data, callback);
+        }
+    }
+
+    public void send(final MQProperties.CanalDestination destination, String topicName,
+                     com.alibaba.otter.canal.protocol.Message data, Callback callback) {
         if (!mqProperties.getFlatMessage()) {
             try {
                 if (destination.getPartition() != null) {
-                    Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data,
-                        mqProperties.isFilterTransactionEntry()));
+                    Message message = new Message(topicName,
+                        CanalMessageSerializer.serializer(data, mqProperties.isFilterTransactionEntry()));
                     if (logger.isDebugEnabled()) {
                         logger.debug("send message:{} to destination:{}, partition: {}",
                             message,
@@ -83,9 +102,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                     }, null);
                 } else {
                     if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
-                        com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(data,
-                            destination.getPartitionsNum(),
-                            destination.getPartitionHash());
+                        com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils
+                            .messagePartition(data, destination.getPartitionsNum(), destination.getPartitionHash());
                         int length = messages.length;
                         for (int i = 0; i < length; i++) {
                             com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
@@ -97,7 +115,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 }
                                 final int index = i;
                                 try {
-                                    Message message = new Message(destination.getTopic(),
+                                    Message message = new Message(topicName,
                                         CanalMessageSerializer.serializer(dataPartition,
                                             mqProperties.isFilterTransactionEntry()));
                                     this.defaultMQProducer.send(message, new MessageQueueSelector() {
@@ -135,11 +153,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             if (logger.isDebugEnabled()) {
                                 logger.debug("send message: {} to topic: {} fixed partition: {}",
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
-                                    destination.getTopic(),
+                                    topicName,
                                     destination.getPartition());
                             }
-                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage,
-                                SerializerFeature.WriteMapNullValue).getBytes());
+                            Message message = new Message(topicName,
+                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
@@ -168,22 +186,23 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     }
                                     final int index = i;
                                     try {
-                                        Message message = new Message(destination.getTopic(),
+                                        Message message = new Message(topicName,
                                             JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)
                                                 .getBytes());
                                         this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                             @Override
-                                            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                            public MessageQueue select(List<MessageQueue> mqs, Message msg,
+                                                                       Object arg) {
                                                 if (index > mqs.size()) {
-                                                    throw new CanalServerException("partition number is error,config num:"
+                                                    throw new CanalServerException(
+                                                        "partition number is error,config num:"
                                                                                    + destination.getPartitionsNum()
                                                                                    + ", mq num: " + mqs.size());
                                                 }
                                                 return mqs.get(index);
                                             }
-                                        },
-                                            null);
+                                        }, null);
                                     } catch (Exception e) {
                                         logger.error("send flat message to hashed partition error", e);
                                         callback.rollback();

+ 1 - 0
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -137,6 +137,7 @@ public class CanalMQStarter {
                 CanalMQConfig mqConfig = canalInstance.getMqConfig();
                 canalDestination.setTopic(mqConfig.getTopic());
                 canalDestination.setPartition(mqConfig.getPartition());
+                canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
                 canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                 canalDestination.setPartitionHash(mqConfig.getPartitionHash());