mcy 6 years ago
parent
commit
5c4b1f6e63

+ 9 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -17,6 +17,7 @@ public class Dml implements Serializable {
     private String                    destination;                            // 对应canal的实例或者MQ的topic
     private String                    database;                               // 数据库或schema
     private String                    table;                                  // 表名
+    private List<String>              pkNames;
     private String                    type;                                   // 类型: INSERT UPDATE DELETE
     // binlog executeTime
     private Long                      es;                                     // 执行耗时
@@ -50,6 +51,14 @@ public class Dml implements Serializable {
         this.table = table;
     }
 
+    public List<String> getPkNames() {
+        return pkNames;
+    }
+
+    public void setPkNames(List<String> pkNames) {
+        this.pkNames = pkNames;
+    }
+
     public String getType() {
         return type;
     }

+ 11 - 6
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -1,11 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -55,6 +50,8 @@ public class MessageUtil {
 
             if (!rowChange.getIsDdl()) {
                 Set<String> updateSet = new HashSet<>();
+                dml.setPkNames(new ArrayList<>());
+                int i = 0;
                 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                     if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                         && eventType != CanalEntry.EventType.DELETE) {
@@ -71,6 +68,11 @@ public class MessageUtil {
                     }
 
                     for (CanalEntry.Column column : columns) {
+                        if (i == 0) {
+                            if (column.getIsKey()) {
+                                dml.getPkNames().add(column.getName());
+                            }
+                        }
                         row.put(column.getName(),
                             JdbcTypeUtil.typeConvert(column.getName(),
                                 column.getValue(),
@@ -101,6 +103,8 @@ public class MessageUtil {
                             old.add(rowOld);
                         }
                     }
+
+                    i++;
                 }
                 if (!data.isEmpty()) {
                     dml.setData(data);
@@ -134,6 +138,7 @@ public class MessageUtil {
         dml.setDestination(destination);
         dml.setDatabase(flatMessage.getDatabase());
         dml.setTable(flatMessage.getTable());
+        dml.setPkNames(flatMessage.getPkNames());
         dml.setType(flatMessage.getType());
         dml.setTs(flatMessage.getTs());
         dml.setEs(flatMessage.getEs());

+ 0 - 5
client-adapter/rdb/pom.xml

@@ -24,11 +24,6 @@
             <version>1.19</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>com.alibaba.fastsql</groupId>
-            <artifactId>fastsql</artifactId>
-            <version>2.0.0_preview_644</version>
-        </dependency>
 
         <dependency>
             <groupId>mysql</groupId>

+ 2 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -172,7 +172,7 @@ public class RdbAdapter implements OuterAdapter {
     public Map<String, Object> count(String task) {
         MappingConfig config = rdbMapping.get(task);
         MappingConfig.DbMapping dbMapping = config.getDbMapping();
-        String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.dbTable(dbMapping);
+        String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.getDbTableName(dbMapping);
         Connection conn = null;
         Map<String, Object> res = new LinkedHashMap<>();
         try {
@@ -198,7 +198,7 @@ public class RdbAdapter implements OuterAdapter {
                 }
             }
         }
-        res.put("targetTable", SyncUtil.dbTable(dbMapping));
+        res.put("targetTable", SyncUtil.getDbTableName(dbMapping));
 
         return res;
     }

+ 11 - 10
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.rdb.config;
 
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -74,19 +75,19 @@ public class MappingConfig {
 
     public static class DbMapping {
 
-        private Boolean             mirrorDb    = false; // 是否镜像库
-        private String              database;            // 数据库名或schema名
-        private String              table;               // 表名
-        private Map<String, String> targetPk;            // 目标表主键字段
-        private Boolean             mapAll      = false; // 映射所有字段
-        private String              targetDb;            // 目标库名
-        private String              targetTable;         // 目标表名
-        private Map<String, String> targetColumns;       // 目标表字段映射
+        private Boolean             mirrorDb    = false;                 // 是否镜像库
+        private String              database;                            // 数据库名或schema名
+        private String              table;                               // 表名
+        private Map<String, String> targetPk    = new LinkedHashMap<>(); // 目标表主键字段
+        private Boolean             mapAll      = false;                 // 映射所有字段
+        private String              targetDb;                            // 目标库名
+        private String              targetTable;                         // 目标表名
+        private Map<String, String> targetColumns;                       // 目标表字段映射
 
-        private String              etlCondition;        // etl条件sql
+        private String              etlCondition;                        // etl条件sql
 
         private int                 readBatch   = 5000;
-        private int                 commitBatch = 5000;  // etl等批量提交大小
+        private int                 commitBatch = 5000;                  // etl等批量提交大小
 
         public boolean isMirrorDb() {
             return mirrorDb == null ? false : mirrorDb;

+ 3 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -109,7 +109,7 @@ public class RdbEtlService {
             logger.info(
                 dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
 
-            etlResult.setResultMessage("导入目标表 " + SyncUtil.dbTable(dbMapping) + " 数据:" + successCount.get() + " 条");
+            etlResult.setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get() + " 条");
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
@@ -187,7 +187,7 @@ public class RdbEtlService {
                     // }
 
                     StringBuilder insertSql = new StringBuilder();
-                    insertSql.append("INSERT INTO ").append(SyncUtil.dbTable(dbMapping)).append(" (");
+                    insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
                     columnsMap
                         .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
 
@@ -209,7 +209,7 @@ public class RdbEtlService {
                             // 删除数据
                             Map<String, Object> values = new LinkedHashMap<>();
                             StringBuilder deleteSql = new StringBuilder(
-                                "DELETE FROM " + SyncUtil.dbTable(dbMapping) + " WHERE ");
+                                "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
                             appendCondition(dbMapping, deleteSql, values, rs);
                             try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
                                 int k = 1;

+ 31 - 11
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -1,20 +1,14 @@
 package com.alibaba.otter.canal.client.adapter.rdb.service;
 
-import java.io.StringWriter;
 import java.sql.Connection;
 import java.sql.Statement;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.sql.DataSource;
 
-import com.alibaba.fastsql.sql.ast.SQLName;
-import com.alibaba.fastsql.sql.ast.SQLStatement;
-import com.alibaba.fastsql.sql.ast.statement.SQLExprTableSource;
-import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
-import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlOutputVisitor;
-import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlSchemaStatVisitor;
-import com.alibaba.fastsql.sql.parser.SQLStatementParser;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,10 +18,13 @@ import com.alibaba.otter.canal.client.adapter.support.Dml;
 
 public class RdbMirrorDbSyncService {
 
-    private static final Logger        logger = LoggerFactory.getLogger(RdbMirrorDbSyncService.class);
+    private static final Logger               logger             = LoggerFactory
+        .getLogger(RdbMirrorDbSyncService.class);
 
-    private Map<String, MappingConfig> mirrorDbConfigCache;                                           // 镜像库配置
-    private DataSource                 dataSource;
+    private Map<String, MappingConfig>        mirrorDbConfigCache;                           // 镜像库配置
+    private DataSource                        dataSource;
+
+    private static Map<String, MappingConfig> tableDbConfigCache = new ConcurrentHashMap<>();
 
     public RdbMirrorDbSyncService(Map<String, MappingConfig> mirrorDbConfigCache, DataSource dataSource,
                                   Integer threads){
@@ -49,6 +46,29 @@ public class RdbMirrorDbSyncService {
             } else {
                 // DML
                 // TODO
+                MappingConfig mappingConfig = tableDbConfigCache
+                    .get(destination + "." + database + "." + dml.getTable());
+                if (mappingConfig == null) {
+                    // 构造一个配置
+                    mappingConfig = new MappingConfig();
+                    mappingConfig.setDataSourceKey(configMap.getDataSourceKey());
+                    mappingConfig.setDestination(configMap.getDestination());
+                    mappingConfig.setOuterAdapterKey(configMap.getOuterAdapterKey());
+                    mappingConfig.setConcurrent(configMap.getConcurrent());
+                    MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();
+                    mappingConfig.setDbMapping(dbMapping);
+                    dbMapping.setDatabase(dml.getDatabase());
+                    dbMapping.setTable(dml.getTable());
+                    dbMapping.setTargetDb(dml.getDatabase());
+                    dbMapping.setTargetTable(dml.getTable());
+                    dbMapping.setMapAll(true);
+                    List<String> pkNames = dml.getPkNames();
+                    Map<String, String> pkMapping = new LinkedHashMap<>();
+                    pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
+                    dbMapping.setTargetPk(pkMapping);
+
+                    tableDbConfigCache.put(destination + "." + database + "." + dml.getTable(), mappingConfig);
+                }
             }
         }
     }

+ 4 - 4
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -166,7 +166,7 @@ public class RdbSyncService {
             Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
             StringBuilder insertSql = new StringBuilder();
-            insertSql.append("INSERT INTO ").append(SyncUtil.dbTable(dbMapping)).append(" (");
+            insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
 
             columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
             int len = insertSql.length();
@@ -230,7 +230,7 @@ public class RdbSyncService {
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
             StringBuilder updateSql = new StringBuilder();
-            updateSql.append("UPDATE ").append(SyncUtil.dbTable(dbMapping)).append(" SET ");
+            updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
             List<Map<String, ?>> values = new ArrayList<>();
             for (String srcColumnName : old.keySet()) {
                 List<String> targetColumnNames = new ArrayList<>();
@@ -282,7 +282,7 @@ public class RdbSyncService {
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
             StringBuilder sql = new StringBuilder();
-            sql.append("DELETE FROM ").append(SyncUtil.dbTable(dbMapping)).append(" WHERE ");
+            sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
 
             List<Map<String, ?>> values = new ArrayList<>();
             // 拼接主键
@@ -315,7 +315,7 @@ public class RdbSyncService {
                 if (columnType == null) {
                     columnType = new LinkedHashMap<>();
                     final Map<String, Integer> columnTypeTmp = columnType;
-                    String sql = "SELECT * FROM " + SyncUtil.dbTable(dbMapping) + " WHERE 1=2";
+                    String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE 1=2";
                     Util.sqlRS(conn, sql, rs -> {
                         try {
                             ResultSetMetaData rsd = rs.getMetaData();

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -255,7 +255,7 @@ public class SyncUtil {
         }
     }
 
-    public static String dbTable(MappingConfig.DbMapping dbMapping) {
+    public static String getDbTableName(MappingConfig.DbMapping dbMapping) {
         String result = "";
         if (StringUtils.isNotEmpty(dbMapping.getTargetDb())) {
             result += dbMapping.getTargetDb() + ".";

+ 0 - 1
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -6,7 +6,6 @@ dbMapping:
   mirrorDb: true
   database: mytest
 #  table: user
-  targetDb: mytest2
 #  targetTable: mytest.tb_user
 #  targetPk:
 #    id: id

+ 86 - 86
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/SqlParserTest.java

@@ -1,86 +1,86 @@
-wpackage com.alibaba.otter.canal.client.adapter.rdb.test;
-
-import com.alibaba.fastsql.sql.ast.SQLName;
-import com.alibaba.fastsql.sql.ast.SQLStatement;
-import com.alibaba.fastsql.sql.ast.statement.SQLCreateTableStatement;
-import com.alibaba.fastsql.sql.ast.statement.SQLExprTableSource;
-import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlCreateTableParser;
-import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
-import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlOutputVisitor;
-import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlSchemaStatVisitor;
-import com.alibaba.fastsql.sql.parser.SQLStatementParser;
-
-import java.io.StringWriter;
-
-public class SqlParserTest {
-
-    public static class TableNameVisitor extends MySqlOutputVisitor {
-
-        public TableNameVisitor(Appendable appender){
-            super(appender);
-        }
-
-        @Override
-        public boolean visit(SQLExprTableSource x) {
-            SQLName table = (SQLName) x.getExpr();
-            String tableName = table.getSimpleName();
-
-            // 改写tableName
-            print0("new_" + tableName.toUpperCase());
-
-            return true;
-        }
-
-    }
-
-    public static void main(String[] args) {
-        // String sql = "select * from `mytest`.`t` where id=1 and name=ming group by
-        // uid limit 1,200 order by ctime";
-
-        String sql = "CREATE TABLE `mytest`.`user` (\n" + "  `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
-                     + "  `name` varchar(30) NOT NULL,\n" + "  `c_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,\n"
-                     + "  `role_id` bigint(20) DEFAULT NULL,\n" + "  `test1` text,\n" + "  `test2` blob,\n"
-                     + "  `key` varchar(30) DEFAULT NULL,\n" + "  PRIMARY KEY (`id`)\n"
-                     + ") ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;";
-
-        // // 新建 MySQL Parser
-        // SQLStatementParser parser = new MySqlStatementParser(sql);
-        //
-        // // 使用Parser解析生成AST,这里SQLStatement就是AST
-        // SQLStatement sqlStatement = parser.parseStatement();
-        //
-        // MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
-        // sqlStatement.accept(visitor);
-        //
-        // System.out.println("getTables:" + visitor.getTables());
-        // System.out.println("getParameters:" + visitor.getParameters());
-        // System.out.println("getOrderByColumns:" + visitor.getOrderByColumns());
-        // System.out.println("getGroupByColumns:" + visitor.getGroupByColumns());
-        // System.out.println("---------------------------------------------------------------------------");
-        //
-        // // 使用select访问者进行select的关键信息打印
-        // // SelectPrintVisitor selectPrintVisitor = new SelectPrintVisitor();
-        // // sqlStatement.accept(selectPrintVisitor);
-        //
-        // System.out.println("---------------------------------------------------------------------------");
-        // // 最终sql输出
-        // StringWriter out = new StringWriter();
-        // TableNameVisitor outputVisitor = new TableNameVisitor(out);
-        // sqlStatement.accept(outputVisitor);
-        // System.out.println(out.toString());
-
-        MySqlCreateTableParser parser1 = new MySqlCreateTableParser(sql);
-        SQLCreateTableStatement createTableStatement = parser1.parseCreateTable();
-//        MySqlSchemaStatVisitor visitor1 = new MySqlSchemaStatVisitor();
-//        createTableStatement.accept(visitor1);
-        // visitor1.getTables().forEach((k, v) -> {
-        // System.out.println(k.);
-        // System.out.println(v);
-        // });
-        // 最终sql输出
-        StringWriter out = new StringWriter();
-        TableNameVisitor outputVisitor = new TableNameVisitor(out);
-        createTableStatement.accept(outputVisitor);
-        System.out.println(out.toString());
-    }
-}
+//wpackage com.alibaba.otter.canal.client.adapter.rdb.test;
+//
+//import com.alibaba.fastsql.sql.ast.SQLName;
+//import com.alibaba.fastsql.sql.ast.SQLStatement;
+//import com.alibaba.fastsql.sql.ast.statement.SQLCreateTableStatement;
+//import com.alibaba.fastsql.sql.ast.statement.SQLExprTableSource;
+//import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlCreateTableParser;
+//import com.alibaba.fastsql.sql.dialect.mysql.parser.MySqlStatementParser;
+//import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlOutputVisitor;
+//import com.alibaba.fastsql.sql.dialect.mysql.visitor.MySqlSchemaStatVisitor;
+//import com.alibaba.fastsql.sql.parser.SQLStatementParser;
+//
+//import java.io.StringWriter;
+//
+//public class SqlParserTest {
+//
+//    public static class TableNameVisitor extends MySqlOutputVisitor {
+//
+//        public TableNameVisitor(Appendable appender){
+//            super(appender);
+//        }
+//
+//        @Override
+//        public boolean visit(SQLExprTableSource x) {
+//            SQLName table = (SQLName) x.getExpr();
+//            String tableName = table.getSimpleName();
+//
+//            // 改写tableName
+//            print0("new_" + tableName.toUpperCase());
+//
+//            return true;
+//        }
+//
+//    }
+//
+//    public static void main(String[] args) {
+//        // String sql = "select * from `mytest`.`t` where id=1 and name=ming group by
+//        // uid limit 1,200 order by ctime";
+//
+//        String sql = "CREATE TABLE `mytest`.`user` (\n" + "  `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+//                     + "  `name` varchar(30) NOT NULL,\n" + "  `c_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,\n"
+//                     + "  `role_id` bigint(20) DEFAULT NULL,\n" + "  `test1` text,\n" + "  `test2` blob,\n"
+//                     + "  `key` varchar(30) DEFAULT NULL,\n" + "  PRIMARY KEY (`id`)\n"
+//                     + ") ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;";
+//
+//        // // 新建 MySQL Parser
+//        // SQLStatementParser parser = new MySqlStatementParser(sql);
+//        //
+//        // // 使用Parser解析生成AST,这里SQLStatement就是AST
+//        // SQLStatement sqlStatement = parser.parseStatement();
+//        //
+//        // MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
+//        // sqlStatement.accept(visitor);
+//        //
+//        // System.out.println("getTables:" + visitor.getTables());
+//        // System.out.println("getParameters:" + visitor.getParameters());
+//        // System.out.println("getOrderByColumns:" + visitor.getOrderByColumns());
+//        // System.out.println("getGroupByColumns:" + visitor.getGroupByColumns());
+//        // System.out.println("---------------------------------------------------------------------------");
+//        //
+//        // // 使用select访问者进行select的关键信息打印
+//        // // SelectPrintVisitor selectPrintVisitor = new SelectPrintVisitor();
+//        // // sqlStatement.accept(selectPrintVisitor);
+//        //
+//        // System.out.println("---------------------------------------------------------------------------");
+//        // // 最终sql输出
+//        // StringWriter out = new StringWriter();
+//        // TableNameVisitor outputVisitor = new TableNameVisitor(out);
+//        // sqlStatement.accept(outputVisitor);
+//        // System.out.println(out.toString());
+//
+//        MySqlCreateTableParser parser1 = new MySqlCreateTableParser(sql);
+//        SQLCreateTableStatement createTableStatement = parser1.parseCreateTable();
+////        MySqlSchemaStatVisitor visitor1 = new MySqlSchemaStatVisitor();
+////        createTableStatement.accept(visitor1);
+//        // visitor1.getTables().forEach((k, v) -> {
+//        // System.out.println(k.);
+//        // System.out.println(v);
+//        // });
+//        // 最终sql输出
+//        StringWriter out = new StringWriter();
+//        TableNameVisitor outputVisitor = new TableNameVisitor(out);
+//        createTableStatement.accept(outputVisitor);
+//        System.out.println(out.toString());
+//    }
+//}