Explorar el Código

fixed issue #1625, mq dymaic topic match

agapple hace 6 años
padre
commit
661d119265

+ 8 - 6
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -52,6 +52,8 @@ public class CanalStater {
             // disable netty
             System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
             String autoScan = CanalController.getProperty(properties, CanalConstants.CANAL_AUTO_SCAN);
+            // 设置为raw避免ByteString->Entry的二次解析
+            System.setProperty("canal.instance.memory.rawEntry", "false");
             if ("true".equals(autoScan)) {
                 String rootDir = CanalController.getProperty(properties, CanalConstants.CANAL_CONF_DIR);
                 if (StringUtils.isEmpty(rootDir)) {
@@ -68,13 +70,13 @@ public class CanalStater {
                     });
                     if (instanceDirs != null && instanceDirs.length > 0) {
                         List<String> instances = Lists.transform(Arrays.asList(instanceDirs),
-                                new Function<File, String>() {
+                            new Function<File, String>() {
 
-                                    @Override
-                                    public String apply(File instanceDir) {
-                                        return instanceDir.getName();
-                                    }
-                                });
+                                @Override
+                                public String apply(File instanceDir) {
+                                    return instanceDir.getName();
+                                }
+                            });
                         System.setProperty(CanalConstants.CANAL_DESTINATIONS, Joiner.on(",").join(instances));
                     }
                 }

+ 1 - 1
deployer/src/main/resources/example/instance.properties

@@ -46,7 +46,7 @@ canal.instance.filter.black.regex=
 # mq config
 canal.mq.topic=example
 # dynamic topic route by schema or table regex
-#canal.mq.dynamicTopic=.*,mytest,mytest\\..*,mytest2.user
+#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
 canal.mq.partition=0
 # hash partition config
 #canal.mq.partitionsNum=4

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -509,7 +509,7 @@ public class MQMessageUtils {
     }
 
     private static Set<String> matchTopics(String name, String dynamicTopicConfigs) {
-        String[] router = StringUtils.split(dynamicTopicConfigs, ';');
+        String[] router = StringUtils.split(dynamicTopicConfigs, ',');
         Set<String> topics = new HashSet<>();
         for (String item : router) {
             int i = item.indexOf(":");

+ 0 - 3
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -53,9 +53,6 @@ public class CanalMQStarter {
                 System.setProperty("canal.instance.filter.transaction.entry", "true");
             }
 
-            // 设置为raw避免ByteString->Entry的二次解析
-            System.setProperty("canal.instance.memory.rawEntry", "false");
-
             canalServer = CanalServerWithEmbedded.instance();
 
             // 对应每个instance启动一个worker线程