Browse Source

Merge pull request #2 from alibaba/master

merge master
Neal Hu 7 years ago
parent
commit
4f4b016da3
62 changed files with 2577 additions and 811 deletions
  1. 1 0
      client/src/main/java/com/alibaba/otter/canal/client/CanalConnector.java
  2. 6 3
      client/src/main/java/com/alibaba/otter/canal/client/CanalConnectors.java
  3. 19 8
      client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.java
  4. 20 5
      client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java
  5. 0 1
      common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java
  6. 1 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
  7. 14 7
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  8. 1 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  9. 1 0
      deployer/src/main/resources/canal.properties
  10. 5 1
      deployer/src/main/resources/logback.xml
  11. 1 1
      deployer/src/main/resources/spring/default-instance.xml
  12. 1 1
      deployer/src/main/resources/spring/file-instance.xml
  13. 1 1
      deployer/src/main/resources/spring/memory-instance.xml
  14. 1 1
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java
  15. 2 2
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/GTIDSet.java
  16. 25 60
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MysqlGTIDSet.java
  17. 22 30
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/UUIDSet.java
  18. 18 20
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/BinlogDumpGTIDCommandPacket.java
  19. 2 2
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannelPool.java
  20. 2 2
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/ByteHelper.java
  21. 4 4
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java
  22. 35 45
      driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlGTIDSetTest.java
  23. 5 5
      driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/UUIDSetTest.java
  24. 0 3
      example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java
  25. 111 0
      kafka-client/pom.xml
  26. 128 0
      kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java
  27. 35 0
      kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnectors.java
  28. 62 0
      kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/MessageDeserializer.java
  29. 25 0
      kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java
  30. 139 0
      kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/CanalKafkaClientExample.java
  31. 61 0
      kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaClientRunningTest.java
  32. 19 0
      kafka-client/src/test/resources/logback.xml
  33. 140 0
      kafka/pom.xml
  34. 64 0
      kafka/src/main/assembly/dev.xml
  35. 64 0
      kafka/src/main/assembly/release.xml
  36. 25 0
      kafka/src/main/bin/startup.bat
  37. 104 0
      kafka/src/main/bin/startup.sh
  38. 65 0
      kafka/src/main/bin/stop.sh
  39. 17 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalLauncher.java
  40. 78 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalServerStarter.java
  41. 76 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java
  42. 147 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java
  43. 161 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java
  44. 50 0
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java
  45. 18 0
      kafka/src/main/resources/kafka.yml
  46. 85 0
      kafka/src/main/resources/logback.xml
  47. 5 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  48. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java
  49. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  50. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  51. 10 10
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java
  52. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  53. 0 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDO.java
  54. 30 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/FastsqlSchemaTest.java
  55. 40 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta_DDL_Test.java
  56. 21 0
      parse/src/test/resources/ddl/ddl_test1.sql
  57. 3 1
      pom.xml
  58. 578 565
      protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java
  59. 0 1
      protocol/src/main/java/com/alibaba/otter/canal/protocol/position/EntryPosition.java
  60. 17 15
      server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java
  61. 1 1
      server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java
  62. 5 5
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java

+ 1 - 0
client/src/main/java/com/alibaba/otter/canal/client/CanalConnector.java

@@ -152,6 +152,7 @@ public interface CanalConnector {
 
     /**
      * 中断的阻塞,用于优雅停止client
+     * 
      * @throws CanalClientException
      */
     void stopRunning() throws CanalClientException;

+ 6 - 3
client/src/main/java/com/alibaba/otter/canal/client/CanalConnectors.java

@@ -29,7 +29,8 @@ public class CanalConnectors {
     public static CanalConnector newSingleConnector(SocketAddress address, String destination, String username,
                                                     String password) {
         SimpleCanalConnector canalConnector = new SimpleCanalConnector(address, username, password, destination);
-        canalConnector.setSoTimeout(30 * 1000);
+        canalConnector.setSoTimeout(60 * 1000);
+        canalConnector.setIdleTimeout(60 * 60 * 1000);
         return canalConnector;
     }
 
@@ -48,7 +49,8 @@ public class CanalConnectors {
             password,
             destination,
             new SimpleNodeAccessStrategy(addresses));
-        canalConnector.setSoTimeout(30 * 1000);
+        canalConnector.setSoTimeout(60 * 1000);
+        canalConnector.setIdleTimeout(60 * 60 * 1000);
         return canalConnector;
     }
 
@@ -67,7 +69,8 @@ public class CanalConnectors {
             password,
             destination,
             new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
-        canalConnector.setSoTimeout(30 * 1000);
+        canalConnector.setSoTimeout(60 * 1000);
+        canalConnector.setIdleTimeout(60 * 60 * 1000);
         return canalConnector;
     }
 }

+ 19 - 8
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.java

@@ -22,7 +22,8 @@ public class ClusterCanalConnector implements CanalConnector {
     private final Logger            logger        = LoggerFactory.getLogger(this.getClass());
     private String                  username;
     private String                  password;
-    private int                     soTimeout     = 10000;
+    private int                     soTimeout     = 60000;
+    private int                     idleTimeout   = 60 * 60 * 1000;
     private int                     retryTimes    = 3;                                       // 设置-1时可以subscribe阻塞等待时优雅停机
     private int                     retryInterval = 5000;                                    // 重试的时间间隔,默认5秒
     private CanalNodeAccessStrategy accessStrategy;
@@ -52,6 +53,7 @@ public class ClusterCanalConnector implements CanalConnector {
 
                     };
                     currentConnector.setSoTimeout(soTimeout);
+                    currentConnector.setIdleTimeout(idleTimeout);
                     if (filter != null) {
                         currentConnector.setFilter(filter);
                     }
@@ -110,10 +112,8 @@ public class ClusterCanalConnector implements CanalConnector {
                     logger.info("block waiting interrupted by other thread.");
                     return;
                 } else {
-                    logger.warn(String.format(
-                            "something goes wrong when subscribing from server: %s",
-                            currentConnector != null ? currentConnector.getAddress() : "null"),
-                            t);
+                    logger.warn(String.format("something goes wrong when subscribing from server: %s",
+                        currentConnector != null ? currentConnector.getAddress() : "null"), t);
                     times++;
                     restart();
                     logger.info("restart the connector for next round retry.");
@@ -218,7 +218,8 @@ public class ClusterCanalConnector implements CanalConnector {
                 return;
             } catch (Throwable t) {
                 logger.warn(String.format("something goes wrong when rollbacking data from server:%s",
-                    currentConnector.getAddress()), t);
+                    currentConnector.getAddress()),
+                    t);
                 times++;
                 restart();
                 logger.info("restart the connector for next round retry.");
@@ -235,7 +236,8 @@ public class ClusterCanalConnector implements CanalConnector {
                 return;
             } catch (Throwable t) {
                 logger.warn(String.format("something goes wrong when rollbacking data from server:%s",
-                    currentConnector.getAddress()), t);
+                    currentConnector.getAddress()),
+                    t);
                 times++;
                 restart();
                 logger.info("restart the connector for next round retry.");
@@ -253,7 +255,8 @@ public class ClusterCanalConnector implements CanalConnector {
                 return;
             } catch (Throwable t) {
                 logger.warn(String.format("something goes wrong when acking data from server:%s",
-                    currentConnector.getAddress()), t);
+                    currentConnector.getAddress()),
+                    t);
                 times++;
                 restart();
                 logger.info("restart the connector for next round retry.");
@@ -300,6 +303,14 @@ public class ClusterCanalConnector implements CanalConnector {
         this.soTimeout = soTimeout;
     }
 
+    public int getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public void setIdleTimeout(int idleTimeout) {
+        this.idleTimeout = idleTimeout;
+    }
+
     public int getRetryTimes() {
         return retryTimes;
     }

+ 20 - 5
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -55,6 +55,7 @@ public class SimpleCanalConnector implements CanalConnector {
     private String               username;
     private String               password;
     private int                  soTimeout             = 60000;                                              // milliseconds
+    private int                  idleTimeout           = 60 * 60 * 1000;                                     // client和server之间的空闲链接超时的时间,默认为1小时
     private String               filter;                                                                     // 记录上一次的filter提交值,便于自动重试时提交
 
     private final ByteBuffer     readHeader            = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
@@ -78,15 +79,21 @@ public class SimpleCanalConnector implements CanalConnector {
     private boolean              running               = false;
 
     public SimpleCanalConnector(SocketAddress address, String username, String password, String destination){
-        this(address, username, password, destination, 60000);
+        this(address, username, password, destination, 60000, 60 * 60 * 1000);
     }
 
     public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
                                 int soTimeout){
+        this(address, username, password, destination, soTimeout, 60 * 60 * 1000);
+    }
+
+    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
+                                int soTimeout, int idleTimeout){
         this.address = address;
         this.username = username;
         this.password = password;
         this.soTimeout = soTimeout;
+        this.idleTimeout = idleTimeout;
         this.clientIdentity = new ClientIdentity(destination, (short) 1001);
     }
 
@@ -157,8 +164,8 @@ public class SimpleCanalConnector implements CanalConnector {
             ClientAuth ca = ClientAuth.newBuilder()
                 .setUsername(username != null ? username : "")
                 .setPassword(ByteString.copyFromUtf8(password != null ? password : ""))
-                .setNetReadTimeout(soTimeout)
-                .setNetWriteTimeout(soTimeout)
+                .setNetReadTimeout(idleTimeout)
+                .setNetWriteTimeout(idleTimeout)
                 .build();
             writeWithHeader(Packet.newBuilder()
                 .setType(PacketType.CLIENTAUTHENTICATION)
@@ -500,6 +507,14 @@ public class SimpleCanalConnector implements CanalConnector {
         this.soTimeout = soTimeout;
     }
 
+    public int getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public void setIdleTimeout(int idleTimeout) {
+        this.idleTimeout = idleTimeout;
+    }
+
     public void setZkClientx(ZkClientx zkClientx) {
         this.zkClientx = zkClientx;
         initClientRunningMonitor(this.clientIdentity);
@@ -519,9 +534,9 @@ public class SimpleCanalConnector implements CanalConnector {
 
     public void stopRunning() {
         if (running) {
-            running = false;  //设置为非running状态
+            running = false; // 设置为非running状态
             if (!mutex.state()) {
-                mutex.set(true);  //中断阻塞
+                mutex.set(true); // 中断阻塞
             }
         }
     }

+ 0 - 1
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java

@@ -4,7 +4,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.common.CanalException;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -16,6 +16,7 @@ public class CanalConstants {
     public static final String CANAL_IP                          = ROOT + "." + "ip";
     public static final String CANAL_PORT                        = ROOT + "." + "port";
     public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
+    public static final String CANAL_WITHOUT_NETTY               = ROOT + "." + "withoutNetty";
 
     public static final String CANAL_DESTINATIONS                = ROOT + "." + "destinations";
     public static final String CANAL_AUTO_SCAN                   = ROOT + "." + "auto.scan";

+ 14 - 7
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -43,7 +43,7 @@ import com.google.common.collect.MigrateMap;
 
 /**
  * canal调度控制器
- * 
+ *
  * @author jianghang 2012-11-8 下午12:03:11
  * @version 1.0.0
  */
@@ -97,9 +97,12 @@ public class CanalController {
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
-        canalServer = CanalServerWithNetty.instance();
-        canalServer.setIp(ip);
-        canalServer.setPort(port);
+        String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
+        if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
+            canalServer = CanalServerWithNetty.instance();
+            canalServer.setIp(ip);
+            canalServer.setPort(port);
+        }
 
         // 处理下ip为空,默认使用hostIp暴露到zk中
         if (StringUtils.isEmpty(ip)) {
@@ -431,11 +434,15 @@ public class CanalController {
         }
 
         // 启动网络接口
-        canalServer.start();
+        if (canalServer != null) {
+            canalServer.start();
+        }
     }
 
     public void stop() throws Throwable {
-        canalServer.stop();
+        if (canalServer != null) {
+            canalServer.stop();
+        }
 
         if (autoScan) {
             for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
@@ -454,7 +461,7 @@ public class CanalController {
         // 释放canal的工作节点
         releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
         logger.info("## stop the canal server[{}:{}]", ip, port);
-        
+
         if (zkclientx != null) {
             zkclientx.close();
         }

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -59,6 +59,7 @@ public class CanalLauncher {
 
     private static void setGlobalUncaughtExceptionHandler() {
         Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+
             @Override
             public void uncaughtException(Thread t, Throwable e) {
                 logger.error("UnCaughtException", e);

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -7,6 +7,7 @@ canal.port= 11111
 canal.zkServers=
 # flush data to zk
 canal.zookeeper.flush.period = 1000
+canal.withoutNetty = false
 # flush meta cursor/parse position to file
 canal.file.data.dir = ${canal.conf.dir}
 canal.file.flush.period = 1000

+ 5 - 1
deployer/src/main/resources/logback.xml

@@ -70,9 +70,13 @@
         <appender-ref ref="CANAL-ROOT" />
     </logger>
     <logger name="com.alibaba.otter.canal.meta.FileMixedMetaManager" additivity="false">  
-        <level value="INFO" />  
+        <level value="INFO" />
         <appender-ref ref="CANAL-META" />
     </logger>
+	<logger name="com.alibaba.otter.canal.kafka" additivity="false">
+		<level value="INFO" />
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
     
 	<root level="WARN">
 		<appender-ref ref="STDOUT"/>

+ 1 - 1
deployer/src/main/resources/spring/default-instance.xml

@@ -191,6 +191,6 @@
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
 		
 		<!--是否启用GTID模式-->
-		<property name="isGTIDMode" value="${canal.instance.gtidon}"/>
+		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
 	</bean>
 </beans>

+ 1 - 1
deployer/src/main/resources/spring/file-instance.xml

@@ -176,6 +176,6 @@
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
 
 		<!--是否启用GTID模式-->
-		<property name="isGTIDMode" value="${canal.instance.gtidon}"/>
+		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
 	</bean>
 </beans>

+ 1 - 1
deployer/src/main/resources/spring/memory-instance.xml

@@ -164,6 +164,6 @@
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
 		
 		<!--是否启用GTID模式-->
-		<property name="isGTIDMode" value="${canal.instance.gtidon}"/>
+		<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>
 	</bean>
 </beans>

+ 1 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -53,7 +53,7 @@ public class MysqlConnector {
         String addr = address.getHostString();
         int port = address.getPort();
         this.address = new InetSocketAddress(addr, port);
-        
+
         this.username = username;
         this.password = password;
     }

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/GTIDSet.java

@@ -3,8 +3,7 @@ package com.alibaba.otter.canal.parse.driver.mysql.packets;
 import java.io.IOException;
 
 /**
- * Created by hiwjd on 2018/4/23.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/23. hiwjd0@gmail.com
  */
 public interface GTIDSet {
 
@@ -17,6 +16,7 @@ public interface GTIDSet {
 
     /**
      * 更新当前实例
+     * 
      * @param str
      * @throws Exception
      */

+ 25 - 60
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MysqlGTIDSet.java

@@ -1,14 +1,14 @@
 package com.alibaba.otter.canal.parse.driver.mysql.packets;
 
-import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
 
 /**
- * Created by hiwjd on 2018/4/23.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/23. hiwjd0@gmail.com
  */
 public class MysqlGTIDSet implements GTIDSet {
 
@@ -56,61 +56,26 @@ public class MysqlGTIDSet implements GTIDSet {
     }
 
     /**
-     * 解析如下格式的字符串为MysqlGTIDSet:
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:2}]
-     *       }
-     *     }
-     *   }
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:4}]
-     *       }
-     *     }
-     *   }
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:4 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:5}]
-     *       }
-     *     }
-     *   }
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:4}, {start:7, stop: 10}]
-     *       }
-     *     }
-     *   }
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3,726757ad-4455-11e8-ae04-0242ac110003:4 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:4}]
-     *       },
-     *       726757ad-4455-11e8-ae04-0242ac110003: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:4, stop:5}]
-     *       }
-     *     }
-     *   }
+     * 解析如下格式的字符串为MysqlGTIDSet: 726757ad-4455-11e8-ae04-0242ac110002:1 =>
+     * MysqlGTIDSet{ sets: { 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:2}] } }
+     * } 726757ad-4455-11e8-ae04-0242ac110002:1-3 => MysqlGTIDSet{ sets: {
+     * 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}] } }
+     * } 726757ad-4455-11e8-ae04-0242ac110002:1-3:4 => MysqlGTIDSet{ sets: {
+     * 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:5}] } }
+     * } 726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9 => MysqlGTIDSet{ sets: {
+     * 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4},
+     * {start:7, stop: 10}] } } }
+     * 726757ad-4455-11e8-ae04-0242ac110002:1-3,726757
+     * ad-4455-11e8-ae04-0242ac110003:4 => MysqlGTIDSet{ sets: {
+     * 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}] },
+     * 726757ad-4455-11e8-ae04-0242ac110003: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:4, stop:5}] } }
+     * }
      *
      * @param gtidData
      * @return

+ 22 - 30
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/UUIDSet.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.parse.driver.mysql.packets;
 
-import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -10,13 +8,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+
 /**
- * Created by hiwjd on 2018/4/23.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/23. hiwjd0@gmail.com
  */
 public class UUIDSet {
 
-    public UUID SID;
+    public UUID           SID;
     public List<Interval> intervals;
 
     public byte[] encode() throws IOException {
@@ -54,6 +53,7 @@ public class UUIDSet {
     }
 
     public static class Interval implements Comparable<Interval> {
+
         public long start;
         public long stop;
 
@@ -85,19 +85,15 @@ public class UUIDSet {
     }
 
     /**
-     * 解析如下格式字符串为UUIDSet:
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1 =>
-     *   UUIDSet{SID: 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:2}]}
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3 =>
-     *   UUIDSet{SID: 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}]}
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:4
-     *   UUIDSet{SID: 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:5}]}
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9
-     *   UUIDSet{SID: 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}, {start:7, stop:10}]}
+     * 解析如下格式字符串为UUIDSet: 726757ad-4455-11e8-ae04-0242ac110002:1 => UUIDSet{SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:2}]}
+     * 726757ad-4455-11e8-ae04-0242ac110002:1-3 => UUIDSet{SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}]}
+     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:4 UUIDSet{SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:5}]}
+     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9 UUIDSet{SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4},
+     * {start:7, stop:10}]}
      *
      * @param str
      * @return
@@ -110,7 +106,7 @@ public class UUIDSet {
         }
 
         List<Interval> intervals = new ArrayList<Interval>();
-        for (int i=1; i<ss.length; i++) {
+        for (int i = 1; i < ss.length; i++) {
             intervals.add(parseInterval(ss[i]));
         }
 
@@ -134,7 +130,7 @@ public class UUIDSet {
                 sb.append(":");
                 sb.append(interval.start);
                 sb.append("-");
-                sb.append(interval.stop-1);
+                sb.append(interval.stop - 1);
             }
         }
 
@@ -142,12 +138,8 @@ public class UUIDSet {
     }
 
     /**
-     * 解析如下格式字符串为Interval:
-     *
-     * 1 => Interval{start:1, stop:2}
-     * 1-3 => Interval{start:1, stop:4}
-     *
-     * 注意!字符串格式表达时[n,m]是两侧都包含的,Interval表达时[n,m)右侧开
+     * 解析如下格式字符串为Interval: 1 => Interval{start:1, stop:2} 1-3 =>
+     * Interval{start:1, stop:4} 注意!字符串格式表达时[n,m]是两侧都包含的,Interval表达时[n,m)右侧开
      *
      * @param str
      * @return
@@ -173,8 +165,8 @@ public class UUIDSet {
     }
 
     /**
-     * 把{start,stop}连续的合并掉:
-     * [{start:1, stop:4},{start:4, stop:5}] => [{start:1, stop:5}]
+     * 把{start,stop}连续的合并掉: [{start:1, stop:4},{start:4, stop:5}] => [{start:1,
+     * stop:5}]
      *
      * @param intervals
      * @return
@@ -183,11 +175,11 @@ public class UUIDSet {
         List<Interval> combined = new ArrayList<Interval>();
         Collections.sort(intervals);
         int len = intervals.size();
-        for (int i=0; i<len; i++) {
+        for (int i = 0; i < len; i++) {
             combined.add(intervals.get(i));
 
             int j;
-            for (j=i+1; j<len; j++) {
+            for (j = i + 1; j < len; j++) {
                 if (intervals.get(i).stop >= intervals.get(j).start) {
                     intervals.get(i).stop = intervals.get(j).stop;
                 } else {

+ 18 - 20
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/BinlogDumpGTIDCommandPacket.java

@@ -1,28 +1,26 @@
 package com.alibaba.otter.canal.parse.driver.mysql.packets.client;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
 import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
 /**
- * Created by hiwjd on 2018/4/24.
- * hiwjd0@gmail.com
- *
+ * Created by hiwjd on 2018/4/24. hiwjd0@gmail.com
  * https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
  */
 public class BinlogDumpGTIDCommandPacket extends CommandPacket {
 
-    public static final int BINLOG_DUMP_NON_BLOCK = 0x01;
+    public static final int BINLOG_DUMP_NON_BLOCK   = 0x01;
     public static final int BINLOG_THROUGH_POSITION = 0x02;
-    public static final int BINLOG_THROUGH_GTID = 0x04;
+    public static final int BINLOG_THROUGH_GTID     = 0x04;
 
-    public long    slaveServerId;
-    public GTIDSet gtidSet;
+    public long             slaveServerId;
+    public GTIDSet          gtidSet;
 
-    public BinlogDumpGTIDCommandPacket() {
+    public BinlogDumpGTIDCommandPacket(){
         setCommand((byte) 0x1e);
     }
 
@@ -51,15 +49,15 @@ public class BinlogDumpGTIDCommandPacket extends CommandPacket {
         // 6. [4] data-size
         ByteHelper.writeUnsignedIntLittleEndian(bs.length, out);
         // 7, [] data
-        //       [8] n_sids // 文档写的是4个字节,其实是8个字节
-        //       for n_sids {
-        //          [16] SID
-        //          [8] n_intervals
-        //          for n_intervals {
-        //             [8] start (signed)
-        //             [8] end (signed)
-        //          }
-        //       }
+        // [8] n_sids // 文档写的是4个字节,其实是8个字节
+        // for n_sids {
+        // [16] SID
+        // [8] n_intervals
+        // for n_intervals {
+        // [8] start (signed)
+        // [8] end (signed)
+        // }
+        // }
         out.write(bs);
         // }
 

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannelPool.java

@@ -29,8 +29,8 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings({ "rawtypes", "deprecation" })
 public abstract class NettySocketChannelPool {
 
-    private static EventLoopGroup              group     = new NioEventLoopGroup();                         // 非阻塞IO线程组
-    private static Bootstrap                   boot      = new Bootstrap();                                 // 主
+    private static EventLoopGroup              group     = new NioEventLoopGroup();                              // 非阻塞IO线程组
+    private static Bootstrap                   boot      = new Bootstrap();                                      // 主
     private static Map<Channel, SocketChannel> chManager = new ConcurrentHashMap<Channel, SocketChannel>();
     private static final Logger                logger    = LoggerFactory.getLogger(NettySocketChannelPool.class);
 

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/ByteHelper.java

@@ -108,7 +108,7 @@ public abstract class ByteHelper {
 
         return out.toByteArray();
     }
-    
+
     public static void write8ByteUnsignedIntLittleEndian(long data, ByteArrayOutputStream out) {
         out.write((byte) (data & 0xFF));
         out.write((byte) (data >>> 8));
@@ -119,7 +119,7 @@ public abstract class ByteHelper {
         out.write((byte) (data >>> 48));
         out.write((byte) (data >>> 56));
     }
-    
+
     public static void writeUnsignedIntLittleEndian(long data, ByteArrayOutputStream out) {
         out.write((byte) (data & 0xFF));
         out.write((byte) (data >>> 8));

+ 4 - 4
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java

@@ -14,15 +14,15 @@ public abstract class PacketManager {
     }
 
     public static HeaderPacket readHeader(SocketChannel ch, int len, int timeout) throws IOException {
-    	HeaderPacket header = new HeaderPacket();
-    	header.fromBytes(ch.read(len, timeout));
-    	return header;
+        HeaderPacket header = new HeaderPacket();
+        header.fromBytes(ch.read(len, timeout));
+        return header;
     }
 
     public static byte[] readBytes(SocketChannel ch, int len) throws IOException {
         return ch.read(len);
     }
-    
+
     public static byte[] readBytes(SocketChannel ch, int len, int timeout) throws IOException {
         return ch.read(len, timeout);
     }

+ 35 - 45
driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlGTIDSetTest.java

@@ -1,17 +1,23 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.UUIDSet;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.*;
 
 /**
- * Created by hiwjd on 2018/4/25.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/25. hiwjd0@gmail.com
  */
 public class MysqlGTIDSetTest {
 
@@ -20,14 +26,12 @@ public class MysqlGTIDSetTest {
         GTIDSet gtidSet = MysqlGTIDSet.parse("726757ad-4455-11e8-ae04-0242ac110002:1");
         byte[] bytes = gtidSet.encode();
 
-        byte[] expected = {
-                0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x72, 0x67, 0x57, (byte)0xad,
-                0x44, 0x55, 0x11, (byte)0xe8, (byte)0xae, 0x04, 0x02, 0x42, (byte)0xac, 0x11, 0x00, 0x02,
-                0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
-                0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
-        };
+        byte[] expected = { 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x72, 0x67, 0x57, (byte) 0xad, 0x44, 0x55,
+                0x11, (byte) 0xe8, (byte) 0xae, 0x04, 0x02, 0x42, (byte) 0xac, 0x11, 0x00, 0x02, 0x01, 0x00, 0x00,
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
+                0x00, 0x00, 0x00, 0x00 };
 
-        for (int i=0; i<bytes.length; i++) {
+        for (int i = 0; i < bytes.length; i++) {
             assertEquals(expected[i], bytes[i]);
         }
     }
@@ -35,31 +39,17 @@ public class MysqlGTIDSetTest {
     @Test
     public void testParse() {
         Map<String, MysqlGTIDSet> cases = new HashMap<String, MysqlGTIDSet>(5);
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1",
-                buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 2))
-        );
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1-3",
-                buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4))
-        );
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1-3:4",
-                buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 5))
-        );
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9",
-                buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4, 7, 10))
-        );
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1-3,726757ad-4455-11e8-ae04-0242ac110003:4",
-                buildForTest(
-                        Arrays.asList(
-                            new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4),
-                                new Material("726757ad-4455-11e8-ae04-0242ac110003", 4, 5)
-                        )
-                )
-        );
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1",
+            buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 2)));
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1-3",
+            buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4)));
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1-3:4",
+            buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 5)));
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9",
+            buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4, 7, 10)));
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1-3,726757ad-4455-11e8-ae04-0242ac110003:4",
+            buildForTest(Arrays.asList(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4),
+                new Material("726757ad-4455-11e8-ae04-0242ac110003", 4, 5))));
 
         for (Map.Entry<String, MysqlGTIDSet> entry : cases.entrySet()) {
             MysqlGTIDSet expected = entry.getValue();
@@ -71,7 +61,7 @@ public class MysqlGTIDSetTest {
 
     private static class Material {
 
-        public Material(String uuid, long start, long stop) {
+        public Material(String uuid, long start, long stop){
             this.uuid = uuid;
             this.start = start;
             this.stop = stop;
@@ -79,7 +69,7 @@ public class MysqlGTIDSetTest {
             this.stop1 = 0;
         }
 
-        public Material(String uuid, long start, long stop, long start1, long stop1) {
+        public Material(String uuid, long start, long stop, long start1, long stop1){
             this.uuid = uuid;
             this.start = start;
             this.stop = stop;
@@ -88,10 +78,10 @@ public class MysqlGTIDSetTest {
         }
 
         public String uuid;
-        public long start;
-        public long stop;
-        public long start1;
-        public long stop1;
+        public long   start;
+        public long   stop;
+        public long   start1;
+        public long   stop1;
     }
 
     private MysqlGTIDSet buildForTest(Material material) {

+ 5 - 5
driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/UUIDSetTest.java

@@ -1,16 +1,16 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.UUIDSet;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.UUIDSet;
 
 /**
- * Created by hiwjd on 2018/4/26.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/26. hiwjd0@gmail.com
  */
 public class UUIDSetTest {
 

+ 0 - 3
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -13,7 +13,6 @@ import org.springframework.util.Assert;
 import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
@@ -38,7 +37,6 @@ public class AbstractCanalClientTest {
     protected static final String             SEP                = SystemUtils.LINE_SEPARATOR;
     protected static final String             DATE_FORMAT        = "yyyy-MM-dd HH:mm:ss";
     protected volatile boolean                running            = false;
-    private volatile boolean                  waiting            = true;
     protected Thread.UncaughtExceptionHandler handler            = new Thread.UncaughtExceptionHandler() {
 
                                                                      public void uncaughtException(Thread t, Throwable e) {
@@ -116,7 +114,6 @@ public class AbstractCanalClientTest {
                 MDC.put("destination", destination);
                 connector.connect();
                 connector.subscribe();
-                waiting = false;
                 while (running) {
                     Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                     long batchId = message.getId();

+ 111 - 0
kafka-client/pom.xml

@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>canal</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.0.26-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>canal.kafka.client</artifactId>
+    <packaging>jar</packaging>
+    <name>canal kafka client module for otter ${project.version}</name>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.protocol</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.0.1</version>
+        </dependency>
+
+        <!-- junit -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>dev</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>env</name>
+                    <value>!javadoc</value>
+                </property>
+            </activation>
+        </profile>
+
+        <profile>
+            <id>javadoc</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>javadoc</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-javadoc-plugin</artifactId>
+                        <version>2.9.1</version>
+                        <executions>
+                            <execution>
+                                <id>attach-javadocs</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>jar</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <aggregate>true</aggregate>
+                            <show>public</show>
+                            <nohelp>true</nohelp>
+                            <header>${project.artifactId}-${project.version}</header>
+                            <footer>${project.artifactId}-${project.version}</footer>
+                            <doctitle>${project.artifactId}-${project.version}</doctitle>
+                            <links>
+                                <link>https://github.com/alibaba/canal</link>
+                            </links>
+                            <outputDirectory>${project.build.directory}/apidocs/apidocs/${project.version}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-scm-publish-plugin</artifactId>
+                        <version>1.0-beta-2</version>
+                        <executions>
+                            <execution>
+                                <id>attach-javadocs</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>publish-scm</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <checkoutDirectory>${project.build.directory}/scmpublish</checkoutDirectory>
+                            <checkinComment>Publishing javadoc for ${project.artifactId}:${project.version}</checkinComment>
+                            <content>${project.build.directory}/apidocs</content>
+                            <skipDeletedFiles>true</skipDeletedFiles>
+                            <pubScmUrl>scm:git:git@github.com:alibaba/canal.git</pubScmUrl>
+                            <scmBranch>gh-pages</scmBranch>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

+ 128 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java

@@ -0,0 +1,128 @@
+package com.alibaba.otter.canal.kafka.client;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * canal kafka 数据操作客户端
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class KafkaCanalConnector {
+
+    private KafkaConsumer<String, Message> kafkaConsumer;
+
+    private String                         topic;
+
+    private Integer                        partition;
+
+    private Properties                     properties;
+
+    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId){
+        this.topic = topic;
+        this.partition = partition;
+
+        properties = new Properties();
+        properties.put("bootstrap.servers", servers);
+        properties.put("group.id", groupId);
+        properties.put("enable.auto.commit", false);
+        properties.put("auto.commit.interval.ms", "1000");
+        properties.put("auto.offset.reset", "latest"); // earliest
+                                                       // //如果没有offset则从最后的offset开始读
+        properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
+        properties.put("session.timeout.ms", "30000"); // 默认为30秒
+        properties.put("max.poll.records", "1"); // 所以一次只取一条数据
+        properties.put("key.deserializer", StringDeserializer.class.getName());
+        properties.put("value.deserializer", MessageDeserializer.class.getName());
+    }
+
+    /**
+     * 重新设置sessionTime
+     *
+     * @param timeout
+     * @param unit
+     */
+    public void setSessionTimeout(Long timeout, TimeUnit unit) {
+        long t = unit.toMillis(timeout);
+        properties.put("request.timeout.ms", String.valueOf(t + 60000));
+        properties.put("session.timeout.ms", String.valueOf(t));
+    }
+
+    /**
+     * 关闭链接
+     */
+    public void close() {
+        kafkaConsumer.close();
+    }
+
+    /**
+     * 订阅topic
+     */
+    public void subscribe() {
+        if (kafkaConsumer == null) {
+            kafkaConsumer = new KafkaConsumer<String, Message>(properties);
+        }
+        if (partition == null) {
+            kafkaConsumer.subscribe(Collections.singletonList(topic));
+        } else {
+            TopicPartition topicPartition = new TopicPartition(topic, partition);
+            kafkaConsumer.assign(Collections.singletonList(topicPartition));
+        }
+    }
+
+    /**
+     * 取消订阅
+     */
+    public void unsubscribe() {
+        kafkaConsumer.unsubscribe();
+    }
+
+    /**
+     * 获取数据,自动进行确认
+     *
+     * @return
+     */
+    public Message get() {
+        return get(100L, TimeUnit.MILLISECONDS);
+    }
+
+    public Message get(Long timeout, TimeUnit unit) {
+        Message message = getWithoutAck(timeout, unit);
+        this.ack();
+        return message;
+    }
+
+    public Message getWithoutAck() {
+        return getWithoutAck(100L, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * 获取数据,不进行确认,等待处理完成手工确认
+     *
+     * @return
+     */
+    public Message getWithoutAck(Long timeout, TimeUnit unit) {
+        ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); // 基于配置,最多只能poll到一条数据
+
+        if (!records.isEmpty()) {
+            return records.iterator().next().value();
+        }
+        return null;
+    }
+
+    /**
+     * 提交offset,如果超过 session.timeout.ms 设置的时间没有ack则会抛出异常,ack失败
+     */
+    public void ack() {
+        kafkaConsumer.commitSync();
+    }
+}

+ 35 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnectors.java

@@ -0,0 +1,35 @@
+package com.alibaba.otter.canal.kafka.client;
+
+/**
+ * canal kafka connectors创建工具类
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class KafkaCanalConnectors {
+
+    /**
+     * 创建kafka客户端链接
+     *
+     * @param servers
+     * @param topic
+     * @param partition
+     * @param groupId
+     * @return
+     */
+    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition, String groupId) {
+        return new KafkaCanalConnector(servers, topic, partition, groupId);
+    }
+
+    /**
+     * 创建kafka客户端链接
+     *
+     * @param servers
+     * @param topic
+     * @param groupId
+     * @return
+     */
+    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, String groupId) {
+        return new KafkaCanalConnector(servers, topic, null, groupId);
+    }
+}

+ 62 - 0
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/MessageDeserializer.java

@@ -0,0 +1,62 @@
+package com.alibaba.otter.canal.kafka.client;
+
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.protobuf.ByteString;
+
+/**
+ * Kafka Message类的反序列化
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class MessageDeserializer implements Deserializer<Message> {
+
+    private static Logger logger = LoggerFactory.getLogger(MessageDeserializer.class);
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public Message deserialize(String topic, byte[] data) {
+        try {
+            if (data == null) return null;
+            else {
+                CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
+                switch (p.getType()) {
+                    case MESSAGES: {
+                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)) {
+                            throw new CanalClientException("compression is not supported in this connector");
+                        }
+
+                        CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
+                        Message result = new Message(messages.getBatchId());
+                        for (ByteString byteString : messages.getMessagesList()) {
+                            result.addEntry(CanalEntry.Entry.parseFrom(byteString));
+                        }
+                        return result;
+                    }
+                    default:
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Error when deserializing byte[] to message ");
+        }
+        return null;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

+ 25 - 0
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java

@@ -0,0 +1,25 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import org.junit.Assert;
+
+/**
+ * Kafka 测试基类
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public abstract class AbstractKafkaTest {
+
+    public static String  topic     = "example";
+    public static Integer partition = null;
+    public static String  groupId   = "g1";
+    public static String  servers   = "slave1:6667,slave2:6667,slave3:6667";
+
+    public void sleep(long time) {
+        try {
+            Thread.sleep(time);
+        } catch (InterruptedException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+}

+ 139 - 0
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/CanalKafkaClientExample.java

@@ -0,0 +1,139 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * Kafka client example
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class CanalKafkaClientExample {
+
+    protected final static Logger           logger  = LoggerFactory.getLogger(CanalKafkaClientExample.class);
+
+    private KafkaCanalConnector             connector;
+
+    private static volatile boolean         running = false;
+
+    private Thread                          thread  = null;
+
+    private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+
+                                                        public void uncaughtException(Thread t, Throwable e) {
+                                                            logger.error("parse events has an error", e);
+                                                        }
+                                                    };
+
+    public CanalKafkaClientExample(String servers, String topic, Integer partition, String groupId){
+        connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
+    }
+
+    public static void main(String[] args) {
+        try {
+            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(AbstractKafkaTest.servers,
+                AbstractKafkaTest.topic,
+                AbstractKafkaTest.partition,
+                AbstractKafkaTest.groupId);
+            logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
+            kafkaCanalClientExample.start();
+            logger.info("## the canal kafka consumer is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the kafka consumer");
+                        kafkaCanalClientExample.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping kafka consumer:", e);
+                    } finally {
+                        logger.info("## kafka consumer is down.");
+                    }
+                }
+
+            });
+            while (running)
+                ;
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the kafka consumer:", e);
+            System.exit(0);
+        }
+    }
+
+    public void start() {
+        Assert.notNull(connector, "connector is null");
+        thread = new Thread(new Runnable() {
+
+            public void run() {
+                process();
+            }
+        });
+        thread.setUncaughtExceptionHandler(handler);
+        thread.start();
+        running = true;
+    }
+
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        running = false;
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        try {
+            connector.subscribe();
+            while (running) {
+                try {
+                    Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); // 获取message
+                    if (message == null) {
+                        continue;
+                    }
+                    long batchId = message.getId();
+                    int size = message.getEntries().size();
+                    if (batchId == -1 || size == 0) {
+                        // try {
+                        // Thread.sleep(1000);
+                        // } catch (InterruptedException e) {
+                        // }
+                    } else {
+                        // printSummary(message, batchId, size);
+                        // printEntry(message.getEntries());
+                        logger.info(message.toString());
+                    }
+
+                    connector.ack(); // 提交确认
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.close();
+    }
+}

+ 61 - 0
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaClientRunningTest.java

@@ -0,0 +1,61 @@
+package com.alibaba.otter.canal.kafka.client.running;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.errors.WakeupException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * Kafka consumer获取Message的测试例子
+ *
+ * @author machengyuan @ 2018-6-12
+ * @version 1.0.0
+ */
+public class KafkaClientRunningTest extends AbstractKafkaTest {
+
+    private Logger  logger  = LoggerFactory.getLogger(KafkaClientRunningTest.class);
+
+    private boolean running = true;
+
+    @Test
+    public void testKafkaConsumer() {
+        final ExecutorService executor = Executors.newFixedThreadPool(1);
+
+        final KafkaCanalConnector connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
+
+        executor.submit(new Runnable() {
+
+            @Override
+            public void run() {
+                connector.subscribe();
+                while (running) {
+                    try {
+                        Message message = connector.getWithoutAck(3L, TimeUnit.SECONDS);
+                        if (message != null) {
+                            System.out.println(message);
+                        }
+                        connector.ack();
+                    } catch (WakeupException e) {
+                        // ignore
+                    }
+                }
+                connector.unsubscribe();
+                connector.close();
+            }
+        });
+
+        sleep(60000);
+        running = false;
+        executor.shutdown();
+        logger.info("shutdown completed");
+    }
+
+}

+ 19 - 0
kafka-client/src/test/resources/logback.xml

@@ -0,0 +1,19 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+
+	<logger name="org.apache.kafka" additivity="false">
+		<level value="ERROR" />
+		<appender-ref ref="STDOUT" />
+	</logger>
+
+	<root level="INFO">
+		<appender-ref ref="STDOUT"/>
+	</root>
+</configuration>

+ 140 - 0
kafka/pom.xml

@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>canal</artifactId>
+        <groupId>com.alibaba.otter</groupId>
+        <version>1.0.26-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <groupId>com.alibaba.otter</groupId>
+    <artifactId>canal.kafka</artifactId>
+    <packaging>jar</packaging>
+    <name>canal kafka module for otter ${project.version}</name>
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.deployer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.17</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.0.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- deploy模块的packaging通常是jar,如果项目中没有java 源代码或资源文件,加上这一段配置使项目能通过构建 -->
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <addMavenDescriptor>true</addMavenDescriptor>
+                    </archive>
+                    <excludes>
+                        <exclude>**/logback.xml</exclude>
+                        <exclude>**/canal.properties</exclude>
+                        <exclude>**/spring/**</exclude>
+                        <exclude>**/example/**</exclude>
+                        <exclude>**/kafka.yml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <!-- 这是最新版本,推荐使用这个版本 -->
+                <version>2.2.1</version>
+                <executions>
+                    <execution>
+                        <id>assemble</id>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <phase>package</phase>
+                    </execution>
+                </executions>
+                <configuration>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>dev</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>env</name>
+                    <value>!release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <!-- maven assembly插件需要一个描述文件 来告诉插件包的结构以及打包所需的文件来自哪里 -->
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/dev.xml</descriptor>
+                            </descriptors>
+                            <finalName>canal</finalName>
+                            <outputDirectory>${project.build.directory}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+
+        </profile>
+
+        <profile>
+            <id>release</id>
+            <activation>
+                <property>
+                    <name>env</name>
+                    <value>release</value>
+                </property>
+            </activation>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <!-- 发布模式使用的maven assembly插件描述文件 -->
+                            <descriptors>
+                                <descriptor>${basedir}/src/main/assembly/release.xml</descriptor>
+                            </descriptors>
+                            <!-- 如果一个应用的包含多个deploy模块,如果使用同样的包名, 如果把它们复制的一个目录中可能会失败,所以包名加了 artifactId以示区分 -->
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <!-- scm 要求 release 模式打出的包放到顶级目录下的target子目录中 -->
+                            <outputDirectory>${project.parent.build.directory}</outputDirectory>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

+ 64 - 0
kafka/src/main/assembly/dev.xml

@@ -0,0 +1,64 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>dist</id>
+	<formats>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>.</directory>
+			<outputDirectory>/</outputDirectory>
+			<includes>
+				<include>README*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/conf</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<excludes>
+				<exclude>logback.xml</exclude>
+			</excludes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>logs</outputDirectory>
+			<excludes>
+				<exclude>**/*</exclude>
+			</excludes>
+		</fileSet>
+	</fileSets>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>lib</outputDirectory>
+			<excludes>
+				<exclude>junit:junit</exclude>
+			</excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>

+ 64 - 0
kafka/src/main/assembly/release.xml

@@ -0,0 +1,64 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>dist</id>
+	<formats>
+		<format>tar.gz</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>.</directory>
+			<outputDirectory>/</outputDirectory>
+			<includes>
+				<include>README*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/conf</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>../deployer/src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+			<excludes>
+				<exclude>logback.xml</exclude>
+			</excludes>
+		</fileSet>
+		<fileSet>
+			<directory>./src/main/resources</directory>
+			<outputDirectory>/conf</outputDirectory>
+			<includes>
+				<include>**/*</include>
+			</includes>
+		</fileSet>
+		<fileSet>
+			<directory>target</directory>
+			<outputDirectory>logs</outputDirectory>
+			<excludes>
+				<exclude>**/*</exclude>
+			</excludes>
+		</fileSet>
+	</fileSets>
+	<dependencySets>
+		<dependencySet>
+			<outputDirectory>lib</outputDirectory>
+			<excludes>
+				<exclude>junit:junit</exclude>
+			</excludes>
+		</dependencySet>
+	</dependencySets>
+</assembly>

+ 25 - 0
kafka/src/main/bin/startup.bat

@@ -0,0 +1,25 @@
+@echo off
+@if not "%ECHO%" == ""  echo %ECHO%
+@if "%OS%" == "Windows_NT"  setlocal
+
+set ENV_PATH=.\
+if "%OS%" == "Windows_NT" set ENV_PATH=%~dp0%
+
+set conf_dir=%ENV_PATH%\..\conf
+set canal_conf=%conf_dir%\canal.properties
+set logback_configurationFile=%conf_dir%\logback.xml
+
+set CLASSPATH=%conf_dir%
+set CLASSPATH=%conf_dir%\..\lib\*;%CLASSPATH%
+
+set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m
+set JAVA_OPTS_EXT= -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dapplication.codeset=UTF-8 -Dfile.encoding=UTF-8
+set JAVA_DEBUG_OPT= -server -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=9099,server=y,suspend=n
+set CANAL_OPTS= -DappName=otter-canal -Dlogback.configurationFile="%logback_configurationFile%" -Dcanal.conf="%canal_conf%"
+
+set JAVA_OPTS= %JAVA_MEM_OPTS% %JAVA_OPTS_EXT% %JAVA_DEBUG_OPT% %CANAL_OPTS%
+
+set CMD_STR= java %JAVA_OPTS% -classpath "%CLASSPATH%" java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.kafka.CanalLauncher
+echo start cmd : %CMD_STR%
+
+java %JAVA_OPTS% -classpath "%CLASSPATH%" com.alibaba.otter.canal.kafka.CanalLauncher

+ 104 - 0
kafka/src/main/bin/startup.sh

@@ -0,0 +1,104 @@
+#!/bin/bash 
+
+current_path=`pwd`
+case "`uname`" in
+    Linux)
+		bin_abs_path=$(readlink -f $(dirname $0))
+		;;
+	*)
+		bin_abs_path=`cd $(dirname $0); pwd`
+		;;
+esac
+base=${bin_abs_path}/..
+canal_conf=$base/conf/canal.properties
+logback_configurationFile=$base/conf/logback.xml
+export LANG=en_US.UTF-8
+export BASE=$base
+
+if [ -f $base/bin/canal.pid ] ; then
+	echo "found canal.pid , Please run stop.sh first ,then startup.sh" 2>&2
+    exit 1
+fi
+
+if [ ! -d $base/logs/canal ] ; then 
+	mkdir -p $base/logs/canal
+fi
+
+## set java path
+if [ -z "$JAVA" ] ; then
+  JAVA=$(which java)
+fi
+
+ALIBABA_JAVA="/usr/alibaba/java/bin/java"
+TAOBAO_JAVA="/opt/taobao/java/bin/java"
+if [ -z "$JAVA" ]; then
+  if [ -f $ALIBABA_JAVA ] ; then
+  	JAVA=$ALIBABA_JAVA
+  elif [ -f $TAOBAO_JAVA ] ; then
+  	JAVA=$TAOBAO_JAVA
+  else
+  	echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
+    exit 1
+  fi
+fi
+
+case "$#" 
+in
+0 ) 
+	;;
+1 )	
+	var=$*
+	if [ -f $var ] ; then 
+		canal_conf=$var
+	else
+		echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."
+        exit
+	fi;;
+2 )	
+	var=$1
+	if [ -f $var ] ; then
+		canal_conf=$var
+	else 
+		if [ "$1" = "debug" ]; then
+			DEBUG_PORT=$2
+			DEBUG_SUSPEND="n"
+			JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
+		fi
+     fi;;
+* )
+	echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
+	exit;;
+esac
+
+str=`file -L $JAVA | grep 64-bit`
+if [ -n "$str" ]; then
+	JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
+else
+	JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
+fi
+
+JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
+CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
+
+if [ -e $canal_conf -a -e $logback_configurationFile ]
+then 
+	
+	for i in $base/lib/*;
+		do CLASSPATH=$i:"$CLASSPATH";
+	done
+ 	CLASSPATH="$base/conf:$CLASSPATH";
+ 	
+ 	echo "cd to $bin_abs_path for workaround relative path"
+  	cd $bin_abs_path
+ 	
+	echo LOG CONFIGURATION : $logback_configurationFile
+	echo canal conf : $canal_conf 
+	echo CLASSPATH :$CLASSPATH
+	$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.kafka.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
+	echo $! > $base/bin/canal.pid 
+	
+	echo "cd to $current_path for continue"
+  	cd $current_path
+else 
+	echo "canal conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
+fi

+ 65 - 0
kafka/src/main/bin/stop.sh

@@ -0,0 +1,65 @@
+#!/bin/bash
+
+cygwin=false;
+linux=false;
+case "`uname`" in
+    CYGWIN*)
+        cygwin=true
+        ;;
+    Linux*)
+    	linux=true
+    	;;
+esac
+
+get_pid() {	
+	STR=$1
+	PID=$2
+    if $cygwin; then
+        JAVA_CMD="$JAVA_HOME\bin\java"
+        JAVA_CMD=`cygpath --path --unix $JAVA_CMD`
+        JAVA_PID=`ps |grep $JAVA_CMD |awk '{print $1}'`
+    else
+    	if $linux; then
+	        if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps -C java -f --width 1000|grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    else
+	    	if [ ! -z "$PID" ]; then
+	        	JAVA_PID=`ps aux |grep "$STR"|grep "$PID"|grep -v grep|awk '{print $2}'`
+		    else 
+		        JAVA_PID=`ps aux |grep "$STR"|grep -v grep|awk '{print $2}'`
+	        fi
+	    fi
+    fi
+    echo $JAVA_PID;
+}
+
+base=`dirname $0`/..
+pidfile=$base/bin/canal.pid
+if [ ! -f "$pidfile" ];then
+	echo "canal is not running. exists"
+	exit
+fi
+
+pid=`cat $pidfile`
+if [ "$pid" == "" ] ; then
+	pid=`get_pid "appName=otter-canal"`
+fi
+
+echo -e "`hostname`: stopping canal $pid ... "
+kill $pid
+
+LOOPS=0
+while (true); 
+do 
+	gpid=`get_pid "appName=otter-canal" "$pid"`
+    if [ "$gpid" == "" ] ; then
+    	echo "Oook! cost:$LOOPS"
+    	`rm $pidfile`
+    	break;
+    fi
+    let LOOPS=LOOPS+1
+    sleep 1
+done

+ 17 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalLauncher.java

@@ -0,0 +1,17 @@
+package com.alibaba.otter.canal.kafka;
+
+import com.alibaba.otter.canal.kafka.producer.CanalKafkaStarter;
+
+/**
+ * canal-kafka独立版本启动的入口类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalLauncher {
+
+    public static void main(String[] args) {
+        CanalServerStarter.init();
+        CanalKafkaStarter.init();
+    }
+}

+ 78 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalServerStarter.java

@@ -0,0 +1,78 @@
+package com.alibaba.otter.canal.kafka;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.deployer.CanalController;
+
+/**
+ * canal server 启动类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalServerStarter {
+
+    private static final String CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger logger               = LoggerFactory.getLogger(CanalServerStarter.class);
+    private static boolean      running              = false;
+
+    public static void init() {
+        try {
+            logger.info("## set default uncaught exception handler");
+            setGlobalUncaughtExceptionHandler();
+
+            logger.info("## load canal configurations");
+            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
+            Properties properties = new Properties();
+            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
+                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
+                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
+            } else {
+                properties.load(new FileInputStream(conf));
+            }
+
+            logger.info("## start the canal server.");
+            final CanalController controller = new CanalController(properties);
+            controller.start();
+            running = true;
+            logger.info("## the canal server is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the canal server");
+                        running = false;
+                        controller.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping canal Server:", e);
+                    } finally {
+                        logger.info("## canal server is down.");
+                    }
+                }
+
+            });
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the canal Server:", e);
+            System.exit(0);
+        }
+    }
+
+    public static boolean isRunning() {
+        return running;
+    }
+
+    private static void setGlobalUncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("UnCaughtException", e);
+            }
+        });
+    }
+}

+ 76 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

@@ -0,0 +1,76 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * kafka producer 主操作类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalKafkaProducer {
+
+    private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
+
+    private Producer<String, Message> producer;
+
+    public void init(KafkaProperties kafkaProperties) {
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", kafkaProperties.getServers());
+        properties.put("acks", "all");
+        properties.put("retries", kafkaProperties.getRetries());
+        properties.put("batch.size", kafkaProperties.getBatchSize());
+        properties.put("linger.ms", kafkaProperties.getLingerMs());
+        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
+        properties.put("key.serializer", StringSerializer.class.getName());
+        properties.put("value.serializer", MessageSerializer.class.getName());
+        producer = new KafkaProducer<String, Message>(properties);
+    }
+
+    public void stop() {
+        try {
+            logger.info("## stop the kafka producer");
+            producer.close();
+        } catch (Throwable e) {
+            logger.warn("##something goes wrong when stopping kafka producer:", e);
+        } finally {
+            logger.info("## kafka producer is down.");
+        }
+    }
+
+    public void send(Topic topic, Message message) {
+        boolean valid = false;
+        if (message != null && !message.getEntries().isEmpty()) {
+            for (CanalEntry.Entry entry : message.getEntries()) {
+                if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
+                    && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
+                    valid = true;
+                    break;
+                }
+            }
+        }
+        if (!valid) {
+            return;
+        }
+
+        ProducerRecord<String, Message> record;
+        if (topic.getPartition() != null) {
+            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
+        } else {
+            record = new ProducerRecord<String, Message>(topic.getTopic(), message);
+        }
+        producer.send(record);
+        logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
+    }
+}

+ 147 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java

@@ -0,0 +1,147 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import java.io.FileInputStream;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.alibaba.otter.canal.kafka.CanalServerStarter;
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.CanalDestination;
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+
+/**
+ * kafka 启动类
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class CanalKafkaStarter {
+
+    private static final String       CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger       logger               = LoggerFactory.getLogger(CanalKafkaStarter.class);
+
+    private volatile static boolean   running              = false;
+
+    private static ExecutorService    executorService;
+
+    private static CanalKafkaProducer canalKafkaProducer;
+
+    private static KafkaProperties    kafkaProperties;
+
+    public static void init() {
+        try {
+            logger.info("## load kafka configurations");
+            String conf = System.getProperty("kafka.conf", "classpath:kafka.yml");
+
+            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
+                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
+                kafkaProperties = new Yaml().loadAs(CanalKafkaStarter.class.getClassLoader().getResourceAsStream(conf),
+                    KafkaProperties.class);
+            } else {
+                kafkaProperties = new Yaml().loadAs(new FileInputStream(conf), KafkaProperties.class);
+            }
+
+            // 初始化 kafka producer
+            canalKafkaProducer = new CanalKafkaProducer();
+            canalKafkaProducer.init(kafkaProperties);
+
+            // 对应每个instance启动一个worker线程
+            List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();
+
+            executorService = Executors.newFixedThreadPool(destinations.size());
+
+            logger.info("## start the kafka workers.");
+            for (final CanalDestination destination : destinations) {
+                executorService.execute(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        worker(destination);
+                    }
+                });
+            }
+            running = true;
+            logger.info("## the kafka workers is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+
+                public void run() {
+                    try {
+                        logger.info("## stop the kafka workers");
+                        running = false;
+                        executorService.shutdown();
+                        canalKafkaProducer.stop();
+                    } catch (Throwable e) {
+                        logger.warn("##something goes wrong when stopping kafka workers:", e);
+                    } finally {
+                        logger.info("## canal kafka is down.");
+                    }
+                }
+
+            });
+
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the canal kafka workers:", e);
+            System.exit(0);
+        }
+    }
+
+    private static void worker(CanalDestination destination) {
+        while (!running)
+            ;
+        while (!CanalServerStarter.isRunning())
+            ; // 等待server启动完成
+        logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
+        CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
+        ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
+        while (running) {
+            try {
+                if (!server.getCanalInstances().containsKey(clientIdentity.getDestination())) {
+                    try {
+                        Thread.sleep(3000);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                    continue;
+                }
+                server.subscribe(clientIdentity);
+                logger.info("## the canal consumer {} is running now ......", destination.getCanalDestination());
+
+                while (running) {
+                    Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
+                    long batchId = message.getId();
+                    try {
+                        int size = message.getEntries().size();
+                        if (batchId != -1 && size != 0) {
+                            if (!StringUtils.isEmpty(destination.getTopic())) {
+                                Topic topic = new Topic();
+                                topic.setTopic(destination.getTopic());
+                                topic.setPartition(destination.getPartition());
+                                destination.getTopics().add(topic);
+                            }
+                            for (Topic topic : destination.getTopics()) {
+                                canalKafkaProducer.send(topic, message); // 发送message到所有topic
+                            }
+                        }
+
+                        if (batchId != -1) {
+                            server.ack(clientIdentity, batchId); // 提交确认
+                        }
+                    } catch (Exception e) {
+                        server.rollback(clientIdentity);
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("process error!", e);
+            }
+        }
+    }
+}

+ 161 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java

@@ -0,0 +1,161 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * kafka 配置项
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class KafkaProperties {
+
+    private String                 servers           = "localhost:6667";
+    private int                    retries           = 0;
+    private int                    batchSize         = 16384;
+    private int                    lingerMs          = 1;
+    private long                   bufferMemory      = 33554432L;
+
+    private int                    canalBatchSize    = 5;
+
+    private List<CanalDestination> canalDestinations = new ArrayList<CanalDestination>();
+
+    public static class CanalDestination {
+
+        private String     canalDestination;
+        private String     topic;
+        private Integer    partition;
+        private Set<Topic> topics = new HashSet<Topic>();
+
+        public String getCanalDestination() {
+            return canalDestination;
+        }
+
+        public void setCanalDestination(String canalDestination) {
+            this.canalDestination = canalDestination;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public Integer getPartition() {
+            return partition;
+        }
+
+        public void setPartition(Integer partition) {
+            this.partition = partition;
+        }
+
+        public Set<Topic> getTopics() {
+            return topics;
+        }
+
+        public void setTopics(Set<Topic> topics) {
+            this.topics = topics;
+        }
+    }
+
+    public static class Topic {
+
+        private String  topic;
+        private Integer partition;
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public Integer getPartition() {
+            return partition;
+        }
+
+        public void setPartition(Integer partition) {
+            this.partition = partition;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Topic topic1 = (Topic) o;
+
+            if (topic != null ? !topic.equals(topic1.topic) : topic1.topic != null) return false;
+            return partition != null ? partition.equals(topic1.partition) : topic1.partition == null;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = topic != null ? topic.hashCode() : 0;
+            result = 31 * result + (partition != null ? partition.hashCode() : 0);
+            return result;
+        }
+    }
+
+    public String getServers() {
+        return servers;
+    }
+
+    public void setServers(String servers) {
+        this.servers = servers;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public int getLingerMs() {
+        return lingerMs;
+    }
+
+    public void setLingerMs(int lingerMs) {
+        this.lingerMs = lingerMs;
+    }
+
+    public long getBufferMemory() {
+        return bufferMemory;
+    }
+
+    public void setBufferMemory(long bufferMemory) {
+        this.bufferMemory = bufferMemory;
+    }
+
+    public int getCanalBatchSize() {
+        return canalBatchSize;
+    }
+
+    public void setCanalBatchSize(int canalBatchSize) {
+        this.canalBatchSize = canalBatchSize;
+    }
+
+    public List<CanalDestination> getCanalDestinations() {
+        return canalDestinations;
+    }
+
+    public void setCanalDestinations(List<CanalDestination> canalDestinations) {
+        this.canalDestinations = canalDestinations;
+    }
+}

+ 50 - 0
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java

@@ -0,0 +1,50 @@
+package com.alibaba.otter.canal.kafka.producer;
+
+import java.util.Map;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+import org.springframework.util.CollectionUtils;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * Kafka Message类的序列化
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class MessageSerializer implements Serializer<Message> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public byte[] serialize(String topic, Message data) {
+        try {
+            if (data == null) return null;
+            else {
+                CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
+                if (data.getId() != -1 && !CollectionUtils.isEmpty(data.getEntries())) {
+                    for (CanalEntry.Entry entry : data.getEntries()) {
+                        messageBuilder.addMessages(entry.toByteString());
+                    }
+                }
+                CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
+                packetBuilder.setType(CanalPacket.PacketType.MESSAGES);
+                packetBuilder.setBody(messageBuilder.build().toByteString());
+                return packetBuilder.build().toByteArray();
+            }
+        } catch (Exception e) {
+            throw new SerializationException("Error when serializing message to byte[] ");
+        }
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

+ 18 - 0
kafka/src/main/resources/kafka.yml

@@ -0,0 +1,18 @@
+servers: slave1:6667,slave2:6667,slave3:6667
+retries: 0
+batchSize: 16384
+lingerMs: 1
+bufferMemory: 33554432
+# canal的批次大小,单位 k
+canalBatchSize: 50
+
+canalDestinations:
+  - canalDestination: example
+    topic: example
+    partition:
+    # 一个destination可以对应多个topic
+#    topics:
+#      - topics: example
+#        partition:
+
+

+ 85 - 0
kafka/src/main/resources/logback.xml

@@ -0,0 +1,85 @@
+<configuration scan="true" scanPeriod=" 5 seconds">
+	<jmxConfigurator />
+	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+			</pattern>
+		</encoder>
+	</appender>
+	
+	<appender name="CANAL-ROOT" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<Key>destination</Key>
+			<DefaultValue>canal</DefaultValue>
+		</discriminator>
+		<sift>
+			<appender name="FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
+				<File>../logs/${destination}/${destination}.log</File>
+				<rollingPolicy
+					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+					<!-- rollover daily -->
+					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/${destination}-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
+					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+						<!-- or whenever the file size reaches 100MB -->
+						<maxFileSize>512MB</maxFileSize>
+					</timeBasedFileNamingAndTriggeringPolicy>
+					<maxHistory>60</maxHistory>
+				</rollingPolicy>
+				<encoder>
+					<pattern>
+						%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{56} - %msg%n
+					</pattern>
+				</encoder>
+			</appender>
+		</sift>
+	</appender>
+	
+	<appender name="CANAL-META" class="ch.qos.logback.classic.sift.SiftingAppender">
+		<discriminator>
+			<Key>destination</Key>
+			<DefaultValue>canal</DefaultValue>
+		</discriminator>
+		<sift>
+			<appender name="META-FILE-${destination}" class="ch.qos.logback.core.rolling.RollingFileAppender">
+				<File>../logs/${destination}/meta.log</File>
+				<rollingPolicy
+					class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+					<!-- rollover daily -->
+					<fileNamePattern>../logs/${destination}/%d{yyyy-MM-dd}/meta-%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern>
+					<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+						<!-- or whenever the file size reaches 100MB -->
+						<maxFileSize>32MB</maxFileSize>
+					</timeBasedFileNamingAndTriggeringPolicy>
+					<maxHistory>60</maxHistory>
+				</rollingPolicy>
+				<encoder>
+					<pattern>
+						%d{yyyy-MM-dd HH:mm:ss.SSS} - %msg%n
+					</pattern>
+				</encoder>
+			</appender>
+		</sift>
+	</appender>
+	
+    <logger name="com.alibaba.otter.canal.instance" additivity="false">  
+        <level value="INFO" />  
+        <appender-ref ref="CANAL-ROOT" />
+    </logger>
+    <logger name="com.alibaba.otter.canal.deployer" additivity="false">  
+        <level value="INFO" />  
+        <appender-ref ref="CANAL-ROOT" />
+    </logger>
+    <logger name="com.alibaba.otter.canal.meta.FileMixedMetaManager" additivity="false">  
+        <level value="INFO" />
+        <appender-ref ref="CANAL-META" />
+    </logger>
+	<logger name="com.alibaba.otter.canal.kafka" additivity="false">
+		<level value="INFO" />
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
+    
+	<root level="WARN">
+		<appender-ref ref="STDOUT"/>
+		<appender-ref ref="CANAL-ROOT" />
+	</root>
+</configuration>

+ 5 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -88,7 +88,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected TimerTask                              heartBeatTimerTask;
     protected Throwable                              exception                  = null;
 
-    protected boolean                                isGTIDMode                 = false; // 是否是GTID模式
+    protected boolean                                isGTIDMode                 = false;                                   // 是否是GTID模式
 
     protected abstract BinlogParser buildParser();
 
@@ -221,12 +221,13 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         if (isGTIDMode()) {
                             erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
                         } else {
-                            if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
+                            if (StringUtils.isEmpty(startPosition.getJournalName())
+                                && startPosition.getTimestamp() != null) {
                                 erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
                             } else {
                                 erosaConnection.dump(startPosition.getJournalName(),
-                                        startPosition.getPosition(),
-                                        sinkHandler);
+                                    startPosition.getPosition(),
+                                    sinkHandler);
                             }
                         }
 

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

@@ -1,9 +1,9 @@
 package com.alibaba.otter.canal.parse.inbound;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-
 import java.io.IOException;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
+
 /**
  * 通用的Erosa的链接接口, 用于一般化处理mysql/oracle的解析过程
  * 

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -4,12 +4,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -8,8 +8,6 @@ import java.nio.charset.Charset;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpGTIDCommandPacket;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -17,8 +15,10 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpGTIDCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.SemiAckCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;

+ 10 - 10
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -19,30 +19,30 @@ import com.taobao.tddl.dbsync.binlog.LogFetcher;
  */
 public class DirectLogFetcher extends LogFetcher {
 
-    protected static final Logger logger            = LoggerFactory.getLogger(DirectLogFetcher.class);
+    protected static final Logger logger                          = LoggerFactory.getLogger(DirectLogFetcher.class);
 
     // Master heartbeat interval
-    public static final int MASTER_HEARTBEAT_PERIOD_SECONDS = 15;
+    public static final int       MASTER_HEARTBEAT_PERIOD_SECONDS = 15;
     // +10s 确保 timeout > heartbeat interval
-    private static final int READ_TIMEOUT_MILLISECONDS = (MASTER_HEARTBEAT_PERIOD_SECONDS + 10) * 1000;
+    private static final int      READ_TIMEOUT_MILLISECONDS       = (MASTER_HEARTBEAT_PERIOD_SECONDS + 10) * 1000;
 
     /** Command to dump binlog */
-    public static final byte      COM_BINLOG_DUMP   = 18;
+    public static final byte      COM_BINLOG_DUMP                 = 18;
 
     /** Packet header sizes */
-    public static final int       NET_HEADER_SIZE   = 4;
-    public static final int       SQLSTATE_LENGTH   = 5;
+    public static final int       NET_HEADER_SIZE                 = 4;
+    public static final int       SQLSTATE_LENGTH                 = 5;
 
     /** Packet offsets */
-    public static final int       PACKET_LEN_OFFSET = 0;
-    public static final int       PACKET_SEQ_OFFSET = 3;
+    public static final int       PACKET_LEN_OFFSET               = 0;
+    public static final int       PACKET_SEQ_OFFSET               = 3;
 
     /** Maximum packet length */
-    public static final int       MAX_PACKET_LENGTH = (256 * 256 * 256 - 1);
+    public static final int       MAX_PACKET_LENGTH               = (256 * 256 * 256 - 1);
 
     private SocketChannel         channel;
 
-    private boolean               issemi            = false;
+    private boolean               issemi                          = false;
 
     // private BufferedInputStream input;
 

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -467,7 +467,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 // 处理alisql模式的test.heartbeat心跳数据
                 // 心跳表基本无权限,需要mock一个tableMeta
                 FieldMeta idMeta = new FieldMeta("id", "smallint(6)", false, true, null);
-                FieldMeta typeMeta = new FieldMeta("type", "int(11)", true, false, null);
+                FieldMeta typeMeta = new FieldMeta("ts", "int(11)", true, false, null);
                 tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
             }
 

+ 0 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDO.java

@@ -87,7 +87,6 @@ public class MetaHistoryDO {
         this.useSchema = useSchema;
     }
 
-
     public String getExtra() {
         return extra;
     }

+ 30 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/FastsqlSchemaTest.java

@@ -0,0 +1,30 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import com.alibaba.fastsql.sql.repository.SchemaObject;
+import com.alibaba.fastsql.sql.repository.SchemaRepository;
+import com.alibaba.fastsql.util.JdbcConstants;
+
+/**
+ * @author agapple 2018年6月7日 下午5:36:13
+ * @since 3.1.9
+ */
+public class FastsqlSchemaTest {
+
+    @Test
+    public void testSimple() throws FileNotFoundException, IOException {
+        SchemaRepository repository = new SchemaRepository(JdbcConstants.MYSQL);
+        String sql = "create table yushitai_test.card_record ( id bigint auto_increment) auto_increment=256 "
+                     + "; alter table yushitai_test.card_record add column customization_id bigint unsigned NOT NULL COMMENT 'TEST' ;"
+                     + "; rename table yushitai_test.card_record to yushitai_test._card_record_del;";
+        repository.console(sql);
+
+        repository.setDefaultSchema("yushitai_test");
+        SchemaObject table = repository.findTable("_card_record_del");
+        System.out.println(table.getStatement().toString());
+    }
+}

+ 40 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta_DDL_Test.java

@@ -0,0 +1,40 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.URL;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+
+/**
+ * @author agapple 2017年8月1日 下午7:15:54
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations = { "/tsdb/mysql-tsdb.xml" })
+public class MemoryTableMeta_DDL_Test {
+
+    @Test
+    public void test1() throws Throwable {
+        MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
+        URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
+        File dummyFile = new File(url.getFile());
+        File create = new File(dummyFile.getParent() + "/ddl", "ddl_test1.sql");
+        String sql = StringUtils.join(IOUtils.readLines(new FileInputStream(create)), "\n");
+        memoryTableMeta.apply(null, "test", sql, null);
+
+        TableMeta meta = memoryTableMeta.find("yushitai_test", "card_record");
+        System.out.println(meta);
+        Assert.assertNotNull(meta.getFieldMetaByName("customization_id"));
+
+        meta = memoryTableMeta.find("yushitai_test", "_card_record_gho");
+        Assert.assertNull(meta);
+    }
+}

+ 21 - 0
parse/src/test/resources/ddl/ddl_test1.sql

@@ -0,0 +1,21 @@
+create table yushitai_test.card_record (
+id bigint auto_increment,
+last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+hint varchar(64) charset ascii not null,
+value varchar(255) charset ascii not null,
+primary key(id),
+unique key hint_uidx(hint)
+) auto_increment=256 ;
+
+DROP TABLE IF EXISTS _card_record_gho /* generated by server */ ;
+DROP TABLE IF EXISTS _card_record_del /* generated by server */ ;
+
+create /* gh-ost */ table yushitai_test._card_record_gho like yushitai_test.card_record ;
+alter /* gh-ost */ table yushitai_test._card_record_gho add column customization_id bigint unsigned NOT NULL COMMENT 'TEST' ;
+
+create /* gh-ost */ table yushitai_test._card_record_del (
+id int auto_increment primary key
+) engine=InnoDB comment='ghost-cut-over-sentry' ;
+
+DROP TABLE IF EXISTS _card_record_del /* generated by server */ ;
+rename /* gh-ost */ table yushitai_test.card_record to yushitai_test._card_record_del, yushitai_test._card_record_gho to yushitai_test.card_record;

+ 3 - 1
pom.xml

@@ -127,6 +127,8 @@
         <module>client</module>
         <module>deployer</module>
         <module>example</module>
+        <module>kafka</module>
+        <module>kafka-client</module>
     </modules>
 
     <dependencyManagement>
@@ -254,7 +256,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_366</version>
+                <version>2.0.0_preview_371</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

File diff suppressed because it is too large
+ 578 - 565
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java


+ 0 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/position/EntryPosition.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.protocol.position;
 
-
 /**
  * 数据库对象的唯一标示
  * 

+ 17 - 15
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -221,7 +221,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
 
             if (CollectionUtils.isEmpty(events.getEvents())) {
                 logger.debug("get successfully, clientId:{} batchSize:{} but result is null",
-                        clientIdentity.getClientId(), batchSize);
+                    clientIdentity.getClientId(),
+                    batchSize);
                 return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
             } else {
                 // 记录到流式信息
@@ -234,11 +235,11 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 });
                 if (logger.isInfoEnabled()) {
                     logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
-                            clientIdentity.getClientId(),
-                            batchSize,
-                            entrys.size(),
-                            batchId,
-                            events.getPositionRange());
+                        clientIdentity.getClientId(),
+                        batchSize,
+                        entrys.size(),
+                        batchId,
+                        events.getPositionRange());
                 }
                 // 直接提交ack
                 ack(clientIdentity, batchId);
@@ -299,7 +300,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
 
             if (CollectionUtils.isEmpty(events.getEvents())) {
                 logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
-                        clientIdentity.getClientId(), batchSize);
+                    clientIdentity.getClientId(),
+                    batchSize);
                 return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
             } else {
                 // 记录到流式信息
@@ -312,11 +314,11 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 });
                 if (logger.isInfoEnabled()) {
                     logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
-                            clientIdentity.getClientId(),
-                            batchSize,
-                            entrys.size(),
-                            batchId,
-                            events.getPositionRange());
+                        clientIdentity.getClientId(),
+                        batchSize,
+                        entrys.size(),
+                        batchId,
+                        events.getPositionRange());
                 }
                 return new Message(batchId, entrys);
             }
@@ -381,9 +383,9 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
             if (logger.isInfoEnabled()) {
                 logger.info("ack successfully, clientId:{} batchId:{} position:{}",
-                        clientIdentity.getClientId(),
-                        batchId,
-                        positionRanges);
+                    clientIdentity.getClientId(),
+                    batchId,
+                    positionRanges);
             }
         }
 

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/ClientAuthenticationHandler.java

@@ -34,7 +34,7 @@ public class ClientAuthenticationHandler extends SimpleChannelHandler {
 
     private static final Logger     logger                                  = LoggerFactory.getLogger(ClientAuthenticationHandler.class);
     private final int               SUPPORTED_VERSION                       = 3;
-    private final int               defaultSubscriptorDisconnectIdleTimeout = 5 * 60 * 1000;
+    private final int               defaultSubscriptorDisconnectIdleTimeout = 60 * 60 * 1000;
     private CanalServerWithEmbedded embeddedServer;
 
     public ClientAuthenticationHandler(){

+ 5 - 5
sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java

@@ -60,11 +60,11 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
     }
 
     public void clear(Event event) {
-       super.clear(event);
+        super.clear(event);
 
-       //应该先判断2,再判断是否是事务尾,因为事务尾也可以导致txState的状态为2
-       //如果先判断事务尾,那么2的状态可能永远没机会被修改了,系统出现死锁
-       //CanalSinkException被注释的代码是不是可以放开??我们内部使用的时候已经放开了,从代码逻辑的分析上以及实践效果来看,应该抛异常
+        // 应该先判断2,再判断是否是事务尾,因为事务尾也可以导致txState的状态为2
+        // 如果先判断事务尾,那么2的状态可能永远没机会被修改了,系统出现死锁
+        // CanalSinkException被注释的代码是不是可以放开??我们内部使用的时候已经放开了,从代码逻辑的分析上以及实践效果来看,应该抛异常
         if (txState.intValue() == 2) {// 非事务中
             boolean result = txState.compareAndSet(2, 0);
             if (result == false) {
@@ -92,7 +92,7 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
                         return true; // 事务允许通过
                     }
                 } else if (txState.compareAndSet(0, 2)) { // 非事务保护中
-                    //当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
+                    // 当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
                     return true; // DDL/DCL/TransactionEnd允许通过
                 }
             }

Some files were not shown because too many files changed in this diff