Forráskód Böngészése

optimize test & connected mistake

jianghang.loujh 1 éve
szülő
commit
307547746c

+ 3 - 2
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -78,14 +78,15 @@ public class KafkaCanalConnector implements CanalMQConnector {
             return;
         }
 
-        connected = true;
         if (kafkaConsumer == null && !flatMessage) {
             kafkaConsumer = new KafkaConsumer<>(properties);
-
         }
+
         if (kafkaConsumer2 == null && flatMessage) {
             kafkaConsumer2 = new KafkaConsumer<>(properties);
         }
+
+        connected = true;
     }
 
     /**

+ 9 - 8
client/src/main/java/com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.java

@@ -1,5 +1,13 @@
 package com.alibaba.otter.canal.client.pulsarmq;
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.otter.canal.client.CanalMQConnector;
 import com.alibaba.otter.canal.client.CanalMessageDeserializer;
@@ -9,13 +17,6 @@ import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.client.api.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 /**
  * PulsarMQ的连接
@@ -256,10 +257,10 @@ public class PulsarMQCanalConnector implements CanalMQConnector {
 
         try {
             this.consumer = builder.subscribe();
+            connected = true;
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
-        connected = true;
     }
 
 

+ 62 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/FastsqlSchemaTest.java

@@ -265,4 +265,66 @@ public class FastsqlSchemaTest {
         System.out.println(data.toString());
     }
 
+
+    @Test
+    public void test_escapse_sql2() {
+        SchemaRepository repository = new SchemaRepository(JdbcConstants.MYSQL);
+        repository.setDefaultSchema("test");
+        String sql = "CREATE TABLE test1 (\n" + "id int(11) NOT NULL AUTO_INCREMENT,\n"
+                     + "uid int(11) NOT NULL DEFAULT '0' COMMENT '用户ID',\n"
+                     + "`from` tinyint(3) NOT NULL DEFAULT '0' COMMENT '来源 1QQ 2微信 3微博',\n"
+                     + "access_code varchar(20) NOT NULL DEFAULT '' COMMENT '临时code',\n"
+                     + "access_token varchar(200) NOT NULL DEFAULT '' COMMENT '授权token',\n"
+                     + "expires_in int(11) NOT NULL DEFAULT '0' COMMENT '过期时间',\n"
+                     + "refresh_token varchar(200) NOT NULL DEFAULT '' COMMENT '刷新token',\n"
+                     + "openid varchar(255) NOT NULL DEFAULT '' COMMENT '授权用户唯一标识',\n"
+                     + "scope varchar(60) NOT NULL COMMENT '授权范围',\n"
+                     + "status tinyint(3) NOT NULL DEFAULT '1' COMMENT '1 ',\n"
+                     + "json_info text NOT NULL COMMENT '个人信息',\n"
+                     + "create_time timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',\n"
+                     + "unionid varchar(128) NOT NULL DEFAULT '' COMMENT 'UnionID',\n"
+                     + "session_key varchar(128) NOT NULL DEFAULT '' COMMENT '小程序session_key',\n"
+                     + "PRIMARY KEY (id),\n" + "KEY uid (uid),\n"
+                     + "KEY idx_openid_from_status (openid, `from`, status),\n" + "KEY idx_access_token (access_token),\n"
+                     + "KEY idx_uid_from_status (uid, `from`, status),\n"
+                     + "KEY idx_unionid_from_status (unionid, `from`, status)\n"
+                     + ") ENGINE = InnoDB AUTO_INCREMENT = 7831300 CHARSET = utf8 COMMENT '用户第三方登录表'";
+        repository.console(sql);
+        SchemaObject table = repository.findTable("test1");
+        Assert.assertTrue(table != null);
+
+        // 应用到新的schema
+        Schema schema = repository.findSchema("test");
+        StringBuilder data = new StringBuilder(4 * 1024);
+        for (String tableIn : schema.showTables()) {
+            SchemaObject schemaObject = schema.findTable(tableIn);
+
+            SQLASTOutputVisitor visitor = SQLUtils.createOutputVisitor(data, DbType.mysql);
+            visitor.config(VisitorFeature.OutputNameQuote, true);
+
+            schemaObject.getStatement().accept(visitor);
+            data.append("; \n");
+        }
+
+        repository.setDefaultSchema("test_new");
+        repository.console(data.toString());
+        table = repository.findTable("test1");
+        Assert.assertTrue(table != null);
+
+        // 打印新的schema的内容
+        schema = repository.findSchema("test_new");
+        data = new StringBuilder(4 * 1024);
+        for (String tableIn : schema.showTables()) {
+            SchemaObject schemaObject = schema.findTable(tableIn);
+
+            SQLASTOutputVisitor visitor = SQLUtils.createOutputVisitor(data, DbType.mysql);
+            visitor.config(VisitorFeature.OutputNameQuote, true);
+
+            schemaObject.getStatement().accept(visitor);
+            data.append("; \n");
+        }
+
+        System.out.println(data.toString());
+    }
+
 }