Browse Source

rdb 并行提交

mcy 6 years ago
parent
commit
e711f1a091

+ 8 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -89,10 +89,12 @@ public class RdbAdapter implements OuterAdapter {
             logger.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
             logger.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
         }
         }
 
 
-        // String threads = properties.get("threads");
+        String threads = properties.get("threads");
         // String commitSize = properties.get("commitSize");
         // String commitSize = properties.get("commitSize");
 
 
-        rdbSyncService = new RdbSyncService(mappingConfigCache, dataSource);
+        rdbSyncService = new RdbSyncService(mappingConfigCache,
+            dataSource,
+            threads != null ? Integer.valueOf(threads) : null);
 
 
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor.init(configuration.getKey(), this);
         rdbConfigMonitor.init(configuration.getKey(), this);
@@ -201,6 +203,10 @@ public class RdbAdapter implements OuterAdapter {
             rdbConfigMonitor.destroy();
             rdbConfigMonitor.destroy();
         }
         }
 
 
+        if (rdbSyncService != null) {
+            rdbSyncService.close();
+        }
+
         executor.shutdown();
         executor.shutdown();
 
 
         if (dataSource != null) {
         if (dataSource != null) {

+ 172 - 75
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -8,6 +8,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 
 import javax.sql.DataSource;
 import javax.sql.DataSource;
 
 
@@ -20,6 +23,7 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
 import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
 import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 import com.alibaba.otter.canal.client.adapter.support.Util;
@@ -38,15 +42,35 @@ public class RdbSyncService {
 
 
     private Map<String, Map<String, MappingConfig>> mappingConfigCache;                                                // 库名-表名对应配置
     private Map<String, Map<String, MappingConfig>> mappingConfigCache;                                                // 库名-表名对应配置
 
 
-    private DataSource                              dataSource;
+    private int                                     threads            = 3;
 
 
-    public RdbSyncService(Map<String, Map<String, MappingConfig>> mappingConfigCache, DataSource dataSource){
-        this.mappingConfigCache = mappingConfigCache;
-        this.dataSource = dataSource;
+    private List<SyncItem>[]                        dmlsPartition;
+    private BatchExecutor[]                         batchExecutors;
+    private ExecutorService[]                       executorThreads;
+
+    @SuppressWarnings("unchecked")
+    public RdbSyncService(Map<String, Map<String, MappingConfig>> mappingConfigCache, DataSource dataSource,
+                          Integer threads){
+        try {
+            if (threads != null) {
+                this.threads = threads;
+            }
+            this.mappingConfigCache = mappingConfigCache;
+            this.dmlsPartition = new List[this.threads];
+            this.batchExecutors = new BatchExecutor[this.threads];
+            this.executorThreads = new ExecutorService[this.threads];
+            for (int i = 0; i < this.threads; i++) {
+                dmlsPartition[i] = new ArrayList<>();
+                batchExecutors[i] = new BatchExecutor(dataSource.getConnection());
+                executorThreads[i] = Executors.newSingleThreadExecutor();
+            }
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        }
     }
     }
 
 
     public void sync(List<Dml> dmls) {
     public void sync(List<Dml> dmls) {
-        try (BatchExecutor batchExecutor = new BatchExecutor(dataSource.getConnection())) {
+        try {
             for (Dml dml : dmls) {
             for (Dml dml : dmls) {
                 String destination = StringUtils.trimToEmpty(dml.getDestination());
                 String destination = StringUtils.trimToEmpty(dml.getDestination());
                 String database = dml.getDatabase();
                 String database = dml.getDatabase();
@@ -55,16 +79,51 @@ public class RdbSyncService {
                     .get(destination + "." + database + "." + table);
                     .get(destination + "." + database + "." + table);
 
 
                 for (MappingConfig config : configMap.values()) {
                 for (MappingConfig config : configMap.values()) {
-                    sync(batchExecutor, config, dml);
+
+                    if (config.getConcurrent()) {
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            dmlsPartition[hash].add(syncItem);
+                        });
+                    } else {
+                        int hash = Math.abs(Math.abs(config.getDbMapping().getTargetTable().hashCode()) % threads);
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            dmlsPartition[hash].add(syncItem);
+                        });
+                    }
                 }
                 }
             }
             }
-            batchExecutor.commit();
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
+            List<Future> futures = new ArrayList<>();
+            for (int i = 0; i < threads; i++) {
+                int j = i;
+                futures.add(executorThreads[i].submit(() -> {
+                    dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                    batchExecutors[j].commit();
+                    return true;
+                }));
+            }
+
+            futures.forEach(future -> {
+                try {
+                    future.get();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            });
+
+            for (int i = 0; i < threads; i++) {
+                dmlsPartition[i].clear();
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
         }
     }
     }
 
 
-    private void sync(BatchExecutor batchExecutor, MappingConfig config, Dml dml) {
+    private void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
         try {
         try {
             if (config != null) {
             if (config != null) {
                 String type = dml.getType();
                 String type = dml.getType();
@@ -90,8 +149,8 @@ public class RdbSyncService {
      * @param config 配置项
      * @param config 配置项
      * @param dml DML数据
      * @param dml DML数据
      */
      */
-    private void insert(BatchExecutor batchExecutor, MappingConfig config, Dml dml) {
-        List<Map<String, Object>> data = dml.getData();
+    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+        Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
         if (data == null || data.isEmpty()) {
             return;
             return;
         }
         }
@@ -99,7 +158,7 @@ public class RdbSyncService {
         DbMapping dbMapping = config.getDbMapping();
         DbMapping dbMapping = config.getDbMapping();
 
 
         try {
         try {
-            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
+            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
 
             StringBuilder insertSql = new StringBuilder();
             StringBuilder insertSql = new StringBuilder();
             insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
             insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
@@ -116,28 +175,26 @@ public class RdbSyncService {
 
 
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
 
-            for (Map<String, Object> d : data) {
-                List<Map<String, ?>> values = new ArrayList<>();
-                for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                    String targetClolumnName = entry.getKey();
-                    String srcColumnName = entry.getValue();
-                    if (srcColumnName == null) {
-                        srcColumnName = targetClolumnName;
-                    }
-
-                    Integer type = ctype.get(targetClolumnName.toLowerCase());
+            List<Map<String, ?>> values = new ArrayList<>();
+            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                String targetClolumnName = entry.getKey();
+                String srcColumnName = entry.getValue();
+                if (srcColumnName == null) {
+                    srcColumnName = targetClolumnName;
+                }
 
 
-                    Object value = d.get(srcColumnName);
+                Integer type = ctype.get(targetClolumnName.toLowerCase());
 
 
-                    BatchExecutor.setValue(values, type, value);
-                }
+                Object value = data.get(srcColumnName);
 
 
-                batchExecutor.execute(insertSql.toString(), values);
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Insert into target table, sql: {}", insertSql);
-                }
+                BatchExecutor.setValue(values, type, value);
+            }
 
 
+            batchExecutor.execute(insertSql.toString(), values);
+            if (logger.isTraceEnabled()) {
+                logger.trace("Insert into target table, sql: {}", insertSql);
             }
             }
+
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);
         }
         }
@@ -149,59 +206,53 @@ public class RdbSyncService {
      * @param config 配置项
      * @param config 配置项
      * @param dml DML数据
      * @param dml DML数据
      */
      */
-    private void update(BatchExecutor batchExecutor, MappingConfig config, Dml dml) {
-        List<Map<String, Object>> data = dml.getData();
+    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+        Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
         if (data == null || data.isEmpty()) {
             return;
             return;
         }
         }
 
 
-        List<Map<String, Object>> old = dml.getOld();
+        Map<String, Object> old = dml.getOld();
         if (old == null || old.isEmpty()) {
         if (old == null || old.isEmpty()) {
             return;
             return;
         }
         }
 
 
         DbMapping dbMapping = config.getDbMapping();
         DbMapping dbMapping = config.getDbMapping();
 
 
-        int idx = 1;
-
         try {
         try {
-            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
+            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
 
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
 
-            for (Map<String, Object> o : old) {
-                Map<String, Object> d = data.get(idx - 1);
-                StringBuilder updateSql = new StringBuilder();
-                updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
-                List<Map<String, ?>> values = new ArrayList<>();
-                for (String srcColumnName : o.keySet()) {
-                    List<String> targetColumnNames = new ArrayList<>();
-                    columnsMap.forEach((targetColumn, srcColumn) -> {
-                        if (srcColumnName.toLowerCase().equals(srcColumn)) {
-                            targetColumnNames.add(targetColumn);
-                        }
-                    });
-                    if (!targetColumnNames.isEmpty()) {
+            StringBuilder updateSql = new StringBuilder();
+            updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+            List<Map<String, ?>> values = new ArrayList<>();
+            for (String srcColumnName : old.keySet()) {
+                List<String> targetColumnNames = new ArrayList<>();
+                columnsMap.forEach((targetColumn, srcColumn) -> {
+                    if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                        targetColumnNames.add(targetColumn);
+                    }
+                });
+                if (!targetColumnNames.isEmpty()) {
 
 
-                        for (String targetColumnName : targetColumnNames) {
-                            updateSql.append(targetColumnName).append("=?, ");
-                            Integer type = ctype.get(targetColumnName.toLowerCase());
-                            BatchExecutor.setValue(values, type, d.get(srcColumnName));
-                        }
+                    for (String targetColumnName : targetColumnNames) {
+                        updateSql.append(targetColumnName).append("=?, ");
+                        Integer type = ctype.get(targetColumnName.toLowerCase());
+                        BatchExecutor.setValue(values, type, data.get(srcColumnName));
                     }
                     }
                 }
                 }
-                int len = updateSql.length();
-                updateSql.delete(len - 2, len).append(" WHERE ");
+            }
+            int len = updateSql.length();
+            updateSql.delete(len - 2, len).append(" WHERE ");
 
 
-                // 拼接主键
-                appendCondition(dbMapping, updateSql, ctype, values, d, o);
+            // 拼接主键
+            appendCondition(dbMapping, updateSql, ctype, values, data, old);
 
 
-                batchExecutor.execute(updateSql.toString(), values);
+            batchExecutor.execute(updateSql.toString(), values);
 
 
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Update target table, sql: {}", updateSql);
-                }
-                idx++;
+            if (logger.isTraceEnabled()) {
+                logger.trace("Update target table, sql: {}", updateSql);
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);
@@ -214,8 +265,8 @@ public class RdbSyncService {
      * @param config
      * @param config
      * @param dml
      * @param dml
      */
      */
-    private void delete(BatchExecutor batchExecutor, MappingConfig config, Dml dml) {
-        List<Map<String, Object>> data = dml.getData();
+    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+        Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
         if (data == null || data.isEmpty()) {
             return;
             return;
         }
         }
@@ -225,19 +276,17 @@ public class RdbSyncService {
         try {
         try {
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
             Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
 
-            for (Map<String, Object> d : data) {
-                StringBuilder sql = new StringBuilder();
-                sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
+            StringBuilder sql = new StringBuilder();
+            sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
 
 
-                List<Map<String, ?>> values = new ArrayList<>();
-                // 拼接主键
-                appendCondition(dbMapping, sql, ctype, values, d);
+            List<Map<String, ?>> values = new ArrayList<>();
+            // 拼接主键
+            appendCondition(dbMapping, sql, ctype, values, data);
 
 
-                batchExecutor.execute(sql.toString(), values);
+            batchExecutor.execute(sql.toString(), values);
 
 
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Delete from target table, sql: {}", sql);
-                }
+            if (logger.isTraceEnabled()) {
+                logger.trace("Delete from target table, sql: {}", sql);
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);
@@ -309,4 +358,52 @@ public class RdbSyncService {
         int len = sql.length();
         int len = sql.length();
         sql.delete(len - 4, len);
         sql.delete(len - 4, len);
     }
     }
+
+    private class SyncItem {
+
+        private MappingConfig config;
+        private SingleDml     singleDml;
+
+        private SyncItem(MappingConfig config, SingleDml singleDml){
+            this.config = config;
+            this.singleDml = singleDml;
+        }
+    }
+
+    /**
+     * 取主键hash
+     */
+    private int pkHash(DbMapping dbMapping, Map<String, Object> d) {
+        return pkHash(dbMapping, d, null);
+    }
+
+    private int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
+        int hash = 0;
+        // 取主键
+        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
+            String targetColumnName = entry.getKey();
+            String srcColumnName = entry.getValue();
+            if (srcColumnName == null) {
+                srcColumnName = targetColumnName;
+            }
+            Object value;
+            if (o != null && o.containsKey(srcColumnName)) {
+                value = o.get(srcColumnName);
+            } else {
+                value = d.get(srcColumnName);
+            }
+            if (value != null) {
+                hash += value.hashCode();
+            }
+        }
+        hash = Math.abs(hash) % threads;
+        return Math.abs(hash);
+    }
+
+    public void close() {
+        for (int i = 0; i < threads; i++) {
+            batchExecutors[i].close();
+            executorThreads[i].shutdown();
+        }
+    }
 }
 }

+ 83 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SingleDml.java

@@ -0,0 +1,83 @@
+package com.alibaba.otter.canal.client.adapter.rdb.support;
+
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SingleDml {
+
+    private String              destination;
+    private String              database;
+    private String              table;
+    private String              type;
+    private Map<String, Object> data;
+    private Map<String, Object> old;
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public Map<String, Object> getData() {
+        return data;
+    }
+
+    public void setData(Map<String, Object> data) {
+        this.data = data;
+    }
+
+    public Map<String, Object> getOld() {
+        return old;
+    }
+
+    public void setOld(Map<String, Object> old) {
+        this.old = old;
+    }
+
+    public static List<SingleDml> dml2SingleDmls(Dml dml) {
+        int size = dml.getData().size();
+        List<SingleDml> singleDmls = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            SingleDml singleDml = new SingleDml();
+            singleDml.setDestination(dml.getDestination());
+            singleDml.setDatabase(dml.getDatabase());
+            singleDml.setTable(dml.getTable());
+            singleDml.setType(dml.getType());
+            singleDml.setData(dml.getData().get(i));
+            if (dml.getOld() != null) {
+                singleDml.setOld(dml.getOld().get(i));
+            }
+            singleDmls.add(singleDml);
+        }
+        return singleDmls;
+    }
+}