1
0
Эх сурвалжийг харах

rdb 批量提交超时提交

mcy 6 жил өмнө
parent
commit
fcdf95b5ec

+ 5 - 2
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -1,14 +1,16 @@
 package com.alibaba.otter.canal.client.adapter.es;
 
 import java.net.InetAddress;
-import java.util.*;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import javax.sql.DataSource;
 
-import com.alibaba.otter.canal.client.adapter.es.monitor.ESConfigMonitor;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
@@ -22,6 +24,7 @@ import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SqlParser;
+import com.alibaba.otter.canal.client.adapter.es.monitor.ESConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
 import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;

+ 0 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -14,7 +14,6 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
-import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;

+ 9 - 12
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -5,7 +5,6 @@ import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -47,7 +46,6 @@ public class RdbAdapter implements OuterAdapter {
     private List<SimpleDml>                         dmlList            = Collections
         .synchronizedList(new ArrayList<>());
     private Lock                                    syncLock           = new ReentrantLock();
-    private Condition                               condition          = syncLock.newCondition();
     private ExecutorService                         executor           = Executors.newFixedThreadPool(1);
 
     private RdbConfigMonitor                        rdbConfigMonitor;
@@ -112,16 +110,16 @@ public class RdbAdapter implements OuterAdapter {
 
         executor.submit(() -> {
             while (running) {
+                int beginSize = dmlList.size();
                 try {
-                    syncLock.lock();
-                    if (!condition.await(3, TimeUnit.SECONDS)) {
-                        // 超时提交
-                        sync();
-                    }
-                } catch (Exception e) {
-                    logger.error(e.getMessage(), e);
-                } finally {
-                    syncLock.unlock();
+                    Thread.sleep(3000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                int endSize = dmlList.size();
+
+                if (endSize - beginSize < 300) {
+                    sync();
                 }
             }
         });
@@ -156,7 +154,6 @@ public class RdbAdapter implements OuterAdapter {
         try {
             syncLock.lock();
             if (!dmlList.isEmpty()) {
-                condition.signal();
                 rdbSyncService.sync(dmlList);
                 dmlList.clear();
             }