浏览代码

specified mq dynamic topic

mcy 6 年之前
父节点
当前提交
18b304f99d

+ 8 - 10
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -189,7 +189,7 @@ public class RdbSyncService {
                 } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                     delete(batchExecutor, config, dml);
                 } else if (type != null && type.equalsIgnoreCase("TRUNCATE")) {
-                    truncate(batchExecutor, config, dml);
+                    truncate(batchExecutor, config);
                 }
                 if (logger.isDebugEnabled()) {
                     logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
@@ -353,15 +353,13 @@ public class RdbSyncService {
      *
      * @param config
      */
-    private void truncate(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
-        if (dml.getIsTruncate()) {
-            DbMapping dbMapping = config.getDbMapping();
-            StringBuilder sql = new StringBuilder();
-            sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
-            batchExecutor.execute(sql.toString(), new ArrayList<>());
-            if (logger.isTraceEnabled()) {
-                logger.trace("Truncate target table, sql: {}", sql);
-            }
+    private void truncate(BatchExecutor batchExecutor, MappingConfig config) throws SQLException {
+        DbMapping dbMapping = config.getDbMapping();
+        StringBuilder sql = new StringBuilder();
+        sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
+        batchExecutor.execute(sql.toString(), new ArrayList<>());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Truncate target table, sql: {}", sql);
         }
     }
 

+ 2 - 12
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SingleDml.java

@@ -14,7 +14,6 @@ public class SingleDml {
     private String              type;
     private Map<String, Object> data;
     private Map<String, Object> old;
-    private boolean             isTruncate = false;
 
     public String getDestination() {
         return destination;
@@ -64,18 +63,10 @@ public class SingleDml {
         this.old = old;
     }
 
-    public boolean getIsTruncate() {
-        return isTruncate;
-    }
-
-    public void setIsTruncate(boolean isTruncate) {
-        this.isTruncate = isTruncate;
-    }
-
     public static List<SingleDml> dml2SingleDmls(Dml dml) {
-        int size = dml.getData() == null ? 0 : dml.getData().size();
         List<SingleDml> singleDmls = new ArrayList<>();
-        if (size > 0) {
+        if (dml.getData() != null) {
+            int size = dml.getData().size();
             for (int i = 0; i < size; i++) {
                 SingleDml singleDml = new SingleDml();
                 singleDml.setDestination(dml.getDestination());
@@ -94,7 +85,6 @@ public class SingleDml {
             singleDml.setDatabase(dml.getDatabase());
             singleDml.setTable(dml.getTable());
             singleDml.setType(dml.getType());
-            singleDml.setIsTruncate(true);
             singleDmls.add(singleDml);
         }
         return singleDmls;

+ 75 - 40
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -1,12 +1,6 @@
 package com.alibaba.otter.canal.common;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -30,36 +24,46 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class MQMessageUtils {
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap.makeComputingMap(new MapMaker().softValues(),
-                                                                             new Function<String, List<PartitionData>>() {
+    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap
+        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<PartitionData>>() {
 
                                                                                  public List<PartitionData> apply(String pkHashConfigs) {
-                                                                                     List<PartitionData> datas = Lists.newArrayList();
-                                                                                     String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
-                                                                                         ",");
+                                                                                     List<PartitionData> datas = Lists
+                                                                                         .newArrayList();
+                                                                                     String[] pkHashConfigArray = StringUtils
+                                                                                         .split(pkHashConfigs, ",");
                                                                                      // schema.table:id^name
                                                                                      for (String pkHashConfig : pkHashConfigArray) {
                                                                                          PartitionData data = new PartitionData();
-                                                                                         int i = pkHashConfig.lastIndexOf(":");
+                                                                                         int i = pkHashConfig
+                                                                                             .lastIndexOf(":");
                                                                                          if (i > 0) {
-                                                                                             String pkStr = pkHashConfig.substring(i + 1);
-                                                                                             if (pkStr.equalsIgnoreCase("$pk$")) {
+                                                                                             String pkStr = pkHashConfig
+                                                                                                 .substring(i + 1);
+                                                                                             if (pkStr.equalsIgnoreCase(
+                                                                                                 "$pk$")) {
                                                                                                  data.hashMode.autoPkHash = true;
                                                                                              } else {
-                                                                                                 data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
-                                                                                                     '^'));
+                                                                                                 data.hashMode.pkNames = Lists
+                                                                                                     .newArrayList(
+                                                                                                         StringUtils
+                                                                                                             .split(
+                                                                                                                 pkStr,
+                                                                                                                 '^'));
                                                                                              }
 
-                                                                                             pkHashConfig = pkHashConfig.substring(0,
-                                                                                                 i);
+                                                                                             pkHashConfig = pkHashConfig
+                                                                                                 .substring(0, i);
                                                                                          } else {
                                                                                              data.hashMode.tableHash = true;
                                                                                          }
 
-                                                                                         if (!isWildCard(pkHashConfig)) {
+                                                                                         if (!isWildCard(
+                                                                                             pkHashConfig)) {
                                                                                              data.simpleName = pkHashConfig;
                                                                                          } else {
-                                                                                             data.regexFilter = new AviaterRegexFilter(pkHashConfig);
+                                                                                             data.regexFilter = new AviaterRegexFilter(
+                                                                                                 pkHashConfig);
                                                                                          }
                                                                                          datas.add(data);
                                                                                      }
@@ -69,24 +73,29 @@ public class MQMessageUtils {
                                                                              });
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
-                                                                             new Function<String, List<DynamicTopicData>>() {
+    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap
+        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<DynamicTopicData>>() {
 
                                                                                  public List<DynamicTopicData> apply(String pkHashConfigs) {
-                                                                                     List<DynamicTopicData> datas = Lists.newArrayList();
-                                                                                     String[] dynamicTopicArray = StringUtils.split(pkHashConfigs,
-                                                                                         ",");
+                                                                                     List<DynamicTopicData> datas = Lists
+                                                                                         .newArrayList();
+                                                                                     String[] dynamicTopicArray = StringUtils
+                                                                                         .split(pkHashConfigs, ",");
                                                                                      // schema.table
                                                                                      for (String dynamicTopic : dynamicTopicArray) {
                                                                                          DynamicTopicData data = new DynamicTopicData();
 
-                                                                                         if (!isWildCard(dynamicTopic)) {
+                                                                                         if (!isWildCard(
+                                                                                             dynamicTopic)) {
                                                                                              data.simpleName = dynamicTopic;
                                                                                          } else {
-                                                                                             if (dynamicTopic.contains("\\.")) {
-                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic);
+                                                                                             if (dynamicTopic
+                                                                                                 .contains("\\.")) {
+                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(
+                                                                                                     dynamicTopic);
                                                                                              } else {
-                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic);
+                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(
+                                                                                                     dynamicTopic);
                                                                                              }
                                                                                          }
                                                                                          datas.add(data);
@@ -134,15 +143,22 @@ public class MQMessageUtils {
             if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) {
                 put2MapMessage(messages, message.getId(), defaultTopic, entry);
             } else {
-                if (matchDynamicTopic(schemaName + "." + tableName, dynamicTopicConfigs)) {
-                    put2MapMessage(messages, message.getId(), schemaName + "." + tableName, entry);
-                } else if (matchDynamicTopic(schemaName, dynamicTopicConfigs)) {
-                    put2MapMessage(messages, message.getId(), schemaName, entry);
+                Set<String> topics = matchTopics(schemaName + "." + tableName, dynamicTopicConfigs);
+                if (topics != null) {
+                    for (String topic : topics) {
+                        put2MapMessage(messages, message.getId(), topic, entry);
+                    }
                 } else {
-                    put2MapMessage(messages, message.getId(), defaultTopic, entry);
+                    topics = matchTopics(schemaName, dynamicTopicConfigs);
+                    if (topics != null) {
+                        for (String topic : topics) {
+                            put2MapMessage(messages, message.getId(), topic, entry);
+                        }
+                    } else {
+                        put2MapMessage(messages, message.getId(), defaultTopic, entry);
+                    }
                 }
             }
-
         }
         return messages;
     }
@@ -275,8 +291,9 @@ public class MQMessageUtils {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
-                                               + entry.toString(), e);
+                    throw new RuntimeException(
+                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
+                        e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -480,6 +497,24 @@ public class MQMessageUtils {
         return null;
     }
 
+    private static Set<String> matchTopics(String name, String dynamicTopicConfigs) {
+        String[] router = StringUtils.split(dynamicTopicConfigs, ';');
+        Set<String> topics = new HashSet<>();
+        for (String item : router) {
+            int i = item.indexOf(":");
+            if (i > -1) {
+                String topic = item.substring(0, i).trim();
+                String topicConfigs = item.substring(i + 1).trim();
+                if (matchDynamicTopic(name, topicConfigs)) {
+                    topics.add(topic);
+                }
+            } else if (matchDynamicTopic(name, item)) {
+                topics.add(name);
+            }
+        }
+        return topics.isEmpty() ? null : topics;
+    }
+
     public static boolean matchDynamicTopic(String name, String dynamicTopicConfigs) {
         if (StringUtils.isEmpty(dynamicTopicConfigs)) {
             return false;
@@ -520,8 +555,8 @@ public class MQMessageUtils {
 
     private static boolean isWildCard(String value) {
         // not contaiins '.' ?
-        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
-                '^' });
+        return StringUtils.containsAny(value,
+            new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$', '^' });
     }
 
     private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,

+ 23 - 23
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -79,27 +79,31 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
     @Override
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
-
-        if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
-            // 动态topic
-            Map<String, Message> messageMap = MQMessageUtils
-                .messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
-
-            for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
-                String topicName = entry.getKey().replace('.', '_');
-                Message messageSub = entry.getValue();
-                if (logger.isDebugEnabled()) {
-                    logger.debug("## Send message to kafka topic: " + topicName);
+        try {
+            if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
+                // 动态topic
+                Map<String, Message> messageMap = MQMessageUtils
+                    .messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
+
+                for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
+                    String topicName = entry.getKey().replace('.', '_');
+                    Message messageSub = entry.getValue();
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("## Send message to kafka topic: " + topicName);
+                    }
+                    send(canalDestination, topicName, messageSub);
                 }
-                send(canalDestination, topicName, messageSub, callback);
+            } else {
+                send(canalDestination, canalDestination.getTopic(), message);
             }
-        } else {
-            send(canalDestination, canalDestination.getTopic(), message, callback);
+            callback.commit();
+        } catch (Exception e) {
+            callback.rollback();
         }
     }
 
-    private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message,
-                      Callback callback) {
+    private void send(MQProperties.CanalDestination canalDestination, String topicName,
+                      Message message) throws Exception {
         // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
             try {
@@ -134,8 +138,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
                 // producer.abortTransaction();
-                callback.rollback();
-                return;
+                throw e;
             }
         } else {
             // 发送扁平数据json
@@ -156,8 +159,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                         } catch (Exception e) {
                             logger.error(e.getMessage(), e);
                             // producer.abortTransaction();
-                            callback.rollback();
-                            return;
+                            throw e;
                         }
                     } else {
                         FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
@@ -177,8 +179,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 } catch (Exception e) {
                                     logger.error(e.getMessage(), e);
                                     // producer.abortTransaction();
-                                    callback.rollback();
-                                    return;
+                                    throw e;
                                 }
                             }
                         }
@@ -193,7 +194,6 @@ public class CanalKafkaProducer implements CanalMQProducer {
         }
 
         // producer.commitTransaction();
-        callback.commit();
     }
 
 }

+ 21 - 21
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -61,23 +61,28 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     @Override
     public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
                      Callback callback) {
-        if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
-            // 动态topic
-            Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
-                .messageTopics(data, destination.getTopic(), destination.getDynamicTopic());
-
-            for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
-                String topicName = entry.getKey().replace('.', '_');
-                com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
-                send(destination, topicName, messageSub, callback);
+        try {
+            if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
+                // 动态topic
+                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
+                    .messageTopics(data, destination.getTopic(), destination.getDynamicTopic());
+
+                for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
+                    String topicName = entry.getKey().replace('.', '_');
+                    com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
+                    send(destination, topicName, messageSub);
+                }
+            } else {
+                send(destination, destination.getTopic(), data);
             }
-        } else {
-            send(destination, destination.getTopic(), data, callback);
+            callback.commit();
+        } catch (Exception e) {
+            callback.rollback();
         }
     }
 
     public void send(final MQProperties.CanalDestination destination, String topicName,
-                     com.alibaba.otter.canal.protocol.Message data, Callback callback) {
+                     com.alibaba.otter.canal.protocol.Message data) throws Exception {
         if (!mqProperties.getFlatMessage()) {
             try {
                 if (destination.getPartition() != null) {
@@ -132,8 +137,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     }, null);
                                 } catch (Exception e) {
                                     logger.error("send flat message to hashed partition error", e);
-                                    callback.rollback();
-                                    return;
+                                    throw e;
                                 }
                             }
                         }
@@ -141,8 +145,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                 }
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
-                callback.rollback();
-                return;
+                throw e;
             }
         } else {
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
@@ -167,8 +170,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             }, null);
                         } catch (Exception e) {
                             logger.error("send flat message to fixed partition error", e);
-                            callback.rollback();
-                            return;
+                            throw e;
                         }
                     } else {
                         if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
@@ -205,8 +207,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                         }, null);
                                     } catch (Exception e) {
                                         logger.error("send flat message to hashed partition error", e);
-                                        callback.rollback();
-                                        return;
+                                        throw e;
                                     }
                                 }
                             }
@@ -216,7 +217,6 @@ public class CanalRocketMQProducer implements CanalMQProducer {
             }
         }
 
-        callback.commit();
         if (logger.isDebugEnabled()) {
             logger.debug("send message to rocket topic: {}", destination.getTopic());
         }