Browse Source

rdb 批量提交

mcy 6 years ago
parent
commit
7ee7ba0e05

+ 30 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
+import java.io.File;
+import java.net.URL;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -9,6 +11,7 @@ import java.util.function.Function;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,4 +90,31 @@ public class Util {
             }
         }
     }
+
+    public static File getConfDirPath() {
+        return getConfDirPath("");
+    }
+
+    public static File getConfDirPath(String subConf) {
+        URL url = Util.class.getClassLoader().getResource("");
+        String path;
+        if (url != null) {
+            path = url.getPath();
+        } else {
+            path = new File("").getAbsolutePath();
+        }
+        File file = null;
+        if (path != null) {
+            file = new File(
+                path + ".." + File.separator + Constant.CONF_DIR + File.separator + StringUtils.trimToEmpty(subConf));
+            if (!file.exists()) {
+                file = new File(path + StringUtils.trimToEmpty(subConf));
+            }
+        }
+        if (file == null || !file.exists()) {
+            throw new RuntimeException("Config dir not found.");
+        }
+
+        return file;
+    }
 }

+ 3 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -5,6 +5,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.sql.DataSource;
 
@@ -37,8 +38,8 @@ public class HbaseAdapter implements OuterAdapter {
 
     private static Logger              logger             = LoggerFactory.getLogger(HbaseAdapter.class);
 
-    private Map<String, MappingConfig> hbaseMapping       = new HashMap<>();                            // 文件名对应配置
-    private Map<String, MappingConfig> mappingConfigCache = new HashMap<>();                            // 库名-表名对应配置
+    private Map<String, MappingConfig> hbaseMapping       = new ConcurrentHashMap<>();                            // 文件名对应配置
+    private Map<String, MappingConfig> mappingConfigCache = new ConcurrentHashMap<>();                            // 库名-表名对应配置
 
     private Connection                 conn;
     private HbaseSyncService           hbaseSyncService;

+ 54 - 58
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationRunningMonitor.java

@@ -2,16 +2,16 @@ package com.alibaba.otter.canal.adapter.launcher.monitor;
 
 import java.io.File;
 import java.io.FileReader;
-import java.net.URL;
 import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 
+import org.apache.commons.io.filefilter.FileFilterUtils;
+import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.io.monitor.FileAlterationObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.cloud.context.refresh.ContextRefresher;
@@ -19,76 +19,72 @@ import org.springframework.stereotype.Component;
 import org.yaml.snakeyaml.Yaml;
 
 import com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService;
-import com.alibaba.otter.canal.client.adapter.support.Constant;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 @Component
 public class ApplicationRunningMonitor {
 
-    private static final Logger      logger                = LoggerFactory.getLogger(ApplicationRunningMonitor.class);
+    private static final Logger   logger = LoggerFactory.getLogger(ApplicationRunningMonitor.class);
 
     @Resource
-    private ContextRefresher         contextRefresher;
+    private ContextRefresher      contextRefresher;
 
     @Resource
-    private CanalAdapterService      canalAdapterService;
+    private CanalAdapterService   canalAdapterService;
 
-    private ScheduledExecutorService scheduled             = Executors.newScheduledThreadPool(1);
-
-    private static volatile long     applicationLastModify = -1L;
+    private FileAlterationMonitor fileMonitor;
 
     @PostConstruct
     public void init() {
-        URL url = this.getClass().getClassLoader().getResource("");
-        String path;
-        if (url != null) {
-            path = url.getPath();
-        } else {
-            path = new File("").getAbsolutePath();
-        }
-        File file = null;
-        if (path != null) {
-            file = new File(path + ".." + File.separator + Constant.CONF_DIR + File.separator + "application.yml");
-            if (!file.exists()) {
-                file = new File(path + "application.yml");
-            }
-        }
-        if (file == null || !file.exists()) {
-            throw new RuntimeException("application.yml config file not found.");
+        File confDir = Util.getConfDirPath();
+        try {
+            FileAlterationObserver observer = new FileAlterationObserver(confDir,
+                FileFilterUtils.and(FileFilterUtils.fileFileFilter(),
+                    FileFilterUtils.prefixFileFilter("application"),
+                    FileFilterUtils.suffixFileFilter("yml")));
+            FileListener listener = new FileListener();
+            observer.addListener(listener);
+            fileMonitor = new FileAlterationMonitor(3000, observer);
+            fileMonitor.start();
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
-        File configFile = file;
-        scheduled.scheduleWithFixedDelay(() -> {
-            if (applicationLastModify == -1L) {
-                applicationLastModify = configFile.lastModified();
-            } else {
-                if (configFile.lastModified() != applicationLastModify) {
-                    applicationLastModify = configFile.lastModified();
-
-                    try {
-                        // 检查yml格式
-                        new Yaml().loadAs(new FileReader(configFile), Map.class);
-
-                        canalAdapterService.destroy();
-
-                        // refresh context
-                        contextRefresher.refresh();
-
-                        try {
-                            Thread.sleep(2000);
-                        } catch (InterruptedException e) {
-                            // ignore
-                        }
-                        canalAdapterService.init();
-                        logger.info("## adapter application config reloaded.");
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
-                    }
-                }
-            }
-        }, 5, 5, TimeUnit.SECONDS);
     }
 
     @PreDestroy
     public void destroy() {
-        scheduled.shutdown();
+        try {
+            fileMonitor.stop();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private class FileListener extends FileAlterationListenerAdaptor {
+
+        @Override
+        public void onFileChange(File file) {
+            super.onFileChange(file);
+            try {
+                // 检查yml格式
+                new Yaml().loadAs(new FileReader(file), Map.class);
+
+                canalAdapterService.destroy();
+
+                // refresh context
+                contextRefresher.refresh();
+
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                canalAdapterService.init();
+                logger.info("## adapter application config reloaded.");
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
     }
 }

+ 11 - 11
client-adapter/launcher/src/main/resources/application.yml

@@ -2,7 +2,7 @@ server:
   port: 8081
 logging:
   level:
-#    org.springframework: WARN
+    org.springframework: WARN
     com.alibaba.otter.canal.client.adapter.hbase: DEBUG
     com.alibaba.otter.canal.client.adapter.es: DEBUG
     com.alibaba.otter.canal.client.adapter.rdb: DEBUG
@@ -28,16 +28,16 @@ canal.conf:
   - instance: example
     groups:
     - outAdapters:
-      - name: logger
-#      - name: rdb
-#        key: oracle1
-#        properties:
-#          jdbc.driverClassName: oracle.jdbc.OracleDriver
-#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
-#          jdbc.username: mytest
-#          jdbc.password: m121212
-#          threads: 5
-#          commitSize: 3000
+#      - name: logger
+      - name: rdb
+        key: oracle1
+        properties:
+          jdbc.driverClassName: oracle.jdbc.OracleDriver
+          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
+          jdbc.username: mytest
+          jdbc.password: m121212
+          threads: 5
+          commitSize: 5000
 #      - name: rdb
 #        key: postgres1
 #        properties:

+ 5 - 5
client-adapter/rdb/pom.xml

@@ -39,12 +39,12 @@
             <!--<version>42.1.4</version>-->
             <!--<scope>test</scope>-->
         <!--</dependency>-->
-        <!--<dependency>-->
-            <!--<groupId>com.oracle</groupId>-->
-            <!--<artifactId>ojdbc6</artifactId>-->
-            <!--<version>11.2.0.4.0-atlassian-hosted</version>-->
+        <dependency>
+            <groupId>com.oracle</groupId>
+            <artifactId>ojdbc6</artifactId>
+            <version>11.2.0.4.0-atlassian-hosted</version>
             <!--<scope>test</scope>-->
-        <!--</dependency>-->
+        </dependency>
         <!--<dependency>-->
             <!--<groupId>com.microsoft.sqlserver</groupId>-->
             <!--<artifactId>mssql-jdbc</artifactId>-->

+ 57 - 6
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -3,6 +3,13 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.sql.DataSource;
 
@@ -30,6 +37,8 @@ public class RdbAdapter implements OuterAdapter {
 
     private RdbSyncService             rdbSyncService;
 
+    private int                        commitSize         = 3000;
+
     @Override
     public void init(OuterAdapterConfig configuration) {
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load();
@@ -69,19 +78,58 @@ public class RdbAdapter implements OuterAdapter {
 
         String threads = properties.get("threads");
         String commitSize = properties.get("commitSize");
-        rdbSyncService = new RdbSyncService(commitSize != null ? Integer.valueOf(commitSize) : null,
+        if (commitSize != null) {
+            this.commitSize = Integer.valueOf(commitSize);
+        }
+        rdbSyncService = new RdbSyncService(this.commitSize,
             threads != null ? Integer.valueOf(threads) : null,
             dataSource);
     }
 
+    private AtomicInteger   batchRowNum = new AtomicInteger(0);
+    private List<Dml>       dmlList     = Collections.synchronizedList(new ArrayList<>());
+    private Lock            syncLock    = new ReentrantLock();
+    private Condition       condition   = syncLock.newCondition();
+    private ExecutorService executor    = Executors.newFixedThreadPool(1);
+
     @Override
     public void sync(Dml dml) {
-        String destination = StringUtils.trimToEmpty(dml.getDestination());
-        String database = dml.getDatabase();
-        String table = dml.getTable();
-        MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
+        boolean first = batchRowNum.get() == 0;
+        int currentSize = batchRowNum.addAndGet(dml.getData().size());
+        dmlList.add(dml);
+
+        if (first) {
+            // 开启超时判断
+            executor.submit(() -> {
+                try {
+                    syncLock.lock();
+                    if (!condition.await(5, TimeUnit.SECONDS)) {
+                        // 批量超时
+                        sync();
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                } finally {
+                    syncLock.unlock();
+                }
+            });
+        }
 
-        rdbSyncService.sync(config, dml);
+        if (currentSize > commitSize) {
+            sync();
+        }
+    }
+
+    private void sync() {
+        try {
+            syncLock.lock();
+            rdbSyncService.sync(mappingConfigCache, dmlList);
+            batchRowNum.set(0);
+            dmlList.clear();
+            condition.signal();
+        } finally {
+            syncLock.unlock();
+        }
     }
 
     @Override
@@ -178,9 +226,12 @@ public class RdbAdapter implements OuterAdapter {
 
     @Override
     public void destroy() {
+        executor.shutdown();
+
         if (rdbSyncService != null) {
             rdbSyncService.close();
         }
+
         if (dataSource != null) {
             dataSource.close();
         }

+ 24 - 24
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java

@@ -71,32 +71,32 @@ public class BatchExecutor {
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
         }
-        int i = idx.incrementAndGet();
-
-        // 批次的第一次执行设置延时
-        if (i == 1) {
-            executor.submit(() -> {
-                try {
-                    commitLock.lock();
-                    conn.commit(); //直接提交一次
-                    if (!condition.await(5, TimeUnit.SECONDS)) {
-                        // 超时提交
-                        commit();
-                    }
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                } finally {
-                    commitLock.unlock();
-                }
-            });
-        }
-
-        if (i == commitSize) {
-            commit();
-        }
+        // int i = idx.incrementAndGet();
+        //
+        // // 批次的第一次执行设置延时
+        // if (i == 1) {
+        // executor.submit(() -> {
+        // try {
+        // commitLock.lock();
+        // conn.commit(); //直接提交一次
+        // if (!condition.await(5, TimeUnit.SECONDS)) {
+        // // 超时提交
+        // commit();
+        // }
+        // } catch (Exception e) {
+        // logger.error(e.getMessage(), e);
+        // } finally {
+        // commitLock.unlock();
+        // }
+        // });
+        // }
+        //
+        // if (i == commitSize) {
+        // commit();
+        // }
     }
 
-    private void commit() {
+    public void commit() {
         try {
             commitLock.lock();
             conn.commit();

+ 151 - 81
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -5,17 +5,13 @@ import java.io.StringReader;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.sql.*;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.sql.Date;
+import java.util.*;
+import java.util.concurrent.*;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +62,81 @@ public class RdbSyncService {
         }
     }
 
-    public void sync(MappingConfig config, List<Dml> dmlList) {
+    @SuppressWarnings("unchecked")
+    public void sync(Map<String, MappingConfig> mappingConfigCache, List<Dml> dmlList) {
+        try {
+            List<Map<String, Object>>[] dmlPartition = new ArrayList[threads];
+            for (int i = 0; i < threads; i++) {
+                dmlPartition[i] = new ArrayList<>();
+            }
+            // 根据hash拆分
+            dmlList.forEach(dml -> {
+                String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String database = dml.getDatabase();
+                String table = dml.getTable();
+                MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
+
+                Dml[] dmls = new Dml[threads];
+                for (int i = 0; i < threads; i++) {
+                    Dml dmlTmp = new Dml();
+                    dmlTmp.setDestination(dml.getDestination());
+                    dmlTmp.setDatabase(dml.getDatabase());
+                    dmlTmp.setTable(dml.getTable());
+                    dmlTmp.setType(dml.getType());
+                    dmlTmp.setTs(dml.getTs());
+                    dmlTmp.setEs(dml.getEs());
+                    dmlTmp.setSql(dml.getSql());
+                    dmlTmp.setData(new ArrayList<>());
+                    dmlTmp.setOld(new ArrayList<>());
+                    dmls[i] = dmlTmp;
+                }
+                int idx = 0;
+                for (Map<String, Object> data : dml.getData()) {
+                    int hash;
+                    if (config.getConcurrent()) {
+                        hash = pkHash(config.getDbMapping(), data, threads);
+                    } else {
+                        hash = Math.abs(Math.abs(config.getDbMapping().getTargetTable().hashCode()) % threads);
+                    }
+                    Dml dmlTmp = dmls[hash];
+                    dmlTmp.getData().add(data);
+                    if (dml.getOld() != null) {
+                        dmlTmp.getOld().add(dml.getOld().get(idx));
+                    }
+
+                    idx++;
+                }
+                for (int i = 0; i < threads; i++) {
+                    Map<String, Object> item = new HashMap<>();
+                    item.put("dml", dmls[i]);
+                    item.put("config", config);
+                    dmlPartition[i].add(item);
+                }
+
+            });
+            List<Future<Boolean>> futures = new ArrayList<>();
+            for (int i = 0; i < threads; i++) {
+                int j = i;
+                futures.add(threadExecutors[i].submit(() -> {
+                    dmlPartition[j].forEach(item -> {
+                        MappingConfig config = (MappingConfig) item.get("config");
+                        Dml dml = (Dml) item.get("dml");
+                        sync(config, dml);
+                    });
+                    batchExecutors[j].commit();
+                    return true;
+                }));
+            }
+            for (int i = 0; i < threads; i++) {
+                try {
+                    futures.get(i).get();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Error rdb sync for batch", e);
+        }
     }
 
     public void sync(MappingConfig config, Dml dml) {
@@ -134,32 +204,32 @@ public class RdbSyncService {
             } else {
                 hash = pkHash(dbMapping, d, threads);
             }
-            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
-            checkQueue(tpe);
-            tpe.submit(() -> {
-                try {
-                    BatchExecutor batchExecutor = batchExecutors[hash];
-                    List<Map<String, ?>> values = new ArrayList<>();
-
-                    for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                        String targetColumnName = entry.getKey();
-                        String srcColumnName = entry.getValue();
-                        if (srcColumnName == null) {
-                            srcColumnName = targetColumnName;
-                        }
+            // ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
+            // checkQueue(tpe);
+            // tpe.submit(() -> {
+            try {
+                BatchExecutor batchExecutor = batchExecutors[hash];
+                List<Map<String, ?>> values = new ArrayList<>();
+
+                for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                    String targetColumnName = entry.getKey();
+                    String srcColumnName = entry.getValue();
+                    if (srcColumnName == null) {
+                        srcColumnName = targetColumnName;
+                    }
 
-                        Integer type = ctype.get(targetColumnName.toLowerCase());
-                        if (type == null) {
-                            throw new RuntimeException("No column: " + targetColumnName + " found in target db");
-                        }
-                        Object value = d.get(srcColumnName);
-                        BatchExecutor.setValue(values, type, value);
+                    Integer type = ctype.get(targetColumnName.toLowerCase());
+                    if (type == null) {
+                        throw new RuntimeException("No column: " + targetColumnName + " found in target db");
                     }
-                    batchExecutor.execute(sql, values);
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
+                    Object value = d.get(srcColumnName);
+                    BatchExecutor.setValue(values, type, value);
                 }
-            });
+                batchExecutor.execute(sql, values);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+            // });
         }
     }
 
@@ -202,46 +272,46 @@ public class RdbSyncService {
                 hash = pkHash(dbMapping, d, o, threads);
             }
 
-            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
-            checkQueue(tpe);
-            tpe.submit(() -> {
-                try {
-                    BatchExecutor batchExecutor = batchExecutors[hash];
+            // ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
+            // checkQueue(tpe);
+            // tpe.submit(() -> {
+            try {
+                BatchExecutor batchExecutor = batchExecutors[hash];
 
-                    StringBuilder updateSql = new StringBuilder();
-                    updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+                StringBuilder updateSql = new StringBuilder();
+                updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
 
-                    List<Map<String, ?>> values = new ArrayList<>();
-                    for (String srcColumnName : o.keySet()) {
-                        List<String> targetColumnNames = new ArrayList<>();
-                        columnsMap.forEach((targetColumn, srcColumn) -> {
-                            if (srcColumnName.toLowerCase().equals(srcColumn)) {
-                                targetColumnNames.add(targetColumn);
-                            }
-                        });
-                        if (!targetColumnNames.isEmpty()) {
-                            for (String targetColumnName : targetColumnNames) {
-                                updateSql.append(targetColumnName).append("=?, ");
-                                Integer type = ctype.get(targetColumnName.toLowerCase());
-                                BatchExecutor.setValue(values, type, d.get(srcColumnName));
-                            }
+                List<Map<String, ?>> values = new ArrayList<>();
+                for (String srcColumnName : o.keySet()) {
+                    List<String> targetColumnNames = new ArrayList<>();
+                    columnsMap.forEach((targetColumn, srcColumn) -> {
+                        if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                            targetColumnNames.add(targetColumn);
+                        }
+                    });
+                    if (!targetColumnNames.isEmpty()) {
+                        for (String targetColumnName : targetColumnNames) {
+                            updateSql.append(targetColumnName).append("=?, ");
+                            Integer type = ctype.get(targetColumnName.toLowerCase());
+                            BatchExecutor.setValue(values, type, d.get(srcColumnName));
                         }
                     }
-                    int len = updateSql.length();
-                    updateSql.delete(len - 2, len).append(" WHERE ");
+                }
+                int len = updateSql.length();
+                updateSql.delete(len - 2, len).append(" WHERE ");
 
-                    // 拼接主键
-                    appendCondition(dbMapping, updateSql, ctype, values, d, o);
+                // 拼接主键
+                appendCondition(dbMapping, updateSql, ctype, values, d, o);
 
-                    batchExecutor.execute(updateSql.toString(), values);
+                batchExecutor.execute(updateSql.toString(), values);
 
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Execute sql: {}", updateSql);
-                    }
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Execute sql: {}", updateSql);
                 }
-            });
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+            // });
 
             idx++;
         }
@@ -276,27 +346,27 @@ public class RdbSyncService {
                 hash = pkHash(dbMapping, d, threads);
             }
 
-            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
-            checkQueue(tpe);
-            tpe.submit(() -> {
-                try {
-                    BatchExecutor batchExecutor = batchExecutors[hash];
-                    StringBuilder sql = new StringBuilder();
-                    sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
+            // ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
+            // checkQueue(tpe);
+            // tpe.submit(() -> {
+            try {
+                BatchExecutor batchExecutor = batchExecutors[hash];
+                StringBuilder sql = new StringBuilder();
+                sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
 
-                    List<Map<String, ?>> values = new ArrayList<>();
+                List<Map<String, ?>> values = new ArrayList<>();
 
-                    // 拼接主键
-                    appendCondition(dbMapping, sql, ctype, values, d);
+                // 拼接主键
+                appendCondition(dbMapping, sql, ctype, values, d);
 
-                    batchExecutor.execute(sql.toString(), values);
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Execute sql: {}", sql);
-                    }
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
+                batchExecutor.execute(sql.toString(), values);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Execute sql: {}", sql);
                 }
-            });
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+            // });
         }
     }