소스 검색

Merge pull request #1126 from rewerma/master

 关系型数据同步adapter, 支持 mysql oracle postgresql sqlserver
agapple 6 년 전
부모
커밋
8c1f15a49e
36개의 변경된 파일1892개의 추가작업 그리고 58개의 파일을 삭제
  1. 1 1
      client-adapter/common/pom.xml
  2. 90 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java
  3. 3 3
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java
  4. 2 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  5. 5 4
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java
  6. 4 40
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java
  7. 2 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java
  8. 14 1
      client-adapter/launcher/pom.xml
  9. 7 0
      client-adapter/launcher/src/main/assembly/dev.xml
  10. 7 0
      client-adapter/launcher/src/main/assembly/release.xml
  11. 0 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java
  12. 2 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  13. 15 0
      client-adapter/launcher/src/main/resources/application.yml
  14. 4 2
      client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java
  15. 1 2
      client-adapter/pom.xml
  16. 85 0
      client-adapter/rdb/pom.xml
  17. 221 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  18. 8 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/MysqlAdapter.java
  19. 8 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/OracleAdapter.java
  20. 8 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/PostgresqlAdapter.java
  21. 8 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/SqlserverAdapter.java
  22. 162 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java
  23. 100 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfigLoader.java
  24. 290 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  25. 527 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  26. 4 0
      client-adapter/rdb/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  27. 15 0
      client-adapter/rdb/src/main/resources/oracle/mytest_user.yml
  28. 15 0
      client-adapter/rdb/src/main/resources/postgresql/mytest_user.yml
  29. 29 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/ConfigLoadTest.java
  30. 82 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java
  31. 37 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/TestConstant.java
  32. 29 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java
  33. 66 0
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java
  34. 13 0
      client-adapter/rdb/src/test/resources/log4j2-test.xml
  35. 13 0
      client-adapter/rdb/src/test/resources/logback-test.xml
  36. 15 0
      client-adapter/rdb/src/test/resources/oracle/mytest_user.yml

+ 1 - 1
client-adapter/common/pom.xml

@@ -14,7 +14,7 @@
         <dependency>
             <groupId>com.alibaba.otter</groupId>
             <artifactId>canal.protocol</artifactId>
-            <version>${canal_version}</version>
+            <version>1.1.2-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>joda-time</groupId>

+ 90 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -0,0 +1,90 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Util {
+
+    private static final Logger logger = LoggerFactory.getLogger(Util.class);
+
+    /**
+     * 通过DS执行sql
+     */
+    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
+        Connection conn = null;
+        Statement stmt = null;
+        ResultSet rs = null;
+        try {
+            conn = ds.getConnection();
+            stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            stmt.setFetchSize(Integer.MIN_VALUE);
+            rs = stmt.executeQuery(sql);
+            return fun.apply(rs);
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    /**
+     * sql执行获取resultSet
+     *
+     * @param conn sql connection
+     * @param sql sql
+     * @param consumer 回调方法
+     */
+    public static void sqlRS(Connection conn, String sql, Consumer<ResultSet> consumer) {
+        Statement stmt = null;
+        ResultSet rs = null;
+        try {
+            stmt = conn.createStatement();
+            rs = stmt.executeQuery(sql);
+            consumer.accept(rs);
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+}

+ 3 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -42,7 +42,7 @@ public class ESSyncConfigLoader {
     }
 
     public static synchronized void load() {
-        logger.info("## Start loading mapping config ... ");
+        logger.info("## Start loading es mapping config ... ");
         Collection<String> configs = AdapterConfigs.get("es");
         if (configs == null) {
             return;
@@ -92,7 +92,7 @@ public class ESSyncConfigLoader {
             esSyncConfig.put(c, config);
         }
 
-        logger.info("## Mapping config loaded");
+        logger.info("## ES mapping config loaded");
     }
 
     private static String readConfigContent(String config) {
@@ -113,7 +113,7 @@ public class ESSyncConfigLoader {
             in.read(bytes);
             return new String(bytes, StandardCharsets.UTF_8);
         } catch (IOException e) {
-            throw new RuntimeException("Read yml config error ", e);
+            throw new RuntimeException("Read es mapping config error ", e);
         } finally {
             try {
                 if (in != null) {

+ 2 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -10,6 +10,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
@@ -73,7 +74,7 @@ public class ESSyncService {
                     dml.getDestination());
             }
             if (logger.isDebugEnabled()) {
-                logger.debug("DML: {}", JSON.toJSONString(dml));
+                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
             }
         }
     }

+ 5 - 4
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -4,6 +4,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -33,7 +34,7 @@ public class MappingConfigLoader {
      * @return 配置名/配置文件名--对象
      */
     public static Map<String, MappingConfig> load() {
-        logger.info("## Start loading mapping config ... ");
+        logger.info("## Start loading hbase mapping config ... ");
 
         Map<String, MappingConfig> result = new LinkedHashMap<>();
 
@@ -116,7 +117,7 @@ public class MappingConfigLoader {
             result.put(c, config);
         }
 
-        logger.info("## Mapping config loaded");
+        logger.info("## Hbase mapping config loaded");
         return result;
     }
 
@@ -136,9 +137,9 @@ public class MappingConfigLoader {
 
             byte[] bytes = new byte[in.available()];
             in.read(bytes);
-            return new String(bytes, "UTF-8");
+            return new String(bytes, StandardCharsets.UTF_8);
         } catch (IOException e) {
-            throw new RuntimeException("Read ds-config.yml or hbase-mappings.conf error. ", e);
+            throw new RuntimeException("Read hbase mapping config error. ", e);
         } finally {
             try {
                 if (in != null) {

+ 4 - 40
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -14,6 +14,7 @@ import java.util.function.Function;
 
 import javax.sql.DataSource;
 
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,43 +35,6 @@ public class HbaseEtlService {
 
     private static Logger logger = LoggerFactory.getLogger(HbaseEtlService.class);
 
-    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
-        Connection conn = null;
-        Statement stmt = null;
-        ResultSet rs = null;
-        try {
-            conn = ds.getConnection();
-            stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-            stmt.setFetchSize(Integer.MIN_VALUE);
-            rs = stmt.executeQuery(sql);
-            return fun.apply(rs);
-        } finally {
-            if (rs != null) {
-                try {
-                    rs.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            if (stmt != null) {
-                try {
-                    stmt.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            if (conn != null) {
-                try {
-                    conn.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            rs = null;
-            stmt = null;
-            conn = null;
-        }
-    }
 
     /**
      * 建表
@@ -138,7 +102,7 @@ public class HbaseEtlService {
             if (params != null && params.size() == 1 && hbaseMapping.getEtlCondition() == null) {
                 AtomicBoolean stExists = new AtomicBoolean(false);
                 // 验证是否有SYS_TIME字段
-                sqlRS(ds, sql, rs -> {
+                Util.sqlRS(ds, sql, rs -> {
                     try {
                         ResultSetMetaData rsmd = rs.getMetaData();
                         int cnt = rsmd.getColumnCount();
@@ -169,7 +133,7 @@ public class HbaseEtlService {
 
             // 获取总数
             String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
-            long cnt = (Long) sqlRS(ds, countSql, rs -> {
+            long cnt = (Long) Util.sqlRS(ds, countSql, rs -> {
                 Long count = null;
                 try {
                     if (rs.next()) {
@@ -244,7 +208,7 @@ public class HbaseEtlService {
     private static boolean executeSqlImport(DataSource ds, String sql, MappingConfig.HbaseMapping hbaseMapping,
                                             HbaseTemplate hbaseTemplate, AtomicLong successCount, List<String> errMsg) {
         try {
-            sqlRS(ds, sql, rs -> {
+            Util.sqlRS(ds, sql, rs -> {
                 int i = 1;
 
                 try {

+ 2 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -7,6 +7,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.hbase.support.*;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
@@ -39,7 +40,7 @@ public class HbaseSyncService {
                     delete(config, dml);
                 }
                 if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml));
+                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                 }
             }
         } catch (Exception e) {

+ 14 - 1
client-adapter/launcher/pom.xml

@@ -33,7 +33,7 @@
         <dependency>
             <groupId>com.alibaba.otter</groupId>
             <artifactId>canal.client</artifactId>
-            <version>${canal_version}</version>
+            <version>1.1.2-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.yaml</groupId>
@@ -107,6 +107,19 @@
             <classifier>jar-with-dependencies</classifier>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.rdb</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>*</artifactId>
+                    <groupId>*</groupId>
+                </exclusion>
+            </exclusions>
+            <classifier>jar-with-dependencies</classifier>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <build>

+ 7 - 0
client-adapter/launcher/src/main/assembly/dev.xml

@@ -43,6 +43,13 @@
                 <include>**/*</include>
             </includes>
         </fileSet>
+		<fileSet>
+			<directory>../rdb/src/main/resources/</directory>
+			<outputDirectory>/config</outputDirectory>
+            <excludes>
+                <exclude>META-INF/**</exclude>
+            </excludes>
+		</fileSet>
 		<fileSet>
 			<directory>target</directory>
 			<outputDirectory>logs</outputDirectory>

+ 7 - 0
client-adapter/launcher/src/main/assembly/release.xml

@@ -44,6 +44,13 @@
                 <include>**/*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>../rdb/src/main/resources/</directory>
+            <outputDirectory>/config</outputDirectory>
+            <excludes>
+                <exclude>META-INF/**</exclude>
+            </excludes>
+        </fileSet>
         <fileSet>
             <directory>target</directory>
             <outputDirectory>logs</outputDirectory>

+ 0 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java

@@ -55,8 +55,6 @@ public class AdapterConfig {
                 ds.setMaxWait(60000);
                 ds.setTimeBetweenEvictionRunsMillis(60000);
                 ds.setMinEvictableIdleTimeMillis(300000);
-                ds.setPoolPreparedStatements(false);
-                ds.setMaxPoolPreparedStatementPerConnectionSize(20);
                 ds.setValidationQuery("select 1");
                 try {
                     ds.init();

+ 2 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.errors.WakeupException;
 
@@ -36,7 +37,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             groupId,
             canalClientConfig.getBatchSize(),
             flatMessage);
-        // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
+        connector.setSessionTimeout(30L, TimeUnit.SECONDS);
     }
 
     @Override

+ 15 - 0
client-adapter/launcher/src/main/resources/application.yml

@@ -4,6 +4,7 @@ logging:
   level:
     com.alibaba.otter.canal.client.adapter.hbase: DEBUG
     com.alibaba.otter.canal.client.adapter.es: DEBUG
+    com.alibaba.otter.canal.client.adapter.rdb: DEBUG
 spring:
   jackson:
     date-format: yyyy-MM-dd HH:mm:ss
@@ -36,6 +37,18 @@ spring:
 #    - groupId: g2
 #      outAdapters:
 #      - name: logger
+#      - name: oracle
+#        properties:
+#          jdbc.driverClassName: oracle.jdbc.OracleDriver
+#          jdbc.url: jdbc:oracle:thin:@127.0.0.1:49161:XE
+#          jdbc.username: mytest
+#          jdbc.password: m121212
+#      - name: postgresql
+#        properties:
+#          jdbc.driverClassName: org.postgresql.Driver
+#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
+#          jdbc.username: postgres
+#          jdbc.password: 121212
 #  mqTopics:
 #  - mqMode: rocketmq
 #    topic: example
@@ -53,3 +66,5 @@ spring:
 #  adapterConfigs:
 #  - hbase/mytest_person2.yml
 #  - es/mytest_user.yml
+#  - oracle/mytest_user.yml
+#  - postgresql/mytest_user.yml

+ 4 - 2
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -3,9 +3,11 @@ package com.alibaba.otter.canal.client.adapter.logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
 
 /**
@@ -27,7 +29,7 @@ public class LoggerAdapterExample implements OuterAdapter {
 
     @Override
     public void sync(Dml dml) {
-        logger.info("DML: {}", dml.toString());
+        logger.info("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
     }
 
     @Override

+ 1 - 2
client-adapter/pom.xml

@@ -14,8 +14,6 @@
         <java_source_version>1.8</java_source_version>
         <java_target_version>1.8</java_target_version>
         <file_encoding>UTF-8</file_encoding>
-
-        <canal_version>1.1.2-SNAPSHOT</canal_version>
     </properties>
 
     <modules>
@@ -24,6 +22,7 @@
         <module>hbase</module>
         <module>elasticsearch</module>
         <module>launcher</module>
+        <module>rdb</module>
     </modules>
 
     <build>

+ 85 - 0
client-adapter/rdb/pom.xml

@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>canal.client-adapter</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.1.2-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>client-adapter.rdb</artifactId>
+    <packaging>jar</packaging>
+    <name>canal client adapter rdb module for otter ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.19</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.40</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.1.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.oracle</groupId>
+            <artifactId>ojdbc6</artifactId>
+            <version>11.2.0.4.0-atlassian-hosted</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>mssql-jdbc</artifactId>
+            <version>7.0.0.jre8</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 221 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -0,0 +1,221 @@
+package com.alibaba.otter.canal.client.adapter.rdb;
+
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.*;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
+import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
+import com.alibaba.otter.canal.client.adapter.support.*;
+
+public abstract class RdbAdapter implements OuterAdapter {
+
+    private static Logger                       logger             = LoggerFactory.getLogger(RdbAdapter.class);
+
+    private volatile Map<String, MappingConfig> rdbMapping         = null;                                     // 文件名对应配置
+    private volatile Map<String, MappingConfig> mappingConfigCache = null;                                     // 库名-表名对应配置
+
+    private DruidDataSource                     dataSource;
+
+    private RdbSyncService                      rdbSyncService;
+
+    @Override
+    public void init(OuterAdapterConfig configuration) {
+        if (mappingConfigCache == null) {
+            synchronized (MappingConfig.class) {
+                if (mappingConfigCache == null) {
+                    SPI spi = this.getClass().getAnnotation(SPI.class);
+                    rdbMapping = MappingConfigLoader.load(spi.value());
+                    mappingConfigCache = new HashMap<>();
+                    for (MappingConfig mappingConfig : rdbMapping.values()) {
+                        mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                               + mappingConfig.getDbMapping().getDatabase() + "."
+                                               + mappingConfig.getDbMapping().getTable(),
+                            mappingConfig);
+                    }
+                }
+            }
+        }
+
+        Map<String, String> properties = configuration.getProperties();
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
+        dataSource.setUrl(properties.get("jdbc.url"));
+        dataSource.setUsername(properties.get("jdbc.username"));
+        dataSource.setPassword(properties.get("jdbc.password"));
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(2);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            logger.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
+        }
+
+        rdbMapping.values().forEach(config -> {
+            try {
+                MappingConfig.DbMapping dbMapping = config.getDbMapping();
+                // 从源表加载所有字段名
+                if (dbMapping.getAllColumns() == null) {
+                    synchronized (RdbSyncService.class) {
+                        if (dbMapping.getAllColumns() == null) {
+                            DataSource srcDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+                            Connection srcConn = srcDS.getConnection();
+                            String srcMetaSql = "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable()
+                                                + " WHERE 1=2 ";
+                            List<String> srcColumns = new ArrayList<>();
+                            Util.sqlRS(srcConn, srcMetaSql, rs -> {
+                                try {
+                                    ResultSetMetaData rmd = rs.getMetaData();
+                                    int cnt = rmd.getColumnCount();
+                                    for (int i = 1; i <= cnt; i++) {
+                                        srcColumns.add(rmd.getColumnName(i).toLowerCase());
+                                    }
+                                } catch (SQLException e) {
+                                    logger.error(e.getMessage(), e);
+                                }
+                            });
+                            Map<String, String> columnsMap = new LinkedHashMap<>();
+
+                            for (String srcColumn : srcColumns) {
+                                String targetColumn = srcColumn;
+                                if (dbMapping.getTargetColumns() != null) {
+                                    for (Map.Entry<String, String> entry : dbMapping.getTargetColumns().entrySet()) {
+                                        String targetColumnName = entry.getKey();
+                                        String srcColumnName = entry.getValue();
+
+                                        if (srcColumnName != null
+                                            && srcColumnName.toLowerCase().equals(srcColumn.toUpperCase())) {
+                                            targetColumn = targetColumnName;
+                                        }
+                                    }
+                                }
+                                columnsMap.put(targetColumn, srcColumn);
+                            }
+                            dbMapping.setAllColumns(columnsMap);
+                        }
+                    }
+                }
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            }
+        });
+
+        rdbSyncService = new RdbSyncService(dataSource);
+    }
+
+    @Override
+    public void sync(Dml dml) {
+        String destination = StringUtils.trimToEmpty(dml.getDestination());
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
+
+        rdbSyncService.sync(config, dml);
+    }
+
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        MappingConfig config = rdbMapping.get(task);
+        if (config != null) {
+            DataSource srcDataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+            if (srcDataSource != null) {
+                return RdbEtlService.importData(srcDataSource, dataSource, config, params);
+            } else {
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("DataSource not found");
+                return etlResult;
+            }
+        } else {
+            StringBuilder resultMsg = new StringBuilder();
+            boolean resSucc = true;
+            // ds不为空说明传入的是destination
+            for (MappingConfig configTmp : rdbMapping.values()) {
+                // 取所有的destination为task的配置
+                if (configTmp.getDestination().equals(task)) {
+                    DataSource srcDataSource = DatasourceConfig.DATA_SOURCES.get(configTmp.getDataSourceKey());
+                    if (srcDataSource == null) {
+                        continue;
+                    }
+                    EtlResult etlRes = RdbEtlService.importData(srcDataSource, dataSource, configTmp, params);
+                    if (!etlRes.getSucceeded()) {
+                        resSucc = false;
+                        resultMsg.append(etlRes.getErrorMessage()).append("\n");
+                    } else {
+                        resultMsg.append(etlRes.getResultMessage()).append("\n");
+                    }
+                }
+            }
+            if (resultMsg.length() > 0) {
+                etlResult.setSucceeded(resSucc);
+                if (resSucc) {
+                    etlResult.setResultMessage(resultMsg.toString());
+                } else {
+                    etlResult.setErrorMessage(resultMsg.toString());
+                }
+                return etlResult;
+            }
+        }
+        etlResult.setSucceeded(false);
+        etlResult.setErrorMessage("Task not found");
+        return etlResult;
+    }
+
+    @Override
+    public Map<String, Object> count(String task) {
+        MappingConfig config = rdbMapping.get(task);
+        MappingConfig.DbMapping dbMapping = config.getDbMapping();
+        String sql = "SELECT COUNT(1) AS cnt FROM " + dbMapping.getTargetTable();
+        Connection conn = null;
+        Map<String, Object> res = new LinkedHashMap<>();
+        try {
+            conn = dataSource.getConnection();
+            Util.sqlRS(conn, sql, rs -> {
+                try {
+                    if (rs.next()) {
+                        Long rowCount = rs.getLong("cnt");
+                        res.put("count", rowCount);
+                    }
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            });
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+        res.put("targetTable", dbMapping.getTargetTable());
+
+        return res;
+    }
+
+    @Override
+    public void destroy() {
+        if (dataSource != null) {
+            dataSource.close();
+        }
+    }
+}

+ 8 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/MysqlAdapter.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.adapter.rdb.adapters;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+@SPI("mysql")
+public class MysqlAdapter extends RdbAdapter {
+}

+ 8 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/OracleAdapter.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.adapter.rdb.adapters;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+@SPI("oracle")
+public class OracleAdapter extends RdbAdapter {
+}

+ 8 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/PostgresqlAdapter.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.adapter.rdb.adapters;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+@SPI("postgresql")
+public class PostgresqlAdapter extends RdbAdapter {
+}

+ 8 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/SqlserverAdapter.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.adapter.rdb.adapters;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+@SPI("sqlserver")
+public class SqlserverAdapter extends RdbAdapter {
+}

+ 162 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -0,0 +1,162 @@
+package com.alibaba.otter.canal.client.adapter.rdb.config;
+
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * RDB表映射配置
+ *
+ * @author rewerma 2018-11-07 下午02:41:34
+ * @version 1.0.0
+ */
+public class MappingConfig {
+
+    private String    dataSourceKey; // 数据源key
+
+    private String    destination;   // canal实例或MQ的topic
+
+    private DbMapping dbMapping;     // db映射配置
+
+    public String getDataSourceKey() {
+        return dataSourceKey;
+    }
+
+    public void setDataSourceKey(String dataSourceKey) {
+        this.dataSourceKey = dataSourceKey;
+    }
+
+    public DbMapping getDbMapping() {
+        return dbMapping;
+    }
+
+    public void setDbMapping(DbMapping dbMapping) {
+        this.dbMapping = dbMapping;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public void validate() {
+        if (dbMapping.database == null || dbMapping.database.isEmpty()) {
+            throw new NullPointerException("dbMapping.database");
+        }
+        if (dbMapping.table == null || dbMapping.table.isEmpty()) {
+            throw new NullPointerException("dbMapping.table");
+        }
+        if (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty()) {
+            throw new NullPointerException("dbMapping.targetTable");
+        }
+    }
+
+    public static class DbMapping {
+
+        private String                       database;                            // 数据库名或schema名
+        private String                       table;                               // 表面名
+        private Map<String, String>          targetPk;                            // 目标表主键字段
+        private boolean                      mapAll      = false;                 // 映射所有字段
+        private String                       targetTable;                         // 目标表名
+        private Map<String, String>          targetColumns;                       // 目标表字段映射
+
+        private String                       etlCondition;                        // etl条件sql
+
+        private Set<String>                  families    = new LinkedHashSet<>(); // column family列表
+        private int                          readBatch   = 5000;
+        private int                          commitBatch = 5000;                  // etl等批量提交大小
+
+        private volatile Map<String, String> allColumns;                          // mapAll为true,自动设置改字段
+
+        public String getDatabase() {
+            return database;
+        }
+
+        public void setDatabase(String database) {
+            this.database = database;
+        }
+
+        public String getTable() {
+            return table;
+        }
+
+        public void setTable(String table) {
+            this.table = table;
+        }
+
+        public Map<String, String> getTargetPk() {
+            return targetPk;
+        }
+
+        public void setTargetPk(Map<String, String> targetPk) {
+            this.targetPk = targetPk;
+        }
+
+        public boolean isMapAll() {
+            return mapAll;
+        }
+
+        public void setMapAll(boolean mapAll) {
+            this.mapAll = mapAll;
+        }
+
+        public String getTargetTable() {
+            return targetTable;
+        }
+
+        public void setTargetTable(String targetTable) {
+            this.targetTable = targetTable;
+        }
+
+        public Map<String, String> getTargetColumns() {
+            return targetColumns;
+        }
+
+        public void setTargetColumns(Map<String, String> targetColumns) {
+            this.targetColumns = targetColumns;
+        }
+
+        public String getEtlCondition() {
+            return etlCondition;
+        }
+
+        public void setEtlCondition(String etlCondition) {
+            this.etlCondition = etlCondition;
+        }
+
+        public Set<String> getFamilies() {
+            return families;
+        }
+
+        public void setFamilies(Set<String> families) {
+            this.families = families;
+        }
+
+        public int getReadBatch() {
+            return readBatch;
+        }
+
+        public void setReadBatch(int readBatch) {
+            this.readBatch = readBatch;
+        }
+
+        public int getCommitBatch() {
+            return commitBatch;
+        }
+
+        public void setCommitBatch(int commitBatch) {
+            this.commitBatch = commitBatch;
+        }
+
+        public Map<String, String> getAllColumns() {
+            return allColumns;
+        }
+
+        public void setAllColumns(Map<String, String> allColumns) {
+            this.allColumns = allColumns;
+        }
+    }
+}

+ 100 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfigLoader.java

@@ -0,0 +1,100 @@
+package com.alibaba.otter.canal.client.adapter.rdb.config;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+
+/**
+ * RDB表映射配置加载器
+ *
+ * @author rewerma 2018-11-07 下午02:41:34
+ * @version 1.0.0
+ */
+public class MappingConfigLoader {
+
+    private static Logger       logger    = LoggerFactory.getLogger(MappingConfigLoader.class);
+
+    /**
+     * 加载HBase表映射配置
+     * 
+     * @return 配置名/配置文件名--对象
+     */
+    public static Map<String, MappingConfig> load(String name) {
+        logger.info("## Start loading rdb mapping config ... ");
+
+        Map<String, MappingConfig> result = new LinkedHashMap<>();
+
+        Collection<String> configs = AdapterConfigs.get(name);
+        if (configs == null) {
+            return result;
+        }
+        for (String c : configs) {
+            if (c == null) {
+                continue;
+            }
+            c = c.trim();
+            if (c.equals("") || c.startsWith("#")) {
+                continue;
+            }
+
+            String configContent = null;
+
+            if (c.endsWith(".yml")) {
+                configContent = readConfigContent(name + "/" + c);
+            }
+
+            MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
+
+            try {
+                config.validate();
+            } catch (Exception e) {
+                throw new RuntimeException("ERROR Config: " + c + " " + e.getMessage(), e);
+            }
+            result.put(c, config);
+        }
+
+        logger.info("## Rdb mapping config loaded");
+        return result;
+    }
+
+    public static String readConfigContent(String config) {
+        InputStream in = null;
+        try {
+            // 先取本地文件,再取类路径
+            File configFile = new File("../config/" + config);
+            if (configFile.exists()) {
+                in = new FileInputStream(configFile);
+            } else {
+                in = MappingConfigLoader.class.getClassLoader().getResourceAsStream(config);
+            }
+            if (in == null) {
+                throw new RuntimeException("Rdb mapping config file not found.");
+            }
+
+            byte[] bytes = new byte[in.available()];
+            in.read(bytes);
+            return new String(bytes, StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new RuntimeException("Read rdb mapping config  error. ", e);
+        } finally {
+            try {
+                if (in != null) {
+                    in.close();
+                }
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+    }
+}

+ 290 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -0,0 +1,290 @@
+package com.alibaba.otter.canal.client.adapter.rdb.service;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import com.google.common.base.Joiner;
+
+/**
+ * RDB ETL 操作业务类
+ *
+ * @author rewerma @ 2018-11-7
+ * @version 1.0.0
+ */
+public class RdbEtlService {
+
+    private static final Logger logger = LoggerFactory.getLogger(RdbEtlService.class);
+
+    /**
+     * 导入数据
+     */
+    public static EtlResult importData(DataSource srcDS, DataSource targetDS, MappingConfig config,
+                                       List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        AtomicLong successCount = new AtomicLong();
+        List<String> errMsg = new ArrayList<>();
+        String hbaseTable = "";
+        try {
+            if (config == null) {
+                logger.error("Config is null!");
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("Config is null!");
+                return etlResult;
+            }
+            DbMapping dbMapping = config.getDbMapping();
+
+            long start = System.currentTimeMillis();
+
+            // 拼接sql
+            StringBuilder sql = new StringBuilder(
+                "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable());
+
+            // 拼接条件
+            appendCondition(params, dbMapping, srcDS, sql);
+
+            // 获取总数
+            String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
+            long cnt = (Long) Util.sqlRS(srcDS, countSql, rs -> {
+                Long count = null;
+                try {
+                    if (rs.next()) {
+                        count = ((Number) rs.getObject(1)).longValue();
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+                return count == null ? 0 : count;
+            });
+
+            // 当大于1万条记录时开启多线程
+            if (cnt >= 10000) {
+                int threadCount = 3;
+                long perThreadCnt = cnt / threadCount;
+                ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                List<Future<Boolean>> futures = new ArrayList<>(threadCount);
+                for (int i = 0; i < threadCount; i++) {
+                    long offset = i * perThreadCnt;
+                    Long size = null;
+                    if (i != threadCount - 1) {
+                        size = perThreadCnt;
+                    }
+                    String sqlFinal;
+                    if (size != null) {
+                        sqlFinal = sql + " LIMIT " + offset + "," + size;
+                    } else {
+                        sqlFinal = sql + " LIMIT " + offset + "," + cnt;
+                    }
+                    Future<Boolean> future = executor.submit(
+                        () -> executeSqlImport(params, srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
+                    futures.add(future);
+                }
+
+                for (Future<Boolean> future : futures) {
+                    future.get();
+                }
+
+                executor.shutdown();
+            } else {
+                executeSqlImport(params, srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
+            }
+
+            logger.info(
+                dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+
+            etlResult.setResultMessage("导入目标表 " + dbMapping.getTargetTable() + " 数据:" + successCount.get() + " 条");
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
+        }
+
+        if (errMsg.isEmpty()) {
+            etlResult.setSucceeded(true);
+        } else {
+            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
+        }
+        return etlResult;
+    }
+
+    private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds,
+                                        StringBuilder sql) throws SQLException {
+        if (params != null && params.size() == 1 && dbMapping.getEtlCondition() == null) {
+            AtomicBoolean stExists = new AtomicBoolean(false);
+            // 验证是否有SYS_TIME字段
+            Util.sqlRS(ds, sql.toString(), rs -> {
+                try {
+                    ResultSetMetaData rsmd = rs.getMetaData();
+                    int cnt = rsmd.getColumnCount();
+                    for (int i = 1; i <= cnt; i++) {
+                        String columnName = rsmd.getColumnName(i);
+                        if ("SYS_TIME".equalsIgnoreCase(columnName)) {
+                            stExists.set(true);
+                            break;
+                        }
+                    }
+                } catch (Exception e) {
+                    // ignore
+                }
+                return null;
+            });
+            if (stExists.get()) {
+                sql.append(" WHERE SYS_TIME >= '").append(params.get(0)).append("' ");
+            }
+        } else if (dbMapping.getEtlCondition() != null && params != null) {
+            String etlCondition = dbMapping.getEtlCondition();
+            int size = params.size();
+            for (int i = 0; i < size; i++) {
+                etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
+            }
+
+            sql.append(" ").append(etlCondition);
+        }
+    }
+
+    /**
+     * 执行导入
+     */
+    private static boolean executeSqlImport(List<String> params, DataSource srcDS, DataSource targetDS, String sql,
+                                            DbMapping dbMapping, AtomicLong successCount, List<String> errMsg) {
+        try {
+            Util.sqlRS(srcDS, sql, rs -> {
+                int idx = 1;
+
+                try {
+                    boolean completed = false;
+
+                    Map<String, Integer> columnType = new LinkedHashMap<>();
+                    ResultSetMetaData rsd = rs.getMetaData();
+                    int columnCount = rsd.getColumnCount();
+                    for (int i = 1; i <= columnCount; i++) {
+                        columnType.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                    }
+
+                    Map<String, String> columnsMap;
+                    if (dbMapping.isMapAll()) {
+                        columnsMap = dbMapping.getAllColumns();
+                    } else {
+                        columnsMap = dbMapping.getTargetColumns();
+                    }
+
+                    StringBuilder insertSql = new StringBuilder();
+                    insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+                    columnsMap
+                        .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+
+                    int len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(") VALUES (");
+                    int mapLen = columnsMap.size();
+                    for (int i = 0; i < mapLen; i++) {
+                        insertSql.append("?,");
+                    }
+                    len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(")");
+                    try (Connection connTarget = targetDS.getConnection();
+                            PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
+                        connTarget.setAutoCommit(false);
+
+                        while (rs.next()) {
+                            pstmt.clearParameters();
+
+                            // 删除数据
+                            Map<String, Object> values = new LinkedHashMap<>();
+                            StringBuilder deleteSql = new StringBuilder(
+                                "DELETE FROM " + dbMapping.getTargetTable() + " WHERE ");
+                            appendCondition(dbMapping, deleteSql, values, rs);
+                            try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
+                                int k = 1;
+                                for (Object val : values.values()) {
+                                    pstmt2.setObject(k++, val);
+                                }
+                                pstmt2.execute();
+                            }
+
+                            int i = 1;
+                            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                                String targetClolumnName = entry.getKey();
+                                String srcColumnName = entry.getValue();
+                                if (srcColumnName == null) {
+                                    srcColumnName = targetClolumnName;
+                                }
+
+                                Integer type = columnType.get(targetClolumnName.toLowerCase());
+
+                                Object value = rs.getObject(srcColumnName);
+                                if (value != null) {
+                                    RdbSyncService.setPStmt(type, pstmt, value, i);
+                                } else {
+                                    pstmt.setNull(i, type);
+                                }
+
+                                i++;
+                            }
+
+                            pstmt.execute();
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("Insert into target table, sql: {}", insertSql);
+                            }
+
+                            if (idx % dbMapping.getCommitBatch() == 0) {
+                                connTarget.commit();
+                                completed = true;
+                            }
+                            idx++;
+                            successCount.incrementAndGet();
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("successful import count:" + successCount.get());
+                            }
+                        }
+                        if (!completed) {
+                            connTarget.commit();
+                        }
+                    }
+
+                } catch (Exception e) {
+                    logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
+                }
+                return idx;
+            });
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+    }
+
+    /**
+     * 拼接目标表主键where条件
+     */
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
+                                        ResultSet rs) throws SQLException {
+        // 拼接主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = targetColumnName;
+            }
+            sql.append(targetColumnName).append("=? AND ");
+            values.put(targetColumnName, rs.getObject(srcColumnName));
+        }
+        int len = sql.length();
+        sql.delete(len - 4, len);
+    }
+}

+ 527 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -0,0 +1,527 @@
+package com.alibaba.otter.canal.client.adapter.rdb.service;
+
+import java.io.Reader;
+import java.io.StringReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import javax.sql.DataSource;
+
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+/**
+ * RDB同步操作业务
+ *
+ * @author rewerma 2018-11-7 下午06:45:49
+ * @version 1.0.0
+ */
+public class RdbSyncService {
+
+    private static final Logger                     logger             = LoggerFactory.getLogger(RdbSyncService.class);
+
+    private final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
+
+    private DataSource                              dataSource;
+
+    public RdbSyncService(DataSource dataSource){
+        this.dataSource = dataSource;
+    }
+
+    public void sync(MappingConfig config, Dml dml) {
+        try {
+            if (config != null) {
+                String type = dml.getType();
+                if (type != null && type.equalsIgnoreCase("INSERT")) {
+                    insert(config, dml);
+                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                    update(config, dml);
+                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                    delete(config, dml);
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 插入操作
+     *
+     * @param config 配置项
+     * @param dml DML数据
+     */
+    private void insert(MappingConfig config, Dml dml) throws SQLException {
+        List<Map<String, Object>> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        int idx = 1;
+        boolean completed = false;
+
+        Connection conn = dataSource.getConnection();
+        conn.setAutoCommit(false);
+        try {
+            Map<String, String> columnsMap;
+            if (dbMapping.isMapAll()) {
+                columnsMap = dbMapping.getAllColumns();
+            } else {
+                columnsMap = dbMapping.getTargetColumns();
+            }
+
+            StringBuilder insertSql = new StringBuilder();
+            insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+
+            columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+            int len = insertSql.length();
+            insertSql.delete(len - 1, len).append(") VALUES (");
+            int mapLen = columnsMap.size();
+            for (int i = 0; i < mapLen; i++) {
+                insertSql.append("?,");
+            }
+            len = insertSql.length();
+            insertSql.delete(len - 1, len).append(")");
+
+            Map<String, Integer> ctype = getTargetColumnType(conn, config);
+
+            PreparedStatement pstmt = conn.prepareStatement(insertSql.toString());
+
+            for (Map<String, Object> d : data) {
+                pstmt.clearParameters();
+                int i = 1;
+                for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                    String targetClolumnName = entry.getKey();
+                    String srcColumnName = entry.getValue();
+                    if (srcColumnName == null) {
+                        srcColumnName = targetClolumnName;
+                    }
+
+                    Integer type = ctype.get(targetClolumnName.toLowerCase());
+
+                    Object value = d.get(srcColumnName);
+                    if (value != null) {
+                        if (type == null) {
+                            throw new RuntimeException("No column: " + targetClolumnName + " found in target db");
+                        }
+
+                        setPStmt(type, pstmt, value, i);
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                    i++;
+                }
+
+                pstmt.execute();
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Insert into target table, sql: {}", insertSql);
+                }
+
+                if (idx % config.getDbMapping().getCommitBatch() == 0) {
+                    conn.commit();
+                    completed = true;
+                }
+                idx++;
+            }
+            if (!completed) {
+                conn.commit();
+            }
+        } catch (Exception e) {
+            conn.rollback();
+        } finally {
+            conn.close();
+        }
+    }
+
+    /**
+     * 更新操作
+     * 
+     * @param config 配置项
+     * @param dml DML数据
+     */
+    private void update(MappingConfig config, Dml dml) throws SQLException {
+        List<Map<String, Object>> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        List<Map<String, Object>> old = dml.getOld();
+        if (old == null || old.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        int idx = 1;
+        boolean completed = false;
+
+        Connection conn = dataSource.getConnection();
+        conn.setAutoCommit(false);
+
+        try {
+            Map<String, String> columnsMap;
+            if (dbMapping.isMapAll()) {
+                columnsMap = dbMapping.getAllColumns();
+            } else {
+                columnsMap = dbMapping.getTargetColumns();
+            }
+
+            Map<String, Integer> ctype = getTargetColumnType(conn, config);
+
+            for (Map<String, Object> o : old) {
+                Map<String, Object> d = data.get(idx - 1);
+                StringBuilder updateSql = new StringBuilder();
+                updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+                Map<String, Object> values = new LinkedHashMap<>();
+                for (String srcColumnName : o.keySet()) {
+                    List<String> targetColumnNames = new ArrayList<>();
+                    columnsMap.forEach((targetColumn, srcColumn) -> {
+                        if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                            targetColumnNames.add(targetColumn);
+                        }
+                    });
+                    if (!targetColumnNames.isEmpty()) {
+
+                        for (String targetColumnName : targetColumnNames) {
+                            updateSql.append(targetColumnName).append("=?, ");
+                            values.put(targetColumnName, d.get(srcColumnName));
+                        }
+                    }
+                }
+                int len = updateSql.length();
+                updateSql.delete(len - 2, len).append(" WHERE ");
+
+                // 拼接主键
+                appendCondition(dbMapping, updateSql, values, d);
+
+                sqlExe(conn, updateSql.toString(), ctype, values);
+
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Update target table, sql: {}", updateSql);
+                }
+                if (idx % config.getDbMapping().getCommitBatch() == 0) {
+                    conn.commit();
+                    completed = true;
+                }
+                idx++;
+            }
+            if (!completed) {
+                conn.commit();
+            }
+        } catch (Exception e) {
+            conn.rollback();
+        } finally {
+            conn.close();
+        }
+    }
+
+    /**
+     * 删除操作
+     * 
+     * @param config
+     * @param dml
+     * @throws SQLException
+     */
+    private void delete(MappingConfig config, Dml dml) throws SQLException {
+        List<Map<String, Object>> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        int idx = 1;
+        boolean completed = false;
+
+        Connection conn = dataSource.getConnection();
+        conn.setAutoCommit(false);
+
+        try {
+            Map<String, Integer> ctype = getTargetColumnType(conn, config);
+
+            for (Map<String, Object> d : data) {
+                StringBuilder sql = new StringBuilder();
+                sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
+
+                Map<String, Object> values = new LinkedHashMap<>();
+                // 拼接主键
+                appendCondition(dbMapping, sql, values, d);
+
+                sqlExe(conn, sql.toString(), ctype, values);
+
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Delete from target table, sql: {}", sql);
+                }
+                if (idx % config.getDbMapping().getCommitBatch() == 0) {
+                    conn.commit();
+                    completed = true;
+                }
+                idx++;
+            }
+            if (!completed) {
+                conn.commit();
+            }
+        } catch (Exception e) {
+            conn.rollback();
+        } finally {
+            conn.close();
+        }
+    }
+
+
+
+    /**
+     * 获取目标字段类型
+     * 
+     * @param conn sql connection
+     * @param config 映射配置
+     * @return 字段sqlType
+     */
+    private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
+        DbMapping dbMapping = config.getDbMapping();
+        String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
+        Map<String, Integer> columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+        if (columnType == null) {
+            synchronized (RdbSyncService.class) {
+                columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+                if (columnType == null) {
+                    columnType = new LinkedHashMap<>();
+                    final Map<String, Integer> columnTypeTmp = columnType;
+                    String sql = "SELECT * FROM " + dbMapping.getTargetTable() + " WHERE 1=2";
+                   Util.sqlRS(conn, sql, rs -> {
+                        try {
+                            ResultSetMetaData rsd = rs.getMetaData();
+                            int columnCount = rsd.getColumnCount();
+                            for (int i = 1; i <= columnCount; i++) {
+                                columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                            }
+                            COLUMNS_TYPE_CACHE.put(cacheKey, columnTypeTmp);
+                        } catch (SQLException e) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    });
+                }
+            }
+        }
+        return columnType;
+    }
+
+    /**
+     * 设置 preparedStatement
+     * 
+     * @param type sqlType
+     * @param pstmt 需要设置的preparedStatement
+     * @param value 值
+     * @param i 索引号
+     */
+    public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
+        switch (type) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+                if (value instanceof Boolean) {
+                    pstmt.setBoolean(i, (Boolean) value);
+                } else if (value instanceof String) {
+                    boolean v = !value.equals("0");
+                    pstmt.setBoolean(i, v);
+                } else if (value instanceof Number) {
+                    boolean v = ((Number) value).intValue() != 0;
+                    pstmt.setBoolean(i, v);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CHAR:
+            case Types.NCHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                pstmt.setString(i, value.toString());
+                break;
+            case Types.TINYINT:
+                if (value instanceof Number) {
+                    pstmt.setByte(i, ((Number) value).byteValue());
+                } else if (value instanceof String) {
+                    pstmt.setByte(i, Byte.parseByte((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.SMALLINT:
+                if (value instanceof Number) {
+                    pstmt.setShort(i, ((Number) value).shortValue());
+                } else if (value instanceof String) {
+                    pstmt.setShort(i, Short.parseShort((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.INTEGER:
+                if (value instanceof Number) {
+                    pstmt.setInt(i, ((Number) value).intValue());
+                } else if (value instanceof String) {
+                    pstmt.setInt(i, Integer.parseInt((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BIGINT:
+                if (value instanceof Number) {
+                    pstmt.setLong(i, ((Number) value).longValue());
+                } else if (value instanceof String) {
+                    pstmt.setLong(i, Long.parseLong((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
+                break;
+            case Types.REAL:
+                if (value instanceof Number) {
+                    pstmt.setFloat(i, ((Number) value).floatValue());
+                } else if (value instanceof String) {
+                    pstmt.setFloat(i, Float.parseFloat((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                if (value instanceof Number) {
+                    pstmt.setDouble(i, ((Number) value).doubleValue());
+                } else if (value instanceof String) {
+                    pstmt.setDouble(i, Double.parseDouble((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+
+                if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CLOB:
+                if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    Reader clobReader = new StringReader((String) value);
+                    pstmt.setCharacterStream(i, clobReader);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DATE:
+                if (value instanceof java.util.Date) {
+                    pstmt.setDate(i, new java.sql.Date(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        v = v.trim().replace(" ", "T");
+                        DateTime dt = new DateTime(v);
+                        pstmt.setDate(i, new Date(dt.toDate().getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIME:
+                if (value instanceof java.util.Date) {
+                    pstmt.setTime(i, new java.sql.Time(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    v = "T" + v;
+                    DateTime dt = new DateTime(v);
+                    pstmt.setTime(i, new Time(dt.toDate().getTime()));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIMESTAMP:
+                if (value instanceof java.util.Date) {
+                    pstmt.setTimestamp(i, new java.sql.Timestamp(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        v = v.trim().replace(" ", "T");
+                        DateTime dt = new DateTime(v);
+                        pstmt.setTimestamp(i, new Timestamp(dt.toDate().getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            default:
+                pstmt.setObject(i, value, type);
+        }
+    }
+
+    /**
+     * 拼接主键 where条件
+     */
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
+                                        Map<String, Object> d) {
+        // 拼接主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = targetColumnName;
+            }
+            sql.append(targetColumnName).append("=? AND ");
+            values.put(targetColumnName, d.get(srcColumnName));
+        }
+        int len = sql.length();
+        sql.delete(len - 4, len);
+    }
+
+    /**
+     * 执行sql
+     */
+    private static void sqlExe(Connection conn, String sql, Map<String, Integer> ctype, Map<String, Object> values) {
+        try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
+            int i = 1;
+            for (Map.Entry<String, Object> entry : values.entrySet()) {
+                String targetColumnName = entry.getKey();
+                Object value = entry.getValue();
+                Integer type = ctype.get(targetColumnName.toLowerCase());
+                setPStmt(type, pstmt, value, i++);
+            }
+            pstmt.execute();
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+}

+ 4 - 0
client-adapter/rdb/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -0,0 +1,4 @@
+mysql=com.alibaba.otter.canal.client.adapter.rdb.adapters.MysqlAdapter
+postgresql=com.alibaba.otter.canal.client.adapter.rdb.adapters.PostgresqlAdapter
+oracle=com.alibaba.otter.canal.client.adapter.rdb.adapters.OracleAdapter
+sqlserver=com.alibaba.otter.canal.client.adapter.rdb.adapters.SqlserverAdapter

+ 15 - 0
client-adapter/rdb/src/main/resources/oracle/mytest_user.yml

@@ -0,0 +1,15 @@
+dataSourceKey: defaultDS
+destination: example
+dbMapping:
+  database: mytest
+  table: user
+  targetTable: mytest.tb_user
+  targetPk:
+    id: id
+  mapAll: true
+#  targetColumns:
+#    id:
+#    name:
+#    role_id:
+#    c_time:
+#    test1:

+ 15 - 0
client-adapter/rdb/src/main/resources/postgresql/mytest_user.yml

@@ -0,0 +1,15 @@
+dataSourceKey: defaultDS
+destination: example
+dbMapping:
+  database: mytest
+  table: user
+  targetTable: public.tb_user
+  targetPk:
+    id: id
+  mapAll: true
+#  targetColumns:
+#    id:
+#    name:
+#    role_id:
+#    c_time:
+#    test1:

+ 29 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/ConfigLoadTest.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test;
+
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+
+public class ConfigLoadTest {
+
+    @Before
+    public void before() {
+        AdapterConfigs.put("oracle", "mytest_user.yml");
+        // 加载数据源连接池
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+    }
+
+    @Test
+    public void testLoad() {
+        Map<String, MappingConfig> configMap =  MappingConfigLoader.load("oracle");
+
+        Assert.assertFalse(configMap.isEmpty());
+    }
+}

+ 82 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java

@@ -0,0 +1,82 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test;
+
+import java.io.BufferedReader;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.sql.*;
+
+import org.junit.Test;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+public class DBTest {
+
+    @Test
+    public void test01() throws SQLException {
+        DruidDataSource dataSource = new DruidDataSource();
+        // dataSource.setDriverClassName("oracle.jdbc.OracleDriver");
+        // dataSource.setUrl("jdbc:oracle:thin:@127.0.0.1:49161:XE");
+        // dataSource.setUsername("mytest");
+        // dataSource.setPassword("m121212");
+
+        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+        dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true");
+        dataSource.setUsername("root");
+        dataSource.setPassword("121212");
+
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(2);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+
+        dataSource.init();
+
+        Connection conn = dataSource.getConnection();
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery("select * from user t where 1=2");
+
+        ResultSetMetaData rsm = rs.getMetaData();
+        int cnt = rsm.getColumnCount();
+        for (int i = 1; i <= cnt; i++) {
+            System.out.println(rsm.getColumnName(i) + " " + rsm.getColumnType(i));
+        }
+
+        rs.close();
+        stmt.close();
+
+//        PreparedStatement pstmt = conn
+//            .prepareStatement("insert into tb_user (id,name,role_id,c_time,test1,test2) values (?,?,?,?,?,?)");
+//        pstmt.setBigDecimal(1, new BigDecimal("5"));
+//        pstmt.setString(2, "test");
+//        pstmt.setBigDecimal(3, new BigDecimal("1"));
+//        pstmt.setDate(4, new Date(new java.util.Date().getTime()));
+//        byte[] a = { (byte) 1, (byte) 2 };
+//        pstmt.setBytes(5, a);
+//        pstmt.setBytes(6, a);
+//        pstmt.execute();
+//
+//        pstmt.close();
+
+        conn.close();
+        dataSource.close();
+    }
+
+    private String clob2Str(Clob clob) {
+        String content = "";
+        try {
+            Reader is = clob.getCharacterStream();
+            BufferedReader buff = new BufferedReader(is);
+            String line = buff.readLine();
+            StringBuffer sb = new StringBuffer();
+            while (line != null) {
+                sb.append(line);
+                line = buff.readLine();
+            }
+            content = sb.toString();
+        } catch (Exception e) {
+        }
+        return content;
+    }
+}

+ 37 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/TestConstant.java

@@ -0,0 +1,37 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test;
+
+import java.sql.SQLException;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+public class TestConstant {
+
+    public final static String    jdbcUrl      = "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true";
+    public final static String    jdbcUser     = "root";
+    public final static String    jdbcPassword = "121212";
+
+    public static DruidDataSource dataSource;
+
+    static {
+        dataSource = new DruidDataSource();
+        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
+        dataSource.setUrl(jdbcUrl);
+        dataSource.setUsername(jdbcUser);
+        dataSource.setPassword(jdbcPassword);
+        dataSource.setInitialSize(1);
+        dataSource.setMinIdle(1);
+        dataSource.setMaxActive(1);
+        dataSource.setMaxWait(60000);
+        dataSource.setTimeBetweenEvictionRunsMillis(60000);
+        dataSource.setMinEvictableIdleTimeMillis(300000);
+        dataSource.setPoolPreparedStatements(false);
+        dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+        dataSource.setValidationQuery("select 1");
+        try {
+            dataSource.init();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 29 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java

@@ -0,0 +1,29 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.rdb.adapters.OracleAdapter;
+import com.alibaba.otter.canal.client.adapter.rdb.test.TestConstant;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Common {
+    public static RdbAdapter init() {
+        DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
+
+        OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
+        outerAdapterConfig.setName("oracle");
+        Map<String, String> properties = new HashMap<>();
+        properties.put("jdbc.driveClassName", "oracle.jdbc.OracleDriver");
+        properties.put("jdbc.url", "jdbc:oracle:thin:@127.0.0.1:49161:XE");
+        properties.put("jdbc.username", "mytest");
+        properties.put("jdbc.password", "m121212");
+        outerAdapterConfig.setProperties(properties);
+
+        RdbAdapter adapter = new OracleAdapter();
+        adapter.init(outerAdapterConfig);
+        return adapter;
+    }
+}

+ 66 - 0
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java

@@ -0,0 +1,66 @@
+package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
+
+import java.util.*;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+public class OracleSyncTest {
+
+    private RdbAdapter rdbAdapter;
+
+    @Before
+    public void init() {
+        AdapterConfigs.put("oracle", "mytest_user.yml");
+        rdbAdapter = Common.init();
+    }
+
+    @Test
+    public void test01() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("INSERT");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric");
+        data.put("role_id", 1L);
+        data.put("c_time", new Date());
+        data.put("test1", "sdfasdfawe中国asfwef");
+        dml.setData(dataList);
+
+        rdbAdapter.sync(dml);
+    }
+
+    @Test
+    public void test02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric2");
+        dml.setData(dataList);
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("name", "Eric");
+        dml.setOld(oldList);
+
+        rdbAdapter.sync(dml);
+    }
+
+}

+ 13 - 0
client-adapter/rdb/src/test/resources/log4j2-test.xml

@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="ERROR">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>

+ 13 - 0
client-adapter/rdb/src/test/resources/logback-test.xml

@@ -0,0 +1,13 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+
+	<root level="TRACE">
+		<appender-ref ref="STDOUT"/>
+	</root>
+</configuration>

+ 15 - 0
client-adapter/rdb/src/test/resources/oracle/mytest_user.yml

@@ -0,0 +1,15 @@
+dataSourceKey: defaultDS
+destination: example
+dbMapping:
+  database: mytest
+  table: user
+  targetTable: mytest.tb_user
+  targetPk:
+    id: id
+  mapAll: true
+#  targetColumns:
+#    id:
+#    name:
+#    role_id:
+#    c_time:
+#    test1: