Browse Source

指定是否按主键hash并行同步, 否则按表名hash表间并行同步

mcy 6 years ago
parent
commit
997646b2db

+ 71 - 0
client-adapter/README.md

@@ -251,3 +251,74 @@ bin/startup.sh
 ```
 #### 验证
 修改mysql mytest.person表的数据, 将会自动同步到HBase的MYTEST.PERSON表下面, 并会打出DML的log
+
+
+## 四、关系型数据库适配器
+
+RDB adapter 用于适配mysql到任意关系型数据库(需支持jdbc)的数据同步及导入
+
+### 4.1 修改启动器配置: application.yml, 这里以oracle目标库为例
+```
+server:
+  port: 8081
+logging:
+  level:
+    com.alibaba.otter.canal.client.adapter.rdb: DEBUG
+......
+
+canal.conf:
+  canalServerHost: 127.0.0.1:11111
+  srcDataSources:
+    defaultDS:
+      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+      username: root
+      password: 121212
+  canalInstances:
+  - instance: example
+    groups:
+    - outAdapters:
+      - name: rdb                                               # 指定为rdb类型同步
+        key: oracle1                                            # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应
+        properties:
+          jdbc.driverClassName: oracle.jdbc.OracleDriver        # jdbc驱动名, jdbc的jar包需要自行放致lib目录下
+          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE        # jdbc url
+          jdbc.username: mytest                                 # jdbc username
+          jdbc.password: m121212                                # jdbc password
+          threads: 5                                            # 并行执行的线程数, 默认为1
+          commitSize: 3000                                      # 批次提交的最大行数
+```
+其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数
+adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件
+
+### 4.2 适配器表映射文件
+修改 conf/rdb/mytest_user.yml文件:
+```
+dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
+destination: example            # cannal的instance或者MQ的topic
+outerAdapterKey: oracle1        # adapter key, 对应上面配置outAdapters中的key
+concurrent: true                # 是否按主键hase并行同步, 并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
+dbMapping:
+  database: mytest              # 源数据源的database/shcema
+  table: user                   # 源数据源表名
+  targetTable: mytest.tb_user   # 目标数据源的库名.表名
+  targetPk:                     # 主键映射
+    id: id                      # 如果是复合主键可以换行映射多个
+#  mapAll: true                 # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)
+  targetColumns:                # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
+    id:
+    name:
+    role_id:
+    c_time:
+    test1: 
+```
+导入的类型以目标表的元类型为准, 将自动转换
+
+### 4.3 启动RDB数据同步
+#### 将目标库的jdbc jar包放入lib文件夹, 这里放入ojdbc6.jar
+
+#### 启动canal-adapter启动器
+```
+bin/startup.sh
+```
+#### 验证
+修改mysql mytest.user表的数据, 将会自动同步到Oracle的MYTEST.TB_USER表下面, 并会打出DML的log

+ 4 - 0
client-adapter/launcher/src/main/resources/application.yml

@@ -35,6 +35,8 @@ canal.conf:
 #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
 #          jdbc.username: mytest
 #          jdbc.password: m121212
+#          threads: 5
+#          commitSize: 3000
 #      - name: rdb
 #        key: postgres1
 #        properties:
@@ -42,6 +44,8 @@ canal.conf:
 #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
 #          jdbc.username: postgres
 #          jdbc.password: 121212
+#          threads: 1
+#          commitSize: 3000
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: 127.0.0.1

+ 0 - 67
client-adapter/rdb/README.md

@@ -1,67 +0,0 @@
-## RDB适配器
-RDB adapter 用于适配mysql到任意关系型数据库(需支持jdbc)的数据同步及导入
-### 1.1 修改启动器配置: application.yml, 这里以oracle目标库为例
-```
-server:
-  port: 8081
-logging:
-  level:
-    com.alibaba.otter.canal.client.adapter.hbase: DEBUG
-spring:
-  jackson:
-    date-format: yyyy-MM-dd HH:mm:ss
-    time-zone: GMT+8
-    default-property-inclusion: non_null
-
-canal.conf:
-  canalServerHost: 127.0.0.1:11111
-  srcDataSources:
-    defaultDS:
-      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
-      username: root
-      password: 121212
-  canalInstances:
-  - instance: example
-    groups:
-    - outAdapters:
-      - name: rdb
-        key: oracle1
-        properties:
-          jdbc.driverClassName: oracle.jdbc.OracleDriver
-          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
-          jdbc.username: mytest
-          jdbc.password: m121212
-```
-其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应!! properties为目标库jdb的相关参数
-adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件
-### 1.2 适配器表映射文件
-修改 conf/rdb/mytest_user.yml文件:
-```
-dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
-destination: example            # cannal的instance或者MQ的topic
-outerAdapterKey: oracle1        # adapter key, 对应上面配置outAdapters中的key
-dbMapping:
-  database: mytest              # 源数据源的database/shcema
-  table: user                   # 源数据源表名
-  targetTable: mytest.tb_user   # 目标数据源的库名.表名
-  targetPk:                     # 主键映射
-    id: id                      # 如果是复合主键可以换行映射多个
-  mapAll: true                  # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)
-#  targetColumns:               # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
-#    id:
-#    name:
-#    role_id:
-#    c_time:
-#    test1: 
-```
-导入的类型以目标表的元类型为准, 将自动转换
-
-### 1.3 启动RDB数据同步
-#### 将目标库的jdbc jar包放入lib文件夹, 这里放入ojdbc6.jar
-
-#### 启动canal-adapter启动器
-```
-bin/startup.sh
-```
-#### 验证
-修改mysql mytest.user表的数据, 将会自动同步到Oracle的MYTEST.TB_USER表下面, 并会打出DML的log

+ 1 - 4
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -2,10 +2,7 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import javax.sql.DataSource;
 

+ 28 - 18
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -18,6 +18,8 @@ public class MappingConfig {
 
     private String    outerAdapterKey; // 对应适配器的key
 
+    private Boolean   concurrent;      // 是否并行同步
+
     private DbMapping dbMapping;       // db映射配置
 
     public String getDataSourceKey() {
@@ -36,6 +38,14 @@ public class MappingConfig {
         this.outerAdapterKey = outerAdapterKey;
     }
 
+    public Boolean getConcurrent() {
+        return concurrent == null ? false : concurrent;
+    }
+
+    public void setConcurrent(Boolean concurrent) {
+        this.concurrent = concurrent;
+    }
+
     public DbMapping getDbMapping() {
         return dbMapping;
     }
@@ -66,20 +76,20 @@ public class MappingConfig {
 
     public static class DbMapping {
 
-        private String                       database;                            // 数据库名或schema名
-        private String                       table;                               // 表面名
-        private Map<String, String>          targetPk;                            // 目标表主键字段
-        private boolean                      mapAll      = false;                 // 映射所有字段
-        private String                       targetTable;                         // 目标表名
-        private Map<String, String>          targetColumns;                       // 目标表字段映射
+        private String              database;                            // 数据库名或schema名
+        private String              table;                               // 表面名
+        private Map<String, String> targetPk;                            // 目标表主键字段
+        private boolean             mapAll      = false;                 // 映射所有字段
+        private String              targetTable;                         // 目标表名
+        private Map<String, String> targetColumns;                       // 目标表字段映射
 
-        private String                       etlCondition;                        // etl条件sql
+        private String              etlCondition;                        // etl条件sql
 
-        private Set<String>                  families    = new LinkedHashSet<>(); // column family列表
-        private int                          readBatch   = 5000;
-        private int                          commitBatch = 5000;                  // etl等批量提交大小
+        private Set<String>         families    = new LinkedHashSet<>(); // column family列表
+        private int                 readBatch   = 5000;
+        private int                 commitBatch = 5000;                  // etl等批量提交大小
 
-//        private volatile Map<String, String> allColumns;                          // mapAll为true,自动设置改字段
+        // private volatile Map<String, String> allColumns; // mapAll为true,自动设置改字段
 
         public String getDatabase() {
             return database;
@@ -161,12 +171,12 @@ public class MappingConfig {
             this.commitBatch = commitBatch;
         }
 
-//        public Map<String, String> getAllColumns() {
-//            return allColumns;
-//        }
-//
-//        public void setAllColumns(Map<String, String> allColumns) {
-//            this.allColumns = allColumns;
-//        }
+        // public Map<String, String> getAllColumns() {
+        // return allColumns;
+        // }
+        //
+        // public void setAllColumns(Map<String, String> allColumns) {
+        // this.allColumns = allColumns;
+        // }
     }
 }

+ 3 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java

@@ -78,12 +78,13 @@ public class BatchExecutor {
             executor.submit(() -> {
                 try {
                     commitLock.lock();
+                    conn.commit(); //直接提交一次
                     if (!condition.await(5, TimeUnit.SECONDS)) {
                         // 超时提交
                         commit();
                     }
-                } catch (InterruptedException e) {
-                    // ignore
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
                 } finally {
                     commitLock.unlock();
                 }

+ 49 - 17
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -48,7 +48,7 @@ public class RdbSyncService {
 
     public RdbSyncService(Integer commitSize, Integer threads, DataSource dataSource){
         try {
-            if (threads != null && threads > 1 && threads < 10) {
+            if (threads != null && threads > 1 && threads <= 10) {
                 this.threads = threads;
             }
             batchExecutors = new BatchExecutor[this.threads];
@@ -121,13 +121,24 @@ public class RdbSyncService {
         Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
 
         String sql = insertSql.toString();
+        Integer tbHash = null;
+        // 如果不是并行同步的表
+        if (!config.getConcurrent()) {
+            // 按表名hash到一个线程
+            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
+        }
         for (Map<String, Object> d : data) {
-            int pkHash = pkHash(dbMapping, d, threads);
-            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[pkHash];
+            int hash;
+            if (tbHash != null) {
+                hash = tbHash;
+            } else {
+                hash = pkHash(dbMapping, d, threads);
+            }
+            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
             checkQueue(tpe);
             tpe.submit(() -> {
                 try {
-                    BatchExecutor batchExecutor = batchExecutors[pkHash];
+                    BatchExecutor batchExecutor = batchExecutors[hash];
                     List<Map<String, ?>> values = new ArrayList<>();
 
                     for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
@@ -176,15 +187,26 @@ public class RdbSyncService {
 
         Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
 
+        Integer tbHash = null;
+        if (!config.getConcurrent()) {
+            // 按表名hash到一个线程
+            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
+        }
         for (Map<String, Object> o : old) {
             Map<String, Object> d = data.get(idx - 1);
-            int pkHash = pkHash(dbMapping, d, o, threads);
 
-            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[pkHash];
+            int hash;
+            if (tbHash != null) {
+                hash = tbHash;
+            } else {
+                hash = pkHash(dbMapping, d, o, threads);
+            }
+
+            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
             checkQueue(tpe);
             tpe.submit(() -> {
                 try {
-                    BatchExecutor batchExecutor = batchExecutors[pkHash];
+                    BatchExecutor batchExecutor = batchExecutors[hash];
 
                     StringBuilder updateSql = new StringBuilder();
                     updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
@@ -211,10 +233,11 @@ public class RdbSyncService {
                     // 拼接主键
                     appendCondition(dbMapping, updateSql, ctype, values, d, o);
 
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Update target table, sql: {}", updateSql);
-                    }
                     batchExecutor.execute(updateSql.toString(), values);
+
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Execute sql: {}", updateSql);
+                    }
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
@@ -240,15 +263,24 @@ public class RdbSyncService {
         DbMapping dbMapping = config.getDbMapping();
 
         Map<String, Integer> ctype = getTargetColumnType(batchExecutors[0].getConn(), config);
-
+        Integer tbHash = null;
+        if (!config.getConcurrent()) {
+            // 按表名hash到一个线程
+            tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads);
+        }
         for (Map<String, Object> d : data) {
-            int pkHash = pkHash(dbMapping, d, threads);
+            int hash;
+            if (tbHash != null) {
+                hash = tbHash;
+            } else {
+                hash = pkHash(dbMapping, d, threads);
+            }
 
-            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[pkHash];
+            ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash];
             checkQueue(tpe);
             tpe.submit(() -> {
                 try {
-                    BatchExecutor batchExecutor = batchExecutors[pkHash];
+                    BatchExecutor batchExecutor = batchExecutors[hash];
                     StringBuilder sql = new StringBuilder();
                     sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
 
@@ -257,10 +289,10 @@ public class RdbSyncService {
                     // 拼接主键
                     appendCondition(dbMapping, sql, ctype, values, d);
 
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Delete from target table, sql: {}", sql);
-                    }
                     batchExecutor.execute(sql.toString(), values);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Execute sql: {}", sql);
+                    }
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }

+ 1 - 0
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -1,6 +1,7 @@
 dataSourceKey: defaultDS
 destination: example
 outerAdapterKey: oracle1
+concurrent: true
 dbMapping:
   database: mytest
   table: user

+ 5 - 4
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java

@@ -1,20 +1,21 @@
 package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.test.TestConstant;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class Common {
+
     public static RdbAdapter init() {
         DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
 
         OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
         outerAdapterConfig.setName("rdb");
-        //outerAdapterConfig.setKey("oralce1");
+        outerAdapterConfig.setKey("oracle1");
         Map<String, String> properties = new HashMap<>();
         properties.put("jdbc.driveClassName", "oracle.jdbc.OracleDriver");
         properties.put("jdbc.url", "jdbc:oracle:thin:@127.0.0.1:49161:XE");

+ 2 - 0
client-adapter/rdb/src/test/resources/rdb/mytest_user.yml

@@ -1,5 +1,7 @@
 dataSourceKey: defaultDS
 destination: example
+outerAdapterKey: oracle1
+concurrent: true
 dbMapping:
   database: mytest
   table: user