瀏覽代碼

Modify canal-client config and polish consumer implementation

duhengforever 6 年之前
父節點
當前提交
fa8588f766

+ 1 - 0
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java

@@ -26,6 +26,7 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
 
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
         List<List<CanalOuterAdapter>> canalOuterAdapters) {
+        logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;

+ 3 - 3
client-launcher/src/main/resources/canal-client.yml

@@ -1,6 +1,6 @@
 #canalServerHost: 127.0.0.1:11111
 #zookeeperHosts: slave1:2181
-bootstrapServers: slave1:6667,slave2:6667 #or rocketmq nameservers
+bootstrapServers: slave1:6667,slave2:6667 #or rocketmq nameservers:host1:9876;host2:9876
 flatMessage: true
 
 #canalInstances:
@@ -14,9 +14,9 @@ flatMessage: true
 
 mqTopics:
 - mqMode: rocketmq
-- topic: example
+  topic: example
   groups:
-  - groupId: example_g1
+  - groupId: example
     outAdapters:
     - name: logger
 #    - name: hbase

+ 6 - 5
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -43,6 +43,9 @@ public class RocketMQCanalConnector implements CanalConnector {
     @Override
     public void connect() throws CanalClientException {
         rocketMQConsumer = new DefaultMQPushConsumer(groupName);
+        if (!StringUtils.isBlank(nameServer)) {
+            rocketMQConsumer.setNamesrvAddr(nameServer);
+        }
     }
 
     @Override
@@ -61,18 +64,16 @@ public class RocketMQCanalConnector implements CanalConnector {
             return;
         }
         try {
-            rocketMQConsumer.subscribe(topic, "*");
             if (rocketMQConsumer == null) {
-                rocketMQConsumer = new DefaultMQPushConsumer(groupName);
-                if (!StringUtils.isBlank(nameServer)) {
-                    rocketMQConsumer.setNamesrvAddr(nameServer);
-                }
+                this.connect();
             }
+            rocketMQConsumer.subscribe(topic, "*");
             rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
                 @Override
                 public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts,
                     ConsumeOrderlyContext context) {
                     context.setAutoCommit(true);
+                    logger.info("xxxx");
                     boolean isSuccess = process(messageExts);
                     if (isSuccess) {
                         logger.info("Dispatch success!");