Browse Source

Merge pull request #1153 from rewerma/master

 rdb 批量提交优化
agapple 6 years ago
parent
commit
0a3c1645e8
15 changed files with 521 additions and 243 deletions
  1. 6 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Constant.java
  2. 1 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MappingConfigsLoader.java
  3. 30 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java
  4. 3 2
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  5. 5 0
      client-adapter/launcher/pom.xml
  6. 5 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java
  7. 22 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/RefresherConfig.java
  8. 28 5
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  9. 30 19
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java
  10. 90 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationRunningMonitor.java
  11. 2 1
      client-adapter/launcher/src/main/resources/application.yml
  12. 66 6
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  13. 2 29
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java
  14. 134 179
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  15. 97 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SimpleDml.java

+ 6 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Constant.java

@@ -0,0 +1,6 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+public class Constant {
+
+    public static final String CONF_DIR = "conf";
+}

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MappingConfigsLoader.java

@@ -15,7 +15,7 @@ public class MappingConfigsLoader {
         Map<String, String> configContentMap = new HashMap<>();
 
         // 先取本地文件,再取类路径
-        File configDir = new File("../conf/" + name);
+        File configDir = new File(".." + File.separator + Constant.CONF_DIR + File.separator + name);
         if (!configDir.exists()) {
             URL url = MappingConfigsLoader.class.getClassLoader().getResource("");
             if (url != null) {

+ 30 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
+import java.io.File;
+import java.net.URL;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -9,6 +11,7 @@ import java.util.function.Function;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,4 +90,31 @@ public class Util {
             }
         }
     }
+
+    public static File getConfDirPath() {
+        return getConfDirPath("");
+    }
+
+    public static File getConfDirPath(String subConf) {
+        URL url = Util.class.getClassLoader().getResource("");
+        String path;
+        if (url != null) {
+            path = url.getPath();
+        } else {
+            path = new File("").getAbsolutePath();
+        }
+        File file = null;
+        if (path != null) {
+            file = new File(
+                path + ".." + File.separator + Constant.CONF_DIR + File.separator + StringUtils.trimToEmpty(subConf));
+            if (!file.exists()) {
+                file = new File(path + StringUtils.trimToEmpty(subConf));
+            }
+        }
+        if (file == null || !file.exists()) {
+            throw new RuntimeException("Config dir not found.");
+        }
+
+        return file;
+    }
 }

+ 3 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -5,6 +5,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.sql.DataSource;
 
@@ -37,8 +38,8 @@ public class HbaseAdapter implements OuterAdapter {
 
     private static Logger              logger             = LoggerFactory.getLogger(HbaseAdapter.class);
 
-    private Map<String, MappingConfig> hbaseMapping       = new HashMap<>();                            // 文件名对应配置
-    private Map<String, MappingConfig> mappingConfigCache = new HashMap<>();                            // 库名-表名对应配置
+    private Map<String, MappingConfig> hbaseMapping       = new ConcurrentHashMap<>();                            // 文件名对应配置
+    private Map<String, MappingConfig> mappingConfigCache = new ConcurrentHashMap<>();                            // 库名-表名对应配置
 
     private Connection                 conn;
     private HbaseSyncService           hbaseSyncService;

+ 5 - 0
client-adapter/launcher/pom.xml

@@ -49,6 +49,11 @@
             <artifactId>spring-boot-configuration-processor</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-context</artifactId>
+            <version>2.0.0.RELEASE</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>

+ 5 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.adapter.launcher;
 
+import org.springframework.boot.Banner;
+import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 
@@ -13,6 +15,8 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
 public class CanalAdapterApplication {
 
     public static void main(String[] args) {
-        new SpringApplicationBuilder(CanalAdapterApplication.class).run(args);
+        SpringApplication application = new SpringApplication(CanalAdapterApplication.class);
+        application.setBannerMode(Banner.Mode.OFF);
+        application.run(args);
     }
 }

+ 22 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/RefresherConfig.java

@@ -0,0 +1,22 @@
+package com.alibaba.otter.canal.adapter.launcher.config;
+
+import org.springframework.cloud.context.refresh.ContextRefresher;
+import org.springframework.cloud.context.scope.refresh.RefreshScope;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class RefresherConfig {
+
+    @Bean
+    public RefreshScope refreshScope() {
+        return new RefreshScope();
+    }
+
+    @Bean
+    public ContextRefresher contextRefresher(ConfigurableApplicationContext configurableApplicationContext,
+                                             RefreshScope refreshScope) {
+        return new ContextRefresher(configurableApplicationContext, refreshScope);
+    }
+}

+ 28 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -6,8 +6,10 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -136,7 +138,6 @@ public class CanalAdapterLoader {
             logger.info("Load canal adapter: {} succeed", config.getName());
         } catch (Exception e) {
             logger.error("Load canal adapter: {} failed", config.getName(), e);
-            System.exit(0);
         }
     }
 
@@ -144,19 +145,41 @@ public class CanalAdapterLoader {
      * 销毁所有适配器 为防止canal实例太多造成销毁阻塞, 并行销毁
      */
     public void destroy() {
-        if (canalWorkers.size() > 0) {
+        if (!canalWorkers.isEmpty()) {
             ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());
+            List<Future<Boolean>> futures = new ArrayList<>();
             for (CanalAdapterWorker canalAdapterWorker : canalWorkers.values()) {
-                stopExecutorService.submit(canalAdapterWorker::stop);
+                futures.add(stopExecutorService.submit(() -> {
+                    canalAdapterWorker.stop();
+                    return true;
+                }));
             }
+            futures.forEach(future -> {
+                try {
+                    future.get();
+                } catch (Exception e) {
+                    // ignore
+                }
+            });
             stopExecutorService.shutdown();
         }
 
-        if (canalMQWorker.size() > 0) {
+        if (!canalMQWorker.isEmpty()) {
             ExecutorService stopMQWorkerService = Executors.newFixedThreadPool(canalMQWorker.size());
+            List<Future<Boolean>> futures = new ArrayList<>();
             for (AbstractCanalAdapterWorker canalAdapterMQWorker : canalMQWorker.values()) {
-                stopMQWorkerService.submit(canalAdapterMQWorker::stop);
+                futures.add(stopMQWorkerService.submit(() -> {
+                    canalAdapterMQWorker.stop();
+                    return true;
+                }));
             }
+            futures.forEach(future -> {
+                try {
+                    future.get();
+                } catch (Exception e) {
+                    // ignore
+                }
+            });
             stopMQWorkerService.shutdown();
         }
         logger.info("All canal adapters destroyed");

+ 30 - 19
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -4,14 +4,15 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 
-import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
-import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
 import org.springframework.stereotype.Component;
 
 import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
 import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
+import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 
 /**
@@ -21,42 +22,51 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
  * @version 1.0.0
  */
 @Component
+@RefreshScope
 public class CanalAdapterService {
 
-    private static final Logger       logger = LoggerFactory.getLogger(CanalAdapterService.class);
+    private static final Logger logger  = LoggerFactory.getLogger(CanalAdapterService.class);
 
-    private static CanalAdapterLoader adapterLoader;
+    private CanalAdapterLoader  adapterLoader;
 
     @Resource
-    private AdapterCanalConfig        adapterCanalConfig;
+    private AdapterCanalConfig  adapterCanalConfig;
 
     // 注入bean保证优先注册
     @Resource
-    private SpringContext             springContext;
+    private SpringContext       springContext;
     @Resource
-    private SyncSwitch                syncSwitch;
+    private SyncSwitch          syncSwitch;
+
+    private volatile boolean    running = false;
 
     @PostConstruct
-    public void init() {
-        if (adapterLoader == null) {
-            try {
-                logger.info("## start the canal client adapters.");
-                adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
-                adapterLoader.init();
-                logger.info("## the canal client adapters are running now ......");
-            } catch (Throwable e) {
-                logger.error("## something goes wrong when starting up the canal client adapters:", e);
-                System.exit(0);
-            }
+    public synchronized void init() {
+        if (running) {
+            return;
+        }
+        try {
+            logger.info("## start the canal client adapters.");
+            adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
+            adapterLoader.init();
+            running = true;
+            logger.info("## the canal client adapters are running now ......");
+        } catch (Exception e) {
+            logger.error("## something goes wrong when starting up the canal client adapters:", e);
         }
     }
 
     @PreDestroy
-    public void destroy() {
+    public synchronized void destroy() {
+        if (!running) {
+            return;
+        }
         try {
+            running = false;
             logger.info("## stop the canal client adapters");
             if (adapterLoader != null) {
                 adapterLoader.destroy();
+                adapterLoader = null;
             }
             for (DruidDataSource druidDataSource : DatasourceConfig.DATA_SOURCES.values()) {
                 try {
@@ -65,6 +75,7 @@ public class CanalAdapterService {
                     logger.error(e.getMessage(), e);
                 }
             }
+            DatasourceConfig.DATA_SOURCES.clear();
         } catch (Throwable e) {
             logger.warn("## something goes wrong when stopping canal client adapters:", e);
         } finally {

+ 90 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationRunningMonitor.java

@@ -0,0 +1,90 @@
+package com.alibaba.otter.canal.adapter.launcher.monitor;
+
+import java.io.File;
+import java.io.FileReader;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.context.refresh.ContextRefresher;
+import org.springframework.stereotype.Component;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+
+@Component
+public class ApplicationRunningMonitor {
+
+    private static final Logger   logger = LoggerFactory.getLogger(ApplicationRunningMonitor.class);
+
+    @Resource
+    private ContextRefresher      contextRefresher;
+
+    @Resource
+    private CanalAdapterService   canalAdapterService;
+
+    private FileAlterationMonitor fileMonitor;
+
+    @PostConstruct
+    public void init() {
+        File confDir = Util.getConfDirPath();
+        try {
+            FileAlterationObserver observer = new FileAlterationObserver(confDir,
+                FileFilterUtils.and(FileFilterUtils.fileFileFilter(),
+                    FileFilterUtils.prefixFileFilter("application"),
+                    FileFilterUtils.suffixFileFilter("yml")));
+            FileListener listener = new FileListener();
+            observer.addListener(listener);
+            fileMonitor = new FileAlterationMonitor(3000, observer);
+            fileMonitor.start();
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    @PreDestroy
+    public void destroy() {
+        try {
+            fileMonitor.stop();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private class FileListener extends FileAlterationListenerAdaptor {
+
+        @Override
+        public void onFileChange(File file) {
+            super.onFileChange(file);
+            try {
+                // 检查yml格式
+                new Yaml().loadAs(new FileReader(file), Map.class);
+
+                canalAdapterService.destroy();
+
+                // refresh context
+                contextRefresher.refresh();
+
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                canalAdapterService.init();
+                logger.info("## adapter application config reloaded.");
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+}

+ 2 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -2,6 +2,7 @@ server:
   port: 8081
 logging:
   level:
+    org.springframework: WARN
     com.alibaba.otter.canal.client.adapter.hbase: DEBUG
     com.alibaba.otter.canal.client.adapter.es: DEBUG
     com.alibaba.otter.canal.client.adapter.rdb: DEBUG
@@ -36,7 +37,7 @@ canal.conf:
 #          jdbc.username: mytest
 #          jdbc.password: m121212
 #          threads: 5
-#          commitSize: 3000
+#          commitSize: 5000
 #      - name: rdb
 #        key: postgres1
 #        properties:

+ 66 - 6
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -3,6 +3,10 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.sql.DataSource;
 
@@ -11,11 +15,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
 import com.alibaba.otter.canal.client.adapter.support.*;
 
 @SPI("rdb")
@@ -23,13 +30,21 @@ public class RdbAdapter implements OuterAdapter {
 
     private static Logger              logger             = LoggerFactory.getLogger(RdbAdapter.class);
 
-    private Map<String, MappingConfig> rdbMapping         = new HashMap<>();                          // 文件名对应配置
-    private Map<String, MappingConfig> mappingConfigCache = new HashMap<>();                          // 库名-表名对应配置
+    private Map<String, MappingConfig> rdbMapping         = new HashMap<>();                                // 文件名对应配置
+    private Map<String, MappingConfig> mappingConfigCache = new HashMap<>();                                // 库名-表名对应配置
 
     private DruidDataSource            dataSource;
 
     private RdbSyncService             rdbSyncService;
 
+    private int                        commitSize         = 3000;
+
+    private volatile boolean           running            = false;
+
+    private List<SimpleDml>            dmlList            = Collections.synchronizedList(new ArrayList<>());
+    private Lock                       syncLock           = new ReentrantLock();
+    private ExecutorService            executor           = Executors.newFixedThreadPool(1);
+
     @Override
     public void init(OuterAdapterConfig configuration) {
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load();
@@ -69,9 +84,28 @@ public class RdbAdapter implements OuterAdapter {
 
         String threads = properties.get("threads");
         String commitSize = properties.get("commitSize");
-        rdbSyncService = new RdbSyncService(commitSize != null ? Integer.valueOf(commitSize) : null,
-            threads != null ? Integer.valueOf(threads) : null,
-            dataSource);
+        if (commitSize != null) {
+            this.commitSize = Integer.valueOf(commitSize);
+        }
+        rdbSyncService = new RdbSyncService(threads != null ? Integer.valueOf(threads) : null, dataSource);
+
+        running = true;
+
+        executor.submit(() -> {
+            while (running) {
+                try {
+                    int size1 = dmlList.size();
+                    Thread.sleep(3000);
+                    int size2 = dmlList.size();
+                    if (size1 == size2) {
+                        // 超时提交
+                        sync();
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        });
     }
 
     @Override
@@ -81,7 +115,29 @@ public class RdbAdapter implements OuterAdapter {
         String table = dml.getTable();
         MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
 
-        rdbSyncService.sync(config, dml);
+        List<SimpleDml> simpleDmlList = SimpleDml.dml2SimpleDml(dml, config);
+
+        dmlList.addAll(simpleDmlList);
+
+        if (dmlList.size() > commitSize) {
+            sync();
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+        }
+    }
+
+    private void sync() {
+        try {
+            syncLock.lock();
+            if (!dmlList.isEmpty()) {
+                rdbSyncService.sync(dmlList);
+                dmlList.clear();
+            }
+        } finally {
+            syncLock.unlock();
+        }
     }
 
     @Override
@@ -178,9 +234,13 @@ public class RdbAdapter implements OuterAdapter {
 
     @Override
     public void destroy() {
+        running = false;
+        executor.shutdown();
+
         if (rdbSyncService != null) {
             rdbSyncService.close();
         }
+
         if (dataSource != null) {
             dataSource.close();
         }

+ 2 - 29
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java

@@ -23,18 +23,14 @@ public class BatchExecutor {
 
     private Integer             key;
     private Connection          conn;
-    private int                 commitSize = 3000;
     private AtomicInteger       idx        = new AtomicInteger(0);
     private ExecutorService     executor   = Executors.newFixedThreadPool(1);
     private Lock                commitLock = new ReentrantLock();
     private Condition           condition  = commitLock.newCondition();
 
-    public BatchExecutor(Integer key, Connection conn, Integer commitSize){
+    public BatchExecutor(Integer key, Connection conn){
         this.key = key;
         this.conn = conn;
-        if (commitSize != null) {
-            this.commitSize = commitSize;
-        }
 
         try {
             this.conn.setAutoCommit(false);
@@ -71,32 +67,9 @@ public class BatchExecutor {
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
         }
-        int i = idx.incrementAndGet();
-
-        // 批次的第一次执行设置延时
-        if (i == 1) {
-            executor.submit(() -> {
-                try {
-                    commitLock.lock();
-                    conn.commit(); //直接提交一次
-                    if (!condition.await(5, TimeUnit.SECONDS)) {
-                        // 超时提交
-                        commit();
-                    }
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                } finally {
-                    commitLock.unlock();
-                }
-            });
-        }
-
-        if (i == commitSize) {
-            commit();
-        }
     }
 
-    private void commit() {
+    public void commit() {
         try {
             commitLock.lock();
             conn.commit();

+ 134 - 179
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -9,10 +9,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.*;
 
 import javax.sql.DataSource;
 
@@ -20,12 +17,10 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
@@ -46,7 +41,7 @@ public class RdbSyncService {
 
     private ExecutorService[]                       threadExecutors;
 
-    public RdbSyncService(Integer commitSize, Integer threads, DataSource dataSource){
+    public RdbSyncService(Integer threads, DataSource dataSource){
         try {
             if (threads != null && threads > 1 && threads <= 10) {
                 this.threads = threads;
@@ -55,7 +50,7 @@ public class RdbSyncService {
             for (int i = 0; i < this.threads; i++) {
                 Connection conn = dataSource.getConnection();
                 conn.setAutoCommit(false);
-                this.batchExecutors[i] = new BatchExecutor(i, conn, commitSize);
+                this.batchExecutors[i] = new BatchExecutor(i, conn);
             }
             threadExecutors = new ExecutorService[this.threads];
             for (int i = 0; i < this.threads; i++) {
@@ -66,23 +61,61 @@ public class RdbSyncService {
         }
     }
 
-    public void sync(MappingConfig config, List<Dml> dmlList) {
+    @SuppressWarnings("unchecked")
+    private List<SimpleDml>[] simpleDmls2Partition(List<SimpleDml> simpleDmlList) {
+        List<SimpleDml>[] simpleDmlPartition = new ArrayList[threads];
+        for (int i = 0; i < threads; i++) {
+            simpleDmlPartition[i] = new ArrayList<>();
+        }
+        simpleDmlList.forEach(simpleDml -> {
+            int hash;
+            if (simpleDml.getConfig().getConcurrent()) {
+                hash = pkHash(simpleDml.getConfig().getDbMapping(), simpleDml.getData(), threads);
+            } else {
+                hash = Math.abs(Math.abs(simpleDml.getConfig().getDbMapping().getTargetTable().hashCode()) % threads);
+            }
+            simpleDmlPartition[hash].add(simpleDml);
+        });
+        return simpleDmlPartition;
     }
 
-    public void sync(MappingConfig config, Dml dml) {
+    public void sync(List<SimpleDml> simpleDmlList) {
         try {
-            if (config != null) {
-                String type = dml.getType();
-                if (type != null && type.equalsIgnoreCase("INSERT")) {
-                    insert(config, dml);
-                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                    update(config, dml);
-                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                    delete(config, dml);
+            List<SimpleDml>[] simpleDmlsPartition = simpleDmls2Partition(simpleDmlList);
+
+            List<Future<Boolean>> futures = new ArrayList<>();
+            for (int i = 0; i < threads; i++) {
+                if (!simpleDmlsPartition[i].isEmpty()) {
+                    int j = i;
+                    futures.add(threadExecutors[i].submit(() -> {
+                        simpleDmlsPartition[j].forEach(simpleDml -> sync(simpleDml, batchExecutors[j]));
+                        batchExecutors[j].commit();
+                        return true;
+                    }));
                 }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+            }
+
+            futures.forEach(future -> {
+                try {
+                    future.get();
+                } catch (Exception e) {
+                    // ignore
                 }
+            });
+        } catch (Exception e) {
+            logger.error("Error rdb sync for batch", e);
+        }
+    }
+
+    public void sync(SimpleDml dml, BatchExecutor batchExecutor) {
+        try {
+            String type = dml.getType();
+            if (type != null && type.equalsIgnoreCase("INSERT")) {
+                insert(dml, batchExecutor);
+            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                update(dml, batchExecutor);
+            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                delete(dml, batchExecutor);
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -92,18 +125,17 @@ public class RdbSyncService {
     /**
      * 插入操作
      *
-     * @param config 配置项
-     * @param dml DML数据
+     * @param simpleDml DML数据
      */
-    private void insert(MappingConfig config, Dml dml) {
-        List<Map<String, Object>> data = dml.getData();
+    private void insert(SimpleDml simpleDml, BatchExecutor batchExecutor) {
+        Map<String, Object> data = simpleDml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = config.getDbMapping();
+        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
 
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
         StringBuilder insertSql = new StringBuilder();
         insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
@@ -118,185 +150,121 @@ public class RdbSyncService {
         len = insertSql.length();
         insertSql.delete(len - 1, len).append(")");
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
 
         String sql = insertSql.toString();
-        Integer tbHash = null;
-        // 如果不是并行同步的表
-        if (!config.getConcurrent()) {
-            // 按表名hash到一个线程
-            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
-        }
-        for (Map<String, Object> d : data) {
-            int hash;
-            if (tbHash != null) {
-                hash = tbHash;
-            } else {
-                hash = pkHash(dbMapping, d, threads);
-            }
-            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
-            checkQueue(tpe);
-            tpe.submit(() -> {
-                try {
-                    BatchExecutor batchExecutor = batchExecutors[hash];
-                    List<Map<String, ?>> values = new ArrayList<>();
-
-                    for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                        String targetColumnName = entry.getKey();
-                        String srcColumnName = entry.getValue();
-                        if (srcColumnName == null) {
-                            srcColumnName = targetColumnName;
-                        }
 
-                        Integer type = ctype.get(targetColumnName.toLowerCase());
-                        if (type == null) {
-                            throw new RuntimeException("No column: " + targetColumnName + " found in target db");
-                        }
-                        Object value = d.get(srcColumnName);
-                        BatchExecutor.setValue(values, type, value);
-                    }
-                    batchExecutor.execute(sql, values);
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
+        try {
+            List<Map<String, ?>> values = new ArrayList<>();
+
+            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                String targetColumnName = entry.getKey();
+                String srcColumnName = entry.getValue();
+                if (srcColumnName == null) {
+                    srcColumnName = targetColumnName;
                 }
-            });
+
+                Integer type = ctype.get(targetColumnName.toLowerCase());
+                if (type == null) {
+                    throw new RuntimeException("No column: " + targetColumnName + " found in target db");
+                }
+                Object value = data.get(srcColumnName);
+                BatchExecutor.setValue(values, type, value);
+            }
+            batchExecutor.execute(sql, values);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * 更新操作
      * 
-     * @param config 配置项
-     * @param dml DML数据
+     * @param simpleDml DML数据
      */
-    private void update(MappingConfig config, Dml dml) {
-        List<Map<String, Object>> data = dml.getData();
+    private void update(SimpleDml simpleDml, BatchExecutor batchExecutor) {
+        Map<String, Object> data = simpleDml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        List<Map<String, Object>> old = dml.getOld();
+        Map<String, Object> old = simpleDml.getOld();
         if (old == null || old.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = config.getDbMapping();
-
-        int idx = 1;
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
+        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
+        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
-        Integer tbHash = null;
-        if (!config.getConcurrent()) {
-            // 按表名hash到一个线程
-            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
-        }
-        for (Map<String, Object> o : old) {
-            Map<String, Object> d = data.get(idx - 1);
-
-            int hash;
-            if (tbHash != null) {
-                hash = tbHash;
-            } else {
-                hash = pkHash(dbMapping, d, o, threads);
-            }
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
 
-            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
-            checkQueue(tpe);
-            tpe.submit(() -> {
-                try {
-                    BatchExecutor batchExecutor = batchExecutors[hash];
-
-                    StringBuilder updateSql = new StringBuilder();
-                    updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
-
-                    List<Map<String, ?>> values = new ArrayList<>();
-                    for (String srcColumnName : o.keySet()) {
-                        List<String> targetColumnNames = new ArrayList<>();
-                        columnsMap.forEach((targetColumn, srcColumn) -> {
-                            if (srcColumnName.toLowerCase().equals(srcColumn)) {
-                                targetColumnNames.add(targetColumn);
-                            }
-                        });
-                        if (!targetColumnNames.isEmpty()) {
-                            for (String targetColumnName : targetColumnNames) {
-                                updateSql.append(targetColumnName).append("=?, ");
-                                Integer type = ctype.get(targetColumnName.toLowerCase());
-                                BatchExecutor.setValue(values, type, d.get(srcColumnName));
-                            }
-                        }
+        try {
+            StringBuilder updateSql = new StringBuilder();
+            updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+
+            List<Map<String, ?>> values = new ArrayList<>();
+            for (String srcColumnName : old.keySet()) {
+                List<String> targetColumnNames = new ArrayList<>();
+                columnsMap.forEach((targetColumn, srcColumn) -> {
+                    if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                        targetColumnNames.add(targetColumn);
                     }
-                    int len = updateSql.length();
-                    updateSql.delete(len - 2, len).append(" WHERE ");
-
-                    // 拼接主键
-                    appendCondition(dbMapping, updateSql, ctype, values, d, o);
-
-                    batchExecutor.execute(updateSql.toString(), values);
-
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Execute sql: {}", updateSql);
+                });
+                if (!targetColumnNames.isEmpty()) {
+                    for (String targetColumnName : targetColumnNames) {
+                        updateSql.append(targetColumnName).append("=?, ");
+                        Integer type = ctype.get(targetColumnName.toLowerCase());
+                        BatchExecutor.setValue(values, type, data.get(srcColumnName));
                     }
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
                 }
-            });
+            }
+            int len = updateSql.length();
+            updateSql.delete(len - 2, len).append(" WHERE ");
+
+            // 拼接主键
+            appendCondition(dbMapping, updateSql, ctype, values, data, old);
 
-            idx++;
+            batchExecutor.execute(updateSql.toString(), values);
+
+            if (logger.isTraceEnabled()) {
+                logger.trace("Execute sql: {}", updateSql);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * 删除操作
      * 
-     * @param config
-     * @param dml
-     * @throws SQLException
+     * @param simpleDml
      */
-    private void delete(MappingConfig config, Dml dml) {
-        List<Map<String, Object>> data = dml.getData();
+    private void delete(SimpleDml simpleDml, BatchExecutor batchExecutor) {
+        Map<String, Object> data = simpleDml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = config.getDbMapping();
+        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
-        Integer tbHash = null;
-        if (!config.getConcurrent()) {
-            // 按表名hash到一个线程
-            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
-        }
-        for (Map<String, Object> d : data) {
-            int hash;
-            if (tbHash != null) {
-                hash = tbHash;
-            } else {
-                hash = pkHash(dbMapping, d, threads);
-            }
+        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
 
-            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
-            checkQueue(tpe);
-            tpe.submit(() -> {
-                try {
-                    BatchExecutor batchExecutor = batchExecutors[hash];
-                    StringBuilder sql = new StringBuilder();
-                    sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
+        try {
+            StringBuilder sql = new StringBuilder();
+            sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
 
-                    List<Map<String, ?>> values = new ArrayList<>();
+            List<Map<String, ?>> values = new ArrayList<>();
 
-                    // 拼接主键
-                    appendCondition(dbMapping, sql, ctype, values, d);
+            // 拼接主键
+            appendCondition(dbMapping, sql, ctype, values, data);
 
-                    batchExecutor.execute(sql.toString(), values);
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Execute sql: {}", sql);
-                    }
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                }
-            });
+            batchExecutor.execute(sql.toString(), values);
+            if (logger.isTraceEnabled()) {
+                logger.trace("Execute sql: {}", sql);
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
     }
 
@@ -346,7 +314,8 @@ public class RdbSyncService {
      */
     public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
         if (value == null) {
-            pstmt.setObject(i, type);
+            pstmt.setNull(i, type);
+            return;
         }
         switch (type) {
             case Types.BIT:
@@ -568,18 +537,4 @@ public class RdbSyncService {
             executorService.shutdown();
         }
     }
-
-    private void checkQueue(ThreadPoolExecutor tpe) {
-        // 防止队列过大
-        while (tpe.getQueue().size() > 10000) {
-            try {
-                Thread.sleep(3000);
-                while (tpe.getQueue().size() > 5000) {
-                    Thread.sleep(1000);
-                }
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-    }
 }

+ 97 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SimpleDml.java

@@ -0,0 +1,97 @@
+package com.alibaba.otter.canal.client.adapter.rdb.support;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+public class SimpleDml {
+
+    private String              destination;
+    private String              database;
+    private String              table;
+    private String              type;
+    private Map<String, Object> data;
+    private Map<String, Object> old;
+
+    private MappingConfig       config;
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public Map<String, Object> getData() {
+        return data;
+    }
+
+    public void setData(Map<String, Object> data) {
+        this.data = data;
+    }
+
+    public Map<String, Object> getOld() {
+        return old;
+    }
+
+    public void setOld(Map<String, Object> old) {
+        this.old = old;
+    }
+
+    public MappingConfig getConfig() {
+        return config;
+    }
+
+    public void setConfig(MappingConfig config) {
+        this.config = config;
+    }
+
+    public static List<SimpleDml> dml2SimpleDml(Dml dml, MappingConfig config) {
+        List<SimpleDml> simpleDmlList = new ArrayList<>();
+        int len = dml.getData().size();
+
+        for (int i = 0; i < len; i++) {
+            SimpleDml simpleDml = new SimpleDml();
+            simpleDml.setDestination(dml.getDestination());
+            simpleDml.setDatabase(dml.getDatabase());
+            simpleDml.setTable(dml.getTable());
+            simpleDml.setType(dml.getType());
+            simpleDml.setData(dml.getData().get(i));
+            if (dml.getOld() != null) {
+                simpleDml.setOld(dml.getOld().get(i));
+            }
+            simpleDml.setConfig(config);
+            simpleDmlList.add(simpleDml);
+        }
+
+        return simpleDmlList;
+    }
+}