Browse Source

init rdb sync

mcy 6 years ago
parent
commit
5ae7112192
19 changed files with 1068 additions and 9 deletions
  1. 3 3
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java
  2. 5 4
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java
  3. 0 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java
  4. 1 0
      client-adapter/pom.xml
  5. 85 0
      client-adapter/rdb/pom.xml
  6. 86 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  7. 142 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java
  8. 102 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfigLoader.java
  9. 368 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  10. 1 0
      client-adapter/rdb/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  11. 12 0
      client-adapter/rdb/src/main/resources/rdb/mytest_user.yml
  12. 29 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/ConfigLoadTest.java
  13. 82 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java
  14. 37 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/TestConstant.java
  15. 28 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java
  16. 47 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java
  17. 13 0
      client-adapter/rdb/src/test/resources/log4j2-test.xml
  18. 13 0
      client-adapter/rdb/src/test/resources/logback-test.xml
  19. 14 0
      client-adapter/rdb/src/test/resources/rdb/mytest_user.yml

+ 3 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -42,7 +42,7 @@ public class ESSyncConfigLoader {
     }
 
     public static synchronized void load() {
-        logger.info("## Start loading mapping config ... ");
+        logger.info("## Start loading es mapping config ... ");
         Collection<String> configs = AdapterConfigs.get("es");
         if (configs == null) {
             return;
@@ -92,7 +92,7 @@ public class ESSyncConfigLoader {
             esSyncConfig.put(c, config);
         }
 
-        logger.info("## Mapping config loaded");
+        logger.info("## ES mapping config loaded");
     }
 
     private static String readConfigContent(String config) {
@@ -113,7 +113,7 @@ public class ESSyncConfigLoader {
             in.read(bytes);
             return new String(bytes, StandardCharsets.UTF_8);
         } catch (IOException e) {
-            throw new RuntimeException("Read yml config error ", e);
+            throw new RuntimeException("Read es mapping config error ", e);
         } finally {
             try {
                 if (in != null) {

+ 5 - 4
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -4,6 +4,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -33,7 +34,7 @@ public class MappingConfigLoader {
      * @return 配置名/配置文件名--对象
      */
     public static Map<String, MappingConfig> load() {
-        logger.info("## Start loading mapping config ... ");
+        logger.info("## Start loading hbase mapping config ... ");
 
         Map<String, MappingConfig> result = new LinkedHashMap<>();
 
@@ -116,7 +117,7 @@ public class MappingConfigLoader {
             result.put(c, config);
         }
 
-        logger.info("## Mapping config loaded");
+        logger.info("## Hbase mapping config loaded");
         return result;
     }
 
@@ -136,9 +137,9 @@ public class MappingConfigLoader {
 
             byte[] bytes = new byte[in.available()];
             in.read(bytes);
-            return new String(bytes, "UTF-8");
+            return new String(bytes, StandardCharsets.UTF_8);
         } catch (IOException e) {
-            throw new RuntimeException("Read ds-config.yml or hbase-mappings.conf error. ", e);
+            throw new RuntimeException("Read hbase mapping config error. ", e);
         } finally {
             try {
                 if (in != null) {

+ 0 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java

@@ -55,8 +55,6 @@ public class AdapterConfig {
                 ds.setMaxWait(60000);
                 ds.setTimeBetweenEvictionRunsMillis(60000);
                 ds.setMinEvictableIdleTimeMillis(300000);
-                ds.setPoolPreparedStatements(false);
-                ds.setMaxPoolPreparedStatementPerConnectionSize(20);
                 ds.setValidationQuery("select 1");
                 try {
                     ds.init();

+ 1 - 0
client-adapter/pom.xml

@@ -22,6 +22,7 @@
         <module>hbase</module>
         <module>elasticsearch</module>
         <module>launcher</module>
+        <module>rdb</module>
     </modules>
 
     <build>

+ 85 - 0
client-adapter/rdb/pom.xml

@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal.client-adapter</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.2-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>client-adapter.rdb</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client adapter rdb module for otter ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.19</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.40</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.1.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.oracle</groupId>
+            <artifactId>ojdbc6</artifactId>
+            <version>11.2.0.4.0-atlassian-hosted</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>mssql-jdbc</artifactId>
+            <version>7.0.0.jre8</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 86 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -0,0 +1,86 @@
+package com.alibaba.otter.canal.client.adapter.rdb;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+@SPI("rdb")
+public class RdbAdapter implements OuterAdapter {
+
+    private static Logger                              logger             = LoggerFactory.getLogger(RdbAdapter.class);
+
+    private static volatile Map<String, MappingConfig> rdbMapping         = null;                                     // 文件名对应配置
+    private static volatile Map<String, MappingConfig> mappingConfigCache = null;                                     // 库名-表名对应配置
+
+    private DruidDataSource                            dataSource;
+
+    private RdbSyncService                             rdbSyncService;
+
+    @Override
+    public void init(OuterAdapterConfig configuration) {
+        if (mappingConfigCache == null) {
+            synchronized (MappingConfig.class) {
+                if (mappingConfigCache == null) {
+                    rdbMapping = MappingConfigLoader.load();
+                    mappingConfigCache = new HashMap<>();
+                    for (MappingConfig mappingConfig : rdbMapping.values()) {
+                        mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                               + mappingConfig.getDbMapping().getDatabase() + "."
+                                               + mappingConfig.getDbMapping().getTable(),
+                            mappingConfig);
+                    }
+                }
+            }
+        }
+
+        Map<String, String> properties = configuration.getProperties();
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
+        dataSource.setUrl(properties.get("jdbc.url"));
+        dataSource.setUsername(properties.get("jdbc.username"));
+        dataSource.setPassword(properties.get("jdbc.password"));
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(2);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            logger.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
+        }
+
+        rdbSyncService = new RdbSyncService(dataSource);
+    }
+
+    @Override
+    public void sync(Dml dml) {
+        String destination = StringUtils.trimToEmpty(dml.getDestination());
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
+        rdbSyncService.sync(config, dml);
+    }
+
+    @Override
+    public void destroy() {
+        if (dataSource != null) {
+            dataSource.close();
+        }
+    }
+}

+ 142 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -0,0 +1,142 @@
+package com.alibaba.otter.canal.client.adapter.rdb.config;
+
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * RDB表映射配置
+ *
+ * @author rewerma 2018-11-07 下午02:41:34
+ * @version 1.0.0
+ */
+public class MappingConfig {
+
+    private String    dataSourceKey; // 数据源key
+
+    private String    destination;   // canal实例或MQ的topic
+
+    private DbMapping dbMapping;     // db映射配置
+
+    public String getDataSourceKey() {
+        return dataSourceKey;
+    }
+
+    public void setDataSourceKey(String dataSourceKey) {
+        this.dataSourceKey = dataSourceKey;
+    }
+
+    public DbMapping getDbMapping() {
+        return dbMapping;
+    }
+
+    public void setDbMapping(DbMapping dbMapping) {
+        this.dbMapping = dbMapping;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public void validate() {
+        if (dbMapping.database == null || dbMapping.database.isEmpty()) {
+            throw new NullPointerException("dbMapping.database");
+        }
+        if (dbMapping.table == null || dbMapping.table.isEmpty()) {
+            throw new NullPointerException("dbMapping.table");
+        }
+        if (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty()) {
+            throw new NullPointerException("dbMapping.targetTable");
+        }
+    }
+
+    public static class DbMapping {
+
+        private String              database;                            // 数据库名或schema名
+        private String              table;                               // 表面名
+        private boolean             mapAll      = false;                 // 映射所有字段
+        private String              targetTable;                         // 目标表名
+        private Map<String, String> targetColums;                        // 目标表字段映射
+        private String              etlCondition;                        // etl条件sql
+
+        private Set<String>         families    = new LinkedHashSet<>(); // column family列表
+        private int                 readBatch   = 5000;
+        private int                 commitBatch = 5000;                  // etl等批量提交大小
+
+        public String getDatabase() {
+            return database;
+        }
+
+        public void setDatabase(String database) {
+            this.database = database;
+        }
+
+        public String getTable() {
+            return table;
+        }
+
+        public void setTable(String table) {
+            this.table = table;
+        }
+
+        public boolean isMapAll() {
+            return mapAll;
+        }
+
+        public void setMapAll(boolean mapAll) {
+            this.mapAll = mapAll;
+        }
+
+        public String getTargetTable() {
+            return targetTable;
+        }
+
+        public void setTargetTable(String targetTable) {
+            this.targetTable = targetTable;
+        }
+
+        public Map<String, String> getTargetColums() {
+            return targetColums;
+        }
+
+        public void setTargetColums(Map<String, String> targetColums) {
+            this.targetColums = targetColums;
+        }
+
+        public String getEtlCondition() {
+            return etlCondition;
+        }
+
+        public void setEtlCondition(String etlCondition) {
+            this.etlCondition = etlCondition;
+        }
+
+        public Set<String> getFamilies() {
+            return families;
+        }
+
+        public void setFamilies(Set<String> families) {
+            this.families = families;
+        }
+
+        public int getReadBatch() {
+            return readBatch;
+        }
+
+        public void setReadBatch(int readBatch) {
+            this.readBatch = readBatch;
+        }
+
+        public int getCommitBatch() {
+            return commitBatch;
+        }
+
+        public void setCommitBatch(int commitBatch) {
+            this.commitBatch = commitBatch;
+        }
+    }
+}

+ 102 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfigLoader.java

@@ -0,0 +1,102 @@
+package com.alibaba.otter.canal.client.adapter.rdb.config;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+
+/**
+ * RDB表映射配置加载器
+ *
+ * @author rewerma 2018-11-07 下午02:41:34
+ * @version 1.0.0
+ */
+public class MappingConfigLoader {
+
+    private static Logger       logger    = LoggerFactory.getLogger(MappingConfigLoader.class);
+
+    private static final String BASE_PATH = "rdb";
+
+    /**
+     * 加载HBase表映射配置
+     * 
+     * @return 配置名/配置文件名--对象
+     */
+    public static Map<String, MappingConfig> load() {
+        logger.info("## Start loading rdb mapping config ... ");
+
+        Map<String, MappingConfig> result = new LinkedHashMap<>();
+
+        Collection<String> configs = AdapterConfigs.get("rdb");
+        if (configs == null) {
+            return result;
+        }
+        for (String c : configs) {
+            if (c == null) {
+                continue;
+            }
+            c = c.trim();
+            if (c.equals("") || c.startsWith("#")) {
+                continue;
+            }
+
+            String configContent = null;
+
+            if (c.endsWith(".yml")) {
+                configContent = readConfigContent(BASE_PATH + "/" + c);
+            }
+
+            MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+
+            try {
+                config.validate();
+            } catch (Exception e) {
+                throw new RuntimeException("ERROR Config: " + c + " " + e.getMessage(), e);
+            }
+            result.put(c, config);
+        }
+
+        logger.info("## Rdb mapping config loaded");
+        return result;
+    }
+
+    public static String readConfigContent(String config) {
+        InputStream in = null;
+        try {
+            // 先取本地文件,再取类路径
+            File configFile = new File("../config/" + config);
+            if (configFile.exists()) {
+                in = new FileInputStream(configFile);
+            } else {
+                in = MappingConfigLoader.class.getClassLoader().getResourceAsStream(config);
+            }
+            if (in == null) {
+                throw new RuntimeException("Rdb mapping config file not found.");
+            }
+
+            byte[] bytes = new byte[in.available()];
+            in.read(bytes);
+            return new String(bytes, StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new RuntimeException("Read rdb mapping config  error. ", e);
+        } finally {
+            try {
+                if (in != null) {
+                    in.close();
+                }
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+    }
+}

+ 368 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -0,0 +1,368 @@
+package com.alibaba.otter.canal.client.adapter.rdb.service;
+
+import java.io.Reader;
+import java.io.StringReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import javax.sql.DataSource;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+/**
+ * RDB同步操作业务
+ *
+ * @author rewerma 2018-11-7 下午06:45:49
+ * @version 1.0.0
+ */
+public class RdbSyncService {
+
+    private static Logger logger = LoggerFactory.getLogger(RdbSyncService.class);
+
+    private DataSource    dataSource;
+
+    public RdbSyncService(DataSource dataSource){
+        this.dataSource = dataSource;
+    }
+
+    public void sync(MappingConfig config, Dml dml) {
+        try {
+            if (config != null) {
+                String type = dml.getType();
+                if (type != null && type.equalsIgnoreCase("INSERT")) {
+                    insert(config, dml);
+                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                    // update(config, dml);
+                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                    // delete(config, dml);
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DML: {}", JSON.toJSONString(dml));
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 插入操作
+     *
+     * @param config 配置项
+     * @param dml DML数据
+     */
+    private void insert(MappingConfig config, Dml dml) throws SQLException {
+        List<Map<String, Object>> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        int idx = 1;
+        boolean complete = false;
+
+        Connection conn = dataSource.getConnection();
+
+        conn.setAutoCommit(false);
+
+        StringBuilder insertSql = new StringBuilder();
+        insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+        if (!dbMapping.isMapAll()) {
+            dbMapping.getTargetColums().forEach((targetColumnName, srcColumnName) -> {
+                insertSql.append(targetColumnName).append(",");
+            });
+            int len = insertSql.length();
+            insertSql.delete(len - 1, len).append(") VALUES (");
+            int mapLen = dbMapping.getTargetColums().size();
+            for (int i = 0; i < mapLen; i++) {
+                insertSql.append("?,");
+            }
+            len = insertSql.length();
+            insertSql.delete(len - 1, len).append(")");
+        }
+
+        PreparedStatement pstmt = conn.prepareStatement(insertSql.toString());
+
+        for (Map<String, Object> r : data) {
+            pstmt.clearParameters();
+            convertData2DbRow(conn, config, r, pstmt);
+
+            pstmt.execute();
+
+            if (idx % config.getDbMapping().getCommitBatch() == 0) {
+                conn.commit();
+                complete = true;
+            }
+            idx++;
+        }
+        if (!complete) {
+            conn.commit();
+        }
+        conn.close();
+    }
+
+    private static void sqlRS(Connection conn, String sql, Consumer<ResultSet> consumer) {
+        Statement stmt = null;
+        ResultSet rs = null;
+        try {
+            stmt = conn.createStatement();
+            rs = stmt.executeQuery(sql);
+            consumer.accept(rs);
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                    rs = null;
+                } catch (SQLException e) {
+                    // ignore
+                }
+            }
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                    stmt = null;
+                } catch (SQLException e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    private static Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
+
+    private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
+        DbMapping dbMapping = config.getDbMapping();
+        String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
+        Map<String, Integer> columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+        if (columnType == null) {
+            synchronized (RdbSyncService.class) {
+                columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+                if (columnType == null) {
+                    columnType = new LinkedHashMap<>();
+                    final Map<String, Integer> columnTypeTmp = columnType;
+                    String sql = "SELECT * FROM " + dbMapping.getTargetTable() + " WHERE 1=2";
+                    sqlRS(conn, sql, rs -> {
+                        try {
+                            ResultSetMetaData rsd = rs.getMetaData();
+                            int columnCount = rsd.getColumnCount();
+                            for (int i = 1; i <= columnCount; i++) {
+                                columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                            }
+                            COLUMNS_TYPE_CACHE.put(cacheKey, columnTypeTmp);
+                        } catch (SQLException e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    });
+                }
+            }
+        }
+        return columnType;
+    }
+
+    /**
+     * 新增类型转换
+     *
+     * @param config
+     * @param data
+     * @param pstmt
+     * @throws SQLException
+     */
+    private void convertData2DbRow(Connection conn, MappingConfig config, Map<String, Object> data,
+                                   PreparedStatement pstmt) throws SQLException {
+        DbMapping dbMapping = config.getDbMapping();
+        if (dbMapping.isMapAll()) {
+
+        } else {
+            Map<String, Integer> ctype = getTargetColumnType(conn, config);
+
+            int i = 1;
+            for (Map.Entry<String, String> entry : dbMapping.getTargetColums().entrySet()) {
+                String targetClassName = entry.getKey();
+                String srcColumnName = entry.getValue();
+                if (srcColumnName == null) {
+                    srcColumnName = targetClassName;
+                }
+
+                Integer type = ctype.get(targetClassName.toLowerCase());
+
+                Object value = data.get(srcColumnName);
+                if (value != null) {
+                    if (type == null) {
+                        throw new RuntimeException("No column: " + targetClassName + " found in target db");
+                    }
+                    switch (type) {
+                        case Types.BIT:
+                        case Types.BOOLEAN:
+                            if (value instanceof Boolean) {
+                                pstmt.setBoolean(i, (Boolean) value);
+                            } else if (value instanceof String) {
+                                boolean v = !value.equals("0");
+                                pstmt.setBoolean(i, v);
+                            } else if (value instanceof Number) {
+                                boolean v = ((Number) value).intValue() != 0;
+                                pstmt.setBoolean(i, v);
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.CHAR:
+                        case Types.NCHAR:
+                        case Types.VARCHAR:
+                        case Types.LONGVARCHAR:
+                            pstmt.setString(i, value.toString());
+                            break;
+                        case Types.TINYINT:
+                            if (value instanceof Number) {
+                                pstmt.setByte(i, ((Number) value).byteValue());
+                            } else if (value instanceof String) {
+                                pstmt.setByte(i, Byte.parseByte((String) value));
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.SMALLINT:
+                            if (value instanceof Number) {
+                                pstmt.setShort(i, ((Number) value).shortValue());
+                            } else if (value instanceof String) {
+                                pstmt.setShort(i, Short.parseShort((String) value));
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.INTEGER:
+                            if (value instanceof Number) {
+                                pstmt.setInt(i, ((Number) value).intValue());
+                            } else if (value instanceof String) {
+                                pstmt.setInt(i, Integer.parseInt((String) value));
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.BIGINT:
+                            if (value instanceof Number) {
+                                pstmt.setLong(i, ((Number) value).longValue());
+                            } else if (value instanceof String) {
+                                pstmt.setLong(i, Long.parseLong((String) value));
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.DECIMAL:
+                        case Types.NUMERIC:
+                            pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
+                            break;
+                        case Types.REAL:
+                            if (value instanceof Number) {
+                                pstmt.setFloat(i, ((Number) value).floatValue());
+                            } else if (value instanceof String) {
+                                pstmt.setFloat(i, Float.parseFloat((String) value));
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.FLOAT:
+                        case Types.DOUBLE:
+                            if (value instanceof Number) {
+                                pstmt.setDouble(i, ((Number) value).doubleValue());
+                            } else if (value instanceof String) {
+                                pstmt.setDouble(i, Double.parseDouble((String) value));
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.BINARY:
+                        case Types.VARBINARY:
+                        case Types.LONGVARBINARY:
+                        case Types.BLOB:
+
+                            if (value instanceof byte[]) {
+                                pstmt.setBytes(i, (byte[]) value);
+                            } else if (value instanceof String) {
+                                pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.CLOB:
+                            if (value instanceof byte[]) {
+                                pstmt.setBytes(i, (byte[]) value);
+                            } else if (value instanceof String) {
+                                Reader clobReader = new StringReader((String) value);
+                                pstmt.setCharacterStream(i, clobReader);
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.DATE:
+                            if (value instanceof Date) {
+                                pstmt.setDate(i, new java.sql.Date(((Date) value).getTime()));
+                            } else if (value instanceof String) {
+                                String v = (String) value;
+                                if (!v.startsWith("0000-00-00")) {
+                                    v = v.trim().replace(" ", "T");
+                                    DateTime dt = new DateTime(v);
+                                    pstmt.setDate(i, new java.sql.Date(dt.toDate().getTime()));
+                                } else {
+                                    pstmt.setNull(i, type);
+                                }
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.TIME:
+                            if (value instanceof Date) {
+                                pstmt.setTime(i, new java.sql.Time(((Date) value).getTime()));
+                            } else if (value instanceof String) {
+                                String v = (String) value;
+                                v = "T" + v;
+                                DateTime dt = new DateTime(v);
+                                pstmt.setTime(i, new Time(dt.toDate().getTime()));
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        case Types.TIMESTAMP:
+                            if (value instanceof Date) {
+                                pstmt.setTimestamp(i, new java.sql.Timestamp(((Date) value).getTime()));
+                            } else if (value instanceof String) {
+                                String v = (String) value;
+                                if (!v.startsWith("0000-00-00")) {
+                                    v = v.trim().replace(" ", "T");
+                                    DateTime dt = new DateTime(v);
+                                    pstmt.setTimestamp(i, new Timestamp(dt.toDate().getTime()));
+                                } else {
+                                    pstmt.setNull(i, type);
+                                }
+                            } else {
+                                pstmt.setNull(i, type);
+                            }
+                            break;
+                        default:
+                            pstmt.setObject(i, value, type);
+                    }
+
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                i++;
+            }
+        }
+    }
+}

+ 1 - 0
client-adapter/rdb/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -0,0 +1 @@
+rdb=com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter

+ 12 - 0
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -0,0 +1,12 @@
+dataSourceKey: defaultDS
+destination: example
+dbMapping:
+  database: mytest
+  table: user
+  targetTable: mytest.tb_user
+  commitBatch: 3000
+  mapAll: false
+  columns:
+    id:
+    name:
+    role_id:

+ 29 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/ConfigLoadTest.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test;
+
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+
+public class ConfigLoadTest {
+
+    @Before
+    public void before() {
+        AdapterConfigs.put("rdb", "mytest_user.yml");
+        // 加载数据源连接池
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+    }
+
+    @Test
+    public void testLoad() {
+        Map<String, MappingConfig> configMap =  MappingConfigLoader.load();
+
+        Assert.assertFalse(configMap.isEmpty());
+    }
+}

+ 82 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java

@@ -0,0 +1,82 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test;
+
+import java.io.BufferedReader;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.sql.*;
+
+import org.junit.Test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+public class DBTest {
+
+    @Test
+    public void test01() throws SQLException {
+        DruidDataSource dataSource = new DruidDataSource();
+        // dataSource.setDriverClassName("oracle.jdbc.OracleDriver");
+        // dataSource.setUrl("jdbc:oracle:thin:@127.0.0.1:49161:XE");
+        // dataSource.setUsername("mytest");
+        // dataSource.setPassword("m121212");
+
+        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+        dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true");
+        dataSource.setUsername("root");
+        dataSource.setPassword("121212");
+
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(2);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+
+        dataSource.init();
+
+        Connection conn = dataSource.getConnection();
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery("select * from user t where 1=2");
+
+        ResultSetMetaData rsm = rs.getMetaData();
+        int cnt = rsm.getColumnCount();
+        for (int i = 1; i <= cnt; i++) {
+            System.out.println(rsm.getColumnName(i) + " " + rsm.getColumnType(i));
+        }
+
+        rs.close();
+        stmt.close();
+
+//        PreparedStatement pstmt = conn
+//            .prepareStatement("insert into tb_user (id,name,role_id,c_time,test1,test2) values (?,?,?,?,?,?)");
+//        pstmt.setBigDecimal(1, new BigDecimal("5"));
+//        pstmt.setString(2, "test");
+//        pstmt.setBigDecimal(3, new BigDecimal("1"));
+//        pstmt.setDate(4, new Date(new java.util.Date().getTime()));
+//        byte[] a = { (byte) 1, (byte) 2 };
+//        pstmt.setBytes(5, a);
+//        pstmt.setBytes(6, a);
+//        pstmt.execute();
+//
+//        pstmt.close();
+
+        conn.close();
+        dataSource.close();
+    }
+
+    private String clob2Str(Clob clob) {
+        String content = "";
+        try {
+            Reader is = clob.getCharacterStream();
+            BufferedReader buff = new BufferedReader(is);
+            String line = buff.readLine();
+            StringBuffer sb = new StringBuffer();
+            while (line != null) {
+                sb.append(line);
+                line = buff.readLine();
+            }
+            content = sb.toString();
+        } catch (Exception e) {
+        }
+        return content;
+    }
+}

+ 37 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/TestConstant.java

@@ -0,0 +1,37 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test;
+
+import java.sql.SQLException;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+public class TestConstant {
+
+    public final static String    jdbcUrl      = "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true";
+    public final static String    jdbcUser     = "root";
+    public final static String    jdbcPassword = "121212";
+
+    public static DruidDataSource dataSource;
+
+    static {
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUser);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setPoolPreparedStatements(false);
+        dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+        dataSource.setValidationQuery("select 1");
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 28 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java

@@ -0,0 +1,28 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.rdb.test.TestConstant;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Common {
+    public static RdbAdapter init() {
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+
+        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
+        outerAdapterConfig.setName("rdb");
+        Map<String, String> properties = new HashMap<>();
+        properties.put("jdbc.driveClassName", "oracle.jdbc.OracleDriver");
+        properties.put("jdbc.url", "jdbc:oracle:thin:@127.0.0.1:49161:XE");
+        properties.put("jdbc.username", "mytest");
+        properties.put("jdbc.password", "m121212");
+        outerAdapterConfig.setProperties(properties);
+
+        RdbAdapter adapter = new RdbAdapter();
+        adapter.init(outerAdapterConfig);
+        return adapter;
+    }
+}

+ 47 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java

@@ -0,0 +1,47 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
+
+import java.io.BufferedReader;
+import java.io.Reader;
+import java.sql.Clob;
+import java.util.*;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+public class OracleSyncTest {
+
+    private RdbAdapter rdbAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("rdb", "mytest_user.yml");
+        rdbAdapter = Common.init();
+    }
+
+    @Test
+    public void test01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        data.put("role_id", 1L);
+        data.put("c_time", new Date());
+        data.put("test1", "sdfasdfawe中国asfwef");
+        dml.setData(dataList);
+
+        rdbAdapter.sync(dml);
+    }
+
+
+}

+ 13 - 0
client-adapter/rdb/src/test/resources/log4j2-test.xml

@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="ERROR">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>

+ 13 - 0
client-adapter/rdb/src/test/resources/logback-test.xml

@@ -0,0 +1,13 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+
+	<root level="TRACE">
+		<appender-ref ref="STDOUT"/>
+	</root>
+</configuration>

+ 14 - 0
client-adapter/rdb/src/test/resources/rdb/mytest_user.yml

@@ -0,0 +1,14 @@
+dataSourceKey: defaultDS
+destination: example
+dbMapping:
+  database: mytest
+  table: user
+  targetTable: mytest.tb_user
+  commitBatch: 3000
+  mapAll: false
+  targetColums:
+    id:
+    name:
+    role_id:
+    c_time:
+    test1: