Browse Source

关系型数据同步adapter, 支持 mysql oracle postgresql sqlserver

mcy 6 years ago
parent
commit
2ac84b4320
26 changed files with 884 additions and 229 deletions
  1. 90 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java
  2. 2 1
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  3. 4 40
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java
  4. 2 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java
  5. 13 0
      client-adapter/launcher/pom.xml
  6. 7 0
      client-adapter/launcher/src/main/assembly/dev.xml
  7. 7 0
      client-adapter/launcher/src/main/assembly/release.xml
  8. 2 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  9. 15 0
      client-adapter/launcher/src/main/resources/application.yml
  10. 4 2
      client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java
  11. 148 13
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  12. 8 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/MysqlAdapter.java
  13. 8 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/OracleAdapter.java
  14. 8 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/PostgresqlAdapter.java
  15. 8 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/SqlserverAdapter.java
  16. 9 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java
  17. 3 5
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfigLoader.java
  18. 290 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  19. 198 150
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  20. 4 1
      client-adapter/rdb/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  21. 2 1
      client-adapter/rdb/src/main/resources/oracle/mytest_user.yml
  22. 15 0
      client-adapter/rdb/src/main/resources/postgresql/mytest_user.yml
  23. 2 2
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/ConfigLoadTest.java
  24. 3 2
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java
  25. 23 4
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java
  26. 9 6
      client-adapter/rdb/src/test/resources/oracle/mytest_user.yml

+ 90 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -0,0 +1,90 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Util {
+
+    private static final Logger logger = LoggerFactory.getLogger(Util.class);
+
+    /**
+     * 通过DS执行sql
+     */
+    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
+        Connection conn = null;
+        Statement stmt = null;
+        ResultSet rs = null;
+        try {
+            conn = ds.getConnection();
+            stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            stmt.setFetchSize(Integer.MIN_VALUE);
+            rs = stmt.executeQuery(sql);
+            return fun.apply(rs);
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    /**
+     * sql执行获取resultSet
+     *
+     * @param conn sql connection
+     * @param sql sql
+     * @param consumer 回调方法
+     */
+    public static void sqlRS(Connection conn, String sql, Consumer<ResultSet> consumer) {
+        Statement stmt = null;
+        ResultSet rs = null;
+        try {
+            stmt = conn.createStatement();
+            rs = stmt.executeQuery(sql);
+            consumer.accept(rs);
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+}

+ 2 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -10,6 +10,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
@@ -73,7 +74,7 @@ public class ESSyncService {
                     dml.getDestination());
             }
             if (logger.isDebugEnabled()) {
-                logger.debug("DML: {}", JSON.toJSONString(dml));
+                logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
             }
         }
     }

+ 4 - 40
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -14,6 +14,7 @@ import java.util.function.Function;
 
 import javax.sql.DataSource;
 
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,43 +35,6 @@ public class HbaseEtlService {
 
     private static Logger logger = LoggerFactory.getLogger(HbaseEtlService.class);
 
-    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
-        Connection conn = null;
-        Statement stmt = null;
-        ResultSet rs = null;
-        try {
-            conn = ds.getConnection();
-            stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-            stmt.setFetchSize(Integer.MIN_VALUE);
-            rs = stmt.executeQuery(sql);
-            return fun.apply(rs);
-        } finally {
-            if (rs != null) {
-                try {
-                    rs.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            if (stmt != null) {
-                try {
-                    stmt.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            if (conn != null) {
-                try {
-                    conn.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            rs = null;
-            stmt = null;
-            conn = null;
-        }
-    }
 
     /**
      * 建表
@@ -138,7 +102,7 @@ public class HbaseEtlService {
             if (params != null && params.size() == 1 && hbaseMapping.getEtlCondition() == null) {
                 AtomicBoolean stExists = new AtomicBoolean(false);
                 // 验证是否有SYS_TIME字段
-                sqlRS(ds, sql, rs -> {
+                Util.sqlRS(ds, sql, rs -> {
                     try {
                         ResultSetMetaData rsmd = rs.getMetaData();
                         int cnt = rsmd.getColumnCount();
@@ -169,7 +133,7 @@ public class HbaseEtlService {
 
             // 获取总数
             String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
-            long cnt = (Long) sqlRS(ds, countSql, rs -> {
+            long cnt = (Long) Util.sqlRS(ds, countSql, rs -> {
                 Long count = null;
                 try {
                     if (rs.next()) {
@@ -244,7 +208,7 @@ public class HbaseEtlService {
     private static boolean executeSqlImport(DataSource ds, String sql, MappingConfig.HbaseMapping hbaseMapping,
                                             HbaseTemplate hbaseTemplate, AtomicLong successCount, List<String> errMsg) {
         try {
-            sqlRS(ds, sql, rs -> {
+            Util.sqlRS(ds, sql, rs -> {
                 int i = 1;
 
                 try {

+ 2 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -7,6 +7,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.hbase.support.*;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
@@ -39,7 +40,7 @@ public class HbaseSyncService {
                     delete(config, dml);
                 }
                 if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml));
+                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                 }
             }
         } catch (Exception e) {

+ 13 - 0
client-adapter/launcher/pom.xml

@@ -107,6 +107,19 @@
             <classifier>jar-with-dependencies</classifier>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>client-adapter.rdb</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>*</artifactId>
+                    <groupId>*</groupId>
+                </exclusion>
+            </exclusions>
+            <classifier>jar-with-dependencies</classifier>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <build>

+ 7 - 0
client-adapter/launcher/src/main/assembly/dev.xml

@@ -43,6 +43,13 @@
                 <include>**/*</include>
             </includes>
         </fileSet>
+		<fileSet>
+			<directory>../rdb/src/main/resources/</directory>
+			<outputDirectory>/config</outputDirectory>
+            <excludes>
+                <exclude>META-INF/**</exclude>
+            </excludes>
+		</fileSet>
 		<fileSet>
 			<directory>target</directory>
 			<outputDirectory>logs</outputDirectory>

+ 7 - 0
client-adapter/launcher/src/main/assembly/release.xml

@@ -44,6 +44,13 @@
                 <include>**/*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>../rdb/src/main/resources/</directory>
+            <outputDirectory>/config</outputDirectory>
+            <excludes>
+                <exclude>META-INF/**</exclude>
+            </excludes>
+        </fileSet>
         <fileSet>
             <directory>target</directory>
             <outputDirectory>logs</outputDirectory>

+ 2 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.errors.WakeupException;
 
@@ -36,7 +37,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             groupId,
             canalClientConfig.getBatchSize(),
             flatMessage);
-        // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
+        connector.setSessionTimeout(30L, TimeUnit.SECONDS);
     }
 
     @Override

+ 15 - 0
client-adapter/launcher/src/main/resources/application.yml

@@ -4,6 +4,7 @@ logging:
   level:
     com.alibaba.otter.canal.client.adapter.hbase: DEBUG
     com.alibaba.otter.canal.client.adapter.es: DEBUG
+    com.alibaba.otter.canal.client.adapter.rdb: DEBUG
 spring:
   jackson:
     date-format: yyyy-MM-dd HH:mm:ss
@@ -36,6 +37,18 @@ spring:
 #    - groupId: g2
 #      outAdapters:
 #      - name: logger
+#      - name: oracle
+#        properties:
+#          jdbc.driverClassName: oracle.jdbc.OracleDriver
+#          jdbc.url: jdbc:oracle:thin:@127.0.0.1:49161:XE
+#          jdbc.username: mytest
+#          jdbc.password: m121212
+#      - name: postgresql
+#        properties:
+#          jdbc.driverClassName: org.postgresql.Driver
+#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
+#          jdbc.username: postgres
+#          jdbc.password: 121212
 #  mqTopics:
 #  - mqMode: rocketmq
 #    topic: example
@@ -53,3 +66,5 @@ spring:
 #  adapterConfigs:
 #  - hbase/mytest_person2.yml
 #  - es/mytest_user.yml
+#  - oracle/mytest_user.yml
+#  - postgresql/mytest_user.yml

+ 4 - 2
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -3,9 +3,11 @@ package com.alibaba.otter.canal.client.adapter.logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
 
 /**
@@ -27,7 +29,7 @@ public class LoggerAdapterExample implements OuterAdapter {
 
     @Override
     public void sync(Dml dml) {
-        logger.info("DML: {}", dml.toString());
+        logger.info("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
     }
 
     @Override

+ 148 - 13
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -1,8 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.rdb;
 
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
+
+import javax.sql.DataSource;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -12,29 +15,28 @@ import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.*;
 
-@SPI("rdb")
-public class RdbAdapter implements OuterAdapter {
+public abstract class RdbAdapter implements OuterAdapter {
 
-    private static Logger                              logger             = LoggerFactory.getLogger(RdbAdapter.class);
+    private static Logger                       logger             = LoggerFactory.getLogger(RdbAdapter.class);
 
-    private static volatile Map<String, MappingConfig> rdbMapping         = null;                                     // 文件名对应配置
-    private static volatile Map<String, MappingConfig> mappingConfigCache = null;                                     // 库名-表名对应配置
+    private volatile Map<String, MappingConfig> rdbMapping         = null;                                     // 文件名对应配置
+    private volatile Map<String, MappingConfig> mappingConfigCache = null;                                     // 库名-表名对应配置
 
-    private DruidDataSource                            dataSource;
+    private DruidDataSource                     dataSource;
 
-    private RdbSyncService                             rdbSyncService;
+    private RdbSyncService                      rdbSyncService;
 
     @Override
     public void init(OuterAdapterConfig configuration) {
         if (mappingConfigCache == null) {
             synchronized (MappingConfig.class) {
                 if (mappingConfigCache == null) {
-                    rdbMapping = MappingConfigLoader.load();
+                    SPI spi = this.getClass().getAnnotation(SPI.class);
+                    rdbMapping = MappingConfigLoader.load(spi.value());
                     mappingConfigCache = new HashMap<>();
                     for (MappingConfig mappingConfig : rdbMapping.values()) {
                         mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
@@ -65,6 +67,55 @@ public class RdbAdapter implements OuterAdapter {
             logger.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
         }
 
+        rdbMapping.values().forEach(config -> {
+            try {
+                MappingConfig.DbMapping dbMapping = config.getDbMapping();
+                // 从源表加载所有字段名
+                if (dbMapping.getAllColumns() == null) {
+                    synchronized (RdbSyncService.class) {
+                        if (dbMapping.getAllColumns() == null) {
+                            DataSource srcDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+                            Connection srcConn = srcDS.getConnection();
+                            String srcMetaSql = "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable()
+                                                + " WHERE 1=2 ";
+                            List<String> srcColumns = new ArrayList<>();
+                            Util.sqlRS(srcConn, srcMetaSql, rs -> {
+                                try {
+                                    ResultSetMetaData rmd = rs.getMetaData();
+                                    int cnt = rmd.getColumnCount();
+                                    for (int i = 1; i <= cnt; i++) {
+                                        srcColumns.add(rmd.getColumnName(i).toLowerCase());
+                                    }
+                                } catch (SQLException e) {
+                                    logger.error(e.getMessage(), e);
+                                }
+                            });
+                            Map<String, String> columnsMap = new LinkedHashMap<>();
+
+                            for (String srcColumn : srcColumns) {
+                                String targetColumn = srcColumn;
+                                if (dbMapping.getTargetColumns() != null) {
+                                    for (Map.Entry<String, String> entry : dbMapping.getTargetColumns().entrySet()) {
+                                        String targetColumnName = entry.getKey();
+                                        String srcColumnName = entry.getValue();
+
+                                        if (srcColumnName != null
+                                            && srcColumnName.toLowerCase().equals(srcColumn.toUpperCase())) {
+                                            targetColumn = targetColumnName;
+                                        }
+                                    }
+                                }
+                                columnsMap.put(targetColumn, srcColumn);
+                            }
+                            dbMapping.setAllColumns(columnsMap);
+                        }
+                    }
+                }
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            }
+        });
+
         rdbSyncService = new RdbSyncService(dataSource);
     }
 
@@ -74,9 +125,93 @@ public class RdbAdapter implements OuterAdapter {
         String database = dml.getDatabase();
         String table = dml.getTable();
         MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
+
         rdbSyncService.sync(config, dml);
     }
 
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        MappingConfig config = rdbMapping.get(task);
+        if (config != null) {
+            DataSource srcDataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+            if (srcDataSource != null) {
+                return RdbEtlService.importData(srcDataSource, dataSource, config, params);
+            } else {
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("DataSource not found");
+                return etlResult;
+            }
+        } else {
+            StringBuilder resultMsg = new StringBuilder();
+            boolean resSucc = true;
+            // ds不为空说明传入的是destination
+            for (MappingConfig configTmp : rdbMapping.values()) {
+                // 取所有的destination为task的配置
+                if (configTmp.getDestination().equals(task)) {
+                    DataSource srcDataSource = DatasourceConfig.DATA_SOURCES.get(configTmp.getDataSourceKey());
+                    if (srcDataSource == null) {
+                        continue;
+                    }
+                    EtlResult etlRes = RdbEtlService.importData(srcDataSource, dataSource, configTmp, params);
+                    if (!etlRes.getSucceeded()) {
+                        resSucc = false;
+                        resultMsg.append(etlRes.getErrorMessage()).append("\n");
+                    } else {
+                        resultMsg.append(etlRes.getResultMessage()).append("\n");
+                    }
+                }
+            }
+            if (resultMsg.length() > 0) {
+                etlResult.setSucceeded(resSucc);
+                if (resSucc) {
+                    etlResult.setResultMessage(resultMsg.toString());
+                } else {
+                    etlResult.setErrorMessage(resultMsg.toString());
+                }
+                return etlResult;
+            }
+        }
+        etlResult.setSucceeded(false);
+        etlResult.setErrorMessage("Task not found");
+        return etlResult;
+    }
+
+    @Override
+    public Map<String, Object> count(String task) {
+        MappingConfig config = rdbMapping.get(task);
+        MappingConfig.DbMapping dbMapping = config.getDbMapping();
+        String sql = "SELECT COUNT(1) AS cnt FROM " + dbMapping.getTargetTable();
+        Connection conn = null;
+        Map<String, Object> res = new LinkedHashMap<>();
+        try {
+            conn = dataSource.getConnection();
+            Util.sqlRS(conn, sql, rs -> {
+                try {
+                    if (rs.next()) {
+                        Long rowCount = rs.getLong("cnt");
+                        res.put("count", rowCount);
+                    }
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            });
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+        res.put("targetTable", dbMapping.getTargetTable());
+
+        return res;
+    }
+
     @Override
     public void destroy() {
         if (dataSource != null) {

+ 8 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/MysqlAdapter.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.adapter.rdb.adapters;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+@SPI("mysql")
+public class MysqlAdapter extends RdbAdapter {
+}

+ 8 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/OracleAdapter.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.adapter.rdb.adapters;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+@SPI("oracle")
+public class OracleAdapter extends RdbAdapter {
+}

+ 8 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/PostgresqlAdapter.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.adapter.rdb.adapters;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+@SPI("postgresql")
+public class PostgresqlAdapter extends RdbAdapter {
+}

+ 8 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/SqlserverAdapter.java

@@ -0,0 +1,8 @@
+package com.alibaba.otter.canal.client.adapter.rdb.adapters;
+
+import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+@SPI("sqlserver")
+public class SqlserverAdapter extends RdbAdapter {
+}

+ 9 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -58,6 +58,7 @@ public class MappingConfig {
 
         private String                       database;                            // 数据库名或schema名
         private String                       table;                               // 表面名
+        private Map<String, String>          targetPk;                            // 目标表主键字段
         private boolean                      mapAll      = false;                 // 映射所有字段
         private String                       targetTable;                         // 目标表名
         private Map<String, String>          targetColumns;                       // 目标表字段映射
@@ -86,6 +87,14 @@ public class MappingConfig {
             this.table = table;
         }
 
+        public Map<String, String> getTargetPk() {
+            return targetPk;
+        }
+
+        public void setTargetPk(Map<String, String> targetPk) {
+            this.targetPk = targetPk;
+        }
+
         public boolean isMapAll() {
             return mapAll;
         }

+ 3 - 5
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfigLoader.java

@@ -25,19 +25,17 @@ public class MappingConfigLoader {
 
     private static Logger       logger    = LoggerFactory.getLogger(MappingConfigLoader.class);
 
-    private static final String BASE_PATH = "rdb";
-
     /**
      * 加载HBase表映射配置
      * 
      * @return 配置名/配置文件名--对象
      */
-    public static Map<String, MappingConfig> load() {
+    public static Map<String, MappingConfig> load(String name) {
         logger.info("## Start loading rdb mapping config ... ");
 
         Map<String, MappingConfig> result = new LinkedHashMap<>();
 
-        Collection<String> configs = AdapterConfigs.get("rdb");
+        Collection<String> configs = AdapterConfigs.get(name);
         if (configs == null) {
             return result;
         }
@@ -53,7 +51,7 @@ public class MappingConfigLoader {
             String configContent = null;
 
             if (c.endsWith(".yml")) {
-                configContent = readConfigContent(BASE_PATH + "/" + c);
+                configContent = readConfigContent(name + "/" + c);
             }
 
             MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);

+ 290 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -0,0 +1,290 @@
+package com.alibaba.otter.canal.client.adapter.rdb.service;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import com.google.common.base.Joiner;
+
+/**
+ * RDB ETL 操作业务类
+ *
+ * @author rewerma @ 2018-11-7
+ * @version 1.0.0
+ */
+public class RdbEtlService {
+
+    private static final Logger logger = LoggerFactory.getLogger(RdbEtlService.class);
+
+    /**
+     * 导入数据
+     */
+    public static EtlResult importData(DataSource srcDS, DataSource targetDS, MappingConfig config,
+                                       List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        AtomicLong successCount = new AtomicLong();
+        List<String> errMsg = new ArrayList<>();
+        String hbaseTable = "";
+        try {
+            if (config == null) {
+                logger.error("Config is null!");
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("Config is null!");
+                return etlResult;
+            }
+            DbMapping dbMapping = config.getDbMapping();
+
+            long start = System.currentTimeMillis();
+
+            // 拼接sql
+            StringBuilder sql = new StringBuilder(
+                "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable());
+
+            // 拼接条件
+            appendCondition(params, dbMapping, srcDS, sql);
+
+            // 获取总数
+            String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
+            long cnt = (Long) Util.sqlRS(srcDS, countSql, rs -> {
+                Long count = null;
+                try {
+                    if (rs.next()) {
+                        count = ((Number) rs.getObject(1)).longValue();
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+                return count == null ? 0 : count;
+            });
+
+            // 当大于1万条记录时开启多线程
+            if (cnt >= 10000) {
+                int threadCount = 3;
+                long perThreadCnt = cnt / threadCount;
+                ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                List<Future<Boolean>> futures = new ArrayList<>(threadCount);
+                for (int i = 0; i < threadCount; i++) {
+                    long offset = i * perThreadCnt;
+                    Long size = null;
+                    if (i != threadCount - 1) {
+                        size = perThreadCnt;
+                    }
+                    String sqlFinal;
+                    if (size != null) {
+                        sqlFinal = sql + " LIMIT " + offset + "," + size;
+                    } else {
+                        sqlFinal = sql + " LIMIT " + offset + "," + cnt;
+                    }
+                    Future<Boolean> future = executor.submit(
+                        () -> executeSqlImport(params, srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
+                    futures.add(future);
+                }
+
+                for (Future<Boolean> future : futures) {
+                    future.get();
+                }
+
+                executor.shutdown();
+            } else {
+                executeSqlImport(params, srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
+            }
+
+            logger.info(
+                dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+
+            etlResult.setResultMessage("导入目标表 " + dbMapping.getTargetTable() + " 数据:" + successCount.get() + " 条");
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
+        }
+
+        if (errMsg.isEmpty()) {
+            etlResult.setSucceeded(true);
+        } else {
+            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
+        }
+        return etlResult;
+    }
+
+    private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds,
+                                        StringBuilder sql) throws SQLException {
+        if (params != null && params.size() == 1 && dbMapping.getEtlCondition() == null) {
+            AtomicBoolean stExists = new AtomicBoolean(false);
+            // 验证是否有SYS_TIME字段
+            Util.sqlRS(ds, sql.toString(), rs -> {
+                try {
+                    ResultSetMetaData rsmd = rs.getMetaData();
+                    int cnt = rsmd.getColumnCount();
+                    for (int i = 1; i <= cnt; i++) {
+                        String columnName = rsmd.getColumnName(i);
+                        if ("SYS_TIME".equalsIgnoreCase(columnName)) {
+                            stExists.set(true);
+                            break;
+                        }
+                    }
+                } catch (Exception e) {
+                    // ignore
+                }
+                return null;
+            });
+            if (stExists.get()) {
+                sql.append(" WHERE SYS_TIME >= '").append(params.get(0)).append("' ");
+            }
+        } else if (dbMapping.getEtlCondition() != null && params != null) {
+            String etlCondition = dbMapping.getEtlCondition();
+            int size = params.size();
+            for (int i = 0; i < size; i++) {
+                etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
+            }
+
+            sql.append(" ").append(etlCondition);
+        }
+    }
+
+    /**
+     * 执行导入
+     */
+    private static boolean executeSqlImport(List<String> params, DataSource srcDS, DataSource targetDS, String sql,
+                                            DbMapping dbMapping, AtomicLong successCount, List<String> errMsg) {
+        try {
+            Util.sqlRS(srcDS, sql, rs -> {
+                int idx = 1;
+
+                try {
+                    boolean completed = false;
+
+                    Map<String, Integer> columnType = new LinkedHashMap<>();
+                    ResultSetMetaData rsd = rs.getMetaData();
+                    int columnCount = rsd.getColumnCount();
+                    for (int i = 1; i <= columnCount; i++) {
+                        columnType.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
+                    }
+
+                    Map<String, String> columnsMap;
+                    if (dbMapping.isMapAll()) {
+                        columnsMap = dbMapping.getAllColumns();
+                    } else {
+                        columnsMap = dbMapping.getTargetColumns();
+                    }
+
+                    StringBuilder insertSql = new StringBuilder();
+                    insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+                    columnsMap
+                        .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+
+                    int len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(") VALUES (");
+                    int mapLen = columnsMap.size();
+                    for (int i = 0; i < mapLen; i++) {
+                        insertSql.append("?,");
+                    }
+                    len = insertSql.length();
+                    insertSql.delete(len - 1, len).append(")");
+                    try (Connection connTarget = targetDS.getConnection();
+                            PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
+                        connTarget.setAutoCommit(false);
+
+                        while (rs.next()) {
+                            pstmt.clearParameters();
+
+                            // 删除数据
+                            Map<String, Object> values = new LinkedHashMap<>();
+                            StringBuilder deleteSql = new StringBuilder(
+                                "DELETE FROM " + dbMapping.getTargetTable() + " WHERE ");
+                            appendCondition(dbMapping, deleteSql, values, rs);
+                            try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
+                                int k = 1;
+                                for (Object val : values.values()) {
+                                    pstmt2.setObject(k++, val);
+                                }
+                                pstmt2.execute();
+                            }
+
+                            int i = 1;
+                            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                                String targetClolumnName = entry.getKey();
+                                String srcColumnName = entry.getValue();
+                                if (srcColumnName == null) {
+                                    srcColumnName = targetClolumnName;
+                                }
+
+                                Integer type = columnType.get(targetClolumnName.toLowerCase());
+
+                                Object value = rs.getObject(srcColumnName);
+                                if (value != null) {
+                                    RdbSyncService.setPStmt(type, pstmt, value, i);
+                                } else {
+                                    pstmt.setNull(i, type);
+                                }
+
+                                i++;
+                            }
+
+                            pstmt.execute();
+                            if (logger.isTraceEnabled()) {
+                                logger.trace("Insert into target table, sql: {}", insertSql);
+                            }
+
+                            if (idx % dbMapping.getCommitBatch() == 0) {
+                                connTarget.commit();
+                                completed = true;
+                            }
+                            idx++;
+                            successCount.incrementAndGet();
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("successful import count:" + successCount.get());
+                            }
+                        }
+                        if (!completed) {
+                            connTarget.commit();
+                        }
+                    }
+
+                } catch (Exception e) {
+                    logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
+                }
+                return idx;
+            });
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+    }
+
+    /**
+     * 拼接目标表主键where条件
+     */
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
+                                        ResultSet rs) throws SQLException {
+        // 拼接主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = targetColumnName;
+            }
+            sql.append(targetColumnName).append("=? AND ");
+            values.put(targetColumnName, rs.getObject(srcColumnName));
+        }
+        int len = sql.length();
+        sql.delete(len - 4, len);
+    }
+}

+ 198 - 150
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -14,11 +14,13 @@ import java.util.function.Consumer;
 
 import javax.sql.DataSource;
 
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
@@ -32,12 +34,11 @@ import com.alibaba.otter.canal.client.adapter.support.Dml;
  */
 public class RdbSyncService {
 
-    private static final Logger                            logger             = LoggerFactory
-        .getLogger(RdbSyncService.class);
+    private static final Logger                     logger             = LoggerFactory.getLogger(RdbSyncService.class);
 
-    private static final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
+    private final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
 
-    private DataSource                                     dataSource;
+    private DataSource                              dataSource;
 
     public RdbSyncService(DataSource dataSource){
         this.dataSource = dataSource;
@@ -46,62 +47,16 @@ public class RdbSyncService {
     public void sync(MappingConfig config, Dml dml) {
         try {
             if (config != null) {
-                {
-                    DbMapping dbMapping = config.getDbMapping();
-                    // 从源表加载所有字段名
-                    if (dbMapping.getAllColumns() == null) {
-                        synchronized (RdbSyncService.class) {
-                            if (dbMapping.getAllColumns() == null) {
-                                DataSource srcDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-                                Connection srcConn = srcDS.getConnection();
-                                String srcMetaSql = "SELECT * FROM " + dbMapping.getDatabase() + "."
-                                                    + dbMapping.getTable() + " WHERE 1=2 ";
-                                List<String> srcColumns = new ArrayList<>();
-                                sqlRS(srcConn, srcMetaSql, rs -> {
-                                    try {
-                                        ResultSetMetaData rmd = rs.getMetaData();
-                                        int cnt = rmd.getColumnCount();
-                                        for (int i = 1; i <= cnt; i++) {
-                                            srcColumns.add(rmd.getColumnName(i).toLowerCase());
-                                        }
-                                    } catch (SQLException e) {
-                                        logger.error(e.getMessage(), e);
-                                    }
-                                });
-                                Map<String, String> columnsMap = new LinkedHashMap<>();
-
-                                for (String srcColumn : srcColumns) {
-                                    String targetColumn = srcColumn;
-                                    if (dbMapping.getTargetColumns() != null) {
-                                        for (Map.Entry<String, String> entry : dbMapping.getTargetColumns()
-                                            .entrySet()) {
-                                            String targetColumnName = entry.getKey();
-                                            String srcColumnName = entry.getValue();
-
-                                            if (srcColumnName != null
-                                                && srcColumnName.toLowerCase().equals(srcColumn.toUpperCase())) {
-                                                targetColumn = targetColumnName;
-                                            }
-                                        }
-                                    }
-                                    columnsMap.put(targetColumn, srcColumn);
-                                }
-                                dbMapping.setAllColumns(columnsMap);
-                            }
-                        }
-                    }
-                }
-
                 String type = dml.getType();
                 if (type != null && type.equalsIgnoreCase("INSERT")) {
                     insert(config, dml);
                 } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                    // update(config, dml);
+                    update(config, dml);
                 } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                    // delete(config, dml);
+                    delete(config, dml);
                 }
                 if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml));
+                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                 }
             }
         } catch (Exception e) {
@@ -109,6 +64,102 @@ public class RdbSyncService {
         }
     }
 
+    /**
+     * 插入操作
+     *
+     * @param config 配置项
+     * @param dml DML数据
+     */
+    private void insert(MappingConfig config, Dml dml) throws SQLException {
+        List<Map<String, Object>> data = dml.getData();
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+
+        DbMapping dbMapping = config.getDbMapping();
+
+        int idx = 1;
+        boolean completed = false;
+
+        Connection conn = dataSource.getConnection();
+        conn.setAutoCommit(false);
+        try {
+            Map<String, String> columnsMap;
+            if (dbMapping.isMapAll()) {
+                columnsMap = dbMapping.getAllColumns();
+            } else {
+                columnsMap = dbMapping.getTargetColumns();
+            }
+
+            StringBuilder insertSql = new StringBuilder();
+            insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+
+            columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+            int len = insertSql.length();
+            insertSql.delete(len - 1, len).append(") VALUES (");
+            int mapLen = columnsMap.size();
+            for (int i = 0; i < mapLen; i++) {
+                insertSql.append("?,");
+            }
+            len = insertSql.length();
+            insertSql.delete(len - 1, len).append(")");
+
+            Map<String, Integer> ctype = getTargetColumnType(conn, config);
+
+            PreparedStatement pstmt = conn.prepareStatement(insertSql.toString());
+
+            for (Map<String, Object> d : data) {
+                pstmt.clearParameters();
+                int i = 1;
+                for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                    String targetClolumnName = entry.getKey();
+                    String srcColumnName = entry.getValue();
+                    if (srcColumnName == null) {
+                        srcColumnName = targetClolumnName;
+                    }
+
+                    Integer type = ctype.get(targetClolumnName.toLowerCase());
+
+                    Object value = d.get(srcColumnName);
+                    if (value != null) {
+                        if (type == null) {
+                            throw new RuntimeException("No column: " + targetClolumnName + " found in target db");
+                        }
+
+                        setPStmt(type, pstmt, value, i);
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                    i++;
+                }
+
+                pstmt.execute();
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Insert into target table, sql: {}", insertSql);
+                }
+
+                if (idx % config.getDbMapping().getCommitBatch() == 0) {
+                    conn.commit();
+                    completed = true;
+                }
+                idx++;
+            }
+            if (!completed) {
+                conn.commit();
+            }
+        } catch (Exception e) {
+            conn.rollback();
+        } finally {
+            conn.close();
+        }
+    }
+
+    /**
+     * 更新操作
+     * 
+     * @param config 配置项
+     * @param dml DML数据
+     */
     private void update(MappingConfig config, Dml dml) throws SQLException {
         List<Map<String, Object>> data = dml.getData();
         if (data == null || data.isEmpty()) {
@@ -123,10 +174,9 @@ public class RdbSyncService {
         DbMapping dbMapping = config.getDbMapping();
 
         int idx = 1;
-        boolean complete = false;
+        boolean completed = false;
 
         Connection conn = dataSource.getConnection();
-        boolean oriAutoCommit = conn.getAutoCommit();
         conn.setAutoCommit(false);
 
         try {
@@ -137,13 +187,13 @@ public class RdbSyncService {
                 columnsMap = dbMapping.getTargetColumns();
             }
 
-            int i = 0;
+            Map<String, Integer> ctype = getTargetColumnType(conn, config);
+
             for (Map<String, Object> o : old) {
-                Map<String, Object> d = data.get(i);
+                Map<String, Object> d = data.get(idx - 1);
                 StringBuilder updateSql = new StringBuilder();
                 updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
-                List<Object> values = new ArrayList<>();
-                boolean flag = false;
+                Map<String, Object> values = new LinkedHashMap<>();
                 for (String srcColumnName : o.keySet()) {
                     List<String> targetColumnNames = new ArrayList<>();
                     columnsMap.forEach((targetColumn, srcColumn) -> {
@@ -152,36 +202,48 @@ public class RdbSyncService {
                         }
                     });
                     if (!targetColumnNames.isEmpty()) {
-                        if (!flag) {
-                            flag = true;
-                        }
+
                         for (String targetColumnName : targetColumnNames) {
                             updateSql.append(targetColumnName).append("=?, ");
-                            values.add(data.get(i));
+                            values.put(targetColumnName, d.get(srcColumnName));
                         }
                     }
                 }
-                if (flag) {
-                    int len = updateSql.length();
-                    updateSql.delete(len - 2, len).append(" WHERE ");
+                int len = updateSql.length();
+                updateSql.delete(len - 2, len).append(" WHERE ");
+
+                // 拼接主键
+                appendCondition(dbMapping, updateSql, values, d);
+
+                sqlExe(conn, updateSql.toString(), ctype, values);
+
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Update target table, sql: {}", updateSql);
                 }
-                i++;
+                if (idx % config.getDbMapping().getCommitBatch() == 0) {
+                    conn.commit();
+                    completed = true;
+                }
+                idx++;
+            }
+            if (!completed) {
+                conn.commit();
             }
         } catch (Exception e) {
             conn.rollback();
         } finally {
-            conn.setAutoCommit(oriAutoCommit);
             conn.close();
         }
     }
 
     /**
-     * 插入操作
-     *
-     * @param config 配置项
-     * @param dml DML数据
+     * 删除操作
+     * 
+     * @param config
+     * @param dml
+     * @throws SQLException
      */
-    private void insert(MappingConfig config, Dml dml) throws SQLException {
+    private void delete(MappingConfig config, Dml dml) throws SQLException {
         List<Map<String, Object>> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -193,40 +255,24 @@ public class RdbSyncService {
         boolean completed = false;
 
         Connection conn = dataSource.getConnection();
-        boolean oriAutoCommit = conn.getAutoCommit();
         conn.setAutoCommit(false);
-        try {
-            Map<String, String> columnsMap;
-            if (dbMapping.isMapAll()) {
-                columnsMap = dbMapping.getAllColumns();
-            } else {
-                columnsMap = dbMapping.getTargetColumns();
-            }
 
-            StringBuilder insertSql = new StringBuilder();
-            insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+        try {
+            Map<String, Integer> ctype = getTargetColumnType(conn, config);
 
-            columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
-            int len = insertSql.length();
-            insertSql.delete(len - 1, len).append(") VALUES (");
-            int mapLen = columnsMap.size();
-            for (int i = 0; i < mapLen; i++) {
-                insertSql.append("?,");
-            }
-            len = insertSql.length();
-            insertSql.delete(len - 1, len).append(")");
+            for (Map<String, Object> d : data) {
+                StringBuilder sql = new StringBuilder();
+                sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
 
-            PreparedStatement pstmt = conn.prepareStatement(insertSql.toString());
+                Map<String, Object> values = new LinkedHashMap<>();
+                // 拼接主键
+                appendCondition(dbMapping, sql, values, d);
 
-            for (Map<String, Object> r : data) {
-                pstmt.clearParameters();
-                convertData2DbRow(conn, config, r, pstmt);
+                sqlExe(conn, sql.toString(), ctype, values);
 
-                pstmt.execute();
                 if (logger.isTraceEnabled()) {
-                    logger.trace("Insert into target db, sql: {}", insertSql);
+                    logger.trace("Delete from target table, sql: {}", sql);
                 }
-
                 if (idx % config.getDbMapping().getCommitBatch() == 0) {
                     conn.commit();
                     completed = true;
@@ -239,19 +285,19 @@ public class RdbSyncService {
         } catch (Exception e) {
             conn.rollback();
         } finally {
-            conn.setAutoCommit(oriAutoCommit);
             conn.close();
         }
     }
 
-    private static void sqlRS(Connection conn, String sql, Consumer<ResultSet> consumer) {
-        try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
-            consumer.accept(rs);
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
 
+
+    /**
+     * 获取目标字段类型
+     * 
+     * @param conn sql connection
+     * @param config 映射配置
+     * @return 字段sqlType
+     */
     private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
         DbMapping dbMapping = config.getDbMapping();
         String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
@@ -263,7 +309,7 @@ public class RdbSyncService {
                     columnType = new LinkedHashMap<>();
                     final Map<String, Integer> columnTypeTmp = columnType;
                     String sql = "SELECT * FROM " + dbMapping.getTargetTable() + " WHERE 1=2";
-                    sqlRS(conn, sql, rs -> {
+                   Util.sqlRS(conn, sql, rs -> {
                         try {
                             ResultSetMetaData rsd = rs.getMetaData();
                             int columnCount = rsd.getColumnCount();
@@ -282,49 +328,14 @@ public class RdbSyncService {
     }
 
     /**
-     * 新增类型转换
-     *
-     * @param config
-     * @param data
-     * @param pstmt
-     * @throws SQLException
+     * 设置 preparedStatement
+     * 
+     * @param type sqlType
+     * @param pstmt 需要设置的preparedStatement
+     * @param value 值
+     * @param i 索引号
      */
-    private void convertData2DbRow(Connection conn, MappingConfig config, Map<String, Object> data,
-                                   PreparedStatement pstmt) throws SQLException {
-        DbMapping dbMapping = config.getDbMapping();
-        Map<String, String> columnsMap;
-        if (dbMapping.isMapAll()) {
-            columnsMap = dbMapping.getAllColumns();
-        } else {
-            columnsMap = dbMapping.getTargetColumns();
-        }
-        Map<String, Integer> ctype = getTargetColumnType(conn, config);
-
-        int i = 1;
-        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-            String targetClassName = entry.getKey();
-            String srcColumnName = entry.getValue();
-            if (srcColumnName == null) {
-                srcColumnName = targetClassName;
-            }
-
-            Integer type = ctype.get(targetClassName.toLowerCase());
-
-            Object value = data.get(srcColumnName);
-            if (value != null) {
-                if (type == null) {
-                    throw new RuntimeException("No column: " + targetClassName + " found in target db");
-                }
-
-                setPStmt(type, pstmt, value, i);
-            } else {
-                pstmt.setNull(i, type);
-            }
-            i++;
-        }
-    }
-
-    private void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
+    public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
         switch (type) {
             case Types.BIT:
             case Types.BOOLEAN:
@@ -476,4 +487,41 @@ public class RdbSyncService {
                 pstmt.setObject(i, value, type);
         }
     }
+
+    /**
+     * 拼接主键 where条件
+     */
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
+                                        Map<String, Object> d) {
+        // 拼接主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = targetColumnName;
+            }
+            sql.append(targetColumnName).append("=? AND ");
+            values.put(targetColumnName, d.get(srcColumnName));
+        }
+        int len = sql.length();
+        sql.delete(len - 4, len);
+    }
+
+    /**
+     * 执行sql
+     */
+    private static void sqlExe(Connection conn, String sql, Map<String, Integer> ctype, Map<String, Object> values) {
+        try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
+            int i = 1;
+            for (Map.Entry<String, Object> entry : values.entrySet()) {
+                String targetColumnName = entry.getKey();
+                Object value = entry.getValue();
+                Integer type = ctype.get(targetColumnName.toLowerCase());
+                setPStmt(type, pstmt, value, i++);
+            }
+            pstmt.execute();
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
 }

+ 4 - 1
client-adapter/rdb/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -1 +1,4 @@
-rdb=com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter
+mysql=com.alibaba.otter.canal.client.adapter.rdb.adapters.MysqlAdapter
+postgresql=com.alibaba.otter.canal.client.adapter.rdb.adapters.PostgresqlAdapter
+oracle=com.alibaba.otter.canal.client.adapter.rdb.adapters.OracleAdapter
+sqlserver=com.alibaba.otter.canal.client.adapter.rdb.adapters.SqlserverAdapter

+ 2 - 1
client-adapter/rdb/src/test/resources/rdb/mytest_user.yml → client-adapter/rdb/src/main/resources/oracle/mytest_user.yml

@@ -4,7 +4,8 @@ dbMapping:
   database: mytest
   table: user
   targetTable: mytest.tb_user
-  commitBatch: 3000
+  targetPk:
+    id: id
   mapAll: true
 #  targetColumns:
 #    id:

+ 15 - 0
client-adapter/rdb/src/main/resources/postgresql/mytest_user.yml

@@ -0,0 +1,15 @@
+dataSourceKey: defaultDS
+destination: example
+dbMapping:
+  database: mytest
+  table: user
+  targetTable: public.tb_user
+  targetPk:
+    id: id
+  mapAll: true
+#  targetColumns:
+#    id:
+#    name:
+#    role_id:
+#    c_time:
+#    test1:

+ 2 - 2
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/ConfigLoadTest.java

@@ -15,14 +15,14 @@ public class ConfigLoadTest {
 
     @Before
     public void before() {
-        AdapterConfigs.put("rdb", "mytest_user.yml");
+        AdapterConfigs.put("oracle", "mytest_user.yml");
         // 加载数据源连接池
         DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
     }
 
     @Test
     public void testLoad() {
-        Map<String, MappingConfig> configMap =  MappingConfigLoader.load();
+        Map<String, MappingConfig> configMap =  MappingConfigLoader.load("oracle");
 
         Assert.assertFalse(configMap.isEmpty());
     }

+ 3 - 2
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
 
 import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
+import com.alibaba.otter.canal.client.adapter.rdb.adapters.OracleAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.test.TestConstant;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
@@ -13,7 +14,7 @@ public class Common {
         DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
 
         OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
-        outerAdapterConfig.setName("rdb");
+        outerAdapterConfig.setName("oracle");
         Map<String, String> properties = new HashMap<>();
         properties.put("jdbc.driveClassName", "oracle.jdbc.OracleDriver");
         properties.put("jdbc.url", "jdbc:oracle:thin:@127.0.0.1:49161:XE");
@@ -21,7 +22,7 @@ public class Common {
         properties.put("jdbc.password", "m121212");
         outerAdapterConfig.setProperties(properties);
 
-        RdbAdapter adapter = new RdbAdapter();
+        RdbAdapter adapter = new OracleAdapter();
         adapter.init(outerAdapterConfig);
         return adapter;
     }

+ 23 - 4
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java

@@ -1,8 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
 
-import java.io.BufferedReader;
-import java.io.Reader;
-import java.sql.Clob;
 import java.util.*;
 
 import org.junit.Before;
@@ -18,7 +15,7 @@ public class OracleSyncTest {
 
     @Before
     public void init() {
-        AdapterConfigs.put("rdb", "mytest_user.yml");
+        AdapterConfigs.put("oracle", "mytest_user.yml");
         rdbAdapter = Common.init();
     }
 
@@ -43,5 +40,27 @@ public class OracleSyncTest {
         rdbAdapter.sync(dml);
     }
 
+    @Test
+    public void test02() {
+        Dml dml = new Dml();
+        dml.setDestination("example");
+        dml.setTs(new Date().getTime());
+        dml.setType("UPDATE");
+        dml.setDatabase("mytest");
+        dml.setTable("user");
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        Map<String, Object> data = new LinkedHashMap<>();
+        dataList.add(data);
+        data.put("id", 1L);
+        data.put("name", "Eric2");
+        dml.setData(dataList);
+        List<Map<String, Object>> oldList = new ArrayList<>();
+        Map<String, Object> old = new LinkedHashMap<>();
+        oldList.add(old);
+        old.put("name", "Eric");
+        dml.setOld(oldList);
+
+        rdbAdapter.sync(dml);
+    }
 
 }

+ 9 - 6
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml → client-adapter/rdb/src/test/resources/oracle/mytest_user.yml

@@ -4,9 +4,12 @@ dbMapping:
   database: mytest
   table: user
   targetTable: mytest.tb_user
-  commitBatch: 3000
-  mapAll: false
-  targetColumns:
-    id:
-    name:
-    role_id:
+  targetPk:
+    id: id
+  mapAll: true
+#  targetColumns:
+#    id:
+#    name:
+#    role_id:
+#    c_time:
+#    test1: