Răsfoiți Sursa

Merge branch 'master' of https://github.com/rewerma/canal

mcy 6 ani în urmă
părinte
comite
297122f47f
49 a modificat fișierele cu 727 adăugiri și 643 ștergeri
  1. 11 12
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 8 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  3. 1 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java
  4. 1 0
      client-adapter/example/.gitignore
  5. 0 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  6. 4 12
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  7. 9 10
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  8. 22 43
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  9. 0 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  10. 8 1
      client-adapter/launcher/src/main/resources/application.yml
  11. 24 0
      client-launcher/src/main/resources/canal-client.yml
  12. 91 0
      client/src/main/java/com/alibaba/otter/canal/client/CanalMQConnector.java
  13. 92 45
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
  14. 0 39
      client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningData.java
  15. 0 21
      client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningListener.java
  16. 0 280
      client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningMonitor.java
  17. 5 5
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java
  18. 125 59
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
  19. 6 2
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectors.java
  20. 0 13
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java
  21. 1 2
      client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java
  22. 15 10
      client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java
  23. 1 1
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java
  24. 1 1
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  25. 20 13
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  26. 4 0
      deployer/src/main/resources/canal.properties
  27. 1 1
      deployer/src/main/resources/mq.yml
  28. 2 0
      deployer/src/main/resources/spring/default-instance.xml
  29. 2 0
      deployer/src/main/resources/spring/file-instance.xml
  30. 2 0
      deployer/src/main/resources/spring/memory-instance.xml
  31. 2 2
      deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml
  32. 2 2
      deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml
  33. 6 0
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  34. 18 0
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java
  35. 18 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java
  36. 2 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java
  37. 2 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  38. 4 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  39. 49 15
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
  40. 4 7
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDAO.java
  41. 4 7
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaSnapshotDAO.java
  42. 17 2
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaHistoryDAOTest.java
  43. 41 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaSnapshotDAOTest.java
  44. 2 2
      parse/src/test/resources/tsdb/sql-map/sqlmap_history.xml
  45. 2 2
      parse/src/test/resources/tsdb/sql-map/sqlmap_snapshot.xml
  46. 0 1
      protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java
  47. 2 2
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  48. 91 17
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java
  49. 5 6
      server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

+ 11 - 12
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -75,7 +75,7 @@ public class CanalClientConfig {
 
         private String             instance;      // 实例名
 
-        private List<AdapterGroup> adapterGroups; // 适配器分组列表
+        private List<Group> groups;  // 适配器分组列表
 
         public String getInstance() {
             return instance;
@@ -87,16 +87,17 @@ public class CanalClientConfig {
             }
         }
 
-        public List<AdapterGroup> getAdapterGroups() {
-            return adapterGroups;
+        public List<Group> getGroups() {
+            return groups;
         }
 
-        public void setAdapterGroups(List<AdapterGroup> adapterGroups) {
-            this.adapterGroups = adapterGroups;
+        public void setGroups(List<Group> groups) {
+            this.groups = groups;
         }
+
     }
 
-    public static class AdapterGroup {
+    public static class Group {
 
         private List<OuterAdapterConfig> outAdapters; // 适配器列表
 
@@ -115,7 +116,7 @@ public class CanalClientConfig {
 
         private String      topic;                      // topic名
 
-        private List<Group> groups = new ArrayList<>(); // 分组列表
+        private List<MQGroup> groups = new ArrayList<>(); // 分组列表
 
         public String getMqMode() {
             return mqMode;
@@ -133,21 +134,19 @@ public class CanalClientConfig {
             this.topic = topic;
         }
 
-        public List<Group> getGroups() {
+        public List<MQGroup> getGroups() {
             return groups;
         }
 
-        public void setGroups(List<Group> groups) {
+        public void setGroups(List<MQGroup> groups) {
             this.groups = groups;
         }
     }
 
-    public static class Group {
+    public static class MQGroup {
 
         private String                   groupId;     // group id
 
-        // private List<Adaptor> adapters = new ArrayList<>();
-
         private List<OuterAdapterConfig> outAdapters; // 适配器配置列表
 
         public String getGroupId() {

+ 8 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -1,6 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -102,8 +107,9 @@ public class MessageUtil {
                 if (!old.isEmpty()) {
                     dml.setOld(old);
                 }
-                consumer.accept(dml);
             }
+
+            consumer.accept(dml);
         }
     }
 

+ 1 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java

@@ -11,6 +11,7 @@ import java.util.Date;
  */
 public class Result implements Serializable {
 
+    private static final long serialVersionUID = -3276409502352405716L;
     public Integer code = 20000;
     public Object  data;
     public String  message;

+ 1 - 0
client-adapter/example/.gitignore

@@ -0,0 +1 @@
+/bin/

+ 0 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -51,7 +51,6 @@ public abstract class AbstractCanalAdapterWorker {
                     adapters.forEach(adapter -> {
                         long begin = System.currentTimeMillis();
                         MessageUtil.parse4Dml(canalDestination, message, adapter::sync);
-
                         if (logger.isDebugEnabled()) {
                             logger.debug("{} elapsed time: {}",
                                 adapter.getClass().getName(),

+ 4 - 12
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -1,8 +1,6 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.CommitFailedException;
@@ -23,19 +21,16 @@ import com.alibaba.otter.canal.protocol.Message;
 public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
     private KafkaCanalConnector connector;
-
     private String              topic;
-
     private boolean             flatMessage;
 
     public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
                                    List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
         this.canalOuterAdapters = canalOuterAdapters;
-        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
         this.canalDestination = topic;
         this.flatMessage = flatMessage;
-        connector = KafkaCanalConnectors.newKafkaConnector(bootstrapServers, topic, null, groupId, flatMessage);
+        this.connector = KafkaCanalConnectors.newKafkaConnector(bootstrapServers, topic, null, groupId, flatMessage);
         // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
     }
 
@@ -48,7 +43,6 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     protected void process() {
         while (!running)
             ;
-        ExecutorService executor = Executors.newSingleThreadExecutor();
         while (running) {
             try {
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
@@ -62,9 +56,9 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
                         List<?> messages;
                         if (!flatMessage) {
-                            messages = connector.getWithoutAck();
+                            messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);
                         } else {
-                            messages = connector.getFlatMessageWithoutAck(100L, TimeUnit.MILLISECONDS);
+                            messages = connector.getFlatListWithoutAck(100L, TimeUnit.MILLISECONDS);
                         }
                         if (messages != null) {
                             for (final Object message : messages) {
@@ -78,7 +72,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                         connector.ack();
                     } catch (CommitFailedException e) {
                         logger.warn(e.getMessage());
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         logger.error(e.getMessage(), e);
                         TimeUnit.SECONDS.sleep(1L);
                     }
@@ -88,8 +82,6 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        executor.shutdown();
-
         try {
             connector.unsubscribe();
         } catch (WakeupException e) {

+ 9 - 10
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -58,7 +58,7 @@ public class CanalAdapterLoader {
             for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
                 List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
-                for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
+                for (CanalClientConfig.Group connectorGroup : instance.getGroups()) {
                     List<OuterAdapter> canalOutConnectors = new ArrayList<>();
                     for (OuterAdapterConfig c : connectorGroup.getOutAdapters()) {
                         loadConnector(c, canalOutConnectors);
@@ -82,23 +82,23 @@ public class CanalAdapterLoader {
         // 初始化canal-client-mq的适配器
         if (canalClientConfig.getMqTopics() != null) {
             for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
-                for (CanalClientConfig.Group group : topic.getGroups()) {
+                for (CanalClientConfig.MQGroup group : topic.getGroups()) {
                     List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
-
                     List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
-
                     for (OuterAdapterConfig config : group.getOutAdapters()) {
                         loadConnector(config, canalOuterAdapters);
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
                     if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig
-                            .getBootstrapServers(), topic.getTopic(), group.getGroupId(), canalOuterAdapterGroups);
+                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig.getBootstrapServers(),
+                            topic.getTopic(),
+                            group.getGroupId(),
+                            canalOuterAdapterGroups,
+                            canalClientConfig.getFlatMessage());
                         canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                         rocketMQWorker.start();
                     } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(
-                            canalClientConfig.getBootstrapServers(),
+                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig.getBootstrapServers(),
                             topic.getTopic(),
                             group.getGroupId(),
                             canalOuterAdapterGroups,
@@ -106,9 +106,8 @@ public class CanalAdapterLoader {
                         canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
                         canalKafkaWorker.start();
                     }
-                    logger.info("Start adapter for canal-client rocketmq topic: {} succeed",
+                    logger.info("Start adapter for canal-client mq topic: {} succeed",
                         topic.getTopic() + "-" + group.getGroupId());
-
                 }
             }
         }

+ 22 - 43
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -1,38 +1,35 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectors;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
- * kafka对应的client适配器工作线程
+ * rocketmq对应的client适配器工作线程
  *
- * @author rewerma 2018-8-19 下午11:30:49
  * @version 1.0.0
  */
 public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
 
     private RocketMQCanalConnector connector;
-
     private String                 topic;
+    private boolean                flatMessage;
 
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
-                                      List<List<OuterAdapter>> canalOuterAdapters){
+                                      List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
         this.canalOuterAdapters = canalOuterAdapters;
-        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
+        this.flatMessage = flatMessage;
         this.canalDestination = topic;
-        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
+        this.connector = RocketMQCanalConnectors.newRocketMQConnector(nameServers, topic, groupId, flatMessage);
     }
 
     @Override
@@ -44,7 +41,6 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
     protected void process() {
         while (!running)
             ;
-        ExecutorService executor = Executors.newSingleThreadExecutor();
         while (running) {
             try {
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
@@ -54,38 +50,23 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                 logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
                 while (running) {
                     try {
-                        // switcher.get(); //等待开关开启
-
-                        final Message message = connector.getWithoutAck(1);
-                        if (message != null) {
-                            executor.submit(() -> {
-                                try {
-                                    if (logger.isDebugEnabled()) {
-                                        logger.debug("topic: {} batchId: {} batchSize: {} ",
-                                            topic,
-                                            message.getId(),
-                                            message.getEntries().size());
-                                    }
-                                    long begin = System.currentTimeMillis();
-                                    writeOut(message);
-                                    long now = System.currentTimeMillis();
-                                    if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
-                                        logger.error("topic: {} batchId {} elapsed time: {} ms",
-                                            topic,
-                                            message.getId(),
-                                            now - begin);
-                                    }
-                                } catch (Exception e) {
-                                    logger.error(e.getMessage(), e);
-                                }
-                                connector.ack(message.getId());
-                            });
+                        List<?> messages;
+                        if (!flatMessage) {
+                            messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);
                         } else {
-                            logger.debug("Message is null");
+                            messages = connector.getFlatListWithoutAck(100L, TimeUnit.MILLISECONDS);
                         }
-                    } catch (CommitFailedException e) {
-                        logger.warn(e.getMessage());
-                    } catch (Exception e) {
+                        if (messages != null) {
+                            for (final Object message : messages) {
+                                if (message instanceof FlatMessage) {
+                                    writeOut((FlatMessage) message);
+                                } else {
+                                    writeOut((Message) message);
+                                }
+                            }
+                        }
+                        connector.ack();
+                    } catch (Throwable e) {
                         logger.error(e.getMessage(), e);
                         TimeUnit.SECONDS.sleep(1L);
                     }
@@ -95,8 +76,6 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             }
         }
 
-        executor.shutdown();
-
         try {
             connector.unsubscribe();
         } catch (WakeupException e) {

+ 0 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -90,7 +90,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                     long batchId = message.getId();
                     try {
                         int size = message.getEntries().size();
-
                         if (batchId == -1 || size == 0) {
                             Thread.sleep(1000);
                         } else {
@@ -131,7 +130,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                     // ignore
                 }
             }
-
         }
     }
 }

+ 8 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -21,7 +21,7 @@ canal.conf:
   flatMessage: true
   canalInstances:
   - instance: example
-    adapterGroups:
+    groups:
     - outAdapters:
       - name: logger
 #      - name: hbase
@@ -36,6 +36,13 @@ canal.conf:
 #    - groupId: g2
 #      outAdapters:
 #      - name: logger
+#  mqTopics:
+#  - mqMode: rocketmq
+#    topic: example
+#    groups:
+#    - groupId: g2
+#      outAdapters:
+#      - name: logger
 
 #adapter.conf:
 #  datasourceConfigs:

+ 24 - 0
client-launcher/src/main/resources/canal-client.yml

@@ -0,0 +1,24 @@
+#canalServerHost: 127.0.0.1:11111
+#zookeeperHosts: slave1:2181
+bootstrapServers: slave1:6667,slave2:6667 #or rocketmq nameservers:host1:9876;host2:9876
+flatMessage: false
+
+#canalInstances:
+#- instance: example
+#  adapterGroups:
+#  - outAdapters:
+#    - name: logger
+#    - name: hbase
+#      hosts: slave1:2181
+#      properties: {znodeParent: "/hbase-unsecure"}
+
+mqTopics:
+- mqMode: rocketmq
+  topic: example
+  groups:
+  - groupId: example
+    outAdapters:
+    - name: logger
+#    - name: hbase
+#      hosts: slave1:2181
+#      properties: {znodeParent: "/hbase-unsecure"}

+ 91 - 0
client/src/main/java/com/alibaba/otter/canal/client/CanalMQConnector.java

@@ -0,0 +1,91 @@
+package com.alibaba.otter.canal.client;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+
+/**
+ * canal MQ数据操作客户端
+ * 
+ * <pre>
+ * 1. canal server写入MQ消息,考虑性能会合并多条数据写入为一个MQ消息,一个Message对应一个MQ消息
+ * 2. canal client消费MQ消息,因为client性能会弱于server的写入,MQ数据获取时会拿到堆积的多条MQ消息,会拿到List<Message>
+ * 3. client的ack/rollback,都是和MQ直接交互,不存在对应的batchId概念
+ * </pre>
+ * 
+ * @author agapple 2018年10月28日 下午6:42:27
+ * @since 1.1.1
+ */
+public interface CanalMQConnector extends CanalConnector {
+
+    /**
+     * 获取数据,自动进行确认,设置timeout时间直到拿到数据为止
+     * 
+     * <pre>
+     * 该方法返回的条件:
+     *  a. 如果timeout=0,有多少取多少,不会阻塞等待
+     *  b. 如果timeout不为0,尝试阻塞对应的超时时间,直到拿到数据就返回
+     * </pre>
+     * 
+     * @return
+     * @throws CanalClientException
+     */
+    List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException;
+
+    /**
+     * 获取数据,设置timeout时间直到拿到数据为止
+     * 
+     * <pre>
+     * 该方法返回的条件:
+     *  a. 如果timeout=0,有多少取多少,不会阻塞等待
+     *  b. 如果timeout不为0,尝试阻塞对应的超时时间,直到拿到数据就返回
+     * </pre>
+     * 
+     * @throws CanalClientException
+     */
+    List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException;
+
+    /**
+     * 获取数据,自动进行确认,设置timeout时间直到拿到数据为止
+     * 
+     * <pre>
+     * 该方法返回的条件:
+     *  a. 如果timeout=0,有多少取多少,不会阻塞等待
+     *  b. 如果timeout不为0,尝试阻塞对应的超时时间,直到拿到数据就返回
+     * </pre>
+     * 
+     * @return
+     * @throws CanalClientException
+     */
+    List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException;
+
+    /**
+     * 获取数据,设置timeout时间直到拿到数据为止
+     * 
+     * <pre>
+     * 该方法返回的条件:
+     *  a. 如果timeout=0,有多少取多少,不会阻塞等待
+     *  b. 如果timeout不为0,尝试阻塞对应的超时时间,直到拿到数据就返回
+     * </pre>
+     * 
+     * @throws CanalClientException
+     */
+    List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException;
+
+    /**
+     * 消费确认。
+     * 
+     * @throws CanalClientException
+     */
+    void ack() throws CanalClientException;
+
+    /**
+     * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
+     * 
+     * @throws CanalClientException
+     */
+    void rollback() throws CanalClientException;
+}

+ 92 - 45
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -13,16 +13,25 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.CanalMQConnector;
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.Lists;
 
 /**
  * canal kafka 数据操作客户端
+ * 
+ * <pre>
+ * 注意点:
+ * 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
+ * </pre>
  *
  * @author machengyuan @ 2018-6-12
- * @version 1.0.0
+ * @version 1.1.1
  */
-public class KafkaCanalConnector {
+public class KafkaCanalConnector implements CanalMQConnector {
 
     private KafkaConsumer<String, Message> kafkaConsumer;
     private KafkaConsumer<String, String>  kafkaConsumer2;   // 用于扁平message的数据消费
@@ -55,18 +64,6 @@ public class KafkaCanalConnector {
         }
     }
 
-    /**
-     * 重新设置sessionTime
-     *
-     * @param timeout
-     * @param unit
-     */
-    public void setSessionTimeout(Long timeout, TimeUnit unit) {
-        long t = unit.toMillis(timeout);
-        properties.put("request.timeout.ms", String.valueOf(t + 60000));
-        properties.put("session.timeout.ms", String.valueOf(t));
-    }
-
     /**
      * 打开连接
      */
@@ -76,7 +73,6 @@ public class KafkaCanalConnector {
         }
 
         connected = true;
-
         if (kafkaConsumer == null && !flatMessage) {
             kafkaConsumer = new KafkaConsumer<String, Message>(properties);
         }
@@ -151,62 +147,61 @@ public class KafkaCanalConnector {
         }
     }
 
-    /**
-     * 获取数据,自动进行确认
-     *
-     * @return
-     */
-    public List<Message> get() {
-        return get(100L, TimeUnit.MILLISECONDS);
-    }
-
-    public List<Message> get(Long timeout, TimeUnit unit) {
+    @Override
+    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
         waitClientRunning();
         if (!running) {
-            return null;
+            return Lists.newArrayList();
         }
 
-        List<Message> messages = getWithoutAck(timeout, unit);
-        this.ack();
+        List<Message> messages = getListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            this.ack();
+        }
         return messages;
     }
 
-    public List<Message> getWithoutAck() {
-        return getWithoutAck(100L, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * 获取数据,不进行确认,等待处理完成手工确认
-     *
-     * @return
-     */
-    public List<Message> getWithoutAck(Long timeout, TimeUnit unit) {
+    @Override
+    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
         waitClientRunning();
         if (!running) {
-            return null;
+            return Lists.newArrayList();
         }
 
         ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); // 基于配置,最多只能poll到一条数据
 
         if (!records.isEmpty()) {
-            // return records.iterator().next().value();
             List<Message> messages = new ArrayList<>();
             for (ConsumerRecord<String, Message> record : records) {
                 messages.add(record.value());
             }
             return messages;
         }
-        return null;
+        return Lists.newArrayList();
     }
 
-    public List<FlatMessage> getFlatMessageWithoutAck(Long timeout, TimeUnit unit) {
+    @Override
+    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
         waitClientRunning();
         if (!running) {
-            return null;
+            return Lists.newArrayList();
         }
 
-        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
+        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            this.ack();
+        }
+        return messages;
+    }
+
+    @Override
+    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+        waitClientRunning();
+        if (!running) {
+            return Lists.newArrayList();
+        }
 
+        ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
         if (!records.isEmpty()) {
             List<FlatMessage> flatMessages = new ArrayList<>();
             for (ConsumerRecord<String, String> record : records) {
@@ -217,7 +212,11 @@ public class KafkaCanalConnector {
 
             return flatMessages;
         }
-        return null;
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public void rollback() throws CanalClientException {
     }
 
     /**
@@ -245,4 +244,52 @@ public class KafkaCanalConnector {
             // }
         }
     }
+
+    @Override
+    public void subscribe(String filter) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message get(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void ack(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void rollback(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    /**
+     * 重新设置sessionTime
+     *
+     * @param timeout
+     * @param unit
+     */
+    public void setSessionTimeout(Long timeout, TimeUnit unit) {
+        long t = unit.toMillis(timeout);
+        properties.put("request.timeout.ms", String.valueOf(t + 60000));
+        properties.put("session.timeout.ms", String.valueOf(t));
+    }
+
 }

+ 0 - 39
client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningData.java

@@ -1,39 +0,0 @@
-package com.alibaba.otter.canal.client.kafka.running;
-
-/**
- * client running状态信息
- *
- * @author machengyuan 2018-06-20 下午04:10:12
- * @version 1.0.0
- */
-public class ClientRunningData {
-
-    private String  groupId;
-    private String  address;
-    private boolean active = true;
-
-    public String getGroupId() {
-        return groupId;
-    }
-
-    public void setGroupId(String groupId) {
-        this.groupId = groupId;
-    }
-
-    public String getAddress() {
-        return address;
-    }
-
-    public void setAddress(String address) {
-        this.address = address;
-    }
-
-    public boolean isActive() {
-        return active;
-    }
-
-    public void setActive(boolean active) {
-        this.active = active;
-    }
-
-}

+ 0 - 21
client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningListener.java

@@ -1,21 +0,0 @@
-package com.alibaba.otter.canal.client.kafka.running;
-
-/**
- * client running状态信息
- *
- * @author machengyuan 2018-06-20 下午04:10:12
- * @version 1.0.0
- */
-public interface ClientRunningListener {
-
-    /**
-     * 触发现在轮到自己做为active
-     */
-    public void processActiveEnter();
-
-    /**
-     * 触发一下当前active模式失败
-     */
-    public void processActiveExit();
-
-}

+ 0 - 280
client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningMonitor.java

@@ -1,280 +0,0 @@
-package com.alibaba.otter.canal.client.kafka.running;
-
-import java.text.MessageFormat;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkException;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
-import com.alibaba.otter.canal.common.utils.AddressUtils;
-import com.alibaba.otter.canal.common.utils.BooleanMutex;
-import com.alibaba.otter.canal.common.utils.JsonUtils;
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
-import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-
-/**
- * kafka client running状态信息
- *
- * @author machengyuan 2018-06-20 下午04:10:12
- * @version 1.0.0
- */
-public class ClientRunningMonitor extends AbstractCanalLifeCycle {
-
-    private static final String TOPIC_ROOT_NODE             = ZookeeperPathUtils.CANAL_ROOT_NODE
-                                                              + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + "topics";
-
-    private static final String TOPIC_NODE                  = TOPIC_ROOT_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR
-                                                              + "{0}";
-
-    private static final String TOPIC_CLIENTID_NODE         = TOPIC_NODE + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR
-                                                              + "{1}";
-
-    private static final String TOPIC_CLIENTID_RUNNING_NODE = TOPIC_CLIENTID_NODE
-                                                              + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR
-                                                              + ZookeeperPathUtils.RUNNING_NODE;
-
-    private static String getTopicClientRunning(String topic, String groupId) {
-        return MessageFormat.format(TOPIC_CLIENTID_RUNNING_NODE, topic, groupId);
-    }
-
-    private static String getClientIdNodePath(String topic, String groupId) {
-        return MessageFormat.format(TOPIC_CLIENTID_NODE, topic, groupId);
-    }
-
-    private static final Logger        logger       = LoggerFactory.getLogger(ClientRunningMonitor.class);
-    private ZkClientx                  zkClient;
-    private String                     topic;
-    private ClientRunningData          clientData;
-    private IZkDataListener            dataListener;
-    private BooleanMutex               mutex        = new BooleanMutex(false);
-    private volatile boolean           release      = false;
-    private volatile ClientRunningData activeData;
-    private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
-    private ClientRunningListener      listener;
-    private int                        delayTime    = 5;
-
-    private static Integer             virtualPort;
-
-    public ClientRunningMonitor(){
-        if (virtualPort == null) {
-            Random rand = new Random();
-            virtualPort = rand.nextInt(9000) + 1000;
-        }
-
-        dataListener = new IZkDataListener() {
-
-            public void handleDataChange(String dataPath, Object data) throws Exception {
-                MDC.put("kafkaTopic", topic);
-                ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
-                if (!isMine(runningData.getAddress())) {
-                    mutex.set(false);
-                }
-
-                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
-                    release = true;
-                    releaseRunning();// 彻底释放mainstem
-                }
-
-                activeData = (ClientRunningData) runningData;
-            }
-
-            public void handleDataDeleted(String dataPath) throws Exception {
-                MDC.put("kafkaTopic", topic);
-
-                mutex.set(false);
-                // 触发一下退出,可能是人为干预的释放操作或者网络闪断引起的session expired timeout
-                processActiveExit();
-                if (!release && activeData != null && isMine(activeData.getAddress())) {
-                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
-                    initRunning();
-                } else {
-                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
-                    delayExector.schedule(new Runnable() {
-
-                        public void run() {
-                            initRunning();
-                        }
-                    }, delayTime, TimeUnit.SECONDS);
-                }
-            }
-
-        };
-
-    }
-
-    public void start() {
-        super.start();
-
-        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-
-        zkClient.subscribeDataChanges(path, dataListener);
-        initRunning();
-    }
-
-    public void stop() {
-        super.stop();
-        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-        zkClient.unsubscribeDataChanges(path, dataListener);
-        releaseRunning(); // 尝试一下release
-        // Fix issue #697
-        if (delayExector != null) {
-            delayExector.shutdown();
-        }
-    }
-
-    // 改动记录:
-    // 1,在方法上加synchronized关键字,保证同步顺序执行;
-    // 2,判断Zk上已经存在的activeData是否是本机,是的话把mutex重置为true,否则会导致死锁
-    // 3,增加异常处理,保证出现异常时,running节点能被删除,否则会导致死锁
-    public synchronized void initRunning() {
-        if (!isStart()) {
-            return;
-        }
-
-        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-        // 序列化
-        byte[] bytes = JsonUtils.marshalToByte(clientData);
-        try {
-            mutex.set(false);
-            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
-            processActiveEnter();// 触发一下事件
-            activeData = clientData;
-            mutex.set(true);
-        } catch (ZkNodeExistsException e) {
-            bytes = zkClient.readData(path, true);
-            if (bytes == null) {// 如果不存在节点,立即尝试一次
-                initRunning();
-            } else {
-                activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
-                // 如果发现已经存在,判断一下是否自己,避免活锁
-                if (activeData.getAddress().contains(":") && isMine(activeData.getAddress())) {
-                    mutex.set(true);
-                }
-            }
-        } catch (ZkNoNodeException e) {
-            zkClient.createPersistent(getClientIdNodePath(this.topic, clientData.getGroupId()), true); // 尝试创建父节点
-            initRunning();
-        } catch (Throwable t) {
-            logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
-                topic),
-                t);
-            // 出现任何异常尝试release
-            releaseRunning();
-            throw new CanalClientException("something goes wrong in initRunning method. ", t);
-        }
-    }
-
-    /**
-     * 阻塞等待自己成为active,如果自己成为active,立马返回
-     *
-     * @throws InterruptedException
-     */
-    public void waitForActive() throws InterruptedException {
-        initRunning();
-        mutex.get();
-    }
-
-    /**
-     * 检查当前的状态
-     */
-    public boolean check() {
-        String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-        // ZookeeperPathUtils.getDestinationClientRunning(this.destination,
-        // clientData.getClientId());
-        try {
-            byte[] bytes = zkClient.readData(path);
-            ClientRunningData eventData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
-            activeData = eventData;// 更新下为最新值
-            // 检查下nid是否为自己
-            boolean result = isMine(activeData.getAddress());
-            if (!result) {
-                logger.warn("canal is running in [{}] , but not in [{}]",
-                    activeData.getAddress(),
-                    clientData.getAddress());
-            }
-            return result;
-        } catch (ZkNoNodeException e) {
-            logger.warn("canal is not run any in node");
-            return false;
-        } catch (ZkInterruptedException e) {
-            logger.warn("canal check is interrupt");
-            Thread.interrupted();// 清除interrupt标记
-            return check();
-        } catch (ZkException e) {
-            logger.warn("canal check is failed");
-            return false;
-        }
-    }
-
-    public boolean releaseRunning() {
-        if (check()) {
-            String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-            zkClient.delete(path);
-            mutex.set(false);
-            processActiveExit();
-            return true;
-        }
-
-        return false;
-    }
-
-    // ====================== helper method ======================
-
-    private boolean isMine(String address) {
-        return address.equals(clientData.getAddress());
-    }
-
-    private void processActiveEnter() {
-        if (listener != null) {
-            // 触发回调
-            listener.processActiveEnter();
-            this.clientData.setAddress(/* address */AddressUtils.getHostIp() + ":" + virtualPort);
-
-            String path = getTopicClientRunning(this.topic, clientData.getGroupId());
-            // 序列化
-            byte[] bytes = JsonUtils.marshalToByte(clientData);
-            zkClient.writeData(path, bytes);
-        }
-    }
-
-    private void processActiveExit() {
-        if (listener != null) {
-            listener.processActiveExit();
-        }
-    }
-
-    public void setListener(ClientRunningListener listener) {
-        this.listener = listener;
-    }
-
-    // ===================== setter / getter =======================
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public void setClientData(ClientRunningData clientData) {
-        this.clientData = clientData;
-    }
-
-    public void setDelayTime(int delayTime) {
-        this.delayTime = delayTime;
-    }
-
-    public void setZkClient(ZkClientx zkClient) {
-        this.zkClient = zkClient;
-    }
-
-}

+ 5 - 5
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java

@@ -1,18 +1,18 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
-import java.util.concurrent.BlockingQueue;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 public class ConsumerBatchMessage<T> {
 
-    private final BlockingQueue<T> data;
+    private final List<T>  data;
     private CountDownLatch         latch;
     private boolean                hasFailure = false;
 
-    public ConsumerBatchMessage(BlockingQueue<T> data){
+    public ConsumerBatchMessage(List<T> data){
         this.data = data;
-        latch = new CountDownLatch(data.size());
+        latch = new CountDownLatch(1);
     }
 
     public boolean waitFinish(long timeout) throws InterruptedException {
@@ -23,7 +23,7 @@ public class ConsumerBatchMessage<T> {
         return !hasFailure;
     }
 
-    public BlockingQueue<T> getData() {
+    public List<T> getData() {
         return data;
     }
 

+ 125 - 59
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -1,9 +1,7 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -17,33 +15,47 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.CanalMQConnector;
 import com.alibaba.otter.canal.client.CanalMessageDeserializer;
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.Lists;
 
-public class RocketMQCanalConnector implements CanalConnector {
+/**
+ * RocketMQ的连接
+ * 
+ * <pre>
+ * 注意点:
+ * 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
+ * </pre>
+ * 
+ * @since 1.1.1
+ */
+public class RocketMQCanalConnector implements CanalMQConnector {
 
-    private static final Logger                          logger              = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+    private static final Logger                 logger              = LoggerFactory.getLogger(RocketMQCanalConnector.class);
 
-    private String                                       nameServer;
-    private String                                       topic;
-    private String                                       groupName;
-    private volatile boolean                             connected           = false;
-    private DefaultMQPushConsumer                        rocketMQConsumer;
-    private BlockingQueue<ConsumerBatchMessage<Message>> messageBlockingQueue;
-    Map<Long, ConsumerBatchMessage<Message>>             messageCache;
-    private long                                         batchProcessTimeout = 10000;
+    private String                              nameServer;
+    private String                              topic;
+    private String                              groupName;
+    private volatile boolean                    connected           = false;
+    private DefaultMQPushConsumer               rocketMQConsumer;
+    private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
+    private long                                batchProcessTimeout = 60 * 1000;
+    private boolean                             flatMessage;
+    private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
 
-    public RocketMQCanalConnector(String nameServer, String topic, String groupName){
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, boolean flatMessage){
         this.nameServer = nameServer;
         this.topic = topic;
         this.groupName = groupName;
-        messageBlockingQueue = new LinkedBlockingQueue<>();
-        messageCache = new ConcurrentHashMap<>();
+        this.flatMessage = flatMessage;
+        this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
     }
 
-    @Override
     public void connect() throws CanalClientException {
         rocketMQConsumer = new DefaultMQPushConsumer(groupName);
         if (!StringUtils.isBlank(nameServer)) {
@@ -51,17 +63,14 @@ public class RocketMQCanalConnector implements CanalConnector {
         }
     }
 
-    @Override
     public void disconnect() throws CanalClientException {
         rocketMQConsumer.shutdown();
     }
 
-    @Override
     public boolean checkValid() throws CanalClientException {
         return connected;
     }
 
-    @Override
     public synchronized void subscribe(String filter) throws CanalClientException {
         if (connected) {
             return;
@@ -70,7 +79,7 @@ public class RocketMQCanalConnector implements CanalConnector {
             if (rocketMQConsumer == null) {
                 this.connect();
             }
-            rocketMQConsumer.subscribe(topic, "*");
+            rocketMQConsumer.subscribe(this.topic, "*");
             rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
 
                 @Override
@@ -93,17 +102,33 @@ public class RocketMQCanalConnector implements CanalConnector {
     }
 
     private boolean process(List<MessageExt> messageExts) {
-        BlockingQueue<Message> messageList = new LinkedBlockingQueue<>();
+        logger.info("Get Message:{}", messageExts);
+        List messageList = Lists.newArrayList();
         for (MessageExt messageExt : messageExts) {
             byte[] data = messageExt.getBody();
-            Message message = CanalMessageDeserializer.deserializer(data);
-            try {
-                messageList.put(message);
-            } catch (InterruptedException ex) {
-                logger.error("Add message error");
+            if (data != null) {
+                try {
+                    if (!flatMessage) {
+                        Message message = CanalMessageDeserializer.deserializer(data);
+                        messageList.add(message);
+                    } else {
+                        FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
+                        messageList.add(flatMessage);
+                    }
+                } catch (Exception ex) {
+                    logger.error("Add message error", ex);
+                    throw new CanalClientException(ex);
+                }
+            } else {
+                logger.warn("Received message data is null");
             }
         }
-        ConsumerBatchMessage<Message> batchMessage = new ConsumerBatchMessage<>(messageList);
+        ConsumerBatchMessage batchMessage;
+        if (!flatMessage) {
+            batchMessage = new ConsumerBatchMessage<Message>(messageList);
+        } else {
+            batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
+        }
         try {
             messageBlockingQueue.put(batchMessage);
         } catch (InterruptedException e) {
@@ -121,76 +146,117 @@ public class RocketMQCanalConnector implements CanalConnector {
         return isCompleted && isSuccess;
     }
 
-    @Override
     public void subscribe() throws CanalClientException {
         this.subscribe(null);
     }
 
-    @Override
     public void unsubscribe() throws CanalClientException {
         this.rocketMQConsumer.unsubscribe(this.topic);
     }
 
     @Override
-    public Message get(int batchSize) throws CanalClientException {
-        Message message = getWithoutAck(batchSize);
-        ack(message.getId());
-        return message;
+    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
+        List<Message> messages = getListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            ack();
+        }
+        return messages;
     }
 
     @Override
-    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
-        Message message = getWithoutAck(batchSize, timeout, unit);
-        ack(message.getId());
-        return message;
-    }
+    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+        try {
+            if (this.lastGetBatchMessage != null) {
+                throw new CanalClientException("mq get/ack not support concurrent & async ack");
+            }
 
-    private Message getMessage(ConsumerBatchMessage consumerBatchMessage) {
-        BlockingQueue<Message> messageList = consumerBatchMessage.getData();
-        if (messageList != null & messageList.size() > 0) {
-            Message message = messageList.poll();
-            messageCache.put(message.getId(), consumerBatchMessage);
-            return message;
+            ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
+            if (batchMessage != null) {
+                this.lastGetBatchMessage = batchMessage;
+                return batchMessage.getData();
+            }
+        } catch (InterruptedException ex) {
+            logger.warn("Get message timeout", ex);
+            throw new CanalClientException("Failed to fetch the data after: " + timeout);
         }
-        return null;
+        return Lists.newArrayList();
     }
 
     @Override
-    public Message getWithoutAck(int batchSize) throws CanalClientException {
-        ConsumerBatchMessage batchMessage = messageBlockingQueue.poll();
-        if (batchMessage != null) {
-            return getMessage(batchMessage);
+    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
+        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            ack();
         }
-        return null;
+        return messages;
     }
 
     @Override
-    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
         try {
+            if (this.lastGetBatchMessage != null) {
+                throw new CanalClientException("mq get/ack not support concurrent & async ack");
+            }
+
             ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
-            return getMessage(batchMessage);
+            if (batchMessage != null) {
+                this.lastGetBatchMessage = batchMessage;
+                return batchMessage.getData();
+            }
         } catch (InterruptedException ex) {
             logger.warn("Get message timeout", ex);
             throw new CanalClientException("Failed to fetch the data after: " + timeout);
         }
+        return Lists.newArrayList();
     }
 
     @Override
-    public void ack(long batchId) throws CanalClientException {
-        ConsumerBatchMessage batchMessage = messageCache.get(batchId);
-        if (batchMessage != null) {
-            batchMessage.ack();
+    public void ack() throws CanalClientException {
+        try {
+            this.lastGetBatchMessage.ack();
+        } catch (Throwable e) {
+            this.lastGetBatchMessage.fail();
+        } finally {
+            this.lastGetBatchMessage = null;
         }
     }
 
     @Override
-    public void rollback(long batchId) throws CanalClientException {
+    public void rollback() throws CanalClientException {
+        try {
+            this.lastGetBatchMessage.fail();
+        } finally {
+            this.lastGetBatchMessage = null;
+        }
+    }
 
+    public Message get(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
     }
 
     @Override
-    public void rollback() throws CanalClientException {
+    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
 
+    @Override
+    public Message getWithoutAck(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void ack(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void rollback(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
     }
 
     @Override

+ 6 - 2
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectorProvider.java → client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectors.java

@@ -3,7 +3,7 @@ package com.alibaba.otter.canal.client.rocketmq;
 /**
  * RocketMQ connector provider.
  */
-public class RocketMQCanalConnectorProvider {
+public class RocketMQCanalConnectors {
 
     /**
      * Create RocketMQ connector
@@ -14,7 +14,11 @@ public class RocketMQCanalConnectorProvider {
      * @return {@link RocketMQCanalConnector}
      */
     public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic, String groupId) {
-        return new RocketMQCanalConnector(nameServers, topic, groupId);
+        return new RocketMQCanalConnector(nameServers, topic, groupId, false);
     }
 
+    public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic, String groupId,
+                                                              boolean flatMessage) {
+        return new RocketMQCanalConnector(nameServers, topic, groupId, flatMessage);
+    }
 }

+ 0 - 13
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java

@@ -1,13 +0,0 @@
-package com.alibaba.otter.canal.client.rocketmq;
-
-import java.util.List;
-
-import org.apache.rocketmq.common.message.MessageExt;
-
-/**
- * RocketMQ message listener
- */
-public interface RocketMQCanalListener {
-
-    boolean onReceive(List<MessageExt> messageExts);
-}

+ 1 - 2
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

@@ -108,8 +108,7 @@ public class CanalKafkaClientExample {
                 connector.subscribe();
                 while (running) {
                     try {
-                        List<Message> messages = connector.getWithoutAck(1L, TimeUnit.SECONDS); // 获取message
-
+                        List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
                         if (messages == null) {
                             continue;
                         }

+ 15 - 10
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

@@ -1,12 +1,15 @@
 package com.alibaba.otter.canal.client.running.rocketmq;
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectors;
 import com.alibaba.otter.canal.client.running.kafka.AbstractKafkaTest;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -34,7 +37,7 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                                                     };
 
     public CanalRocketMQClientExample(String nameServers, String topic, String groupId){
-        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
+        connector = RocketMQCanalConnectors.newRocketMQConnector(nameServers, topic, groupId);
     }
 
     public static void main(String[] args) {
@@ -103,21 +106,23 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                 connector.connect();
                 connector.subscribe();
                 while (running) {
-                    Message message = connector.getWithoutAck(1); // 获取message
-                    try {
-                        if (message == null) {
-                            continue;
-                        }
+                    List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
+                    for (Message message : messages) {
                         long batchId = message.getId();
                         int size = message.getEntries().size();
                         if (batchId == -1 || size == 0) {
+                            // try {
+                            // Thread.sleep(1000);
+                            // } catch (InterruptedException e) {
+                            // }
                         } else {
+                            // printSummary(message, batchId, size);
+                            // printEntry(message.getEntries());
                             logger.info(message.toString());
                         }
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
                     }
-                    connector.ack(message.getId()); // 提交确认
+
+                    connector.ack(); // 提交确认
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);

+ 1 - 1
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java

@@ -367,7 +367,7 @@ public final class RowsLogBuffer {
                     // 转化为unsign long
                     switch (len) {
                         case 1:
-                            value = buffer.getInt8();
+                            value = buffer.getUint8();
                             break;
                         case 2:
                             value = buffer.getBeUint16();

+ 1 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -387,7 +387,7 @@ public class CanalController {
         return config;
     }
 
-    public String getProperty(Properties properties, String key) {
+    public static String getProperty(Properties properties, String key) {
         key = StringUtils.trim(key);
         String value = System.getProperty(key);
 

+ 20 - 13
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -1,15 +1,17 @@
 package com.alibaba.otter.canal.deployer;
 
-import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
-import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
-import com.alibaba.otter.canal.server.CanalMQStarter;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.io.FileInputStream;
 import java.util.Properties;
+
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
+import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
+import com.alibaba.otter.canal.server.CanalMQStarter;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+
 /**
  * canal独立版本启动的入口类
  *
@@ -19,7 +21,7 @@ import org.slf4j.LoggerFactory;
 public class CanalLauncher {
 
     private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class);
+    private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);
 
     public static void main(String[] args) throws Throwable {
         try {
@@ -36,6 +38,19 @@ public class CanalLauncher {
                 properties.load(new FileInputStream(conf));
             }
 
+            CanalMQProducer canalMQProducer = null;
+            String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
+            if (serverMode.equalsIgnoreCase("kafka")) {
+                canalMQProducer = new CanalKafkaProducer();
+            } else if (serverMode.equalsIgnoreCase("rocketmq")) {
+                canalMQProducer = new CanalRocketMQProducer();
+            }
+
+            if (canalMQProducer != null) {
+                // disable netty
+                System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
+            }
+
             logger.info("## start the canal server.");
             final CanalController controller = new CanalController(properties);
             controller.start();
@@ -55,19 +70,11 @@ public class CanalLauncher {
 
             });
 
-            CanalMQProducer canalMQProducer = null;
-            String serverMode = controller.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
-            if (serverMode.equalsIgnoreCase("kafka")) {
-                canalMQProducer = new CanalKafkaProducer();
-            } else if (serverMode.equalsIgnoreCase("rocketmq")) {
-                canalMQProducer = new CanalRocketMQProducer();
-            }
             if (canalMQProducer != null) {
                 CanalMQStarter canalServerStarter = new CanalMQStarter(canalMQProducer);
                 if (canalServerStarter != null) {
                     canalServerStarter.init();
                 }
-
             }
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal Server:", e);

+ 4 - 0
deployer/src/main/resources/canal.properties

@@ -69,6 +69,10 @@ canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destinat
 canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
 canal.instance.tsdb.dbUsername=canal
 canal.instance.tsdb.dbPassword=canal
+# dump snapshot interval, default 24 hour
+canal.instance.tsdb.snapshot.interval=24
+# purge snapshot expire , default 360 hour(15 days)
+canal.instance.tsdb.snapshot.expire=360
 
 # rds oss binlog account
 canal.instance.rds.accesskey =

+ 1 - 1
deployer/src/main/resources/mq.yml

@@ -6,7 +6,7 @@ bufferMemory: 33554432
 
 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
 canalBatchSize: 50
-# Canal get数据的超时时间, 单位: 毫秒, 为不限超时
+# Canal get数据的超时时间, 单位: 毫秒, 0为不限超时
 canalGetTimeout: 100
 flatMessage: true
 

+ 2 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -176,6 +176,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
+		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
 		
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>

+ 2 - 0
deployer/src/main/resources/spring/file-instance.xml

@@ -161,6 +161,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
+		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
 
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>

+ 2 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -149,6 +149,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		<property name="tsdbSnapshotInterval" value="${canal.instance.tsdb.snapshot.interval:24}" />
+		<property name="tsdbSnapshotExpire" value="${canal.instance.tsdb.snapshot.expire:360}" />
 		
 		<!--是否启用GTID模式-->
 		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>

+ 2 - 2
deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml

@@ -36,10 +36,10 @@
     </delete>
 
 
-    <delete id="deleteByGmtModified" parameterClass="java.util.Map">
+    <delete id="deleteByTimestamp" parameterClass="java.util.Map">
         <![CDATA[
 		delete from meta_history
-		where gmt_modified < timestamp(#timestamp#)
+		where destination=#destination# and binlog_timestamp < #timestamp#
         ]]>
     </delete>
 </sqlMap>

+ 2 - 2
deployer/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml

@@ -42,10 +42,10 @@
         where destination=#destination#
     </delete>
 
-    <delete id="deleteByGmtModified" parameterClass="java.util.Map">
+    <delete id="deleteByTimestamp" parameterClass="java.util.Map">
         <![CDATA[
 		delete from meta_snapshot
-		where gmt_modified < timestamp(#timestamp#)
+		where destination=#destination# and binlog_timestamp < #timestamp# and binlog_timestamp > 0
         ]]>
     </delete>
 </sqlMap>

+ 6 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -296,6 +296,12 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
             mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
             // tsdb
+            if (parameters.getTsdbSnapshotInterval() != null) {
+                mysqlEventParser.setTsdbSnapshotInterval(parameters.getTsdbSnapshotInterval());
+            }
+            if (parameters.getTsdbSnapshotExpire() != null) {
+                mysqlEventParser.setTsdbSnapshotExpire(parameters.getTsdbSnapshotExpire());
+            }
             boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
             if (tsdbEnable) {
                 mysqlEventParser.setEnableTsdb(tsdbEnable);

+ 18 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -98,6 +98,8 @@ public class CanalParameter implements Serializable {
     private String                   tsdbJdbcUrl;
     private String                   tsdbJdbcUserName;
     private String                   tsdbJdbcPassword;
+    private Integer                  tsdbSnapshotInterval               = 24;
+    private Integer                  tsdbSnapshotExpire                 = 360;
     private String                   rdsAccesskey;
     private String                   rdsSecretkey;
     private String                   rdsInstanceId;
@@ -964,6 +966,22 @@ public class CanalParameter implements Serializable {
         this.memoryStorageRawEntry = memoryStorageRawEntry;
     }
 
+    public Integer getTsdbSnapshotInterval() {
+        return tsdbSnapshotInterval;
+    }
+
+    public void setTsdbSnapshotInterval(Integer tsdbSnapshotInterval) {
+        this.tsdbSnapshotInterval = tsdbSnapshotInterval;
+    }
+
+    public Integer getTsdbSnapshotExpire() {
+        return tsdbSnapshotExpire;
+    }
+
+    public void setTsdbSnapshotExpire(Integer tsdbSnapshotExpire) {
+        this.tsdbSnapshotExpire = tsdbSnapshotExpire;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 18 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -26,6 +26,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
 
     protected TableMetaTSDBFactory tableMetaTSDBFactory      = new DefaultTableMetaTSDBFactory();
     protected boolean              enableTsdb                = false;
+    protected int                  tsdbSnapshotInterval      = 24;
+    protected int                  tsdbSnapshotExpire        = 360;
     protected String               tsdbSpringXml;
     protected TableMetaTSDB        tableMetaTSDB;
 
@@ -210,4 +212,20 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
         return this.receivedBinlogBytes;
     }
 
+    public int getTsdbSnapshotInterval() {
+        return tsdbSnapshotInterval;
+    }
+
+    public void setTsdbSnapshotInterval(int tsdbSnapshotInterval) {
+        this.tsdbSnapshotInterval = tsdbSnapshotInterval;
+    }
+
+    public int getTsdbSnapshotExpire() {
+        return tsdbSnapshotExpire;
+    }
+
+    public void setTsdbSnapshotExpire(int tsdbSnapshotExpire) {
+        this.tsdbSnapshotExpire = tsdbSnapshotExpire;
+    }
+
 }

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -55,6 +55,8 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
             ((DatabaseTableMeta) tableMetaTSDB).setConnection(metaConnection);
             ((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
             ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
+            ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
+            ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
         }
 
         tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -121,6 +121,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 ((DatabaseTableMeta) tableMetaTSDB).setConnection(metaConnection);
                 ((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
                 ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
+                ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
+                ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
             }
 
             tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);

+ 4 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -671,9 +671,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             }
 
             int javaType = buffer.getJavaType();
-            if (isSingleBit && javaType == Types.TINYINT) {
-                javaType = Types.BIT;
-            }
             if (buffer.isNull()) {
                 columnBuilder.setIsNull(true);
             } else {
@@ -722,6 +719,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                             // 对象为number类型,直接valueof即可
                             columnBuilder.setValue(String.valueOf(value));
                         }
+
+                        if (isSingleBit && javaType == Types.TINYINT) {
+                            javaType = Types.BIT;
+                        }
                         break;
                     case Types.REAL: // float
                     case Types.DOUBLE: // double

+ 49 - 15
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java

@@ -44,19 +44,21 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
  */
 public class DatabaseTableMeta implements TableMetaTSDB {
 
-    public static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
-    private static Logger             logger        = LoggerFactory.getLogger(DatabaseTableMeta.class);
-    private static Pattern            pattern       = Pattern.compile("Duplicate entry '.*' for key '*'");
-    private static Pattern            h2Pattern     = Pattern.compile("Unique index or primary key violation");
+    public static final EntryPosition INIT_POSITION    = new EntryPosition("0", 0L, -2L, -1L);
+    private static Logger             logger           = LoggerFactory.getLogger(DatabaseTableMeta.class);
+    private static Pattern            pattern          = Pattern.compile("Duplicate entry '.*' for key '*'");
+    private static Pattern            h2Pattern        = Pattern.compile("Unique index or primary key violation");
     private String                    destination;
     private MemoryTableMeta           memoryTableMeta;
-    private MysqlConnection           connection;                                                              // 查询meta信息的链接
+    private MysqlConnection           connection;                                                                 // 查询meta信息的链接
     private CanalEventFilter          filter;
     private CanalEventFilter          blackFilter;
     private EntryPosition             lastPosition;
     private ScheduledExecutorService  scheduler;
     private MetaHistoryDAO            metaHistoryDAO;
     private MetaSnapshotDAO           metaSnapshotDAO;
+    private int                       snapshotInterval = 24;
+    private int                       snapshotExpire   = 360;
 
     public DatabaseTableMeta(){
 
@@ -77,18 +79,30 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         });
 
         // 24小时生成一份snapshot
-        scheduler.scheduleWithFixedDelay(new Runnable() {
+        if (snapshotInterval > 0) {
+            scheduler.scheduleWithFixedDelay(new Runnable() {
+
+                @Override
+                public void run() {
+                    boolean applyResult = false;
+                    try {
+                        MDC.put("destination", destination);
+                        applyResult = applySnapshotToDB(lastPosition, false);
+                    } catch (Throwable e) {
+                        logger.error("scheudle applySnapshotToDB faield", e);
+                    }
 
-            @Override
-            public void run() {
-                try {
-                    MDC.put("destination", destination);
-                    applySnapshotToDB(lastPosition, false);
-                } catch (Throwable e) {
-                    logger.error("scheudle applySnapshotToDB faield", e);
+                    try {
+                        MDC.put("destination", destination);
+                        if (applyResult) {
+                            snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire));
+                        }
+                    } catch (Throwable e) {
+                        logger.error("scheudle snapshotExpire faield", e);
+                    }
                 }
-            }
-        }, 24, 24, TimeUnit.HOURS);
+            }, snapshotInterval, snapshotInterval, TimeUnit.HOURS);
+        }
         return true;
     }
 
@@ -461,6 +475,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         return true;
     }
 
+    private int snapshotExpire(int expireTimestamp) {
+        return metaSnapshotDAO.deleteByTimestamp(destination, expireTimestamp);
+    }
+
     public void setConnection(MysqlConnection connection) {
         this.connection = connection;
     }
@@ -489,6 +507,22 @@ public class DatabaseTableMeta implements TableMetaTSDB {
         this.blackFilter = blackFilter;
     }
 
+    public int getSnapshotInterval() {
+        return snapshotInterval;
+    }
+
+    public void setSnapshotInterval(int snapshotInterval) {
+        this.snapshotInterval = snapshotInterval;
+    }
+
+    public int getSnapshotExpire() {
+        return snapshotExpire;
+    }
+
+    public void setSnapshotExpire(int snapshotExpire) {
+        this.snapshotExpire = snapshotExpire;
+    }
+
     public MysqlConnection getConnection() {
         return connection;
     }

+ 4 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDAO.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 
@@ -37,13 +35,12 @@ public class MetaHistoryDAO extends MetaBaseDAO {
     /**
      * 删除interval秒之前的数据
      */
-    public Integer deleteByGmtModified(int interval) {
+    public Integer deleteByTimestamp(String destination, int interval) {
         HashMap params = Maps.newHashMapWithExpectedSize(2);
         long timestamp = System.currentTimeMillis() - interval * 1000;
-        Date date = new Date(timestamp);
-        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        params.put("timestamp", format.format(date));
-        return getSqlMapClientTemplate().delete("meta_history.deleteByGmtModified", params);
+        params.put("timestamp", timestamp);
+        params.put("destination", destination);
+        return getSqlMapClientTemplate().delete("meta_history.deleteByTimestamp", params);
     }
 
     protected void initDao() throws Exception {

+ 4 - 7
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaSnapshotDAO.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.HashMap;
 
 import com.google.common.collect.Maps;
@@ -40,13 +38,12 @@ public class MetaSnapshotDAO extends MetaBaseDAO {
     /**
      * 删除interval秒之前的数据
      */
-    public Integer deleteByGmtModified(int interval) {
+    public Integer deleteByTimestamp(String destination, int interval) {
         HashMap params = Maps.newHashMapWithExpectedSize(2);
         long timestamp = System.currentTimeMillis() - interval * 1000;
-        Date date = new Date(timestamp);
-        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        params.put("timestamp", format.format(date));
-        return getSqlMapClientTemplate().delete("meta_snapshot.deleteByGmtModified", params);
+        params.put("timestamp", timestamp);
+        params.put("destination", destination);
+        return getSqlMapClientTemplate().delete("meta_snapshot.deleteByTimestamp", params);
     }
 
     protected void initDao() throws Exception {

+ 17 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaHistoryDAOTest.java

@@ -16,7 +16,7 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDO;
  * Created by wanshao Date: 2017/9/20 Time: 下午5:00
  **/
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(locations = { "/tsdb/mysql-tsdb.xml" })
+@ContextConfiguration(locations = { "/tsdb/h2-tsdb.xml" })
 public class MetaHistoryDAOTest {
 
     @Resource
@@ -24,7 +24,22 @@ public class MetaHistoryDAOTest {
 
     @Test
     public void testSimple() {
-        List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp("test", 0L, 0L);
+        MetaHistoryDO historyDO = new MetaHistoryDO();
+        historyDO.setDestination("test");
+        historyDO.setBinlogFile("000001");
+        historyDO.setBinlogOffest(4L);
+        historyDO.setBinlogMasterId("1");
+        historyDO.setBinlogTimestamp(System.currentTimeMillis() - 7300 * 1000);
+        historyDO.setSqlSchema("test");
+        historyDO.setUseSchema("test");
+        historyDO.setSqlTable("testTable");
+        historyDO.setSqlTable("drop table testTable");
+        metaHistoryDAO.insert(historyDO);
+
+        int count = metaHistoryDAO.deleteByTimestamp("test", 7200);
+        System.out.println(count);
+
+        List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp("test", 0L, System.currentTimeMillis());
         for (MetaHistoryDO metaHistoryDO : metaHistoryDOList) {
             System.out.println(metaHistoryDO.getId());
         }

+ 41 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MetaSnapshotDAOTest.java

@@ -0,0 +1,41 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+import javax.annotation.Resource;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO;
+import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDO;
+
+/**
+ * Created by wanshao Date: 2017/9/20 Time: 下午5:00
+ **/
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations = { "/tsdb/h2-tsdb.xml" })
+public class MetaSnapshotDAOTest {
+
+    @Resource
+    MetaSnapshotDAO metaSnapshotDAO;
+
+    @Test
+    public void testSimple() {
+        MetaSnapshotDO metaSnapshotDO = new MetaSnapshotDO();
+        metaSnapshotDO.setDestination("test");
+        metaSnapshotDO.setBinlogFile("000001");
+        metaSnapshotDO.setBinlogOffest(4L);
+        metaSnapshotDO.setBinlogMasterId("1");
+        metaSnapshotDO.setBinlogTimestamp(System.currentTimeMillis() - 7300 * 1000);
+        metaSnapshotDO.setData("test");
+        metaSnapshotDAO.insert(metaSnapshotDO);
+
+        MetaSnapshotDO snapshotDO = metaSnapshotDAO.findByTimestamp("test", System.currentTimeMillis());
+        System.out.println(snapshotDO);
+
+        int count = metaSnapshotDAO.deleteByTimestamp("test", 7200);
+        System.out.println(count);
+    }
+
+}

+ 2 - 2
parse/src/test/resources/tsdb/sql-map/sqlmap_history.xml

@@ -36,10 +36,10 @@
     </delete>
 
 
-    <delete id="deleteByGmtModified" parameterClass="java.util.Map">
+    <delete id="deleteByTimestamp" parameterClass="java.util.Map">
         <![CDATA[
 		delete from meta_history
-		where gmt_modified < timestamp(#timestamp#)
+		where destination=#destination# and binlog_timestamp < #timestamp#
         ]]>
     </delete>
 </sqlMap>

+ 2 - 2
parse/src/test/resources/tsdb/sql-map/sqlmap_snapshot.xml

@@ -54,10 +54,10 @@
         where destination=#destination#
     </delete>
 
-    <delete id="deleteByGmtModified" parameterClass="java.util.Map">
+    <delete id="deleteByTimestamp" parameterClass="java.util.Map">
         <![CDATA[
 		delete from meta_snapshot
-		where gmt_modified < timestamp(#timestamp#)
+		where destination=#destination# and binlog_timestamp < #timestamp# and binlog_timestamp > 0
         ]]>
     </delete>
 </sqlMap>

+ 0 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -277,7 +277,6 @@ public class FlatMessage implements Serializable {
             partitionsNum = 1;
         }
         FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
-
         String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
         if (pk == null || flatMessage.getIsDdl()) {
             partitionMessages[0] = flatMessage;

+ 2 - 2
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -12,7 +12,7 @@ import java.util.Map;
  */
 public class MQProperties {
 
-    private String                 servers                = "localhost:6667";
+    private String                 servers                = "127.0.0.1:6667";
     private int                    retries                = 0;
     private int                    batchSize              = 16384;
     private int                    lingerMs               = 1;
@@ -20,7 +20,7 @@ public class MQProperties {
     private boolean                filterTransactionEntry = true;
     private String                 producerGroup          = "Canal-Producer";
     private int                    canalBatchSize         = 50;
-    private Long                   canalGetTimeout;
+    private Long                   canalGetTimeout        = 100L;
     private boolean                flatMessage            = true;
 
     private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();

+ 91 - 17
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,10 +1,7 @@
 package com.alibaba.otter.canal.rocketmq;
 
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.server.exception.CanalServerException;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.util.List;
+
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -15,14 +12,24 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+
 public class CanalRocketMQProducer implements CanalMQProducer {
 
     private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
 
     private DefaultMQProducer   defaultMQProducer;
 
+    private MQProperties        mqProperties;
+
     @Override
     public void init(MQProperties rocketMQProperties) {
+        this.mqProperties = rocketMQProperties;
         defaultMQProducer = new DefaultMQProducer();
         defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
         defaultMQProducer.setProducerGroup(rocketMQProperties.getProducerGroup());
@@ -38,24 +45,91 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     @Override
     public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
                      Callback callback) {
-        try {
-            Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data));
-            this.defaultMQProducer.send(message, new MessageQueueSelector() {
+        if (!mqProperties.getFlatMessage()) {
+            try {
+                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data));
+                logger.debug("send message:{} to destination:{}, partition: {}",
+                    message,
+                    destination.getCanalDestination(),
+                    destination.getPartition());
+                this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
-                @Override
-                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                    int partition = 0;
+                    @Override
+                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                        int partition = 0;
+                        if (destination.getPartition() != null) {
+                            partition = destination.getPartition();
+                        }
+                        return mqs.get(partition);
+                    }
+                }, null);
+                callback.commit();
+            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+                logger.error("Send message error!", e);
+                callback.rollback();
+            }
+        } else {
+            List<FlatMessage> flatMessages = FlatMessage.messageConverter(data);
+            if (flatMessages != null) {
+                for (FlatMessage flatMessage : flatMessages) {
                     if (destination.getPartition() != null) {
-                        partition = destination.getPartition();
+                        try {
+                            logger.info("send flat message: {} to topic: {} fixed partition: {}",
+                                JSON.toJSONString(flatMessage),
+                                destination.getTopic(),
+                                destination.getPartition());
+                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage)
+                                .getBytes());
+                            this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                                @Override
+                                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                    return mqs.get(destination.getPartition());
+                                }
+                            }, null);
+                        } catch (Exception e) {
+                            logger.error("send flat message to fixed partition error", e);
+                            callback.rollback();
+                        }
+                    } else {
+                        if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                                destination.getPartitionsNum(),
+                                destination.getPartitionHash());
+                            int length = partitionFlatMessage.length;
+                            for (int i = 0; i < length; i++) {
+                                FlatMessage flatMessagePart = partitionFlatMessage[i];
+                                logger.debug("flatMessagePart: {}, partition: {}",
+                                    JSON.toJSONString(flatMessagePart),
+                                    i);
+                                final int index = i;
+                                try {
+                                    Message message = new Message(destination.getTopic(),
+                                        JSON.toJSONString(flatMessagePart).getBytes());
+                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                                        @Override
+                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                            if (index > mqs.size()) {
+                                                throw new CanalServerException("partition number is error,config num:"
+                                                                               + destination.getPartitionsNum()
+                                                                               + ", mq num: " + mqs.size());
+                                            }
+                                            return mqs.get(index);
+                                        }
+                                    }, null);
+                                } catch (Exception e) {
+                                    logger.error("send flat message to hashed partition error", e);
+                                    callback.rollback();
+                                }
+
+                            }
+                        }
                     }
-                    return mqs.get(partition);
                 }
-            }, null);
-            callback.commit();
-        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
-            logger.error("Send message error!", e);
-            callback.rollback();
+            }
         }
+
     }
 
     @Override

+ 5 - 6
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -121,15 +121,14 @@ public class CanalMQStarter {
                 server.subscribe(clientIdentity);
                 logger.info("## the canal consumer {} is running now ......", destination.getCanalDestination());
 
+                Long getTimeout = properties.getCanalGetTimeout();
+                int getBatchSize = properties.getCanalBatchSize();
                 while (running) {
                     Message message;
-                    if (properties.getCanalGetTimeout() != null) {
-                        message = server.getWithoutAck(clientIdentity,
-                            properties.getCanalBatchSize(),
-                            properties.getCanalGetTimeout(),
-                            TimeUnit.MILLISECONDS);
+                    if (getTimeout != null && getTimeout > 0) {
+                        message = server.getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS);
                     } else {
-                        message = server.getWithoutAck(clientIdentity, properties.getCanalBatchSize());
+                        message = server.getWithoutAck(clientIdentity, getBatchSize);
                     }
 
                     final long batchId = message.getId();