소스 검색

Merge pull request #1147 from rewerma/master

rdb 批量并行提交
agapple 6 년 전
부모
커밋
5bc0c3f06f

+ 71 - 0
client-adapter/README.md

@@ -251,3 +251,74 @@ bin/startup.sh
 ```
 #### 验证
 修改mysql mytest.person表的数据, 将会自动同步到HBase的MYTEST.PERSON表下面, 并会打出DML的log
+
+
+## 四、关系型数据库适配器
+
+RDB adapter 用于适配mysql到任意关系型数据库(需支持jdbc)的数据同步及导入
+
+### 4.1 修改启动器配置: application.yml, 这里以oracle目标库为例
+```
+server:
+  port: 8081
+logging:
+  level:
+    com.alibaba.otter.canal.client.adapter.rdb: DEBUG
+......
+
+canal.conf:
+  canalServerHost: 127.0.0.1:11111
+  srcDataSources:
+    defaultDS:
+      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+      username: root
+      password: 121212
+  canalInstances:
+  - instance: example
+    groups:
+    - outAdapters:
+      - name: rdb                                               # 指定为rdb类型同步
+        key: oracle1                                            # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应
+        properties:
+          jdbc.driverClassName: oracle.jdbc.OracleDriver        # jdbc驱动名, jdbc的jar包需要自行放致lib目录下
+          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE        # jdbc url
+          jdbc.username: mytest                                 # jdbc username
+          jdbc.password: m121212                                # jdbc password
+          threads: 5                                            # 并行执行的线程数, 默认为1
+          commitSize: 3000                                      # 批次提交的最大行数
+```
+其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数
+adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件
+
+### 4.2 适配器表映射文件
+修改 conf/rdb/mytest_user.yml文件:
+```
+dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
+destination: example            # cannal的instance或者MQ的topic
+outerAdapterKey: oracle1        # adapter key, 对应上面配置outAdapters中的key
+concurrent: true                # 是否按主键hase并行同步, 并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
+dbMapping:
+  database: mytest              # 源数据源的database/shcema
+  table: user                   # 源数据源表名
+  targetTable: mytest.tb_user   # 目标数据源的库名.表名
+  targetPk:                     # 主键映射
+    id: id                      # 如果是复合主键可以换行映射多个
+#  mapAll: true                 # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)
+  targetColumns:                # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
+    id:
+    name:
+    role_id:
+    c_time:
+    test1: 
+```
+导入的类型以目标表的元类型为准, 将自动转换
+
+### 4.3 启动RDB数据同步
+#### 将目标库的jdbc jar包放入lib文件夹, 这里放入ojdbc6.jar
+
+#### 启动canal-adapter启动器
+```
+bin/startup.sh
+```
+#### 验证
+修改mysql mytest.user表的数据, 将会自动同步到Oracle的MYTEST.TB_USER表下面, 并会打出DML的log

+ 9 - 5
client-adapter/launcher/src/main/resources/application.yml

@@ -23,11 +23,11 @@ canal.conf:
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
 #      username: root
 #      password: 121212
-#  canalInstances:
-#  - instance: example
-#    groups:
-#    - outAdapters:
-#      - name: logger
+  canalInstances:
+  - instance: example
+    groups:
+    - outAdapters:
+      - name: logger
 #      - name: rdb
 #        key: oracle1
 #        properties:
@@ -35,6 +35,8 @@ canal.conf:
 #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
 #          jdbc.username: mytest
 #          jdbc.password: m121212
+#          threads: 5
+#          commitSize: 3000
 #      - name: rdb
 #        key: postgres1
 #        properties:
@@ -42,6 +44,8 @@ canal.conf:
 #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
 #          jdbc.username: postgres
 #          jdbc.password: 121212
+#          threads: 1
+#          commitSize: 3000
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: 127.0.0.1

+ 0 - 67
client-adapter/rdb/README.md

@@ -1,67 +0,0 @@
-## RDB适配器
-RDB adapter 用于适配mysql到任意关系型数据库(需支持jdbc)的数据同步及导入
-### 1.1 修改启动器配置: application.yml, 这里以oracle目标库为例
-```
-server:
-  port: 8081
-logging:
-  level:
-    com.alibaba.otter.canal.client.adapter.hbase: DEBUG
-spring:
-  jackson:
-    date-format: yyyy-MM-dd HH:mm:ss
-    time-zone: GMT+8
-    default-property-inclusion: non_null
-
-canal.conf:
-  canalServerHost: 127.0.0.1:11111
-  srcDataSources:
-    defaultDS:
-      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
-      username: root
-      password: 121212
-  canalInstances:
-  - instance: example
-    groups:
-    - outAdapters:
-      - name: rdb
-        key: oracle1
-        properties:
-          jdbc.driverClassName: oracle.jdbc.OracleDriver
-          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
-          jdbc.username: mytest
-          jdbc.password: m121212
-```
-其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应!! properties为目标库jdb的相关参数
-adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件
-### 1.2 适配器表映射文件
-修改 conf/rdb/mytest_user.yml文件:
-```
-dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
-destination: example            # cannal的instance或者MQ的topic
-outerAdapterKey: oracle1        # adapter key, 对应上面配置outAdapters中的key
-dbMapping:
-  database: mytest              # 源数据源的database/shcema
-  table: user                   # 源数据源表名
-  targetTable: mytest.tb_user   # 目标数据源的库名.表名
-  targetPk:                     # 主键映射
-    id: id                      # 如果是复合主键可以换行映射多个
-  mapAll: true                  # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)
-#  targetColumns:               # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
-#    id:
-#    name:
-#    role_id:
-#    c_time:
-#    test1: 
-```
-导入的类型以目标表的元类型为准, 将自动转换
-
-### 1.3 启动RDB数据同步
-#### 将目标库的jdbc jar包放入lib文件夹, 这里放入ojdbc6.jar
-
-#### 启动canal-adapter启动器
-```
-bin/startup.sh
-```
-#### 验证
-修改mysql mytest.user表的数据, 将会自动同步到Oracle的MYTEST.TB_USER表下面, 并会打出DML的log

+ 10 - 6
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -2,10 +2,7 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import javax.sql.DataSource;
 
@@ -59,7 +56,7 @@ public class RdbAdapter implements OuterAdapter {
         dataSource.setPassword(properties.get("jdbc.password"));
         dataSource.setInitialSize(1);
         dataSource.setMinIdle(1);
-        dataSource.setMaxActive(2);
+        dataSource.setMaxActive(20);
         dataSource.setMaxWait(60000);
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
@@ -70,7 +67,11 @@ public class RdbAdapter implements OuterAdapter {
             logger.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
         }
 
-        rdbSyncService = new RdbSyncService(dataSource);
+        String threads = properties.get("threads");
+        String commitSize = properties.get("commitSize");
+        rdbSyncService = new RdbSyncService(commitSize != null ? Integer.valueOf(commitSize) : null,
+            threads != null ? Integer.valueOf(threads) : null,
+            dataSource);
     }
 
     @Override
@@ -177,6 +178,9 @@ public class RdbAdapter implements OuterAdapter {
 
     @Override
     public void destroy() {
+        if (rdbSyncService != null) {
+            rdbSyncService.close();
+        }
         if (dataSource != null) {
             dataSource.close();
         }

+ 28 - 18
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -18,6 +18,8 @@ public class MappingConfig {
 
     private String    outerAdapterKey; // 对应适配器的key
 
+    private Boolean   concurrent;      // 是否并行同步
+
     private DbMapping dbMapping;       // db映射配置
 
     public String getDataSourceKey() {
@@ -36,6 +38,14 @@ public class MappingConfig {
         this.outerAdapterKey = outerAdapterKey;
     }
 
+    public Boolean getConcurrent() {
+        return concurrent == null ? false : concurrent;
+    }
+
+    public void setConcurrent(Boolean concurrent) {
+        this.concurrent = concurrent;
+    }
+
     public DbMapping getDbMapping() {
         return dbMapping;
     }
@@ -66,20 +76,20 @@ public class MappingConfig {
 
     public static class DbMapping {
 
-        private String                       database;                            // 数据库名或schema名
-        private String                       table;                               // 表面名
-        private Map<String, String>          targetPk;                            // 目标表主键字段
-        private boolean                      mapAll      = false;                 // 映射所有字段
-        private String                       targetTable;                         // 目标表名
-        private Map<String, String>          targetColumns;                       // 目标表字段映射
+        private String              database;                            // 数据库名或schema名
+        private String              table;                               // 表面名
+        private Map<String, String> targetPk;                            // 目标表主键字段
+        private boolean             mapAll      = false;                 // 映射所有字段
+        private String              targetTable;                         // 目标表名
+        private Map<String, String> targetColumns;                       // 目标表字段映射
 
-        private String                       etlCondition;                        // etl条件sql
+        private String              etlCondition;                        // etl条件sql
 
-        private Set<String>                  families    = new LinkedHashSet<>(); // column family列表
-        private int                          readBatch   = 5000;
-        private int                          commitBatch = 5000;                  // etl等批量提交大小
+        private Set<String>         families    = new LinkedHashSet<>(); // column family列表
+        private int                 readBatch   = 5000;
+        private int                 commitBatch = 5000;                  // etl等批量提交大小
 
-//        private volatile Map<String, String> allColumns;                          // mapAll为true,自动设置改字段
+        // private volatile Map<String, String> allColumns; // mapAll为true,自动设置改字段
 
         public String getDatabase() {
             return database;
@@ -161,12 +171,12 @@ public class MappingConfig {
             this.commitBatch = commitBatch;
         }
 
-//        public Map<String, String> getAllColumns() {
-//            return allColumns;
-//        }
-//
-//        public void setAllColumns(Map<String, String> allColumns) {
-//            this.allColumns = allColumns;
-//        }
+        // public Map<String, String> getAllColumns() {
+        // return allColumns;
+        // }
+        //
+        // public void setAllColumns(Map<String, String> allColumns) {
+        // this.allColumns = allColumns;
+        // }
     }
 }

+ 125 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java

@@ -0,0 +1,125 @@
+package com.alibaba.otter.canal.client.adapter.rdb.service;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchExecutor {
+
+    private static final Logger logger     = LoggerFactory.getLogger(BatchExecutor.class);
+
+    private Integer             key;
+    private Connection          conn;
+    private int                 commitSize = 3000;
+    private AtomicInteger       idx        = new AtomicInteger(0);
+    private ExecutorService     executor   = Executors.newFixedThreadPool(1);
+    private Lock                commitLock = new ReentrantLock();
+    private Condition           condition  = commitLock.newCondition();
+
+    public BatchExecutor(Integer key, Connection conn, Integer commitSize){
+        this.key = key;
+        this.conn = conn;
+        if (commitSize != null) {
+            this.commitSize = commitSize;
+        }
+
+        try {
+            this.conn.setAutoCommit(false);
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public Integer getKey() {
+        return key;
+    }
+
+    public Connection getConn() {
+        return conn;
+    }
+
+    public static void setValue(List<Map<String, ?>> values, int type, Object value) {
+        Map<String, Object> valueItem = new HashMap<>();
+        valueItem.put("type", type);
+        valueItem.put("value", value);
+        values.add(valueItem);
+    }
+
+    public void execute(String sql, List<Map<String, ?>> values) {
+        try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
+            int len = values.size();
+            for (int i = 0; i < len; i++) {
+                int type = (Integer) values.get(i).get("type");
+                Object value = values.get(i).get("value");
+                RdbSyncService.setPStmt(type, pstmt, value, i + 1);
+            }
+
+            pstmt.execute();
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        }
+        int i = idx.incrementAndGet();
+
+        // 批次的第一次执行设置延时
+        if (i == 1) {
+            executor.submit(() -> {
+                try {
+                    commitLock.lock();
+                    conn.commit(); //直接提交一次
+                    if (!condition.await(5, TimeUnit.SECONDS)) {
+                        // 超时提交
+                        commit();
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                } finally {
+                    commitLock.unlock();
+                }
+            });
+        }
+
+        if (i == commitSize) {
+            commit();
+        }
+    }
+
+    private void commit() {
+        try {
+            commitLock.lock();
+            conn.commit();
+            if (logger.isTraceEnabled()) {
+                logger.trace("Batch executor: " + key + " commit " + idx.get() + " rows");
+            }
+            condition.signal();
+            idx.set(0);
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            commitLock.unlock();
+        }
+    }
+
+    public void close() {
+        if (conn != null) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+        executor.shutdown();
+    }
+}

+ 228 - 161
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -10,6 +10,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import javax.sql.DataSource;
 
@@ -37,10 +40,33 @@ public class RdbSyncService {
 
     private final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
 
-    private DataSource                              dataSource;
+    private BatchExecutor[]                         batchExecutors;
 
-    public RdbSyncService(DataSource dataSource){
-        this.dataSource = dataSource;
+    private int                                     threads            = 1;
+
+    private ExecutorService[]                       threadExecutors;
+
+    public RdbSyncService(Integer commitSize, Integer threads, DataSource dataSource){
+        try {
+            if (threads != null && threads > 1 && threads <= 10) {
+                this.threads = threads;
+            }
+            batchExecutors = new BatchExecutor[this.threads];
+            for (int i = 0; i < this.threads; i++) {
+                Connection conn = dataSource.getConnection();
+                conn.setAutoCommit(false);
+                this.batchExecutors[i] = new BatchExecutor(i, conn, commitSize);
+            }
+            threadExecutors = new ExecutorService[this.threads];
+            for (int i = 0; i < this.threads; i++) {
+                threadExecutors[i] = Executors.newFixedThreadPool(1);
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    public void sync(MappingConfig config, List<Dml> dmlList) {
     }
 
     public void sync(MappingConfig config, Dml dml) {
@@ -69,7 +95,7 @@ public class RdbSyncService {
      * @param config 配置项
      * @param dml DML数据
      */
-    private void insert(MappingConfig config, Dml dml) throws SQLException {
+    private void insert(MappingConfig config, Dml dml) {
         List<Map<String, Object>> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -77,75 +103,63 @@ public class RdbSyncService {
 
         DbMapping dbMapping = config.getDbMapping();
 
-        int idx = 1;
-        boolean completed = false;
-
-        Connection conn = dataSource.getConnection();
-        conn.setAutoCommit(false);
-        try {
-            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
-
-            StringBuilder insertSql = new StringBuilder();
-            insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
-
-            columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
-            int len = insertSql.length();
-            insertSql.delete(len - 1, len).append(") VALUES (");
-            int mapLen = columnsMap.size();
-            for (int i = 0; i < mapLen; i++) {
-                insertSql.append("?,");
-            }
-            len = insertSql.length();
-            insertSql.delete(len - 1, len).append(")");
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
 
-            Map<String, Integer> ctype = getTargetColumnType(conn, config);
+        StringBuilder insertSql = new StringBuilder();
+        insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
 
-            PreparedStatement pstmt = conn.prepareStatement(insertSql.toString());
+        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+        int len = insertSql.length();
+        insertSql.delete(len - 1, len).append(") VALUES (");
+        int mapLen = columnsMap.size();
+        for (int i = 0; i < mapLen; i++) {
+            insertSql.append("?,");
+        }
+        len = insertSql.length();
+        insertSql.delete(len - 1, len).append(")");
 
-            for (Map<String, Object> d : data) {
-                pstmt.clearParameters();
-                int i = 1;
-                for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                    String targetClolumnName = entry.getKey();
-                    String srcColumnName = entry.getValue();
-                    if (srcColumnName == null) {
-                        srcColumnName = targetClolumnName;
-                    }
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
 
-                    Integer type = ctype.get(targetClolumnName.toLowerCase());
+        String sql = insertSql.toString();
+        Integer tbHash = null;
+        // 如果不是并行同步的表
+        if (!config.getConcurrent()) {
+            // 按表名hash到一个线程
+            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
+        }
+        for (Map<String, Object> d : data) {
+            int hash;
+            if (tbHash != null) {
+                hash = tbHash;
+            } else {
+                hash = pkHash(dbMapping, d, threads);
+            }
+            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
+            checkQueue(tpe);
+            tpe.submit(() -> {
+                try {
+                    BatchExecutor batchExecutor = batchExecutors[hash];
+                    List<Map<String, ?>> values = new ArrayList<>();
+
+                    for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                        String targetColumnName = entry.getKey();
+                        String srcColumnName = entry.getValue();
+                        if (srcColumnName == null) {
+                            srcColumnName = targetColumnName;
+                        }
 
-                    Object value = d.get(srcColumnName);
-                    if (value != null) {
+                        Integer type = ctype.get(targetColumnName.toLowerCase());
                         if (type == null) {
-                            throw new RuntimeException("No column: " + targetClolumnName + " found in target db");
+                            throw new RuntimeException("No column: " + targetColumnName + " found in target db");
                         }
-
-                        setPStmt(type, pstmt, value, i);
-                    } else {
-                        pstmt.setNull(i, type);
+                        Object value = d.get(srcColumnName);
+                        BatchExecutor.setValue(values, type, value);
                     }
-                    i++;
-                }
-
-                pstmt.execute();
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Insert into target table, sql: {}", insertSql);
+                    batchExecutor.execute(sql, values);
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                 }
-
-                if (idx % config.getDbMapping().getCommitBatch() == 0) {
-                    conn.commit();
-                    completed = true;
-                }
-                idx++;
-            }
-            if (!completed) {
-                conn.commit();
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            conn.rollback();
-        } finally {
-            conn.close();
+            });
         }
     }
 
@@ -155,7 +169,7 @@ public class RdbSyncService {
      * @param config 配置项
      * @param dml DML数据
      */
-    private void update(MappingConfig config, Dml dml) throws SQLException {
+    private void update(MappingConfig config, Dml dml) {
         List<Map<String, Object>> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -169,61 +183,67 @@ public class RdbSyncService {
         DbMapping dbMapping = config.getDbMapping();
 
         int idx = 1;
-        boolean completed = false;
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
 
-        Connection conn = dataSource.getConnection();
-        conn.setAutoCommit(false);
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
 
-        try {
-            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
-
-            Map<String, Integer> ctype = getTargetColumnType(conn, config);
-
-            for (Map<String, Object> o : old) {
-                Map<String, Object> d = data.get(idx - 1);
-                StringBuilder updateSql = new StringBuilder();
-                updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
-                Map<String, Object> values = new LinkedHashMap<>();
-                for (String srcColumnName : o.keySet()) {
-                    List<String> targetColumnNames = new ArrayList<>();
-                    columnsMap.forEach((targetColumn, srcColumn) -> {
-                        if (srcColumnName.toLowerCase().equals(srcColumn)) {
-                            targetColumnNames.add(targetColumn);
-                        }
-                    });
-                    if (!targetColumnNames.isEmpty()) {
+        Integer tbHash = null;
+        if (!config.getConcurrent()) {
+            // 按表名hash到一个线程
+            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
+        }
+        for (Map<String, Object> o : old) {
+            Map<String, Object> d = data.get(idx - 1);
+
+            int hash;
+            if (tbHash != null) {
+                hash = tbHash;
+            } else {
+                hash = pkHash(dbMapping, d, o, threads);
+            }
 
-                        for (String targetColumnName : targetColumnNames) {
-                            updateSql.append(targetColumnName).append("=?, ");
-                            values.put(targetColumnName, d.get(srcColumnName));
+            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
+            checkQueue(tpe);
+            tpe.submit(() -> {
+                try {
+                    BatchExecutor batchExecutor = batchExecutors[hash];
+
+                    StringBuilder updateSql = new StringBuilder();
+                    updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+
+                    List<Map<String, ?>> values = new ArrayList<>();
+                    for (String srcColumnName : o.keySet()) {
+                        List<String> targetColumnNames = new ArrayList<>();
+                        columnsMap.forEach((targetColumn, srcColumn) -> {
+                            if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                                targetColumnNames.add(targetColumn);
+                            }
+                        });
+                        if (!targetColumnNames.isEmpty()) {
+                            for (String targetColumnName : targetColumnNames) {
+                                updateSql.append(targetColumnName).append("=?, ");
+                                Integer type = ctype.get(targetColumnName.toLowerCase());
+                                BatchExecutor.setValue(values, type, d.get(srcColumnName));
+                            }
                         }
                     }
-                }
-                int len = updateSql.length();
-                updateSql.delete(len - 2, len).append(" WHERE ");
+                    int len = updateSql.length();
+                    updateSql.delete(len - 2, len).append(" WHERE ");
 
-                // 拼接主键
-                appendCondition(dbMapping, updateSql, values, d);
+                    // 拼接主键
+                    appendCondition(dbMapping, updateSql, ctype, values, d, o);
 
-                sqlExe(conn, updateSql.toString(), ctype, values);
+                    batchExecutor.execute(updateSql.toString(), values);
 
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Update target table, sql: {}", updateSql);
-                }
-                if (idx % config.getDbMapping().getCommitBatch() == 0) {
-                    conn.commit();
-                    completed = true;
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Execute sql: {}", updateSql);
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                 }
-                idx++;
-            }
-            if (!completed) {
-                conn.commit();
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            conn.rollback();
-        } finally {
-            conn.close();
+            });
+
+            idx++;
         }
     }
 
@@ -234,7 +254,7 @@ public class RdbSyncService {
      * @param dml
      * @throws SQLException
      */
-    private void delete(MappingConfig config, Dml dml) throws SQLException {
+    private void delete(MappingConfig config, Dml dml) {
         List<Map<String, Object>> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
@@ -242,42 +262,41 @@ public class RdbSyncService {
 
         DbMapping dbMapping = config.getDbMapping();
 
-        int idx = 1;
-        boolean completed = false;
-
-        Connection conn = dataSource.getConnection();
-        conn.setAutoCommit(false);
-
-        try {
-            Map<String, Integer> ctype = getTargetColumnType(conn, config);
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
+        Integer tbHash = null;
+        if (!config.getConcurrent()) {
+            // 按表名hash到一个线程
+            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
+        }
+        for (Map<String, Object> d : data) {
+            int hash;
+            if (tbHash != null) {
+                hash = tbHash;
+            } else {
+                hash = pkHash(dbMapping, d, threads);
+            }
 
-            for (Map<String, Object> d : data) {
-                StringBuilder sql = new StringBuilder();
-                sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
+            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
+            checkQueue(tpe);
+            tpe.submit(() -> {
+                try {
+                    BatchExecutor batchExecutor = batchExecutors[hash];
+                    StringBuilder sql = new StringBuilder();
+                    sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
 
-                Map<String, Object> values = new LinkedHashMap<>();
-                // 拼接主键
-                appendCondition(dbMapping, sql, values, d);
+                    List<Map<String, ?>> values = new ArrayList<>();
 
-                sqlExe(conn, sql.toString(), ctype, values);
+                    // 拼接主键
+                    appendCondition(dbMapping, sql, ctype, values, d);
 
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Delete from target table, sql: {}", sql);
-                }
-                if (idx % config.getDbMapping().getCommitBatch() == 0) {
-                    conn.commit();
-                    completed = true;
+                    batchExecutor.execute(sql.toString(), values);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Execute sql: {}", sql);
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                 }
-                idx++;
-            }
-            if (!completed) {
-                conn.commit();
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            conn.rollback();
-        } finally {
-            conn.close();
+            });
         }
     }
 
@@ -326,6 +345,9 @@ public class RdbSyncService {
      * @param i 索引号
      */
     public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
+        if (value == null) {
+            pstmt.setObject(i, type);
+        }
         switch (type) {
             case Types.BIT:
             case Types.BOOLEAN:
@@ -431,7 +453,7 @@ public class RdbSyncService {
                 break;
             case Types.DATE:
                 if (value instanceof java.util.Date) {
-                    pstmt.setDate(i, new java.sql.Date(((java.util.Date) value).getTime()));
+                    pstmt.setDate(i, new Date(((java.util.Date) value).getTime()));
                 } else if (value instanceof String) {
                     String v = (String) value;
                     if (!v.startsWith("0000-00-00")) {
@@ -447,7 +469,7 @@ public class RdbSyncService {
                 break;
             case Types.TIME:
                 if (value instanceof java.util.Date) {
-                    pstmt.setTime(i, new java.sql.Time(((java.util.Date) value).getTime()));
+                    pstmt.setTime(i, new Time(((java.util.Date) value).getTime()));
                 } else if (value instanceof String) {
                     String v = (String) value;
                     v = "T" + v;
@@ -459,7 +481,7 @@ public class RdbSyncService {
                 break;
             case Types.TIMESTAMP:
                 if (value instanceof java.util.Date) {
-                    pstmt.setTimestamp(i, new java.sql.Timestamp(((java.util.Date) value).getTime()));
+                    pstmt.setTimestamp(i, new Timestamp(((java.util.Date) value).getTime()));
                 } else if (value instanceof String) {
                     String v = (String) value;
                     if (!v.startsWith("0000-00-00")) {
@@ -481,8 +503,13 @@ public class RdbSyncService {
     /**
      * 拼接主键 where条件
      */
-    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Object> values,
-                                        Map<String, Object> d) {
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                        List<Map<String, ?>> values, Map<String, Object> d) {
+        appendCondition(dbMapping, sql, ctype, values, d, null);
+    }
+
+    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                        List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
         // 拼接主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
             String targetColumnName = entry.getKey();
@@ -491,28 +518,68 @@ public class RdbSyncService {
                 srcColumnName = targetColumnName;
             }
             sql.append(targetColumnName).append("=? AND ");
-            values.put(targetColumnName, d.get(srcColumnName));
+            Integer type = ctype.get(targetColumnName.toLowerCase());
+            // 如果有修改主键的情况
+            if (o != null && o.containsKey(srcColumnName)) {
+                BatchExecutor.setValue(values, type, o.get(srcColumnName));
+            } else {
+                BatchExecutor.setValue(values, type, d.get(srcColumnName));
+            }
         }
         int len = sql.length();
         sql.delete(len - 4, len);
     }
 
     /**
-     * 执行sql
+     * 取主键hash
      */
-    private static void sqlExe(Connection conn, String sql, Map<String, Integer> ctype, Map<String, Object> values) {
-        try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
-            int i = 1;
-            for (Map.Entry<String, Object> entry : values.entrySet()) {
-                String targetColumnName = entry.getKey();
-                Object value = entry.getValue();
-                Integer type = ctype.get(targetColumnName.toLowerCase());
-                setPStmt(type, pstmt, value, i++);
+    private static int pkHash(DbMapping dbMapping, Map<String, Object> d, int threads) {
+        return pkHash(dbMapping, d, null, threads);
+    }
+
+    private static int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o, int threads) {
+        int hash = 0;
+        // 取主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = targetColumnName;
             }
-            pstmt.execute();
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+            Object value;
+            if (o != null && o.containsKey(srcColumnName)) {
+                value = o.get(srcColumnName);
+            } else {
+                value = d.get(srcColumnName);
+            }
+            if (value != null) {
+                hash += value.hashCode();
+            }
+        }
+        hash = Math.abs(hash) % threads;
+        return Math.abs(hash);
+    }
+
+    public void close() {
+        for (BatchExecutor batchExecutor : batchExecutors) {
+            batchExecutor.close();
+        }
+        for (ExecutorService executorService : threadExecutors) {
+            executorService.shutdown();
         }
     }
 
+    private void checkQueue(ThreadPoolExecutor tpe) {
+        // 防止队列过大
+        while (tpe.getQueue().size() > 10000) {
+            try {
+                Thread.sleep(3000);
+                while (tpe.getQueue().size() > 5000) {
+                    Thread.sleep(1000);
+                }
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
 }

+ 1 - 0
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -1,6 +1,7 @@
 dataSourceKey: defaultDS
 destination: example
 outerAdapterKey: oracle1
+concurrent: true
 dbMapping:
   database: mytest
   table: user

+ 50 - 23
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java

@@ -2,8 +2,10 @@ package com.alibaba.otter.canal.client.adapter.rdb.test;
 
 import java.io.BufferedReader;
 import java.io.Reader;
-import java.math.BigDecimal;
-import java.sql.*;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 
 import org.junit.Test;
 
@@ -34,30 +36,55 @@ public class DBTest {
         dataSource.init();
 
         Connection conn = dataSource.getConnection();
-        Statement stmt = conn.createStatement();
-        ResultSet rs = stmt.executeQuery("select * from user t where 1=2");
 
-        ResultSetMetaData rsm = rs.getMetaData();
-        int cnt = rsm.getColumnCount();
-        for (int i = 1; i <= cnt; i++) {
-            System.out.println(rsm.getColumnName(i) + " " + rsm.getColumnType(i));
+        conn.setAutoCommit(false);
+        PreparedStatement pstmt = conn
+            .prepareStatement("insert into user (id,name,role_id,c_time,test1,test2) values (?,?,?,?,?,?)");
+
+        java.util.Date now = new java.util.Date();
+        for (int i = 1; i <= 100000; i++) {
+            pstmt.clearParameters();
+            pstmt.setLong(1, (long) i);
+            pstmt.setString(2, "test_" + i);
+            pstmt.setLong(3, (long) i % 4 + 1);
+            pstmt.setDate(4, new java.sql.Date(now.getTime()));
+            pstmt.setString(5, "tttt");
+            pstmt.setBytes(6, null);
+
+            pstmt.execute();
+            if (i % 5000 == 0) {
+                conn.commit();
+            }
         }
+        conn.commit();
+
+        pstmt.close();
+
+        // Statement stmt = conn.createStatement();
+        // ResultSet rs = stmt.executeQuery("select * from user t where 1=2");
+        //
+        // ResultSetMetaData rsm = rs.getMetaData();
+        // int cnt = rsm.getColumnCount();
+        // for (int i = 1; i <= cnt; i++) {
+        // System.out.println(rsm.getColumnName(i) + " " + rsm.getColumnType(i));
+        // }
+
+        // rs.close();
+        // stmt.close();
 
-        rs.close();
-        stmt.close();
-
-//        PreparedStatement pstmt = conn
-//            .prepareStatement("insert into tb_user (id,name,role_id,c_time,test1,test2) values (?,?,?,?,?,?)");
-//        pstmt.setBigDecimal(1, new BigDecimal("5"));
-//        pstmt.setString(2, "test");
-//        pstmt.setBigDecimal(3, new BigDecimal("1"));
-//        pstmt.setDate(4, new Date(new java.util.Date().getTime()));
-//        byte[] a = { (byte) 1, (byte) 2 };
-//        pstmt.setBytes(5, a);
-//        pstmt.setBytes(6, a);
-//        pstmt.execute();
-//
-//        pstmt.close();
+        // PreparedStatement pstmt = conn
+        // .prepareStatement("insert into tb_user (id,name,role_id,c_time,test1,test2)
+        // values (?,?,?,?,?,?)");
+        // pstmt.setBigDecimal(1, new BigDecimal("5"));
+        // pstmt.setString(2, "test");
+        // pstmt.setBigDecimal(3, new BigDecimal("1"));
+        // pstmt.setDate(4, new Date(new java.util.Date().getTime()));
+        // byte[] a = { (byte) 1, (byte) 2 };
+        // pstmt.setBytes(5, a);
+        // pstmt.setBytes(6, a);
+        // pstmt.execute();
+        //
+        // pstmt.close();
 
         conn.close();
         dataSource.close();

+ 5 - 4
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java

@@ -1,20 +1,21 @@
 package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.test.TestConstant;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class Common {
+
     public static RdbAdapter init() {
         DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
 
         OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
         outerAdapterConfig.setName("rdb");
-        //outerAdapterConfig.setKey("oralce1");
+        outerAdapterConfig.setKey("oracle1");
         Map<String, String> properties = new HashMap<>();
         properties.put("jdbc.driveClassName", "oracle.jdbc.OracleDriver");
         properties.put("jdbc.url", "jdbc:oracle:thin:@127.0.0.1:49161:XE");

+ 2 - 0
client-adapter/rdb/src/test/resources/rdb/mytest_user.yml

@@ -1,5 +1,7 @@
 dataSourceKey: defaultDS
 destination: example
+outerAdapterKey: oracle1
+concurrent: true
 dbMapping:
   database: mytest
   table: user

+ 12 - 4
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -171,8 +171,9 @@ public class FlatMessage implements Serializable {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
-                                               + entry.toString(), e);
+                    throw new RuntimeException(
+                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
+                        e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -214,7 +215,7 @@ public class FlatMessage implements Serializable {
                             mysqlType.put(column.getName(), column.getMysqlType());
                             if (column.getIsNull()) {
                                 row.put(column.getName(), null);
-                            } else  {
+                            } else {
                                 row.put(column.getName(), column.getValue());
                             }
                             // 获取update为true的字段
@@ -284,7 +285,14 @@ public class FlatMessage implements Serializable {
             if (flatMessage.getData() != null) {
                 int idx = 0;
                 for (Map<String, String> row : flatMessage.getData()) {
-                    String value = row.get(pk);
+                    Map<String, String> o = flatMessage.getOld().get(idx);
+                    String value;
+                    // 如果old中有pk值说明主键有修改, 以旧的主键值hash为准
+                    if (o != null && o.containsKey(pk)) {
+                        value = o.get(pk);
+                    } else {
+                        value = row.get(pk);
+                    }
                     if (value == null) {
                         value = "";
                     }