mcy 6 سال پیش
والد
کامیت
fbf249580f

+ 3 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -200,7 +200,9 @@ public abstract class AbstractCanalAdapterWorker {
                     len = 0;
                     len = 0;
                 }
                 }
             }
             }
-            adapter.sync(dmlsBatch);
+            if (!dmlsBatch.isEmpty()) {
+                adapter.sync(dmlsBatch);
+            }
         }
         }
     }
     }
 
 

+ 60 - 14
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -5,9 +5,7 @@ import java.sql.SQLException;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 
 
 import javax.sql.DataSource;
 import javax.sql.DataSource;
 
 
@@ -26,6 +24,12 @@ import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import com.alibaba.otter.canal.client.adapter.support.*;
 import com.alibaba.otter.canal.client.adapter.support.*;
 
 
+/**
+ * RDB适配器实现类
+ *
+ * @author rewerma 2018-11-7 下午06:45:49
+ * @version 1.0.0
+ */
 @SPI("rdb")
 @SPI("rdb")
 public class RdbAdapter implements OuterAdapter {
 public class RdbAdapter implements OuterAdapter {
 
 
@@ -40,8 +44,6 @@ public class RdbAdapter implements OuterAdapter {
     private RdbSyncService                          rdbSyncService;
     private RdbSyncService                          rdbSyncService;
     private RdbMirrorDbSyncService                  rdbMirrorDbSyncService;
     private RdbMirrorDbSyncService                  rdbMirrorDbSyncService;
 
 
-    private ExecutorService                         executor            = Executors.newFixedThreadPool(1);
-
     private RdbConfigMonitor                        rdbConfigMonitor;
     private RdbConfigMonitor                        rdbConfigMonitor;
 
 
     public Map<String, MappingConfig> getRdbMapping() {
     public Map<String, MappingConfig> getRdbMapping() {
@@ -52,6 +54,11 @@ public class RdbAdapter implements OuterAdapter {
         return mappingConfigCache;
         return mappingConfigCache;
     }
     }
 
 
+    /**
+     * 初始化方法
+     *
+     * @param configuration 外部适配器配置信息
+     */
     @Override
     @Override
     public void init(OuterAdapterConfig configuration) {
     public void init(OuterAdapterConfig configuration) {
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load();
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load();
@@ -80,6 +87,7 @@ public class RdbAdapter implements OuterAdapter {
             }
             }
         }
         }
 
 
+        // 初始化连接池
         Map<String, String> properties = configuration.getProperties();
         Map<String, String> properties = configuration.getProperties();
         dataSource = new DruidDataSource();
         dataSource = new DruidDataSource();
         dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
         dataSource.setDriverClassName(properties.get("jdbc.driverClassName"));
@@ -88,7 +96,7 @@ public class RdbAdapter implements OuterAdapter {
         dataSource.setPassword(properties.get("jdbc.password"));
         dataSource.setPassword(properties.get("jdbc.password"));
         dataSource.setInitialSize(1);
         dataSource.setInitialSize(1);
         dataSource.setMinIdle(1);
         dataSource.setMinIdle(1);
-        dataSource.setMaxActive(20);
+        dataSource.setMaxActive(10);
         dataSource.setMaxWait(60000);
         dataSource.setMaxWait(60000);
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setTimeBetweenEvictionRunsMillis(60000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
         dataSource.setMinEvictableIdleTimeMillis(300000);
@@ -102,24 +110,49 @@ public class RdbAdapter implements OuterAdapter {
         String threads = properties.get("threads");
         String threads = properties.get("threads");
         // String commitSize = properties.get("commitSize");
         // String commitSize = properties.get("commitSize");
 
 
-        rdbSyncService = new RdbSyncService(mappingConfigCache,
-            dataSource,
-            threads != null ? Integer.valueOf(threads) : null);
+        rdbSyncService = new RdbSyncService(dataSource, threads != null ? Integer.valueOf(threads) : null);
 
 
         rdbMirrorDbSyncService = new RdbMirrorDbSyncService(mirrorDbConfigCache,
         rdbMirrorDbSyncService = new RdbMirrorDbSyncService(mirrorDbConfigCache,
             dataSource,
             dataSource,
-            threads != null ? Integer.valueOf(threads) : null);
+            threads != null ? Integer.valueOf(threads) : null,
+            rdbSyncService.getColumnsTypeCache());
 
 
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor.init(configuration.getKey(), this);
         rdbConfigMonitor.init(configuration.getKey(), this);
     }
     }
 
 
+    /**
+     * 同步方法
+     *
+     * @param dmls 数据包
+     */
     @Override
     @Override
     public void sync(List<Dml> dmls) {
     public void sync(List<Dml> dmls) {
-        rdbSyncService.sync(dmls);
-        rdbMirrorDbSyncService.sync(dmls);
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+        Future<Boolean> future1 = executorService.submit(() -> {
+            rdbSyncService.sync(mappingConfigCache, dmls);
+            return true;
+        });
+        Future<Boolean> future2 = executorService.submit(() -> {
+            rdbMirrorDbSyncService.sync(dmls);
+            return true;
+        });
+        try {
+            future1.get();
+            future2.get();
+        } catch (ExecutionException | InterruptedException e) {
+            // ignore
+        }
     }
     }
 
 
+    /**
+     * ETL方法
+     *
+     * @param task 任务名, 对应配置名
+     * @param params etl筛选条件
+     * @return ETL结果
+     */
     @Override
     @Override
     public EtlResult etl(String task, List<String> params) {
     public EtlResult etl(String task, List<String> params) {
         EtlResult etlResult = new EtlResult();
         EtlResult etlResult = new EtlResult();
@@ -168,6 +201,12 @@ public class RdbAdapter implements OuterAdapter {
         return etlResult;
         return etlResult;
     }
     }
 
 
+    /**
+     * 获取总数方法
+     *
+     * @param task 任务名, 对应配置名
+     * @return 总数
+     */
     @Override
     @Override
     public Map<String, Object> count(String task) {
     public Map<String, Object> count(String task) {
         MappingConfig config = rdbMapping.get(task);
         MappingConfig config = rdbMapping.get(task);
@@ -203,6 +242,12 @@ public class RdbAdapter implements OuterAdapter {
         return res;
         return res;
     }
     }
 
 
+    /**
+     * 获取对应canal instance name 或 mq topic
+     *
+     * @param task 任务名, 对应配置名
+     * @return destination
+     */
     @Override
     @Override
     public String getDestination(String task) {
     public String getDestination(String task) {
         MappingConfig config = rdbMapping.get(task);
         MappingConfig config = rdbMapping.get(task);
@@ -212,6 +257,9 @@ public class RdbAdapter implements OuterAdapter {
         return null;
         return null;
     }
     }
 
 
+    /**
+     * 销毁方法
+     */
     @Override
     @Override
     public void destroy() {
     public void destroy() {
         if (rdbConfigMonitor != null) {
         if (rdbConfigMonitor != null) {
@@ -222,8 +270,6 @@ public class RdbAdapter implements OuterAdapter {
             rdbSyncService.close();
             rdbSyncService.close();
         }
         }
 
 
-        executor.shutdown();
-
         if (dataSource != null) {
         if (dataSource != null) {
             dataSource.close();
             dataSource.close();
         }
         }

+ 10 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -89,6 +89,8 @@ public class MappingConfig {
         private int                 readBatch   = 5000;
         private int                 readBatch   = 5000;
         private int                 commitBatch = 5000;                  // etl等批量提交大小
         private int                 commitBatch = 5000;                  // etl等批量提交大小
 
 
+        private Map<String, String> allMapColumns;
+
         public boolean isMirrorDb() {
         public boolean isMirrorDb() {
             return mirrorDb == null ? false : mirrorDb;
             return mirrorDb == null ? false : mirrorDb;
         }
         }
@@ -176,5 +178,13 @@ public class MappingConfig {
         public void setCommitBatch(int commitBatch) {
         public void setCommitBatch(int commitBatch) {
             this.commitBatch = commitBatch;
             this.commitBatch = commitBatch;
         }
         }
+
+        public Map<String, String> getAllMapColumns() {
+            return allMapColumns;
+        }
+
+        public void setAllMapColumns(Map<String, String> allMapColumns) {
+            this.allMapColumns = allMapColumns;
+        }
     }
     }
 }
 }

+ 110 - 43
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.client.adapter.rdb.service;
 
 
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.Statement;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -14,70 +15,136 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
 
+/**
+ * RDB镜像库同步操作业务
+ *
+ * @author rewerma 2018-12-12 下午011:23
+ * @version 1.0.0
+ */
 public class RdbMirrorDbSyncService {
 public class RdbMirrorDbSyncService {
 
 
-    private static final Logger               logger             = LoggerFactory
-        .getLogger(RdbMirrorDbSyncService.class);
+    private static final Logger              logger             = LoggerFactory.getLogger(RdbMirrorDbSyncService.class);
 
 
-    private Map<String, MappingConfig>        mirrorDbConfigCache;                           // 镜像库配置
-    private DataSource                        dataSource;
+    private Map<String, MappingConfig>       mirrorDbConfigCache;                                                       // 镜像库配置
+    private DataSource                       dataSource;
+    private RdbSyncService                   rdbSyncService;                                                            // rdbSyncService代理
 
 
-    private static Map<String, MappingConfig> tableDbConfigCache = new ConcurrentHashMap<>();
+    private final Map<String, MappingConfig> tableDbConfigCache = new ConcurrentHashMap<>();                            // 自动生成的库表配置缓存
 
 
     public RdbMirrorDbSyncService(Map<String, MappingConfig> mirrorDbConfigCache, DataSource dataSource,
     public RdbMirrorDbSyncService(Map<String, MappingConfig> mirrorDbConfigCache, DataSource dataSource,
-                                  Integer threads){
+                                  Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
         this.mirrorDbConfigCache = mirrorDbConfigCache;
         this.mirrorDbConfigCache = mirrorDbConfigCache;
         this.dataSource = dataSource;
         this.dataSource = dataSource;
+        this.rdbSyncService = new RdbSyncService(dataSource, threads, columnsTypeCache);
     }
     }
 
 
+    /**
+     * 批量同步方法
+     *
+     * @param dmls 批量 DML
+     */
     public void sync(List<Dml> dmls) {
     public void sync(List<Dml> dmls) {
-        for (Dml dml : dmls) {
-            String destination = StringUtils.trimToEmpty(dml.getDestination());
-            String database = dml.getDatabase();
-            MappingConfig configMap = mirrorDbConfigCache.get(destination + "." + database);
-            if (configMap == null) {
-                continue;
+        try {
+            List<Dml> dmlList = new ArrayList<>();
+            for (Dml dml : dmls) {
+                String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String database = dml.getDatabase();
+                MappingConfig configMap = mirrorDbConfigCache.get(destination + "." + database);
+                if (configMap == null) {
+                    continue;
+                }
+                if (dml.getSql() != null) {
+                    // DDL
+                    executeDdl(dml);
+                    rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
+                    tableDbConfigCache.remove(destination + "." + database + "." + dml.getTable()); // 删除对应库表配置
+                } else {
+                    // DML
+                    initMappingConfig(destination + "." + database + "." + dml.getTable(), configMap, dml);
+                    dmlList.add(dml);
+                }
             }
             }
-            if (dml.getSql() != null) {
-                // DDL
-                executeDdl(database, dml.getSql());
-            } else {
-                // DML
-                // TODO
-                MappingConfig mappingConfig = tableDbConfigCache
-                    .get(destination + "." + database + "." + dml.getTable());
-                if (mappingConfig == null) {
-                    // 构造一个配置
-                    mappingConfig = new MappingConfig();
-                    mappingConfig.setDataSourceKey(configMap.getDataSourceKey());
-                    mappingConfig.setDestination(configMap.getDestination());
-                    mappingConfig.setOuterAdapterKey(configMap.getOuterAdapterKey());
-                    mappingConfig.setConcurrent(configMap.getConcurrent());
-                    MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();
-                    mappingConfig.setDbMapping(dbMapping);
-                    dbMapping.setDatabase(dml.getDatabase());
-                    dbMapping.setTable(dml.getTable());
-                    dbMapping.setTargetDb(dml.getDatabase());
-                    dbMapping.setTargetTable(dml.getTable());
-                    dbMapping.setMapAll(true);
-                    List<String> pkNames = dml.getPkNames();
-                    Map<String, String> pkMapping = new LinkedHashMap<>();
-                    pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
-                    dbMapping.setTargetPk(pkMapping);
+            if (!dmlList.isEmpty()) {
+                rdbSyncService.sync(dmlList, dml -> {
+                    String destination = StringUtils.trimToEmpty(dml.getDestination());
+                    String database = dml.getDatabase();
+                    String table = dml.getTable();
+                    MappingConfig config = tableDbConfigCache.get(destination + "." + database + "." + table);
 
 
-                    tableDbConfigCache.put(destination + "." + database + "." + dml.getTable(), mappingConfig);
-                }
+                    if (config == null) {
+                        return false;
+                    }
+
+                    if (config.getConcurrent()) {
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    } else {
+                        int hash = 0;
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
+                            rdbSyncService.getDmlsPartition()[hash].add(syncItem);
+                        });
+                    }
+                    return true;
+                });
             }
             }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 初始化表配置
+     *
+     * @param key 配置key: destination.database.table
+     * @param baseConfigMap db sync config
+     * @param dml DML
+     */
+    private void initMappingConfig(String key, MappingConfig baseConfigMap, Dml dml) {
+        MappingConfig mappingConfig = tableDbConfigCache.get(key);
+        if (mappingConfig == null) {
+            // 构造一个配置
+            mappingConfig = new MappingConfig();
+            mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());
+            mappingConfig.setDestination(baseConfigMap.getDestination());
+            mappingConfig.setOuterAdapterKey(baseConfigMap.getOuterAdapterKey());
+            mappingConfig.setConcurrent(baseConfigMap.getConcurrent());
+            MappingConfig.DbMapping dbMapping = new MappingConfig.DbMapping();
+            mappingConfig.setDbMapping(dbMapping);
+            dbMapping.setDatabase(dml.getDatabase());
+            dbMapping.setTable(dml.getTable());
+            dbMapping.setTargetDb(dml.getDatabase());
+            dbMapping.setTargetTable(dml.getTable());
+            dbMapping.setMapAll(true);
+            List<String> pkNames = dml.getPkNames();
+            Map<String, String> pkMapping = new LinkedHashMap<>();
+            pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
+            dbMapping.setTargetPk(pkMapping);
+
+            tableDbConfigCache.put(key, mappingConfig);
         }
         }
     }
     }
 
 
-    private void executeDdl(String database, String sql) {
+    /**
+     * DDL 操作
+     *
+     * @param ddl DDL
+     */
+    private void executeDdl(Dml ddl) {
         try (Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) {
         try (Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) {
-            statement.execute(sql);
+            statement.execute(ddl.getSql());
+            // 移除对应配置
+            tableDbConfigCache.remove(ddl.getDatabase() + "." + ddl.getDatabase() + "." + ddl.getTable());
             if (logger.isTraceEnabled()) {
             if (logger.isTraceEnabled()) {
-                logger.trace("Execute DDL sql: {} for database: {}", sql, database);
+                logger.trace("Execute DDL sql: {} for database: {}", ddl.getSql(), ddl.getDatabase());
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);

+ 120 - 63
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -11,6 +11,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
+import java.util.function.Function;
 
 
 import javax.sql.DataSource;
 import javax.sql.DataSource;
 
 
@@ -36,26 +37,37 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
  */
  */
 public class RdbSyncService {
 public class RdbSyncService {
 
 
-    private static final Logger                     logger             = LoggerFactory.getLogger(RdbSyncService.class);
+    private static final Logger               logger  = LoggerFactory.getLogger(RdbSyncService.class);
 
 
-    private final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
+    // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
+    private Map<String, Map<String, Integer>> columnsTypeCache;
 
 
-    private Map<String, Map<String, MappingConfig>> mappingConfigCache;                                                // 库名-表名对应配置
+    private int                               threads = 3;
 
 
-    private int                                     threads            = 3;
+    private List<SyncItem>[]                  dmlsPartition;
+    private BatchExecutor[]                   batchExecutors;
+    private ExecutorService[]                 executorThreads;
 
 
-    private List<SyncItem>[]                        dmlsPartition;
-    private BatchExecutor[]                         batchExecutors;
-    private ExecutorService[]                       executorThreads;
+    public List<SyncItem>[] getDmlsPartition() {
+        return dmlsPartition;
+    }
+
+    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
+        return columnsTypeCache;
+    }
+
+    @SuppressWarnings("unchecked")
+    public RdbSyncService(DataSource dataSource, Integer threads){
+        this(dataSource, threads, new ConcurrentHashMap<>());
+    }
 
 
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
-    public RdbSyncService(Map<String, Map<String, MappingConfig>> mappingConfigCache, DataSource dataSource,
-                          Integer threads){
+    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
+        this.columnsTypeCache = columnsTypeCache;
         try {
         try {
             if (threads != null) {
             if (threads != null) {
                 this.threads = threads;
                 this.threads = threads;
             }
             }
-            this.mappingConfigCache = mappingConfigCache;
             this.dmlsPartition = new List[this.threads];
             this.dmlsPartition = new List[this.threads];
             this.batchExecutors = new BatchExecutor[this.threads];
             this.batchExecutors = new BatchExecutor[this.threads];
             this.executorThreads = new ExecutorService[this.threads];
             this.executorThreads = new ExecutorService[this.threads];
@@ -69,66 +81,111 @@ public class RdbSyncService {
         }
         }
     }
     }
 
 
-    public void sync(List<Dml> dmls) {
+    /**
+     * 批量同步回调
+     *
+     * @param dmls 批量 DML
+     * @param function 回调方法
+     */
+    public void sync(List<Dml> dmls, Function<Dml, Boolean> function) {
         try {
         try {
+            boolean toExecute = false;
             for (Dml dml : dmls) {
             for (Dml dml : dmls) {
-                String destination = StringUtils.trimToEmpty(dml.getDestination());
-                String database = dml.getDatabase();
-                String table = dml.getTable();
-                Map<String, MappingConfig> configMap = mappingConfigCache
-                    .get(destination + "." + database + "." + table);
-
-                if (configMap == null) {
-                    continue;
+                if (!toExecute) {
+                    toExecute = function.apply(dml);
+                } else {
+                    function.apply(dml);
+                }
+            }
+            if (toExecute) {
+                List<Future> futures = new ArrayList<>();
+                for (int i = 0; i < threads; i++) {
+                    int j = i;
+                    futures.add(executorThreads[i].submit(() -> {
+                        dmlsPartition[j]
+                            .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                        batchExecutors[j].commit();
+                        return true;
+                    }));
                 }
                 }
-                for (MappingConfig config : configMap.values()) {
-
-                    if (config.getConcurrent()) {
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
-                    } else {
-                        int hash = 0;
-                        // Math.abs(Math.abs(config.getDbMapping().getTargetTable().hashCode()) %
-                        // threads);
-                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
-                        singleDmls.forEach(singleDml -> {
-                            SyncItem syncItem = new SyncItem(config, singleDml);
-                            dmlsPartition[hash].add(syncItem);
-                        });
+
+                futures.forEach(future -> {
+                    try {
+                        future.get();
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
                     }
                     }
+                });
+
+                for (int i = 0; i < threads; i++) {
+                    dmlsPartition[i].clear();
                 }
                 }
             }
             }
-            List<Future> futures = new ArrayList<>();
-            for (int i = 0; i < threads; i++) {
-                int j = i;
-                futures.add(executorThreads[i].submit(() -> {
-                    dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
-                    batchExecutors[j].commit();
-                    return true;
-                }));
-            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 批量同步
+     *
+     * @param mappingConfig 配置集合
+     * @param dmls 批量 DML
+     */
+    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls) {
+        try {
+            sync(dmls, dml -> {
+                if (dml.getSql() != null) {
+                    // DDL
+                    columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
+                    return false;
+                } else {
+                    // DML
+                    String destination = StringUtils.trimToEmpty(dml.getDestination());
+                    String database = dml.getDatabase();
+                    String table = dml.getTable();
+                    Map<String, MappingConfig> configMap = mappingConfig
+                        .get(destination + "." + database + "." + table);
+
+                    if (configMap == null) {
+                        return false;
+                    }
 
 
-            futures.forEach(future -> {
-                try {
-                    future.get();
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
+                    boolean executed = false;
+                    for (MappingConfig config : configMap.values()) {
+                        if (config.getConcurrent()) {
+                            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                            singleDmls.forEach(singleDml -> {
+                                int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                                SyncItem syncItem = new SyncItem(config, singleDml);
+                                dmlsPartition[hash].add(syncItem);
+                            });
+                        } else {
+                            int hash = 0;
+                            List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                            singleDmls.forEach(singleDml -> {
+                                SyncItem syncItem = new SyncItem(config, singleDml);
+                                dmlsPartition[hash].add(syncItem);
+                            });
+                        }
+                        executed = true;
+                    }
+                    return executed;
                 }
                 }
             });
             });
-
-            for (int i = 0; i < threads; i++) {
-                dmlsPartition[i].clear();
-            }
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);
         }
         }
     }
     }
 
 
-    private void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+    /**
+     * 单条 dml 同步
+     *
+     * @param batchExecutor 批量事务执行器
+     * @param config 对应配置对象
+     * @param dml DML
+     */
+    public void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
         try {
         try {
             if (config != null) {
             if (config != null) {
                 String type = dml.getType();
                 String type = dml.getType();
@@ -308,10 +365,10 @@ public class RdbSyncService {
     private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
     private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
         DbMapping dbMapping = config.getDbMapping();
         DbMapping dbMapping = config.getDbMapping();
         String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
         String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
-        Map<String, Integer> columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+        Map<String, Integer> columnType = columnsTypeCache.get(cacheKey);
         if (columnType == null) {
         if (columnType == null) {
             synchronized (RdbSyncService.class) {
             synchronized (RdbSyncService.class) {
-                columnType = COLUMNS_TYPE_CACHE.get(cacheKey);
+                columnType = columnsTypeCache.get(cacheKey);
                 if (columnType == null) {
                 if (columnType == null) {
                     columnType = new LinkedHashMap<>();
                     columnType = new LinkedHashMap<>();
                     final Map<String, Integer> columnTypeTmp = columnType;
                     final Map<String, Integer> columnTypeTmp = columnType;
@@ -323,7 +380,7 @@ public class RdbSyncService {
                             for (int i = 1; i <= columnCount; i++) {
                             for (int i = 1; i <= columnCount; i++) {
                                 columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
                                 columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
                             }
                             }
-                            COLUMNS_TYPE_CACHE.put(cacheKey, columnTypeTmp);
+                            columnsTypeCache.put(cacheKey, columnTypeTmp);
                         } catch (SQLException e) {
                         } catch (SQLException e) {
                             logger.error(e.getMessage(), e);
                             logger.error(e.getMessage(), e);
                         }
                         }
@@ -364,12 +421,12 @@ public class RdbSyncService {
         sql.delete(len - 4, len);
         sql.delete(len - 4, len);
     }
     }
 
 
-    private class SyncItem {
+    public static class SyncItem {
 
 
         private MappingConfig config;
         private MappingConfig config;
         private SingleDml     singleDml;
         private SingleDml     singleDml;
 
 
-        private SyncItem(MappingConfig config, SingleDml singleDml){
+        public SyncItem(MappingConfig config, SingleDml singleDml){
             this.config = config;
             this.config = config;
             this.singleDml = singleDml;
             this.singleDml = singleDml;
         }
         }
@@ -378,11 +435,11 @@ public class RdbSyncService {
     /**
     /**
      * 取主键hash
      * 取主键hash
      */
      */
-    private int pkHash(DbMapping dbMapping, Map<String, Object> d) {
+    public int pkHash(DbMapping dbMapping, Map<String, Object> d) {
         return pkHash(dbMapping, d, null);
         return pkHash(dbMapping, d, null);
     }
     }
 
 
-    private int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
+    public int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
         int hash = 0;
         int hash = 0;
         // 取主键
         // 取主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {

+ 4 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -23,6 +23,9 @@ public class SyncUtil {
     public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
     public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
         Map<String, String> columnsMap;
         Map<String, String> columnsMap;
         if (dbMapping.isMapAll()) {
         if (dbMapping.isMapAll()) {
+            if (dbMapping.getAllMapColumns() != null) {
+                return dbMapping.getAllMapColumns();
+            }
             columnsMap = new LinkedHashMap<>();
             columnsMap = new LinkedHashMap<>();
             for (String srcColumn : columns) {
             for (String srcColumn : columns) {
                 boolean flag = true;
                 boolean flag = true;
@@ -39,6 +42,7 @@ public class SyncUtil {
                     columnsMap.put(srcColumn, srcColumn);
                     columnsMap.put(srcColumn, srcColumn);
                 }
                 }
             }
             }
+            dbMapping.setAllMapColumns(columnsMap);
         } else {
         } else {
             columnsMap = dbMapping.getTargetColumns();
             columnsMap = dbMapping.getTargetColumns();
         }
         }