Browse Source

adapter 配置修改

mcy 6 years ago
parent
commit
67cc73b8e2

+ 41 - 84
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -13,25 +12,25 @@ import java.util.Map;
  */
 public class CanalClientConfig {
 
-    private String              canalServerHost;      // 单机模式下canal server的 ip:port
+    private String             canalServerHost;       // 单机模式下canal server的 ip:port
 
-    private String              zookeeperHosts;       // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
+    private String             zookeeperHosts;        // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
 
-    private String              mqServers;            // kafka or rocket mq 地址
+    private String             mqServers;             // kafka or rocket mq 地址
 
-    private Boolean             flatMessage   = true; // 是否已flatMessage模式传输, 只适用于mq模式
+    private Boolean            flatMessage   = true;  // 是否已flatMessage模式传输, 只适用于mq模式
 
-    private Integer             batchSize;            // 批大小
+    private Integer            batchSize;             // 批大小
 
-    private Integer             syncBatchSize = 1000; // 同步分批提交大小
+    private Integer            syncBatchSize = 1000;  // 同步分批提交大小
 
-    private Integer             retries;              // 重试次数
+    private Integer            retries;               // 重试次数
 
-    private Long                timeout;              // 消费超时时间
+    private Long               timeout;               // 消费超时时间
 
-    private List<MQTopic>       mqTopics;             // mq topic 列表
+    private String             mode          = "tcp"; // 模式 tcp kafka rocketMQ
 
-    private List<CanalInstance> canalInstances;       // tcp 模式下 canal 实例列表, 与mq模式不能共存!!
+    private List<CanalAdapter> canalAdapters;         // canal adapters 配置
 
     public String getCanalServerHost() {
         return canalServerHost;
@@ -57,14 +56,6 @@ public class CanalClientConfig {
         this.mqServers = mqServers;
     }
 
-    public List<MQTopic> getMqTopics() {
-        return mqTopics;
-    }
-
-    public void setMqTopics(List<MQTopic> mqTopics) {
-        this.mqTopics = mqTopics;
-    }
-
     public Boolean getFlatMessage() {
         return flatMessage;
     }
@@ -105,18 +96,28 @@ public class CanalClientConfig {
         this.timeout = timeout;
     }
 
-    public List<CanalInstance> getCanalInstances() {
-        return canalInstances;
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
     }
 
-    public void setCanalInstances(List<CanalInstance> canalInstances) {
-        this.canalInstances = canalInstances;
+    public List<CanalAdapter> getCanalAdapters() {
+        return canalAdapters;
     }
 
-    public static class CanalInstance {
+    public void setCanalAdapters(List<CanalAdapter> canalAdapters) {
+        this.canalAdapters = canalAdapters;
+    }
+
+    public static class CanalAdapter {
 
         private String      instance; // 实例名
 
+        private String      topic;    // mq topic
+
         private List<Group> groups;   // 适配器分组列表
 
         public String getInstance() {
@@ -137,50 +138,6 @@ public class CanalClientConfig {
             this.groups = groups;
         }
 
-    }
-
-    public static class Group {
-
-        private List<OuterAdapterConfig>        outAdapters;                           // 适配器列表
-
-        private Map<String, OuterAdapterConfig> outAdaptersMap = new LinkedHashMap<>();
-
-        public List<OuterAdapterConfig> getOutAdapters() {
-            return outAdapters;
-        }
-
-        public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
-            this.outAdapters = outAdapters;
-            if (outAdapters != null) {
-                outAdapters.forEach(outAdapter -> outAdaptersMap.put(outAdapter.getKey(), outAdapter));
-            }
-        }
-
-        public Map<String, OuterAdapterConfig> getOutAdaptersMap() {
-            return outAdaptersMap;
-        }
-
-        public void setOutAdaptersMap(Map<String, OuterAdapterConfig> outAdaptersMap) {
-            this.outAdaptersMap = outAdaptersMap;
-        }
-    }
-
-    public static class MQTopic {
-
-        private String        mqMode;                     // mq模式 kafka or rocketMQ
-
-        private String        topic;                      // topic名
-
-        private List<MQGroup> groups = new ArrayList<>(); // 分组列表
-
-        public String getMqMode() {
-            return mqMode;
-        }
-
-        public void setMqMode(String mqMode) {
-            this.mqMode = mqMode;
-        }
-
         public String getTopic() {
             return topic;
         }
@@ -188,21 +145,15 @@ public class CanalClientConfig {
         public void setTopic(String topic) {
             this.topic = topic;
         }
-
-        public List<MQGroup> getGroups() {
-            return groups;
-        }
-
-        public void setGroups(List<MQGroup> groups) {
-            this.groups = groups;
-        }
     }
 
-    public static class MQGroup {
+    public static class Group {
 
-        private String                   groupId;     // group id
+        private String                          groupId;                               // group id
 
-        private List<OuterAdapterConfig> outAdapters; // 适配器配置列表
+        private List<OuterAdapterConfig>        outerAdapters;                           // 适配器列表
+
+        private Map<String, OuterAdapterConfig> outerAdaptersMap = new LinkedHashMap<>();
 
         public String getGroupId() {
             return groupId;
@@ -212,14 +163,20 @@ public class CanalClientConfig {
             this.groupId = groupId;
         }
 
-        public List<OuterAdapterConfig> getOutAdapters() {
-            return outAdapters;
+        public List<OuterAdapterConfig> getOuterAdapters() {
+            return outerAdapters;
         }
 
-        public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
-            this.outAdapters = outAdapters;
+        public void setOuterAdapters(List<OuterAdapterConfig> outerAdapters) {
+            this.outerAdapters = outerAdapters;
         }
 
-    }
+        public Map<String, OuterAdapterConfig> getOuterAdaptersMap() {
+            return outerAdaptersMap;
+        }
 
+        public void setOuterAdaptersMap(Map<String, OuterAdapterConfig> outerAdaptersMap) {
+            this.outerAdaptersMap = outerAdaptersMap;
+        }
+    }
 }

+ 11 - 21
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -6,12 +6,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
+import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 
 /**
  * canal 的相关配置类
@@ -28,28 +28,18 @@ public class AdapterCanalConfig extends CanalClientConfig {
     private Map<String, DatasourceConfig> srcDataSources;
 
     @Override
-    public void setCanalInstances(List<CanalInstance> canalInstances) {
-        super.setCanalInstances(canalInstances);
-
-        if (canalInstances != null) {
-            synchronized (DESTINATIONS) {
-                DESTINATIONS.clear();
-                for (CanalInstance canalInstance : canalInstances) {
-                    DESTINATIONS.add(canalInstance.getInstance());
-                }
-            }
-        }
-    }
-
-    @Override
-    public void setMqTopics(List<MQTopic> mqTopics) {
-        super.setMqTopics(mqTopics);
+    public void setCanalAdapters(List<CanalAdapter> canalAdapters) {
+        super.setCanalAdapters(canalAdapters);
 
-        if (mqTopics != null) {
+        if (canalAdapters != null) {
             synchronized (DESTINATIONS) {
                 DESTINATIONS.clear();
-                for (MQTopic mqTopic : mqTopics) {
-                    DESTINATIONS.add(mqTopic.getTopic());
+                for (CanalAdapter canalAdapter : canalAdapters) {
+                    if (canalAdapter.getInstance() != null) {
+                        DESTINATIONS.add(canalAdapter.getInstance());
+                    } else if (canalAdapter.getTopic() != null) {
+                        DESTINATIONS.add(canalAdapter.getInstance());
+                    }
                 }
             }
         }

+ 46 - 37
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -6,7 +6,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -55,14 +54,14 @@ public class CanalAdapterLoader {
         }
         String zkHosts = this.canalClientConfig.getZookeeperHosts();
 
-        if (canalClientConfig.getCanalInstances() != null) {
+        if ("tcp".equalsIgnoreCase(canalClientConfig.getMode())) {
             // 初始化canal-client的适配器
-            for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
+            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                 List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
-                for (CanalClientConfig.Group connectorGroup : instance.getGroups()) {
+                for (CanalClientConfig.Group connectorGroup : canalAdapter.getGroups()) {
                     List<OuterAdapter> canalOutConnectors = new ArrayList<>();
-                    for (OuterAdapterConfig c : connectorGroup.getOutAdapters()) {
+                    for (OuterAdapterConfig c : connectorGroup.getOuterAdapters()) {
                         loadConnector(c, canalOutConnectors);
                     }
                     canalOuterAdapterGroups.add(canalOutConnectors);
@@ -70,52 +69,65 @@ public class CanalAdapterLoader {
                 CanalAdapterWorker worker;
                 if (sa != null) {
                     worker = new CanalAdapterWorker(canalClientConfig,
-                        instance.getInstance(),
+                        canalAdapter.getInstance(),
                         sa,
                         canalOuterAdapterGroups);
                 } else if (zkHosts != null) {
                     worker = new CanalAdapterWorker(canalClientConfig,
-                        instance.getInstance(),
+                        canalAdapter.getInstance(),
                         zkHosts,
                         canalOuterAdapterGroups);
                 } else {
                     throw new RuntimeException("No canal server connector found");
                 }
-                canalWorkers.put(instance.getInstance(), worker);
+                canalWorkers.put(canalAdapter.getInstance(), worker);
                 worker.start();
-                logger.info("Start adapter for canal instance: {} succeed", instance.getInstance());
+                logger.info("Start adapter for canal instance: {} succeed", canalAdapter.getInstance());
             }
-        } else if (canalClientConfig.getMqTopics() != null) {
-            // 初始化canal-client-mq的适配器
-            for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
-                for (CanalClientConfig.MQGroup group : topic.getGroups()) {
+        } else if ("kafka".equalsIgnoreCase(canalClientConfig.getMode())) {
+            // 初始化canal-client-kafka的适配器
+            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
+                for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
                     List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
                     List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
-                    for (OuterAdapterConfig config : group.getOutAdapters()) {
+                    for (OuterAdapterConfig config : group.getOuterAdapters()) {
                         loadConnector(config, canalOuterAdapters);
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
-                    if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
-                            canalClientConfig.getMqServers(),
-                            topic.getTopic(),
-                            group.getGroupId(),
-                            canalOuterAdapterGroups,
-                            canalClientConfig.getFlatMessage());
-                        canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
-                        rocketMQWorker.start();
-                    } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
-                            canalClientConfig.getMqServers(),
-                            topic.getTopic(),
-                            group.getGroupId(),
-                            canalOuterAdapterGroups,
-                            canalClientConfig.getFlatMessage());
-                        canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
-                        canalKafkaWorker.start();
+
+                    CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
+                        canalClientConfig.getMqServers(),
+                        canalAdapter.getTopic(),
+                        group.getGroupId(),
+                        canalOuterAdapterGroups,
+                        canalClientConfig.getFlatMessage());
+                    canalMQWorker.put(canalAdapter.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
+                    canalKafkaWorker.start();
+                    logger.info("Start adapter for canal-client mq topic: {} succeed",
+                        canalAdapter.getTopic() + "-" + group.getGroupId());
+                }
+            }
+        } else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())) {
+            // 初始化canal-client-rocketMQ的适配器
+            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
+                for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
+                    for (OuterAdapterConfig config : group.getOuterAdapters()) {
+                        loadConnector(config, canalOuterAdapters);
                     }
+                    canalOuterAdapterGroups.add(canalOuterAdapters);
+                    CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
+                        canalClientConfig.getMqServers(),
+                        canalAdapter.getTopic(),
+                        group.getGroupId(),
+                        canalOuterAdapterGroups,
+                        canalClientConfig.getFlatMessage());
+                    canalMQWorker.put(canalAdapter.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
+                    rocketMQWorker.start();
+
                     logger.info("Start adapter for canal-client mq topic: {} succeed",
-                        topic.getTopic() + "-" + group.getGroupId());
+                        canalAdapter.getTopic() + "-" + group.getGroupId());
                 }
             }
         }
@@ -124,11 +136,8 @@ public class CanalAdapterLoader {
     private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
             OuterAdapter adapter;
-            // if ("rdb".equalsIgnoreCase(config.getName())) {
             adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey()));
-            // } else {
-            // adapter = loader.getExtension(config.getName());
-            // }
+
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             // 替换ClassLoader
             Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());

+ 8 - 12
client-adapter/launcher/src/main/resources/application.yml

@@ -17,17 +17,22 @@ canal.conf:
 #  zookeeperHosts: slave1:2181
 #  mqServers: slave1:6667 #or rocketmq
 #  flatMessage: true
+  batchSize: 500
+  syncBatchSize: 1000
   retries: 0
   timeout:
+  mode: tcp # kafka rocketMQ
 #  srcDataSources:
 #    defaultDS:
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
 #      username: root
 #      password: 121212
-  canalInstances:
+  canalAdapters:
   - instance: example
+#    topic: example
     groups:
-    - outAdapters:
+    - #groupId: g1
+      outerAdapters:
       - name: logger
 #      - name: rdb
 #        key: oracle1
@@ -36,8 +41,6 @@ canal.conf:
 #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
 #          jdbc.username: mytest
 #          jdbc.password: m121212
-#          threads: 5
-#          commitSize: 5000
 #      - name: rdb
 #        key: postgres1
 #        properties:
@@ -55,11 +58,4 @@ canal.conf:
 #      - name: es
 #        hosts: 127.0.0.1:9300
 #        properties:
-#          cluster.name: elasticsearch
-#  mqTopics:
-#  - mqMode: kafka # or rocketmq
-#    topic: example
-#    groups:
-#    - groupId: g2
-#      outAdapters:
-#      - name: logger
+#          cluster.name: elasticsearch

+ 1 - 1
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java

@@ -42,7 +42,7 @@ public class DBTest {
             .prepareStatement("insert into user (id,name,role_id,c_time,test1,test2) values (?,?,?,?,?,?)");
 
         java.util.Date now = new java.util.Date();
-        for (int i = 1; i <= 10000; i++) {
+        for (int i = 1; i <= 100000; i++) {
             pstmt.clearParameters();
             pstmt.setLong(1, (long) i);
             pstmt.setString(2, "test_" + i);