Bläddra i källkod

rdb 批量提交优化

mcy 6 år sedan
förälder
incheckning
bc056288d4

+ 10 - 10
client-adapter/launcher/src/main/resources/application.yml

@@ -28,16 +28,16 @@ canal.conf:
   - instance: example
     groups:
     - outAdapters:
-#      - name: logger
-      - name: rdb
-        key: oracle1
-        properties:
-          jdbc.driverClassName: oracle.jdbc.OracleDriver
-          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
-          jdbc.username: mytest
-          jdbc.password: m121212
-          threads: 5
-          commitSize: 5000
+      - name: logger
+#      - name: rdb
+#        key: oracle1
+#        properties:
+#          jdbc.driverClassName: oracle.jdbc.OracleDriver
+#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
+#          jdbc.username: mytest
+#          jdbc.password: m121212
+#          threads: 5
+#          commitSize: 5000
 #      - name: rdb
 #        key: postgres1
 #        properties:

+ 44 - 35
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -5,9 +5,6 @@ import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -18,11 +15,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
 import com.alibaba.otter.canal.client.adapter.support.*;
 
 @SPI("rdb")
@@ -30,8 +30,8 @@ public class RdbAdapter implements OuterAdapter {
 
     private static Logger              logger             = LoggerFactory.getLogger(RdbAdapter.class);
 
-    private Map<String, MappingConfig> rdbMapping         = new HashMap<>();                          // 文件名对应配置
-    private Map<String, MappingConfig> mappingConfigCache = new HashMap<>();                          // 库名-表名对应配置
+    private Map<String, MappingConfig> rdbMapping         = new HashMap<>();                                // 文件名对应配置
+    private Map<String, MappingConfig> mappingConfigCache = new HashMap<>();                                // 库名-表名对应配置
 
     private DruidDataSource            dataSource;
 
@@ -39,6 +39,12 @@ public class RdbAdapter implements OuterAdapter {
 
     private int                        commitSize         = 3000;
 
+    private volatile boolean           running            = false;
+
+    private List<SimpleDml>            dmlList            = Collections.synchronizedList(new ArrayList<>());
+    private Lock                       syncLock           = new ReentrantLock();
+    private ExecutorService            executor           = Executors.newFixedThreadPool(1);
+
     @Override
     public void init(OuterAdapterConfig configuration) {
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load();
@@ -81,52 +87,54 @@ public class RdbAdapter implements OuterAdapter {
         if (commitSize != null) {
             this.commitSize = Integer.valueOf(commitSize);
         }
-        rdbSyncService = new RdbSyncService(this.commitSize,
-            threads != null ? Integer.valueOf(threads) : null,
-            dataSource);
-    }
+        rdbSyncService = new RdbSyncService(threads != null ? Integer.valueOf(threads) : null, dataSource);
 
-    private AtomicInteger   batchRowNum = new AtomicInteger(0);
-    private List<Dml>       dmlList     = Collections.synchronizedList(new ArrayList<>());
-    private Lock            syncLock    = new ReentrantLock();
-    private Condition       condition   = syncLock.newCondition();
-    private ExecutorService executor    = Executors.newFixedThreadPool(1);
-
-    @Override
-    public void sync(Dml dml) {
-        boolean first = batchRowNum.get() == 0;
-        int currentSize = batchRowNum.addAndGet(dml.getData().size());
-        dmlList.add(dml);
+        running = true;
 
-        if (first) {
-            // 开启超时判断
-            executor.submit(() -> {
+        executor.submit(() -> {
+            while (running) {
                 try {
-                    syncLock.lock();
-                    if (!condition.await(5, TimeUnit.SECONDS)) {
-                        // 批量超时
+                    int size1 = dmlList.size();
+                    Thread.sleep(3000);
+                    int size2 = dmlList.size();
+                    if (size1 == size2) {
+                        // 超时提交
                         sync();
                     }
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
-                } finally {
-                    syncLock.unlock();
                 }
-            });
-        }
+            }
+        });
+    }
+
+    @Override
+    public void sync(Dml dml) {
+        String destination = StringUtils.trimToEmpty(dml.getDestination());
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+        MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
+
+        List<SimpleDml> simpleDmlList = SimpleDml.dml2SimpleDml(dml, config);
 
-        if (currentSize > commitSize) {
+        dmlList.addAll(simpleDmlList);
+
+        if (dmlList.size() > commitSize) {
             sync();
         }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+        }
     }
 
     private void sync() {
         try {
             syncLock.lock();
-            rdbSyncService.sync(mappingConfigCache, dmlList);
-            batchRowNum.set(0);
-            dmlList.clear();
-            condition.signal();
+            if (!dmlList.isEmpty()) {
+                rdbSyncService.sync(dmlList);
+                dmlList.clear();
+            }
         } finally {
             syncLock.unlock();
         }
@@ -226,6 +234,7 @@ public class RdbAdapter implements OuterAdapter {
 
     @Override
     public void destroy() {
+        running = false;
         executor.shutdown();
 
         if (rdbSyncService != null) {

+ 1 - 28
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java

@@ -23,18 +23,14 @@ public class BatchExecutor {
 
     private Integer             key;
     private Connection          conn;
-    private int                 commitSize = 3000;
     private AtomicInteger       idx        = new AtomicInteger(0);
     private ExecutorService     executor   = Executors.newFixedThreadPool(1);
     private Lock                commitLock = new ReentrantLock();
     private Condition           condition  = commitLock.newCondition();
 
-    public BatchExecutor(Integer key, Connection conn, Integer commitSize){
+    public BatchExecutor(Integer key, Connection conn){
         this.key = key;
         this.conn = conn;
-        if (commitSize != null) {
-            this.commitSize = commitSize;
-        }
 
         try {
             this.conn.setAutoCommit(false);
@@ -71,29 +67,6 @@ public class BatchExecutor {
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
         }
-        // int i = idx.incrementAndGet();
-        //
-        // // 批次的第一次执行设置延时
-        // if (i == 1) {
-        // executor.submit(() -> {
-        // try {
-        // commitLock.lock();
-        // conn.commit(); //直接提交一次
-        // if (!condition.await(5, TimeUnit.SECONDS)) {
-        // // 超时提交
-        // commit();
-        // }
-        // } catch (Exception e) {
-        // logger.error(e.getMessage(), e);
-        // } finally {
-        // commitLock.unlock();
-        // }
-        // });
-        // }
-        //
-        // if (i == commitSize) {
-        // commit();
-        // }
     }
 
     public void commit() {

+ 121 - 236
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -5,23 +5,22 @@ import java.io.StringReader;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.*;
-import java.sql.Date;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.*;
 
 import javax.sql.DataSource;
 
-import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
@@ -42,7 +41,7 @@ public class RdbSyncService {
 
     private ExecutorService[]                       threadExecutors;
 
-    public RdbSyncService(Integer commitSize, Integer threads, DataSource dataSource){
+    public RdbSyncService(Integer threads, DataSource dataSource){
         try {
             if (threads != null && threads > 1 && threads <= 10) {
                 this.threads = threads;
@@ -51,7 +50,7 @@ public class RdbSyncService {
             for (int i = 0; i < this.threads; i++) {
                 Connection conn = dataSource.getConnection();
                 conn.setAutoCommit(false);
-                this.batchExecutors[i] = new BatchExecutor(i, conn, commitSize);
+                this.batchExecutors[i] = new BatchExecutor(i, conn);
             }
             threadExecutors = new ExecutorService[this.threads];
             for (int i = 0; i < this.threads; i++) {
@@ -63,96 +62,60 @@ public class RdbSyncService {
     }
 
     @SuppressWarnings("unchecked")
-    public void sync(Map<String, MappingConfig> mappingConfigCache, List<Dml> dmlList) {
-        try {
-            List<Map<String, Object>>[] dmlPartition = new ArrayList[threads];
-            for (int i = 0; i < threads; i++) {
-                dmlPartition[i] = new ArrayList<>();
+    private List<SimpleDml>[] simpleDmls2Partition(List<SimpleDml> simpleDmlList) {
+        List<SimpleDml>[] simpleDmlPartition = new ArrayList[threads];
+        for (int i = 0; i < threads; i++) {
+            simpleDmlPartition[i] = new ArrayList<>();
+        }
+        simpleDmlList.forEach(simpleDml -> {
+            int hash;
+            if (simpleDml.getConfig().getConcurrent()) {
+                hash = pkHash(simpleDml.getConfig().getDbMapping(), simpleDml.getData(), threads);
+            } else {
+                hash = Math.abs(Math.abs(simpleDml.getConfig().getDbMapping().getTargetTable().hashCode()) % threads);
             }
-            // 根据hash拆分
-            dmlList.forEach(dml -> {
-                String destination = StringUtils.trimToEmpty(dml.getDestination());
-                String database = dml.getDatabase();
-                String table = dml.getTable();
-                MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
-
-                Dml[] dmls = new Dml[threads];
-                for (int i = 0; i < threads; i++) {
-                    Dml dmlTmp = new Dml();
-                    dmlTmp.setDestination(dml.getDestination());
-                    dmlTmp.setDatabase(dml.getDatabase());
-                    dmlTmp.setTable(dml.getTable());
-                    dmlTmp.setType(dml.getType());
-                    dmlTmp.setTs(dml.getTs());
-                    dmlTmp.setEs(dml.getEs());
-                    dmlTmp.setSql(dml.getSql());
-                    dmlTmp.setData(new ArrayList<>());
-                    dmlTmp.setOld(new ArrayList<>());
-                    dmls[i] = dmlTmp;
-                }
-                int idx = 0;
-                for (Map<String, Object> data : dml.getData()) {
-                    int hash;
-                    if (config.getConcurrent()) {
-                        hash = pkHash(config.getDbMapping(), data, threads);
-                    } else {
-                        hash = Math.abs(Math.abs(config.getDbMapping().getTargetTable().hashCode()) % threads);
-                    }
-                    Dml dmlTmp = dmls[hash];
-                    dmlTmp.getData().add(data);
-                    if (dml.getOld() != null) {
-                        dmlTmp.getOld().add(dml.getOld().get(idx));
-                    }
+            simpleDmlPartition[hash].add(simpleDml);
+        });
+        return simpleDmlPartition;
+    }
 
-                    idx++;
-                }
-                for (int i = 0; i < threads; i++) {
-                    Map<String, Object> item = new HashMap<>();
-                    item.put("dml", dmls[i]);
-                    item.put("config", config);
-                    dmlPartition[i].add(item);
-                }
+    public void sync(List<SimpleDml> simpleDmlList) {
+        try {
+            List<SimpleDml>[] simpleDmlsPartition = simpleDmls2Partition(simpleDmlList);
 
-            });
             List<Future<Boolean>> futures = new ArrayList<>();
             for (int i = 0; i < threads; i++) {
-                int j = i;
-                futures.add(threadExecutors[i].submit(() -> {
-                    dmlPartition[j].forEach(item -> {
-                        MappingConfig config = (MappingConfig) item.get("config");
-                        Dml dml = (Dml) item.get("dml");
-                        sync(config, dml);
-                    });
-                    batchExecutors[j].commit();
-                    return true;
-                }));
+                if (!simpleDmlsPartition[i].isEmpty()) {
+                    int j = i;
+                    futures.add(threadExecutors[i].submit(() -> {
+                        simpleDmlsPartition[j].forEach(simpleDml -> sync(simpleDml, batchExecutors[j]));
+                        batchExecutors[j].commit();
+                        return true;
+                    }));
+                }
             }
-            for (int i = 0; i < threads; i++) {
+
+            futures.forEach(future -> {
                 try {
-                    futures.get(i).get();
+                    future.get();
                 } catch (Exception e) {
                     // ignore
                 }
-            }
+            });
         } catch (Exception e) {
             logger.error("Error rdb sync for batch", e);
         }
     }
 
-    public void sync(MappingConfig config, Dml dml) {
+    public void sync(SimpleDml dml, BatchExecutor batchExecutor) {
         try {
-            if (config != null) {
-                String type = dml.getType();
-                if (type != null && type.equalsIgnoreCase("INSERT")) {
-                    insert(config, dml);
-                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                    update(config, dml);
-                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                    delete(config, dml);
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
-                }
+            String type = dml.getType();
+            if (type != null && type.equalsIgnoreCase("INSERT")) {
+                insert(dml, batchExecutor);
+            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                update(dml, batchExecutor);
+            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                delete(dml, batchExecutor);
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -162,18 +125,17 @@ public class RdbSyncService {
     /**
      * 插入操作
      *
-     * @param config 配置项
-     * @param dml DML数据
+     * @param simpleDml DML数据
      */
-    private void insert(MappingConfig config, Dml dml) {
-        List<Map<String, Object>> data = dml.getData();
+    private void insert(SimpleDml simpleDml, BatchExecutor batchExecutor) {
+        Map<String, Object> data = simpleDml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = config.getDbMapping();
+        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
 
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
         StringBuilder insertSql = new StringBuilder();
         insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
@@ -188,185 +150,121 @@ public class RdbSyncService {
         len = insertSql.length();
         insertSql.delete(len - 1, len).append(")");
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
 
         String sql = insertSql.toString();
-        Integer tbHash = null;
-        // 如果不是并行同步的表
-        if (!config.getConcurrent()) {
-            // 按表名hash到一个线程
-            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
-        }
-        for (Map<String, Object> d : data) {
-            int hash;
-            if (tbHash != null) {
-                hash = tbHash;
-            } else {
-                hash = pkHash(dbMapping, d, threads);
-            }
-            // ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
-            // checkQueue(tpe);
-            // tpe.submit(() -> {
-            try {
-                BatchExecutor batchExecutor = batchExecutors[hash];
-                List<Map<String, ?>> values = new ArrayList<>();
-
-                for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                    String targetColumnName = entry.getKey();
-                    String srcColumnName = entry.getValue();
-                    if (srcColumnName == null) {
-                        srcColumnName = targetColumnName;
-                    }
 
-                    Integer type = ctype.get(targetColumnName.toLowerCase());
-                    if (type == null) {
-                        throw new RuntimeException("No column: " + targetColumnName + " found in target db");
-                    }
-                    Object value = d.get(srcColumnName);
-                    BatchExecutor.setValue(values, type, value);
+        try {
+            List<Map<String, ?>> values = new ArrayList<>();
+
+            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                String targetColumnName = entry.getKey();
+                String srcColumnName = entry.getValue();
+                if (srcColumnName == null) {
+                    srcColumnName = targetColumnName;
+                }
+
+                Integer type = ctype.get(targetColumnName.toLowerCase());
+                if (type == null) {
+                    throw new RuntimeException("No column: " + targetColumnName + " found in target db");
                 }
-                batchExecutor.execute(sql, values);
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
+                Object value = data.get(srcColumnName);
+                BatchExecutor.setValue(values, type, value);
             }
-            // });
+            batchExecutor.execute(sql, values);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * 更新操作
      * 
-     * @param config 配置项
-     * @param dml DML数据
+     * @param simpleDml DML数据
      */
-    private void update(MappingConfig config, Dml dml) {
-        List<Map<String, Object>> data = dml.getData();
+    private void update(SimpleDml simpleDml, BatchExecutor batchExecutor) {
+        Map<String, Object> data = simpleDml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        List<Map<String, Object>> old = dml.getOld();
+        Map<String, Object> old = simpleDml.getOld();
         if (old == null || old.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = config.getDbMapping();
+        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
 
-        int idx = 1;
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
 
-        Integer tbHash = null;
-        if (!config.getConcurrent()) {
-            // 按表名hash到一个线程
-            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
-        }
-        for (Map<String, Object> o : old) {
-            Map<String, Object> d = data.get(idx - 1);
-
-            int hash;
-            if (tbHash != null) {
-                hash = tbHash;
-            } else {
-                hash = pkHash(dbMapping, d, o, threads);
-            }
-
-            // ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
-            // checkQueue(tpe);
-            // tpe.submit(() -> {
-            try {
-                BatchExecutor batchExecutor = batchExecutors[hash];
-
-                StringBuilder updateSql = new StringBuilder();
-                updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
-
-                List<Map<String, ?>> values = new ArrayList<>();
-                for (String srcColumnName : o.keySet()) {
-                    List<String> targetColumnNames = new ArrayList<>();
-                    columnsMap.forEach((targetColumn, srcColumn) -> {
-                        if (srcColumnName.toLowerCase().equals(srcColumn)) {
-                            targetColumnNames.add(targetColumn);
-                        }
-                    });
-                    if (!targetColumnNames.isEmpty()) {
-                        for (String targetColumnName : targetColumnNames) {
-                            updateSql.append(targetColumnName).append("=?, ");
-                            Integer type = ctype.get(targetColumnName.toLowerCase());
-                            BatchExecutor.setValue(values, type, d.get(srcColumnName));
-                        }
+        try {
+            StringBuilder updateSql = new StringBuilder();
+            updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+
+            List<Map<String, ?>> values = new ArrayList<>();
+            for (String srcColumnName : old.keySet()) {
+                List<String> targetColumnNames = new ArrayList<>();
+                columnsMap.forEach((targetColumn, srcColumn) -> {
+                    if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                        targetColumnNames.add(targetColumn);
+                    }
+                });
+                if (!targetColumnNames.isEmpty()) {
+                    for (String targetColumnName : targetColumnNames) {
+                        updateSql.append(targetColumnName).append("=?, ");
+                        Integer type = ctype.get(targetColumnName.toLowerCase());
+                        BatchExecutor.setValue(values, type, data.get(srcColumnName));
                     }
                 }
-                int len = updateSql.length();
-                updateSql.delete(len - 2, len).append(" WHERE ");
+            }
+            int len = updateSql.length();
+            updateSql.delete(len - 2, len).append(" WHERE ");
 
-                // 拼接主键
-                appendCondition(dbMapping, updateSql, ctype, values, d, o);
+            // 拼接主键
+            appendCondition(dbMapping, updateSql, ctype, values, data, old);
 
-                batchExecutor.execute(updateSql.toString(), values);
+            batchExecutor.execute(updateSql.toString(), values);
 
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Execute sql: {}", updateSql);
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
+            if (logger.isTraceEnabled()) {
+                logger.trace("Execute sql: {}", updateSql);
             }
-            // });
-
-            idx++;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * 删除操作
      * 
-     * @param config
-     * @param dml
-     * @throws SQLException
+     * @param simpleDml
      */
-    private void delete(MappingConfig config, Dml dml) {
-        List<Map<String, Object>> data = dml.getData();
+    private void delete(SimpleDml simpleDml, BatchExecutor batchExecutor) {
+        Map<String, Object> data = simpleDml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = config.getDbMapping();
+        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
-        Integer tbHash = null;
-        if (!config.getConcurrent()) {
-            // 按表名hash到一个线程
-            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
-        }
-        for (Map<String, Object> d : data) {
-            int hash;
-            if (tbHash != null) {
-                hash = tbHash;
-            } else {
-                hash = pkHash(dbMapping, d, threads);
-            }
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
 
-            // ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
-            // checkQueue(tpe);
-            // tpe.submit(() -> {
-            try {
-                BatchExecutor batchExecutor = batchExecutors[hash];
-                StringBuilder sql = new StringBuilder();
-                sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
+        try {
+            StringBuilder sql = new StringBuilder();
+            sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
 
-                List<Map<String, ?>> values = new ArrayList<>();
+            List<Map<String, ?>> values = new ArrayList<>();
 
-                // 拼接主键
-                appendCondition(dbMapping, sql, ctype, values, d);
+            // 拼接主键
+            appendCondition(dbMapping, sql, ctype, values, data);
 
-                batchExecutor.execute(sql.toString(), values);
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Execute sql: {}", sql);
-                }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
+            batchExecutor.execute(sql.toString(), values);
+            if (logger.isTraceEnabled()) {
+                logger.trace("Execute sql: {}", sql);
             }
-            // });
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
     }
 
@@ -416,7 +314,8 @@ public class RdbSyncService {
      */
     public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
         if (value == null) {
-            pstmt.setObject(i, type);
+            pstmt.setNull(i, type);
+            return;
         }
         switch (type) {
             case Types.BIT:
@@ -638,18 +537,4 @@ public class RdbSyncService {
             executorService.shutdown();
         }
     }
-
-    private void checkQueue(ThreadPoolExecutor tpe) {
-        // 防止队列过大
-        while (tpe.getQueue().size() > 10000) {
-            try {
-                Thread.sleep(3000);
-                while (tpe.getQueue().size() > 5000) {
-                    Thread.sleep(1000);
-                }
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-    }
 }

+ 102 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SimpleDml.java

@@ -0,0 +1,102 @@
+package com.alibaba.otter.canal.client.adapter.rdb.support;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import org.apache.commons.lang.StringUtils;
+
+public class SimpleDml {
+
+    private String              destination;
+    private String              database;
+    private String              table;
+    private String              type;
+    private Map<String, Object> data;
+    private Map<String, Object> old;
+
+    private MappingConfig       config;
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public Map<String, Object> getData() {
+        return data;
+    }
+
+    public void setData(Map<String, Object> data) {
+        this.data = data;
+    }
+
+    public Map<String, Object> getOld() {
+        return old;
+    }
+
+    public void setOld(Map<String, Object> old) {
+        this.old = old;
+    }
+
+    public MappingConfig getConfig() {
+        return config;
+    }
+
+    public void setConfig(MappingConfig config) {
+        this.config = config;
+    }
+
+    public static List<SimpleDml> dml2SimpleDml(Dml dml, MappingConfig config) {
+        List<SimpleDml> simpleDmlList = new ArrayList<>();
+        int len = dml.getData().size();
+
+        String destination = StringUtils.trimToEmpty(dml.getDestination());
+        String database = dml.getDatabase();
+        String table = dml.getTable();
+
+        for (int i = 0; i < len; i++) {
+            SimpleDml simpleDml = new SimpleDml();
+            simpleDml.setDestination(dml.getDestination());
+            simpleDml.setDatabase(dml.getDatabase());
+            simpleDml.setTable(dml.getTable());
+            simpleDml.setType(dml.getType());
+            simpleDml.setData(dml.getData().get(i));
+            if (dml.getOld() != null) {
+                simpleDml.setOld(dml.getOld().get(i));
+            }
+            simpleDml.setConfig(config);
+            simpleDmlList.add(simpleDml);
+        }
+
+        return simpleDmlList;
+    }
+}