七锋 6 anni fa
parent
commit
a0b38a9cac
19 ha cambiato i file con 375 aggiunte e 644 eliminazioni
  1. 11 12
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 1 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java
  3. 1 0
      client-adapter/example/.gitignore
  4. 4 12
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  5. 6 9
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  6. 18 59
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  7. 0 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  8. 7 0
      client-adapter/launcher/src/main/resources/application.yml
  9. 91 0
      client/src/main/java/com/alibaba/otter/canal/client/CanalMQConnector.java
  10. 92 45
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
  11. 0 39
      client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningData.java
  12. 0 21
      client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningListener.java
  13. 0 280
      client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningMonitor.java
  14. 5 5
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java
  15. 97 121
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
  16. 2 2
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectors.java
  17. 0 13
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java
  18. 7 6
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  19. 33 18
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

+ 11 - 12
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -75,7 +75,7 @@ public class CanalClientConfig {
 
         private String             instance;      // 实例名
 
-        private List<AdapterGroup> adapterGroups; // 适配器分组列表
+        private List<Group> groups;  // 适配器分组列表
 
         public String getInstance() {
             return instance;
@@ -87,16 +87,17 @@ public class CanalClientConfig {
             }
         }
 
-        public List<AdapterGroup> getAdapterGroups() {
-            return adapterGroups;
+        public List<Group> getGroups() {
+            return groups;
         }
 
-        public void setAdapterGroups(List<AdapterGroup> adapterGroups) {
-            this.adapterGroups = adapterGroups;
+        public void setGroups(List<Group> groups) {
+            this.groups = groups;
         }
+
     }
 
-    public static class AdapterGroup {
+    public static class Group {
 
         private List<OuterAdapterConfig> outAdapters; // 适配器列表
 
@@ -115,7 +116,7 @@ public class CanalClientConfig {
 
         private String      topic;                      // topic名
 
-        private List<Group> groups = new ArrayList<>(); // 分组列表
+        private List<MQGroup> groups = new ArrayList<>(); // 分组列表
 
         public String getMqMode() {
             return mqMode;
@@ -133,21 +134,19 @@ public class CanalClientConfig {
             this.topic = topic;
         }
 
-        public List<Group> getGroups() {
+        public List<MQGroup> getGroups() {
             return groups;
         }
 
-        public void setGroups(List<Group> groups) {
+        public void setGroups(List<MQGroup> groups) {
             this.groups = groups;
         }
     }
 
-    public static class Group {
+    public static class MQGroup {
 
         private String                   groupId;     // group id
 
-        // private List<Adaptor> adapters = new ArrayList<>();
-
         private List<OuterAdapterConfig> outAdapters; // 适配器配置列表
 
         public String getGroupId() {

+ 1 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java

@@ -11,6 +11,7 @@ import java.util.Date;
  */
 public class Result implements Serializable {
 
+    private static final long serialVersionUID = -3276409502352405716L;
     public Integer code = 20000;
     public Object  data;
     public String  message;

+ 1 - 0
client-adapter/example/.gitignore

@@ -0,0 +1 @@
+/bin/

+ 4 - 12
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -1,8 +1,6 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.CommitFailedException;
@@ -23,19 +21,16 @@ import com.alibaba.otter.canal.protocol.Message;
 public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
     private KafkaCanalConnector connector;
-
     private String              topic;
-
     private boolean             flatMessage;
 
     public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
                                    List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
         this.canalOuterAdapters = canalOuterAdapters;
-        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
         this.canalDestination = topic;
         this.flatMessage = flatMessage;
-        connector = KafkaCanalConnectors.newKafkaConnector(bootstrapServers, topic, null, groupId, flatMessage);
+        this.connector = KafkaCanalConnectors.newKafkaConnector(bootstrapServers, topic, null, groupId, flatMessage);
         // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
     }
 
@@ -48,7 +43,6 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     protected void process() {
         while (!running)
             ;
-        ExecutorService executor = Executors.newSingleThreadExecutor();
         while (running) {
             try {
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
@@ -62,9 +56,9 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
                         List<?> messages;
                         if (!flatMessage) {
-                            messages = connector.getWithoutAck();
+                            messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);
                         } else {
-                            messages = connector.getFlatMessageWithoutAck(100L, TimeUnit.MILLISECONDS);
+                            messages = connector.getFlatListWithoutAck(100L, TimeUnit.MILLISECONDS);
                         }
                         if (messages != null) {
                             for (final Object message : messages) {
@@ -78,7 +72,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                         connector.ack();
                     } catch (CommitFailedException e) {
                         logger.warn(e.getMessage());
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         logger.error(e.getMessage(), e);
                         TimeUnit.SECONDS.sleep(1L);
                     }
@@ -88,8 +82,6 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        executor.shutdown();
-
         try {
             connector.unsubscribe();
         } catch (WakeupException e) {

+ 6 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -58,7 +58,7 @@ public class CanalAdapterLoader {
             for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
                 List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
-                for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
+                for (CanalClientConfig.Group connectorGroup : instance.getGroups()) {
                     List<OuterAdapter> canalOutConnectors = new ArrayList<>();
                     for (OuterAdapterConfig c : connectorGroup.getOutAdapters()) {
                         loadConnector(c, canalOutConnectors);
@@ -82,11 +82,9 @@ public class CanalAdapterLoader {
         // 初始化canal-client-mq的适配器
         if (canalClientConfig.getMqTopics() != null) {
             for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
-                for (CanalClientConfig.Group group : topic.getGroups()) {
+                for (CanalClientConfig.MQGroup group : topic.getGroups()) {
                     List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
-
                     List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
-
                     for (OuterAdapterConfig config : group.getOutAdapters()) {
                         loadConnector(config, canalOuterAdapters);
                     }
@@ -95,12 +93,12 @@ public class CanalAdapterLoader {
                         CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig.getBootstrapServers(),
                             topic.getTopic(),
                             group.getGroupId(),
-                            canalOuterAdapterGroups, canalClientConfig.getFlatMessage());
+                            canalOuterAdapterGroups,
+                            canalClientConfig.getFlatMessage());
                         canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                         rocketMQWorker.start();
                     } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(
-                            canalClientConfig.getBootstrapServers(),
+                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig.getBootstrapServers(),
                             topic.getTopic(),
                             group.getGroupId(),
                             canalOuterAdapterGroups,
@@ -108,9 +106,8 @@ public class CanalAdapterLoader {
                         canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
                         canalKafkaWorker.start();
                     }
-                    logger.info("Start adapter for canal-client rocketmq topic: {} succeed",
+                    logger.info("Start adapter for canal-client mq topic: {} succeed",
                         topic.getTopic() + "-" + group.getGroupId());
-
                 }
             }
         }

+ 18 - 59
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -1,17 +1,14 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
-import com.alibaba.otter.canal.protocol.FlatMessage;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectors;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -22,20 +19,17 @@ import com.alibaba.otter.canal.protocol.Message;
 public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
 
     private RocketMQCanalConnector connector;
-
-    private String topic;
-
-    private boolean flatMessage;
+    private String                 topic;
+    private boolean                flatMessage;
 
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
-        List<List<CanalOuterAdapter>> canalOuterAdapters, boolean flatMessage) {
+                                      List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
         this.canalOuterAdapters = canalOuterAdapters;
-        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
         this.flatMessage = flatMessage;
         this.canalDestination = topic;
-        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId, flatMessage);
+        this.connector = RocketMQCanalConnectors.newRocketMQConnector(nameServers, topic, groupId, flatMessage);
     }
 
     @Override
@@ -47,7 +41,6 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
     protected void process() {
         while (!running)
             ;
-        ExecutorService executor = Executors.newSingleThreadExecutor();
         while (running) {
             try {
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
@@ -57,55 +50,23 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                 logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
                 while (running) {
                     try {
-                        Object message = null;
+                        List<?> messages;
                         if (!flatMessage) {
-                            message = connector.getWithoutAck(1);
+                            messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);
                         } else {
-                            message = connector.getFlatMessageWithoutAck();
+                            messages = connector.getFlatListWithoutAck(100L, TimeUnit.MILLISECONDS);
                         }
-                        if (message != null) {
-                            final Object msg = message;
-                            executor.submit(new Runnable() {
-                                @Override
-                                public void run() {
-                                    try {
-                                        if (logger.isDebugEnabled()) {
-                                            logger.debug("topic: {} batchId: {} batchSize: {} ",
-                                                topic,
-                                                message.getId(),
-                                                message.getEntries().size());
-                                        }
-                                        if (msg != null) {
-                                            long begin = System.currentTimeMillis();
-                                            if (msg instanceof Message) {
-                                                Message receive = (Message) msg;
-                                                writeOut(receive, topic);
-                                                connector.ack(receive.getId());
-                                            } else {
-                                                FlatMessage receive = (FlatMessage) msg;
-                                                writeOut(receive);
-                                                connector.ack(receive.getId());
-                                            }
-                                            long now = System.currentTimeMillis();
-                                            if ((now - begin) > 5 * 60 * 1000) {
-                                                logger.error("topic: {} batchId {} elapsed time: {} ms",
-                                                    topic,
-                                                    message.getId(),
-                                                    now - begin);
-                                            }
-                                        }
-                                    } catch (Exception e) {
-                                        logger.error(e.getMessage(), e);
-                                    }
+                        if (messages != null) {
+                            for (final Object message : messages) {
+                                if (message instanceof FlatMessage) {
+                                    writeOut((FlatMessage) message);
+                                } else {
+                                    writeOut((Message) message);
                                 }
-                                connector.ack(message.getId());
-                            });
-                        } else {
-                            logger.debug("Message is null");
+                            }
                         }
-                    } catch (CommitFailedException e) {
-                        logger.warn(e.getMessage());
-                    } catch (Exception e) {
+                        connector.ack();
+                    } catch (Throwable e) {
                         logger.error(e.getMessage(), e);
                         TimeUnit.SECONDS.sleep(1L);
                     }
@@ -115,8 +76,6 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        executor.shutdown();
-
         try {
             connector.unsubscribe();
         } catch (WakeupException e) {

+ 0 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -90,7 +90,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                     long batchId = message.getId();
                     try {
                         int size = message.getEntries().size();
-
                         if (batchId == -1 || size == 0) {
                             Thread.sleep(1000);
                         } else {
@@ -131,7 +130,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                     // ignore
                 }
             }
-
         }
     }
 }

+ 7 - 0
client-adapter/launcher/src/main/resources/application.yml

@@ -36,6 +36,13 @@ canal.conf:
 #    - groupId: g2
 #      outAdapters:
 #      - name: logger
+#  mqTopics:
+#  - mqMode: rocketmq
+#    topic: example
+#    groups:
+#    - groupId: g2
+#      outAdapters:
+#      - name: logger
 
 #adapter.conf:
 #  datasourceConfigs:

+ 91 - 0
client/src/main/java/com/alibaba/otter/canal/client/CanalMQConnector.java

@@ -0,0 +1,91 @@
+package com.alibaba.otter.canal.client;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+
+/**
+ * canal MQ数据操作客户端
+ * 
+ * <pre>
+ * 1. canal server写入MQ消息,考虑性能会合并多条数据写入为一个MQ消息,一个Message对应一个MQ消息
+ * 2. canal client消费MQ消息,因为client性能会弱于server的写入,MQ数据获取时会拿到堆积的多条MQ消息,会拿到List<Message>
+ * 3. client的ack/rollback,都是和MQ直接交互,不存在对应的batchId概念
+ * </pre>
+ * 
+ * @author agapple 2018年10月28日 下午6:42:27
+ * @since 1.1.1
+ */
+public interface CanalMQConnector extends CanalConnector {
+
+    /**
+     * 获取数据,自动进行确认,设置timeout时间直到拿到数据为止
+     * 
+     * <pre>
+     * 该方法返回的条件:
+     *  a. 如果timeout=0,有多少取多少,不会阻塞等待
+     *  b. 如果timeout不为0,尝试阻塞对应的超时时间,直到拿到数据就返回
+     * </pre>
+     * 
+     * @return
+     * @throws CanalClientException
+     */
+    List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException;
+
+    /**
+     * 获取数据,设置timeout时间直到拿到数据为止
+     * 
+     * <pre>
+     * 该方法返回的条件:
+     *  a. 如果timeout=0,有多少取多少,不会阻塞等待
+     *  b. 如果timeout不为0,尝试阻塞对应的超时时间,直到拿到数据就返回
+     * </pre>
+     * 
+     * @throws CanalClientException
+     */
+    List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException;
+
+    /**
+     * 获取数据,自动进行确认,设置timeout时间直到拿到数据为止
+     * 
+     * <pre>
+     * 该方法返回的条件:
+     *  a. 如果timeout=0,有多少取多少,不会阻塞等待
+     *  b. 如果timeout不为0,尝试阻塞对应的超时时间,直到拿到数据就返回
+     * </pre>
+     * 
+     * @return
+     * @throws CanalClientException
+     */
+    List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException;
+
+    /**
+     * 获取数据,设置timeout时间直到拿到数据为止
+     * 
+     * <pre>
+     * 该方法返回的条件:
+     *  a. 如果timeout=0,有多少取多少,不会阻塞等待
+     *  b. 如果timeout不为0,尝试阻塞对应的超时时间,直到拿到数据就返回
+     * </pre>
+     * 
+     * @throws CanalClientException
+     */
+    List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException;
+
+    /**
+     * 消费确认。
+     * 
+     * @throws CanalClientException
+     */
+    void ack() throws CanalClientException;
+
+    /**
+     * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
+     * 
+     * @throws CanalClientException
+     */
+    void rollback() throws CanalClientException;
+}

+ 92 - 45
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -13,16 +13,25 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.CanalMQConnector;
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.Lists;
 
 /**
  * canal kafka 数据操作客户端
+ * 
+ * <pre>
+ * 注意点:
+ * 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
+ * </pre>
  *
  * @author machengyuan @ 2018-6-12
- * @version 1.0.0
+ * @version 1.1.1
  */
-public class KafkaCanalConnector {
+public class KafkaCanalConnector implements CanalMQConnector {
 
     private KafkaConsumer<String, Message> kafkaConsumer;
     private KafkaConsumer<String, String>  kafkaConsumer2;   // 用于扁平message的数据消费
@@ -55,18 +64,6 @@ public class KafkaCanalConnector {
         }
     }
 
-    /**
-     * 重新设置sessionTime
-     *
-     * @param timeout
-     * @param unit
-     */
-    public void setSessionTimeout(Long timeout, TimeUnit unit) {
-        long t = unit.toMillis(timeout);
-        properties.put("request.timeout.ms", String.valueOf(t + 60000));
-        properties.put("session.timeout.ms", String.valueOf(t));
-    }
-
     /**
      * 打开连接
      */
@@ -76,7 +73,6 @@ public class KafkaCanalConnector {
         }
 
         connected = true;
-
         if (kafkaConsumer == null && !flatMessage) {
             kafkaConsumer = new KafkaConsumer<String, Message>(properties);
         }
@@ -151,62 +147,61 @@ public class KafkaCanalConnector {
         }
     }
 
-    /**
-     * 获取数据,自动进行确认
-     *
-     * @return
-     */
-    public List<Message> get() {
-        return get(100L, TimeUnit.MILLISECONDS);
-    }
-
-    public List<Message> get(Long timeout, TimeUnit unit) {
+    @Override
+    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
         waitClientRunning();
         if (!running) {
-            return null;
+            return Lists.newArrayList();
         }
 
-        List<Message> messages = getWithoutAck(timeout, unit);
-        this.ack();
+        List<Message> messages = getListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            this.ack();
+        }
         return messages;
     }
 
-    public List<Message> getWithoutAck() {
-        return getWithoutAck(100L, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * 获取数据,不进行确认,等待处理完成手工确认
-     *
-     * @return
-     */
-    public List<Message> getWithoutAck(Long timeout, TimeUnit unit) {
+    @Override
+    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
         waitClientRunning();
         if (!running) {
-            return null;
+            return Lists.newArrayList();
         }
 
         ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); // 基于配置,最多只能poll到一条数据
 
         if (!records.isEmpty()) {
-            // return records.iterator().next().value();
             List<Message> messages = new ArrayList<>();
             for (ConsumerRecord<String, Message> record : records) {
                 messages.add(record.value());
             }
             return messages;
         }
-        return null;
+        return Lists.newArrayList();
     }
 
-    public List<FlatMessage> getFlatMessageWithoutAck(Long timeout, TimeUnit unit) {
+    @Override
+    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
         waitClientRunning();
         if (!running) {
-            return null;
+            return Lists.newArrayList();
         }
 
-        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
+        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            this.ack();
+        }
+        return messages;
+    }
+
+    @Override
+    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+        waitClientRunning();
+        if (!running) {
+            return Lists.newArrayList();
+        }
 
+        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
         if (!records.isEmpty()) {
             List<FlatMessage> flatMessages = new ArrayList<>();
             for (ConsumerRecord<String, String> record : records) {
@@ -217,7 +212,11 @@ public class KafkaCanalConnector {
 
             return flatMessages;
         }
-        return null;
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public void rollback() throws CanalClientException {
     }
 
     /**
@@ -245,4 +244,52 @@ public class KafkaCanalConnector {
             // }
         }
     }
+
+    @Override
+    public void subscribe(String filter) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message get(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void ack(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void rollback(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    /**
+     * 重新设置sessionTime
+     *
+     * @param timeout
+     * @param unit
+     */
+    public void setSessionTimeout(Long timeout, TimeUnit unit) {
+        long t = unit.toMillis(timeout);
+        properties.put("request.timeout.ms", String.valueOf(t + 60000));
+        properties.put("session.timeout.ms", String.valueOf(t));
+    }
+
 }

+ 0 - 39
client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningData.java

@@ -1,39 +0,0 @@
-package com.alibaba.otter.canal.client.kafka.running;
-
-/**
- * client running状态信息
- *
- * @author machengyuan 2018-06-20 下午04:10:12
- * @version 1.0.0
- */
-public class ClientRunningData {
-
-    private String  groupId;
-    private String  address;
-    private boolean active = true;
-
-    public String getGroupId() {
-        return groupId;
-    }
-
-    public void setGroupId(String groupId) {
-        this.groupId = groupId;
-    }
-
-    public String getAddress() {
-        return address;
-    }
-
-    public void setAddress(String address) {
-        this.address = address;
-    }
-
-    public boolean isActive() {
-        return active;
-    }
-
-    public void setActive(boolean active) {
-        this.active = active;
-    }
-
-}

+ 0 - 21
client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningListener.java

@@ -1,21 +0,0 @@
-package com.alibaba.otter.canal.client.kafka.running;
-
-/**
- * client running状态信息
- *
- * @author machengyuan 2018-06-20 下午04:10:12
- * @version 1.0.0
- */
-public interface ClientRunningListener {
-
-    /**
-     * 触发现在轮到自己做为active
-     */
-    public void processActiveEnter();
-
-    /**
-     * 触发一下当前active模式失败
-     */
-    public void processActiveExit();
-
-}

+ 0 - 280
client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningMonitor.java

@@ -1,280 +0,0 @@
-package com.alibaba.otter.canal.client.kafka.running;
-
-import java.text.MessageFormat;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkException;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
-import com.alibaba.otter.canal.common.utils.AddressUtils;
-import com.alibaba.otter.canal.common.utils.BooleanMutex;
-import com.alibaba.otter.canal.common.utils.JsonUtils;
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
-import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-
-/**
- * kafka client running状态信息
- *
- * @author machengyuan 2018-06-20 下午04:10:12
- * @version 1.0.0
- */
-public class ClientRunningMonitor extends AbstractCanalLifeCycle {
-
-    private static final String TOPIC_ROOT_NODE             = ZookeeperPathUtils.CANAL_ROOT_NODE
-                                                              + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + "topics";
-
-    private static final String TOPIC_NODE                  = TOPIC_ROOT_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR
-                                                              + "{0}";
-
-    private static final String TOPIC_CLIENTID_NODE         = TOPIC_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR
-                                                              + "{1}";
-
-    private static final String TOPIC_CLIENTID_RUNNING_NODE = TOPIC_CLIENTID_NODE
-                                                              + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR
-                                                              + ZookeeperPathUtils.RUNNING_NODE;
-
-    private static String getTopicClientRunning(String topic, String groupId) {
-        return MessageFormat.format(TOPIC_CLIENTID_RUNNING_NODE, topic, groupId);
-    }
-
-    private static String getClientIdNodePath(String topic, String groupId) {
-        return MessageFormat.format(TOPIC_CLIENTID_NODE, topic, groupId);
-    }
-
-    private static final Logger        logger       = LoggerFactory.getLogger(ClientRunningMonitor.class);
-    private ZkClientx                  zkClient;
-    private String                     topic;
-    private ClientRunningData          clientData;
-    private IZkDataListener            dataListener;
-    private BooleanMutex               mutex        = new BooleanMutex(false);
-    private volatile boolean           release      = false;
-    private volatile ClientRunningData activeData;
-    private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
-    private ClientRunningListener      listener;
-    private int                        delayTime    = 5;
-
-    private static Integer             virtualPort;
-
-    public ClientRunningMonitor(){
-        if (virtualPort == null) {
-            Random rand = new Random();
-            virtualPort = rand.nextInt(9000) + 1000;
-        }
-
-        dataListener = new IZkDataListener() {
-
-            public void handleDataChange(String dataPath, Object data) throws Exception {
-                MDC.put("kafkaTopic", topic);
-                ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
-                if (!isMine(runningData.getAddress())) {
-                    mutex.set(false);
-                }
-
-                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
-                    release = true;
-                    releaseRunning();// 彻底释放mainstem
-                }
-
-                activeData = (ClientRunningData) runningData;
-            }
-
-            public void handleDataDeleted(String dataPath) throws Exception {
-                MDC.put("kafkaTopic", topic);
-
-                mutex.set(false);
-                // 触发一下退出,可能是人为干预的释放操作或者网络闪断引起的session expired timeout
-                processActiveExit();
-                if (!release && activeData != null && isMine(activeData.getAddress())) {
-                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
-                    initRunning();
-                } else {
-                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
-                    delayExector.schedule(new Runnable() {
-
-                        public void run() {
-                            initRunning();
-                        }
-                    }, delayTime, TimeUnit.SECONDS);
-                }
-            }
-
-        };
-
-    }
-
-    public void start() {
-        super.start();
-
-        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-
-        zkClient.subscribeDataChanges(path, dataListener);
-        initRunning();
-    }
-
-    public void stop() {
-        super.stop();
-        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-        zkClient.unsubscribeDataChanges(path, dataListener);
-        releaseRunning(); // 尝试一下release
-        // Fix issue #697
-        if (delayExector != null) {
-            delayExector.shutdown();
-        }
-    }
-
-    // 改动记录:
-    // 1,在方法上加synchronized关键字,保证同步顺序执行;
-    // 2,判断Zk上已经存在的activeData是否是本机,是的话把mutex重置为true,否则会导致死锁
-    // 3,增加异常处理,保证出现异常时,running节点能被删除,否则会导致死锁
-    public synchronized void initRunning() {
-        if (!isStart()) {
-            return;
-        }
-
-        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-        // 序列化
-        byte[] bytes = JsonUtils.marshalToByte(clientData);
-        try {
-            mutex.set(false);
-            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
-            processActiveEnter();// 触发一下事件
-            activeData = clientData;
-            mutex.set(true);
-        } catch (ZkNodeExistsException e) {
-            bytes = zkClient.readData(path, true);
-            if (bytes == null) {// 如果不存在节点,立即尝试一次
-                initRunning();
-            } else {
-                activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
-                // 如果发现已经存在,判断一下是否自己,避免活锁
-                if (activeData.getAddress().contains(":") && isMine(activeData.getAddress())) {
-                    mutex.set(true);
-                }
-            }
-        } catch (ZkNoNodeException e) {
-            zkClient.createPersistent(getClientIdNodePath(this.topic, clientData.getGroupId()), true); // 尝试创建父节点
-            initRunning();
-        } catch (Throwable t) {
-            logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
-                topic),
-                t);
-            // 出现任何异常尝试release
-            releaseRunning();
-            throw new CanalClientException("something goes wrong in initRunning method. ", t);
-        }
-    }
-
-    /**
-     * 阻塞等待自己成为active,如果自己成为active,立马返回
-     *
-     * @throws InterruptedException
-     */
-    public void waitForActive() throws InterruptedException {
-        initRunning();
-        mutex.get();
-    }
-
-    /**
-     * 检查当前的状态
-     */
-    public boolean check() {
-        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-        // ZookeeperPathUtils.getDestinationClientRunning(this.destination,
-        // clientData.getClientId());
-        try {
-            byte[] bytes = zkClient.readData(path);
-            ClientRunningData eventData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
-            activeData = eventData;// 更新下为最新值
-            // 检查下nid是否为自己
-            boolean result = isMine(activeData.getAddress());
-            if (!result) {
-                logger.warn("canal is running in [{}] , but not in [{}]",
-                    activeData.getAddress(),
-                    clientData.getAddress());
-            }
-            return result;
-        } catch (ZkNoNodeException e) {
-            logger.warn("canal is not run any in node");
-            return false;
-        } catch (ZkInterruptedException e) {
-            logger.warn("canal check is interrupt");
-            Thread.interrupted();// 清除interrupt标记
-            return check();
-        } catch (ZkException e) {
-            logger.warn("canal check is failed");
-            return false;
-        }
-    }
-
-    public boolean releaseRunning() {
-        if (check()) {
-            String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-            zkClient.delete(path);
-            mutex.set(false);
-            processActiveExit();
-            return true;
-        }
-
-        return false;
-    }
-
-    // ====================== helper method ======================
-
-    private boolean isMine(String address) {
-        return address.equals(clientData.getAddress());
-    }
-
-    private void processActiveEnter() {
-        if (listener != null) {
-            // 触发回调
-            listener.processActiveEnter();
-            this.clientData.setAddress(/* address */AddressUtils.getHostIp() + ":" + virtualPort);
-
-            String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-            // 序列化
-            byte[] bytes = JsonUtils.marshalToByte(clientData);
-            zkClient.writeData(path, bytes);
-        }
-    }
-
-    private void processActiveExit() {
-        if (listener != null) {
-            listener.processActiveExit();
-        }
-    }
-
-    public void setListener(ClientRunningListener listener) {
-        this.listener = listener;
-    }
-
-    // ===================== setter / getter =======================
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public void setClientData(ClientRunningData clientData) {
-        this.clientData = clientData;
-    }
-
-    public void setDelayTime(int delayTime) {
-        this.delayTime = delayTime;
-    }
-
-    public void setZkClient(ZkClientx zkClient) {
-        this.zkClient = zkClient;
-    }
-
-}

+ 5 - 5
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java

@@ -1,18 +1,18 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
-import java.util.concurrent.BlockingQueue;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 public class ConsumerBatchMessage<T> {
 
-    private final BlockingQueue<T> data;
+    private final List<T>  data;
     private CountDownLatch         latch;
     private boolean                hasFailure = false;
 
-    public ConsumerBatchMessage(BlockingQueue<T> data){
+    public ConsumerBatchMessage(List<T> data){
         this.data = data;
-        latch = new CountDownLatch(data.size());
+        latch = new CountDownLatch(1);
     }
 
     public boolean waitFinish(long timeout) throws InterruptedException {
@@ -23,7 +23,7 @@ public class ConsumerBatchMessage<T> {
         return !hasFailure;
     }
 
-    public BlockingQueue<T> getData() {
+    public List<T> getData() {
         return data;
     }
 

+ 97 - 121
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -1,11 +1,7 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.otter.canal.protocol.FlatMessage;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -19,35 +15,47 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.CanalMQConnector;
 import com.alibaba.otter.canal.client.CanalMessageDeserializer;
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-
-public class RocketMQCanalConnector implements CanalConnector {
-
-    private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);
-
-    private String nameServer;
-    private String topic;
-    private String groupName;
-    private volatile boolean connected = false;
-    private DefaultMQPushConsumer rocketMQConsumer;
+import com.google.common.collect.Lists;
+
+/**
+ * RocketMQ的连接
+ * 
+ * <pre>
+ * 注意点:
+ * 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
+ * </pre>
+ * 
+ * @since 1.1.1
+ */
+public class RocketMQCanalConnector implements CanalMQConnector {
+
+    private static final Logger                 logger              = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+
+    private String                              nameServer;
+    private String                              topic;
+    private String                              groupName;
+    private volatile boolean                    connected           = false;
+    private DefaultMQPushConsumer               rocketMQConsumer;
     private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
-    Map<Long, ConsumerBatchMessage> messageCache;
-    private long batchProcessTimeout = 3000;
-    private boolean flatMessage;
+    private long                                batchProcessTimeout = 60 * 1000;
+    private boolean                             flatMessage;
+    private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
 
-    public RocketMQCanalConnector(String nameServer, String topic, String groupName, boolean flatMessage) {
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, boolean flatMessage){
         this.nameServer = nameServer;
         this.topic = topic;
         this.groupName = groupName;
         this.flatMessage = flatMessage;
-        messageBlockingQueue = new LinkedBlockingQueue<>();
-        messageCache = new ConcurrentHashMap<>();
+        this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
     }
 
-    @Override
     public void connect() throws CanalClientException {
         rocketMQConsumer = new DefaultMQPushConsumer(groupName);
         if (!StringUtils.isBlank(nameServer)) {
@@ -55,17 +63,14 @@ public class RocketMQCanalConnector implements CanalConnector {
         }
     }
 
-    @Override
     public void disconnect() throws CanalClientException {
         rocketMQConsumer.shutdown();
     }
 
-    @Override
     public boolean checkValid() throws CanalClientException {
         return connected;
     }
 
-    @Override
     public synchronized void subscribe(String filter) throws CanalClientException {
         if (connected) {
             return;
@@ -78,8 +83,7 @@ public class RocketMQCanalConnector implements CanalConnector {
             rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
 
                 @Override
-                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts,
-                    ConsumeOrderlyContext context) {
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
                     context.setAutoCommit(true);
                     boolean isSuccess = process(messageExts);
                     if (isSuccess) {
@@ -99,23 +103,23 @@ public class RocketMQCanalConnector implements CanalConnector {
 
     private boolean process(List<MessageExt> messageExts) {
         logger.info("Get Message:{}", messageExts);
-        BlockingQueue messageList = new LinkedBlockingQueue<>();
+        List messageList = Lists.newArrayList();
         for (MessageExt messageExt : messageExts) {
             byte[] data = messageExt.getBody();
-            if (data != null){
+            if (data != null) {
                 try {
                     if (!flatMessage) {
                         Message message = CanalMessageDeserializer.deserializer(data);
-                        messageList.put(message);
+                        messageList.add(message);
                     } else {
                         FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
-                        messageList.put(flatMessage);
+                        messageList.add(flatMessage);
                     }
                 } catch (Exception ex) {
                     logger.error("Add message error", ex);
                     throw new CanalClientException(ex);
                 }
-            }else{
+            } else {
                 logger.warn("Received message data is null");
             }
         }
@@ -142,145 +146,117 @@ public class RocketMQCanalConnector implements CanalConnector {
         return isCompleted && isSuccess;
     }
 
-    @Override
     public void subscribe() throws CanalClientException {
         this.subscribe(null);
     }
 
-    @Override
     public void unsubscribe() throws CanalClientException {
         this.rocketMQConsumer.unsubscribe(this.topic);
     }
 
-    /**
-     * 暂时不支持batchSize 参数
-     *
-     * @param batchSize 暂时不支持
-     * @return
-     * @throws CanalClientException
-     */
     @Override
-    public Message get(int batchSize) throws CanalClientException {
-        Message message = getWithoutAck(batchSize);
-        if (message != null) {
-            ack(message.getId());
+    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
+        List<Message> messages = getListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            ack();
         }
-        return message;
+        return messages;
     }
 
     @Override
-    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
-        Message message = getWithoutAck(batchSize, timeout, unit);
-        if (message != null) {
-            ack(message.getId());
-        }
-        return message;
-    }
-
-    private Message getMessage(ConsumerBatchMessage consumerBatchMessage) {
-        BlockingQueue<Message> messageList = consumerBatchMessage.getData();
-        if (messageList != null & messageList.size() > 0) {
-            Message message = messageList.poll();
-            messageCache.put(message.getId(), consumerBatchMessage);
-            return message;
-        }
-        return null;
-    }
+    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+        try {
+            if (this.lastGetBatchMessage != null) {
+                throw new CanalClientException("mq get/ack not support concurrent & async ack");
+            }
 
-    private FlatMessage getFlatMessage(ConsumerBatchMessage consumerBatchMessage) {
-        BlockingQueue<FlatMessage> messageList = consumerBatchMessage.getData();
-        if (messageList != null & messageList.size() > 0) {
-            FlatMessage message = messageList.poll();
-            messageCache.put(message.getId(), consumerBatchMessage);
-            return message;
+            ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
+            if (batchMessage != null) {
+                this.lastGetBatchMessage = batchMessage;
+                return batchMessage.getData();
+            }
+        } catch (InterruptedException ex) {
+            logger.warn("Get message timeout", ex);
+            throw new CanalClientException("Failed to fetch the data after: " + timeout);
         }
-        return null;
+        return Lists.newArrayList();
     }
 
-    /**
-     * 暂时不支持该参数设置
-     *
-     * @param batchSize
-     * @return
-     * @throws CanalClientException
-     */
     @Override
-    public Message getWithoutAck(int batchSize) throws CanalClientException {
-        ConsumerBatchMessage batchMessage = messageBlockingQueue.poll();
-        if (batchMessage != null) {
-            return getMessage(batchMessage);
+    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
+        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            ack();
         }
-        return null;
+        return messages;
     }
 
     @Override
-    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
         try {
+            if (this.lastGetBatchMessage != null) {
+                throw new CanalClientException("mq get/ack not support concurrent & async ack");
+            }
+
             ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
             if (batchMessage != null) {
-                return getMessage(batchMessage);
+                this.lastGetBatchMessage = batchMessage;
+                return batchMessage.getData();
             }
         } catch (InterruptedException ex) {
             logger.warn("Get message timeout", ex);
             throw new CanalClientException("Failed to fetch the data after: " + timeout);
         }
-        return null;
+        return Lists.newArrayList();
     }
 
-    public FlatMessage getFlatMessageWithoutAck() {
-        return getFlatMessageWithoutAck(null, null);
-    }
-
-    public FlatMessage getFlatMessageWithoutAck(Long timeout,
-        TimeUnit unit) throws CanalClientException {
+    @Override
+    public void ack() throws CanalClientException {
         try {
-            ConsumerBatchMessage batchMessage = null;
-            if (timeout == null || timeout == 0) {
-                batchMessage = messageBlockingQueue.poll();
-            } else {
-                batchMessage = messageBlockingQueue.poll(timeout, unit);
-            }
-            if (batchMessage != null) {
-                return getFlatMessage(batchMessage);
-            }
-        } catch (InterruptedException ex) {
-            logger.warn("Get flat message timeout", ex);
-            throw new CanalClientException("Failed to fetch the flat message data after: " + timeout);
+            this.lastGetBatchMessage.ack();
+        } catch (Throwable e) {
+            this.lastGetBatchMessage.fail();
+        } finally {
+            this.lastGetBatchMessage = null;
         }
-        return null;
     }
 
-    public FlatMessage getFlatMessage() throws CanalClientException {
-        FlatMessage message = getFlatMessageWithoutAck(null, null);
-        if (message != null) {
-            ack(message.getId());
+    @Override
+    public void rollback() throws CanalClientException {
+        try {
+            this.lastGetBatchMessage.fail();
+        } finally {
+            this.lastGetBatchMessage = null;
         }
-        return message;
     }
 
-    public FlatMessage getFlatMessage(Long timeout, TimeUnit unit) throws CanalClientException {
-        FlatMessage message = getFlatMessageWithoutAck(timeout, unit);
-        ack(message.getId());
-        return message;
+    public Message get(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
     }
 
     @Override
-    public void ack(long batchId) throws CanalClientException {
-        ConsumerBatchMessage batchMessage = messageCache.get(batchId);
-        if (batchMessage != null) {
-            batchMessage.ack();
-            messageCache.remove(batchId);
-        }
+    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
     }
 
     @Override
-    public void rollback(long batchId) throws CanalClientException {
+    public Message getWithoutAck(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
 
+    @Override
+    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
     }
 
     @Override
-    public void rollback() throws CanalClientException {
+    public void ack(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
 
+    @Override
+    public void rollback(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
     }
 
     @Override

+ 2 - 2
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectorProvider.java → client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectors.java

@@ -3,7 +3,7 @@ package com.alibaba.otter.canal.client.rocketmq;
 /**
  * RocketMQ connector provider.
  */
-public class RocketMQCanalConnectorProvider {
+public class RocketMQCanalConnectors {
 
     /**
      * Create RocketMQ connector
@@ -18,7 +18,7 @@ public class RocketMQCanalConnectorProvider {
     }
 
     public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic, String groupId,
-        boolean flatMessage) {
+                                                              boolean flatMessage) {
         return new RocketMQCanalConnector(nameServers, topic, groupId, flatMessage);
     }
 }

+ 0 - 13
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java

@@ -1,13 +0,0 @@
-package com.alibaba.otter.canal.client.rocketmq;
-
-import java.util.List;
-
-import org.apache.rocketmq.common.message.MessageExt;
-
-/**
- * RocketMQ message listener
- */
-public interface RocketMQCanalListener {
-
-    boolean onReceive(List<MessageExt> messageExts);
-}

+ 7 - 6
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -1,15 +1,17 @@
 package com.alibaba.otter.canal.deployer;
 
-import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
-import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
-import com.alibaba.otter.canal.server.CanalMQStarter;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.io.FileInputStream;
 import java.util.Properties;
+
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
+import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
+import com.alibaba.otter.canal.server.CanalMQStarter;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+
 /**
  * canal独立版本启动的入口类
  *
@@ -19,7 +21,7 @@ import org.slf4j.LoggerFactory;
 public class CanalLauncher {
 
     private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class);
+    private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);
 
     public static void main(String[] args) throws Throwable {
         try {
@@ -57,7 +59,6 @@ public class CanalLauncher {
 
             CanalMQProducer canalMQProducer = null;
             String serverMode = controller.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
-            serverMode = "rocketmq";
             if (serverMode.equalsIgnoreCase("kafka")) {
                 canalMQProducer = new CanalKafkaProducer();
             } else if (serverMode.equalsIgnoreCase("rocketmq")) {

+ 33 - 18
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,13 +1,7 @@
 package com.alibaba.otter.canal.rocketmq;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.server.exception.CanalServerException;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.util.List;
-import org.apache.kafka.clients.producer.ProducerRecord;
+
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -18,13 +12,20 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+
 public class CanalRocketMQProducer implements CanalMQProducer {
 
     private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
 
-    private DefaultMQProducer defaultMQProducer;
+    private DefaultMQProducer   defaultMQProducer;
 
-    private MQProperties mqProperties;
+    private MQProperties        mqProperties;
 
     @Override
     public void init(MQProperties rocketMQProperties) {
@@ -43,12 +44,16 @@ public class CanalRocketMQProducer implements CanalMQProducer {
 
     @Override
     public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
-        Callback callback) {
+                     Callback callback) {
         if (!mqProperties.getFlatMessage()) {
             try {
                 Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data));
-                logger.debug("send message:{} to destination:{}, partition: {}", message, destination.getCanalDestination(), destination.getPartition());
+                logger.debug("send message:{} to destination:{}, partition: {}",
+                    message,
+                    destination.getCanalDestination(),
+                    destination.getPartition());
                 this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
                     @Override
                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                         int partition = 0;
@@ -69,9 +74,14 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (destination.getPartition() != null) {
                         try {
-                            logger.info("send flat message: {} to topic: {} fixed partition: {}", JSON.toJSONString(flatMessage),destination.getTopic(), destination.getPartition());
-                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage).getBytes());
+                            logger.info("send flat message: {} to topic: {} fixed partition: {}",
+                                JSON.toJSONString(flatMessage),
+                                destination.getTopic(),
+                                destination.getPartition());
+                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage)
+                                .getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
                                 @Override
                                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                     return mqs.get(destination.getPartition());
@@ -82,23 +92,28 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             callback.rollback();
                         }
                     } else {
-                        if (destination.getPartitionHash() != null
-                            && !destination.getPartitionHash().isEmpty()) {
+                        if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                             FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
                                 destination.getPartitionsNum(),
                                 destination.getPartitionHash());
                             int length = partitionFlatMessage.length;
                             for (int i = 0; i < length; i++) {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
-                                logger.debug("flatMessagePart: {}, partition: {}", JSON.toJSONString(flatMessagePart), i);
+                                logger.debug("flatMessagePart: {}, partition: {}",
+                                    JSON.toJSONString(flatMessagePart),
+                                    i);
                                 final int index = i;
                                 try {
-                                    Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessagePart).getBytes());
+                                    Message message = new Message(destination.getTopic(),
+                                        JSON.toJSONString(flatMessagePart).getBytes());
                                     this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
                                         @Override
                                         public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                             if (index > mqs.size()) {
-                                                throw new CanalServerException("partition number is error,config num:" + destination.getPartitionsNum() + ", mq num: " + mqs.size());
+                                                throw new CanalServerException("partition number is error,config num:"
+                                                                               + destination.getPartitionsNum()
+                                                                               + ", mq num: " + mqs.size());
                                             }
                                             return mqs.get(index);
                                         }