Selaa lähdekoodia

fixed adapter

agapple 6 vuotta sitten
vanhempi
commit
aa89453bf6

+ 23 - 33
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -12,25 +12,26 @@ import java.util.Map;
  */
 public class CanalClientConfig {
 
-    private String             canalServerHost;       // 单机模式下canal server的 ip:port
-
-    private String             zookeeperHosts;        // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
-
-    private String             mqServers;             // kafka or rocket mq 地址
-
-    private Boolean            flatMessage   = true;  // 是否已flatMessage模式传输, 只适用于mq模式
-
-    private Integer            batchSize;             // 批大小
-
-    private Integer            syncBatchSize = 1000;  // 同步分批提交大小
-
-    private Integer            retries;               // 重试次数
-
-    private Long               timeout;               // 消费超时时间
-
-    private String             mode          = "tcp"; // 模式 tcp kafka rocketMQ
-
-    private List<CanalAdapter> canalAdapters;         // canal adapters 配置
+    // 单机模式下canal server的ip:port
+    private String             canalServerHost;
+    // 集群模式下的zk地址,如果配置了单机地址则以单机为准!!
+    private String             zookeeperHosts;
+    // kafka or rocket mq 地址
+    private String             mqServers;
+    // 是否已flatMessage模式传输,只适用于mq模式
+    private Boolean            flatMessage   = true;
+    // 批大小
+    private Integer            batchSize;
+    // 同步分批提交大小
+    private Integer            syncBatchSize = 1000;
+    // 重试次数
+    private Integer            retries;
+    // 消费超时时间
+    private Long               timeout;
+    // 模式 tcp kafka rocketMQ
+    private String             mode          = "tcp";
+    // canal adapters 配置
+    private List<CanalAdapter> canalAdapters;
 
     public String getCanalServerHost() {
         return canalServerHost;
@@ -116,9 +117,7 @@ public class CanalClientConfig {
 
         private String      instance; // 实例名
 
-        private String      topic;    // mq topic
-
-        private List<Group> groups;   // 适配器分组列表
+        private List<Group> groups;  // 适配器分组列表
 
         public String getInstance() {
             return instance;
@@ -137,22 +136,13 @@ public class CanalClientConfig {
         public void setGroups(List<Group> groups) {
             this.groups = groups;
         }
-
-        public String getTopic() {
-            return topic;
-        }
-
-        public void setTopic(String topic) {
-            this.topic = topic;
-        }
     }
 
     public static class Group {
 
-        private String                          groupId;                               // group id
-
+        // group id
+        private String                          groupId          = "default";
         private List<OuterAdapterConfig>        outerAdapters;                           // 适配器列表
-
         private Map<String, OuterAdapterConfig> outerAdaptersMap = new LinkedHashMap<>();
 
         public String getGroupId() {

+ 0 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -37,8 +37,6 @@ public class AdapterCanalConfig extends CanalClientConfig {
                 for (CanalAdapter canalAdapter : canalAdapters) {
                     if (canalAdapter.getInstance() != null) {
                         DESTINATIONS.add(canalAdapter.getInstance());
-                    } else if (canalAdapter.getTopic() != null) {
-                        DESTINATIONS.add(canalAdapter.getInstance());
                     }
                 }
             }

+ 20 - 12
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,9 +2,12 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -12,6 +15,7 @@ import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
 import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
 import com.alibaba.otter.canal.client.CanalMQConnector;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -180,18 +184,22 @@ public abstract class AbstractCanalAdapterWorker {
      */
     private void batchSync(List<Dml> dmls, OuterAdapter adapter) {
         // 分批同步
-        int len = 0;
-        List<Dml> dmlsBatch = new ArrayList<>();
-        for (Dml dml : dmls) {
-            dmlsBatch.add(dml);
-            len += dml.getData().size();
-            if (len >= canalClientConfig.getSyncBatchSize()) {
-                adapter.sync(dmlsBatch);
-                dmlsBatch.clear();
-                len = 0;
+        if (dmls.size() <= canalClientConfig.getSyncBatchSize()) {
+            adapter.sync(dmls);
+        } else {
+            int len = 0;
+            List<Dml> dmlsBatch = new ArrayList<>();
+            for (Dml dml : dmls) {
+                dmlsBatch.add(dml);
+                len += dml.getData().size();
+                if (len >= canalClientConfig.getSyncBatchSize()) {
+                    adapter.sync(dmlsBatch);
+                    dmlsBatch.clear();
+                    len = 0;
+                }
             }
+            adapter.sync(dmlsBatch);
         }
-        adapter.sync(dmlsBatch);
     }
 
     public void start() {

+ 8 - 8
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -97,14 +97,14 @@ public class CanalAdapterLoader {
 
                     CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
                         canalClientConfig.getMqServers(),
-                        canalAdapter.getTopic(),
+                        canalAdapter.getInstance(),
                         group.getGroupId(),
                         canalOuterAdapterGroups,
                         canalClientConfig.getFlatMessage());
-                    canalMQWorker.put(canalAdapter.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
+                    canalMQWorker.put(canalAdapter.getInstance() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
                     canalKafkaWorker.start();
-                    logger.info("Start adapter for canal-client mq topic: {} succeed",
-                        canalAdapter.getTopic() + "-" + group.getGroupId());
+                    logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-"
+                                                                                       + group.getGroupId());
                 }
             }
         } else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())) {
@@ -119,15 +119,15 @@ public class CanalAdapterLoader {
                     canalOuterAdapterGroups.add(canalOuterAdapters);
                     CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
                         canalClientConfig.getMqServers(),
-                        canalAdapter.getTopic(),
+                        canalAdapter.getInstance(),
                         group.getGroupId(),
                         canalOuterAdapterGroups,
                         canalClientConfig.getFlatMessage());
-                    canalMQWorker.put(canalAdapter.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
+                    canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                     rocketMQWorker.start();
 
-                    logger.info("Start adapter for canal-client mq topic: {} succeed",
-                        canalAdapter.getTopic() + "-" + group.getGroupId());
+                    logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-"
+                                                                                       + group.getGroupId());
                 }
             }
         }

+ 2 - 3
client-adapter/launcher/src/main/resources/application.yml

@@ -28,10 +28,9 @@ canal.conf:
 #      username: root
 #      password: 121212
   canalAdapters:
-  - instance: example
-#    topic: example
+  - instance: example # canal instance Name or mq topic name
     groups:
-    - #groupId: g1
+    - groupId: g1
       outerAdapters:
       - name: logger
 #      - name: rdb