Browse Source

fixed issue #4899 , druid SchemaRepository support quota

jianghang.loujh 1 year ago
parent
commit
1413cc9196

+ 12 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java

@@ -6,6 +6,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.alibaba.druid.DbType;
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.visitor.SQLASTOutputVisitor;
+import com.alibaba.druid.sql.visitor.VisitorFeature;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -149,7 +153,14 @@ public class MemoryTableMeta implements TableMetaTSDB {
             StringBuilder data = new StringBuilder(4 * 1024);
             for (String table : schema.showTables()) {
                 SchemaObject schemaObject = schema.findTable(table);
-                schemaObject.getStatement().output(data);
+                // fixed issue #4899
+                // snapshot输出的DDL语句未正确处理mysql keyword
+                // 导致canal重启回滚时会出现ddl解析失败的问题
+                // schemaObject.getStatement().output(data);
+                SQLASTOutputVisitor visitor = SQLUtils.createOutputVisitor(data, DbType.mysql);
+                visitor.config(VisitorFeature.OutputNameQuote, true);
+                schemaObject.getStatement().accept(visitor);
+
                 data.append("; \n");
             }
             schemaDdls.put(schema.getName(), data.toString());

+ 80 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/FastsqlSchemaTest.java

@@ -3,8 +3,13 @@ package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.alibaba.druid.DbType;
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.repository.Schema;
 import com.alibaba.druid.sql.repository.SchemaObject;
 import com.alibaba.druid.sql.repository.SchemaRepository;
+import com.alibaba.druid.sql.visitor.SQLASTOutputVisitor;
+import com.alibaba.druid.sql.visitor.VisitorFeature;
 import com.alibaba.druid.util.JdbcConstants;
 
 public class FastsqlSchemaTest {
@@ -184,5 +189,80 @@ public class FastsqlSchemaTest {
         repository.console(sql4);
         table = repository.findTable("test4");
         Assert.assertTrue(table != null);
+
+        String sql5 = "CREATE TABLE `test5` (\n" + "\t`Id` varchar(32) NOT NULL COMMENT '',\n"
+                      + "\t`ExitId` varchar(32) NOT NULL COMMENT '',\n"
+                      + "\t`CreateTime` datetime NOT NULL COMMENT '创建时间',\n"
+                      + "\t`archive_date` datetime NOT NULL DEFAULT '2099-01-01 00:00:00',\n"
+                      + "\tPRIMARY KEY (`Id`, `archive_date`),\n"
+                      + "\tGLOBAL INDEX `g_i_id` (`id`) COVERING (`ExitId`) \n" + "\t\tPARTITION BY KEY(`Id`)\n"
+                      + "\t\tPARTITIONS 16,\n" + "\tKEY `ExitId` USING BTREE (`ExitId`),\n"
+                      + "\tKEY `CreateTime` (`CreateTime`),\n" + "\tKEY `i_id_ExitId` USING BTREE (`Id`, `ExitId`),\n"
+                      + "\tKEY `auto_shard_key_ExitId_id` USING BTREE (`ExitId`, `Id`)\n"
+                      + ") ENGINE = InnoDB DEFAULT CHARSET = utf8\n" + "PARTITION BY KEY(`ExitId`,`Id`)\n"
+                      + "PARTITIONS 16\n" + "LOCAL PARTITION BY RANGE (archive_date)\n" + "INTERVAL 1 MONTH\n"
+                      + "EXPIRE AFTER 27\n" + "PRE ALLOCATE 2\n" + "PIVOTDATE NOW()";
+        repository.console(sql5);
+        table = repository.findTable("test5");
+        Assert.assertTrue(table != null);
     }
+
+    @Test
+    public void test_escapse_sql() {
+        SchemaRepository repository = new SchemaRepository(JdbcConstants.MYSQL);
+        repository.setDefaultSchema("test");
+        String sql = "CREATE TABLE test1 (\n" + "\tid int(11) NOT NULL AUTO_INCREMENT,\n"
+                     + "\tcluster_id int(11) NOT NULL COMMENT '集群id',\n"
+                     + "\tcomponent_id int(11) NOT NULL COMMENT '组件id',\n"
+                     + "\tcomponent_type_code tinyint(1) NOT NULL COMMENT '组件类型',\n"
+                     + "\ttype varchar(128) COLLATE utf8_bin NOT NULL COMMENT '配置类型',\n"
+                     + "\trequired tinyint(1) NOT NULL COMMENT 'true/false',\n"
+                     + "\t`key` varchar(256) COLLATE utf8_bin NOT NULL COMMENT '配置键',\n"
+                     + "\t`value` text COLLATE utf8_bin COMMENT '默认配置项',\n"
+                     + "\t`values` varchar(512) COLLATE utf8_bin DEFAULT NULL COMMENT '可配置项',\n"
+                     + "\tdependencyKey varchar(256) COLLATE utf8_bin DEFAULT NULL COMMENT '依赖键',\n"
+                     + "\tdependencyValue varchar(256) COLLATE utf8_bin DEFAULT NULL COMMENT '依赖值',\n"
+                     + "\t`desc` varchar(512) COLLATE utf8_bin DEFAULT NULL COMMENT '描述',\n"
+                     + "\tgmt_create datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',\n"
+                     + "\tgmt_modified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',\n"
+                     + "\tis_deleted tinyint(1) NOT NULL DEFAULT '0' COMMENT '0正常 1逻辑删除',\n" + "\tPRIMARY KEY (id),\n"
+                     + "\tKEY index_cluster_id (cluster_id),\n" + "\tKEY index_componentId (component_id)\n"
+                     + ") ENGINE = InnoDB AUTO_INCREMENT = 1140 CHARSET = utf8 COLLATE = utf8_bin;";
+        repository.console(sql);
+        SchemaObject table = repository.findTable("test1");
+        Assert.assertTrue(table != null);
+        // 应用到新的schema
+        Schema schema = repository.findSchema("test");
+        StringBuilder data = new StringBuilder(4 * 1024);
+        for (String tableIn : schema.showTables()) {
+            SchemaObject schemaObject = schema.findTable(tableIn);
+
+            SQLASTOutputVisitor visitor = SQLUtils.createOutputVisitor(data, DbType.mysql);
+            visitor.config(VisitorFeature.OutputNameQuote, true);
+
+            schemaObject.getStatement().accept(visitor);
+            data.append("; \n");
+        }
+
+        repository.setDefaultSchema("test_new");
+        repository.console(data.toString());
+        table = repository.findTable("test1");
+        Assert.assertTrue(table != null);
+
+        // 打印新的schema的内容
+        schema = repository.findSchema("test_new");
+        data = new StringBuilder(4 * 1024);
+        for (String tableIn : schema.showTables()) {
+            SchemaObject schemaObject = schema.findTable(tableIn);
+
+            SQLASTOutputVisitor visitor = SQLUtils.createOutputVisitor(data, DbType.mysql);
+            visitor.config(VisitorFeature.OutputNameQuote, true);
+
+            schemaObject.getStatement().accept(visitor);
+            data.append("; \n");
+        }
+
+        System.out.println(data.toString());
+    }
+
 }

+ 1 - 1
pom.xml

@@ -326,7 +326,7 @@
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>druid</artifactId>
-                <version>1.2.17</version>
+                <version>1.2.21</version>
             </dependency>
             <dependency>
                 <groupId>com.lmax</groupId>