Sfoglia il codice sorgente

Add mq adapter

Signed-off-by: duhengforever <duhengforever@gmail.com>
duhengforever 6 anni fa
parent
commit
821f3eced5

+ 9 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -111,11 +111,20 @@ public class CanalClientConfig {
     }
 
     public static class MQTopic {
+        private String      mqMode;
 
         private String      topic;
 
         private List<Group> groups = new ArrayList<>();
 
+        public String getMqMode() {
+            return mqMode;
+        }
+
+        public void setMqMode(String mqMode) {
+            this.mqMode = mqMode;
+        }
+
         public String getTopic() {
             return topic;
         }

+ 26 - 24
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -4,6 +4,7 @@ import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -17,7 +18,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 /**
- * RocktMQ外部适配器的加载器
+ * MQ外部适配器的加载器
  *
  * @version 1.0.0
  */
@@ -29,8 +30,7 @@ public class CanalAdapterLoader {
 
     private Map<String, CanalAdapterWorker> canalWorkers = new HashMap<>();
 
-    private Map<String, CanalAdapterKafkaWorker> canalKafkaWorkers = new HashMap<>();
-    private Map<String, CanalAdapterRocketMQWorker> canalMQWorker = new HashMap<>();
+    private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
 
     private ExtensionLoader<CanalOuterAdapter> loader;
 
@@ -42,7 +42,7 @@ public class CanalAdapterLoader {
      * 初始化canal-client、 canal-client-rocketmq的适配器
      */
     public void init() {
-        // canal instances 和 rocketmq topics 配置不能同时为空
+        // canal instances 和 mq topics 配置不能同时为空
         if (canalClientConfig.getCanalInstances() == null && canalClientConfig.getMqTopics() == null) {
             throw new RuntimeException("Blank config property: canalInstances or canalMQTopics");
         }
@@ -58,11 +58,6 @@ public class CanalAdapterLoader {
         }
         String zkHosts = this.canalClientConfig.getZookeeperHosts();
 
-        // if (zkHosts == null && sa == null) {
-        // throw new RuntimeException("Blank config property: canalServerHost or
-        // zookeeperHosts");
-        // }
-
         // 初始化canal-client的适配器
         if (canalClientConfig.getCanalInstances() != null) {
             for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
@@ -96,19 +91,27 @@ public class CanalAdapterLoader {
                     List<CanalOuterAdapter> canalOuterAdapters = new ArrayList<>();
 
                     for (CanalOuterAdapterConfiguration config : group.getOutAdapters()) {
-                        // for (CanalOuterAdapterConfiguration config : adaptor.getOutAdapters()) {
                         loadConnector(config, canalOuterAdapters);
-                        // }
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
 
-                    CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(
-                        canalClientConfig.getBootstrapServers(),
-                        topic.getTopic(),
-                        group.getGroupId(),
-                        canalOuterAdapterGroups);
-                    canalMQWorker.put(topic.getTopic() + "-" + group.getGroupId(), rocketMQWorker);
-                    rocketMQWorker.start();
+                    if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
+                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(
+                            canalClientConfig.getBootstrapServers(),
+                            topic.getTopic(),
+                            group.getGroupId(),
+                            canalOuterAdapterGroups);
+                        canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
+                        rocketMQWorker.start();
+                    } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
+                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(zkHosts,
+                            canalClientConfig.getBootstrapServers(),
+                            topic.getTopic(),
+                            group.getGroupId(),
+                            canalOuterAdapterGroups);
+                        canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
+                        canalKafkaWorker.start();
+                    }
                     logger.info("Start adapter for canal-client rocketmq topic: {} succeed",
                         topic.getTopic() + "-" + group.getGroupId());
                 }
@@ -149,15 +152,14 @@ public class CanalAdapterLoader {
             }
             stopExecutorService.shutdown();
         }
-        if (canalKafkaWorkers.size() > 0) {
-            ExecutorService stopKafkaExecutorService = Executors.newFixedThreadPool(canalKafkaWorkers.size());
-            for (CanalAdapterKafkaWorker v : canalKafkaWorkers.values()) {
-                final CanalAdapterKafkaWorker cakw = v;
+        if (canalMQWorker.size() > 0) {
+            ExecutorService stopKafkaExecutorService = Executors.newFixedThreadPool(canalMQWorker.size());
+            for (AbstractCanalAdapterWorker tmp : canalMQWorker.values()) {
+                final AbstractCanalAdapterWorker worker = tmp;
                 stopKafkaExecutorService.submit(new Runnable() {
-
                     @Override
                     public void run() {
-                        cakw.stop();
+                        worker.stop();
                     }
                 });
             }

+ 2 - 0
client-launcher/src/main/resources/canal-client.yml

@@ -10,7 +10,9 @@ canalServerHost: 127.0.0.1:11111
 #    - name: hbase
 #      hosts: slave1:2181
 #      properties: {znodeParent: "/hbase-unsecure"}
+
 mqTopics:
+- mqMode: rocketmq
 - topic: example
   groups:
   - groupId: example_g1

+ 1 - 1
deployer/src/main/resources/mq.yml

@@ -10,7 +10,7 @@ filterTransactionEntry: true
 canalDestinations:
   - canalDestination: example
     topic: example
-    partition:
+    partition: 0
     # 一个destination可以对应多个topic
     #topics:
     #  - topic: example

+ 2 - 3
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -35,14 +35,13 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     }
 
     @Override
-    public void send(MQProperties.Topic topic, com.alibaba.otter.canal.protocol.Message data) {
+    public void send(final MQProperties.Topic topic, com.alibaba.otter.canal.protocol.Message data) {
         try {
             Message message = new Message(topic.getTopic(), CanalMessageSerializer.serializer(data));
             this.defaultMQProducer.send(message, new MessageQueueSelector() {
                 @Override
                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                 //   int index = (arg.hashCode() % mqs.size());
-                    return mqs.get(1);
+                    return mqs.get(topic.getPartition());
                 }
             }, null);
         } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {