Bladeren bron

improve es etl (#1758)

* fix typo in trace log

* improve es etl

* improve es etl log

* improve es etl by add threadCount

* improve es etl by add threadCount

* improve es etl query

* remove comment
Jet 6 jaren geleden
bovenliggende
commit
7427a74612

+ 15 - 15
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/AbstractEtlService.java

@@ -18,6 +18,7 @@ public abstract class AbstractEtlService {
 
     private String        type;
     private AdapterConfig config;
+    private final long CNT_PER_TASK = 10000L;
 
     public AbstractEtlService(String type, AdapterConfig config){
         this.type = type;
@@ -71,22 +72,21 @@ public abstract class AbstractEtlService {
 
             // 当大于1万条记录时开启多线程
             if (cnt >= 10000) {
-                int threadCount = 3; // 从配置读取默认为3
-                long perThreadCnt = cnt / threadCount;
+                int threadCount = Runtime.getRuntime().availableProcessors();
+
+                long offset;
+                long size = CNT_PER_TASK;
+                long workerCnt = cnt / size + (cnt % size == 0 ? 0 : 1);
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("workerCnt {} for cnt {} threadCount {}", workerCnt, cnt, threadCount);
+                }
+
                 ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L);
-                List<Future<Boolean>> futures = new ArrayList<>(threadCount);
-                for (int i = 0; i < threadCount; i++) {
-                    long offset = i * perThreadCnt;
-                    Long size = null;
-                    if (i != threadCount - 1) {
-                        size = perThreadCnt;
-                    }
-                    String sqlFinal;
-                    if (size != null) {
-                        sqlFinal = sql + " LIMIT " + offset + "," + size;
-                    } else {
-                        sqlFinal = sql + " LIMIT " + offset + "," + cnt;
-                    }
+                List<Future<Boolean>> futures = new ArrayList<>();
+                for (long i = 0; i < workerCnt; i++) {
+                    offset = size * i;
+                    String sqlFinal = sql + " LIMIT " + offset + "," + size;
                     Future<Boolean> future = executor.submit(
                         () -> executeSqlImport(dataSource, sqlFinal, values, config.getMapping(), impCount, errMsg));
                     futures.add(future);