Browse Source

rdb 批量提交

mcy 6 years ago
parent
commit
d396a0b65c

+ 1 - 0
client-adapter/launcher/src/main/resources/application.yml

@@ -36,6 +36,7 @@ canal.conf:
 #          jdbc.username: mytest
 #          jdbc.password: m121212
 #          threads: 5
+#          commitSize: 3000
 #      - name: rdb
 #        key: postgres1
 #        properties:

+ 4 - 5
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -71,11 +71,10 @@ public class RdbAdapter implements OuterAdapter {
         }
 
         String threads = properties.get("threads");
-        if (threads != null) {
-            rdbSyncService = new RdbSyncService(Integer.valueOf(threads), dataSource);
-        } else {
-            rdbSyncService = new RdbSyncService(null, dataSource);
-        }
+        String commitSize = properties.get("commitSize");
+        rdbSyncService = new RdbSyncService(commitSize != null ? Integer.valueOf(commitSize) : null,
+            threads != null ? Integer.valueOf(threads) : null,
+            dataSource);
     }
 
     @Override

+ 6 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -9,7 +9,10 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import javax.sql.DataSource;
 
@@ -43,7 +46,7 @@ public class RdbSyncService {
 
     private ExecutorService[]                       threadExecutors;
 
-    public RdbSyncService(Integer threads, DataSource dataSource){
+    public RdbSyncService(Integer commitSize, Integer threads, DataSource dataSource){
         try {
             if (threads != null && threads > 1 && threads < 10) {
                 this.threads = threads;
@@ -52,7 +55,7 @@ public class RdbSyncService {
             for (int i = 0; i < this.threads; i++) {
                 Connection conn = dataSource.getConnection();
                 conn.setAutoCommit(false);
-                this.batchExecutors[i] = new BatchExecutor(i, conn, null);
+                this.batchExecutors[i] = new BatchExecutor(i, conn, commitSize);
             }
             threadExecutors = new ExecutorService[this.threads];
             for (int i = 0; i < this.threads; i++) {