Ver código fonte

fixed format

七锋 6 anos atrás
pai
commit
a4255a6377
38 arquivos alterados com 874 adições e 893 exclusões
  1. 1 0
      client/src/main/java/com/alibaba/otter/canal/client/CanalConnector.java
  2. 0 1
      common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java
  3. 2 2
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  4. 1 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  5. 1 1
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java
  6. 2 2
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/GTIDSet.java
  7. 25 60
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MysqlGTIDSet.java
  8. 22 30
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/UUIDSet.java
  9. 18 20
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/BinlogDumpGTIDCommandPacket.java
  10. 2 2
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannelPool.java
  11. 2 2
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/ByteHelper.java
  12. 4 4
      driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java
  13. 35 45
      driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlGTIDSetTest.java
  14. 5 5
      driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/UUIDSetTest.java
  15. 0 1
      example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java
  16. 15 15
      kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java
  17. 2 1
      kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnectors.java
  18. 9 8
      kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/MessageDeserializer.java
  19. 3 3
      kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java
  20. 25 18
      kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/CanalKafkaClientExample.java
  21. 11 8
      kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaClientRunningTest.java
  22. 2 4
      kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalLauncher.java
  23. 9 7
      kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalServerStarter.java
  24. 9 7
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java
  25. 28 24
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java
  26. 13 10
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java
  27. 7 6
      kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java
  28. 5 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  29. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java
  30. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  31. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  32. 10 10
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java
  33. 0 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDO.java
  34. 1 1
      pom.xml
  35. 578 565
      protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java
  36. 0 1
      protocol/src/main/java/com/alibaba/otter/canal/protocol/position/EntryPosition.java
  37. 17 15
      server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java
  38. 5 5
      sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java

+ 1 - 0
client/src/main/java/com/alibaba/otter/canal/client/CanalConnector.java

@@ -152,6 +152,7 @@ public interface CanalConnector {
 
     /**
      * 中断的阻塞,用于优雅停止client
+     * 
      * @throws CanalClientException
      */
     void stopRunning() throws CanalClientException;

+ 0 - 1
common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java

@@ -4,7 +4,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.common.CanalException;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;

+ 2 - 2
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -434,13 +434,13 @@ public class CanalController {
         }
 
         // 启动网络接口
-        if(canalServer!=null) {
+        if (canalServer != null) {
             canalServer.start();
         }
     }
 
     public void stop() throws Throwable {
-        if(canalServer!=null) {
+        if (canalServer != null) {
             canalServer.stop();
         }
 

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -59,6 +59,7 @@ public class CanalLauncher {
 
     private static void setGlobalUncaughtExceptionHandler() {
         Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+
             @Override
             public void uncaughtException(Thread t, Throwable e) {
                 logger.error("UnCaughtException", e);

+ 1 - 1
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java

@@ -53,7 +53,7 @@ public class MysqlConnector {
         String addr = address.getHostString();
         int port = address.getPort();
         this.address = new InetSocketAddress(addr, port);
-        
+
         this.username = username;
         this.password = password;
     }

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/GTIDSet.java

@@ -3,8 +3,7 @@ package com.alibaba.otter.canal.parse.driver.mysql.packets;
 import java.io.IOException;
 
 /**
- * Created by hiwjd on 2018/4/23.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/23. hiwjd0@gmail.com
  */
 public interface GTIDSet {
 
@@ -17,6 +16,7 @@ public interface GTIDSet {
 
     /**
      * 更新当前实例
+     * 
      * @param str
      * @throws Exception
      */

+ 25 - 60
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/MysqlGTIDSet.java

@@ -1,14 +1,14 @@
 package com.alibaba.otter.canal.parse.driver.mysql.packets;
 
-import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
 
 /**
- * Created by hiwjd on 2018/4/23.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/23. hiwjd0@gmail.com
  */
 public class MysqlGTIDSet implements GTIDSet {
 
@@ -56,61 +56,26 @@ public class MysqlGTIDSet implements GTIDSet {
     }
 
     /**
-     * 解析如下格式的字符串为MysqlGTIDSet:
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:2}]
-     *       }
-     *     }
-     *   }
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:4}]
-     *       }
-     *     }
-     *   }
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:4 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:5}]
-     *       }
-     *     }
-     *   }
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:4}, {start:7, stop: 10}]
-     *       }
-     *     }
-     *   }
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3,726757ad-4455-11e8-ae04-0242ac110003:4 =>
-     *   MysqlGTIDSet{
-     *     sets: {
-     *       726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:1, stop:4}]
-     *       },
-     *       726757ad-4455-11e8-ae04-0242ac110003: UUIDSet{
-     *         SID: 726757ad-4455-11e8-ae04-0242ac110002,
-     *         intervals: [{start:4, stop:5}]
-     *       }
-     *     }
-     *   }
+     * 解析如下格式的字符串为MysqlGTIDSet: 726757ad-4455-11e8-ae04-0242ac110002:1 =>
+     * MysqlGTIDSet{ sets: { 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:2}] } }
+     * } 726757ad-4455-11e8-ae04-0242ac110002:1-3 => MysqlGTIDSet{ sets: {
+     * 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}] } }
+     * } 726757ad-4455-11e8-ae04-0242ac110002:1-3:4 => MysqlGTIDSet{ sets: {
+     * 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:5}] } }
+     * } 726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9 => MysqlGTIDSet{ sets: {
+     * 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4},
+     * {start:7, stop: 10}] } } }
+     * 726757ad-4455-11e8-ae04-0242ac110002:1-3,726757
+     * ad-4455-11e8-ae04-0242ac110003:4 => MysqlGTIDSet{ sets: {
+     * 726757ad-4455-11e8-ae04-0242ac110002: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}] },
+     * 726757ad-4455-11e8-ae04-0242ac110003: UUIDSet{ SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:4, stop:5}] } }
+     * }
      *
      * @param gtidData
      * @return

+ 22 - 30
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/UUIDSet.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.parse.driver.mysql.packets;
 
-import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -10,13 +8,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
+import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
+
 /**
- * Created by hiwjd on 2018/4/23.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/23. hiwjd0@gmail.com
  */
 public class UUIDSet {
 
-    public UUID SID;
+    public UUID           SID;
     public List<Interval> intervals;
 
     public byte[] encode() throws IOException {
@@ -54,6 +53,7 @@ public class UUIDSet {
     }
 
     public static class Interval implements Comparable<Interval> {
+
         public long start;
         public long stop;
 
@@ -85,19 +85,15 @@ public class UUIDSet {
     }
 
     /**
-     * 解析如下格式字符串为UUIDSet:
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1 =>
-     *   UUIDSet{SID: 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:2}]}
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3 =>
-     *   UUIDSet{SID: 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}]}
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:4
-     *   UUIDSet{SID: 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:5}]}
-     *
-     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9
-     *   UUIDSet{SID: 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}, {start:7, stop:10}]}
+     * 解析如下格式字符串为UUIDSet: 726757ad-4455-11e8-ae04-0242ac110002:1 => UUIDSet{SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:2}]}
+     * 726757ad-4455-11e8-ae04-0242ac110002:1-3 => UUIDSet{SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4}]}
+     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:4 UUIDSet{SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:5}]}
+     * 726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9 UUIDSet{SID:
+     * 726757ad-4455-11e8-ae04-0242ac110002, intervals: [{start:1, stop:4},
+     * {start:7, stop:10}]}
      *
      * @param str
      * @return
@@ -110,7 +106,7 @@ public class UUIDSet {
         }
 
         List<Interval> intervals = new ArrayList<Interval>();
-        for (int i=1; i<ss.length; i++) {
+        for (int i = 1; i < ss.length; i++) {
             intervals.add(parseInterval(ss[i]));
         }
 
@@ -134,7 +130,7 @@ public class UUIDSet {
                 sb.append(":");
                 sb.append(interval.start);
                 sb.append("-");
-                sb.append(interval.stop-1);
+                sb.append(interval.stop - 1);
             }
         }
 
@@ -142,12 +138,8 @@ public class UUIDSet {
     }
 
     /**
-     * 解析如下格式字符串为Interval:
-     *
-     * 1 => Interval{start:1, stop:2}
-     * 1-3 => Interval{start:1, stop:4}
-     *
-     * 注意!字符串格式表达时[n,m]是两侧都包含的,Interval表达时[n,m)右侧开
+     * 解析如下格式字符串为Interval: 1 => Interval{start:1, stop:2} 1-3 =>
+     * Interval{start:1, stop:4} 注意!字符串格式表达时[n,m]是两侧都包含的,Interval表达时[n,m)右侧开
      *
      * @param str
      * @return
@@ -173,8 +165,8 @@ public class UUIDSet {
     }
 
     /**
-     * 把{start,stop}连续的合并掉:
-     * [{start:1, stop:4},{start:4, stop:5}] => [{start:1, stop:5}]
+     * 把{start,stop}连续的合并掉: [{start:1, stop:4},{start:4, stop:5}] => [{start:1,
+     * stop:5}]
      *
      * @param intervals
      * @return
@@ -183,11 +175,11 @@ public class UUIDSet {
         List<Interval> combined = new ArrayList<Interval>();
         Collections.sort(intervals);
         int len = intervals.size();
-        for (int i=0; i<len; i++) {
+        for (int i = 0; i < len; i++) {
             combined.add(intervals.get(i));
 
             int j;
-            for (j=i+1; j<len; j++) {
+            for (j = i + 1; j < len; j++) {
                 if (intervals.get(i).stop >= intervals.get(j).start) {
                     intervals.get(i).stop = intervals.get(j).stop;
                 } else {

+ 18 - 20
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/packets/client/BinlogDumpGTIDCommandPacket.java

@@ -1,28 +1,26 @@
 package com.alibaba.otter.canal.parse.driver.mysql.packets.client;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
 import com.alibaba.otter.canal.parse.driver.mysql.packets.CommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.utils.ByteHelper;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
 /**
- * Created by hiwjd on 2018/4/24.
- * hiwjd0@gmail.com
- *
+ * Created by hiwjd on 2018/4/24. hiwjd0@gmail.com
  * https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
  */
 public class BinlogDumpGTIDCommandPacket extends CommandPacket {
 
-    public static final int BINLOG_DUMP_NON_BLOCK = 0x01;
+    public static final int BINLOG_DUMP_NON_BLOCK   = 0x01;
     public static final int BINLOG_THROUGH_POSITION = 0x02;
-    public static final int BINLOG_THROUGH_GTID = 0x04;
+    public static final int BINLOG_THROUGH_GTID     = 0x04;
 
-    public long    slaveServerId;
-    public GTIDSet gtidSet;
+    public long             slaveServerId;
+    public GTIDSet          gtidSet;
 
-    public BinlogDumpGTIDCommandPacket() {
+    public BinlogDumpGTIDCommandPacket(){
         setCommand((byte) 0x1e);
     }
 
@@ -51,15 +49,15 @@ public class BinlogDumpGTIDCommandPacket extends CommandPacket {
         // 6. [4] data-size
         ByteHelper.writeUnsignedIntLittleEndian(bs.length, out);
         // 7, [] data
-        //       [8] n_sids // 文档写的是4个字节,其实是8个字节
-        //       for n_sids {
-        //          [16] SID
-        //          [8] n_intervals
-        //          for n_intervals {
-        //             [8] start (signed)
-        //             [8] end (signed)
-        //          }
-        //       }
+        // [8] n_sids // 文档写的是4个字节,其实是8个字节
+        // for n_sids {
+        // [16] SID
+        // [8] n_intervals
+        // for n_intervals {
+        // [8] start (signed)
+        // [8] end (signed)
+        // }
+        // }
         out.write(bs);
         // }
 

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannelPool.java

@@ -29,8 +29,8 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings({ "rawtypes", "deprecation" })
 public abstract class NettySocketChannelPool {
 
-    private static EventLoopGroup              group     = new NioEventLoopGroup();                         // 非阻塞IO线程组
-    private static Bootstrap                   boot      = new Bootstrap();                                 // 主
+    private static EventLoopGroup              group     = new NioEventLoopGroup();                              // 非阻塞IO线程组
+    private static Bootstrap                   boot      = new Bootstrap();                                      // 主
     private static Map<Channel, SocketChannel> chManager = new ConcurrentHashMap<Channel, SocketChannel>();
     private static final Logger                logger    = LoggerFactory.getLogger(NettySocketChannelPool.class);
 

+ 2 - 2
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/ByteHelper.java

@@ -108,7 +108,7 @@ public abstract class ByteHelper {
 
         return out.toByteArray();
     }
-    
+
     public static void write8ByteUnsignedIntLittleEndian(long data, ByteArrayOutputStream out) {
         out.write((byte) (data & 0xFF));
         out.write((byte) (data >>> 8));
@@ -119,7 +119,7 @@ public abstract class ByteHelper {
         out.write((byte) (data >>> 48));
         out.write((byte) (data >>> 56));
     }
-    
+
     public static void writeUnsignedIntLittleEndian(long data, ByteArrayOutputStream out) {
         out.write((byte) (data & 0xFF));
         out.write((byte) (data >>> 8));

+ 4 - 4
driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/utils/PacketManager.java

@@ -14,15 +14,15 @@ public abstract class PacketManager {
     }
 
     public static HeaderPacket readHeader(SocketChannel ch, int len, int timeout) throws IOException {
-    	HeaderPacket header = new HeaderPacket();
-    	header.fromBytes(ch.read(len, timeout));
-    	return header;
+        HeaderPacket header = new HeaderPacket();
+        header.fromBytes(ch.read(len, timeout));
+        return header;
     }
 
     public static byte[] readBytes(SocketChannel ch, int len) throws IOException {
         return ch.read(len);
     }
-    
+
     public static byte[] readBytes(SocketChannel ch, int len, int timeout) throws IOException {
         return ch.read(len, timeout);
     }

+ 35 - 45
driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlGTIDSetTest.java

@@ -1,17 +1,23 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+
 import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.UUIDSet;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.*;
 
 /**
- * Created by hiwjd on 2018/4/25.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/25. hiwjd0@gmail.com
  */
 public class MysqlGTIDSetTest {
 
@@ -20,14 +26,12 @@ public class MysqlGTIDSetTest {
         GTIDSet gtidSet = MysqlGTIDSet.parse("726757ad-4455-11e8-ae04-0242ac110002:1");
         byte[] bytes = gtidSet.encode();
 
-        byte[] expected = {
-                0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x72, 0x67, 0x57, (byte)0xad,
-                0x44, 0x55, 0x11, (byte)0xe8, (byte)0xae, 0x04, 0x02, 0x42, (byte)0xac, 0x11, 0x00, 0x02,
-                0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
-                0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
-        };
+        byte[] expected = { 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x72, 0x67, 0x57, (byte) 0xad, 0x44, 0x55,
+                0x11, (byte) 0xe8, (byte) 0xae, 0x04, 0x02, 0x42, (byte) 0xac, 0x11, 0x00, 0x02, 0x01, 0x00, 0x00,
+                0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
+                0x00, 0x00, 0x00, 0x00 };
 
-        for (int i=0; i<bytes.length; i++) {
+        for (int i = 0; i < bytes.length; i++) {
             assertEquals(expected[i], bytes[i]);
         }
     }
@@ -35,31 +39,17 @@ public class MysqlGTIDSetTest {
     @Test
     public void testParse() {
         Map<String, MysqlGTIDSet> cases = new HashMap<String, MysqlGTIDSet>(5);
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1",
-                buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 2))
-        );
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1-3",
-                buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4))
-        );
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1-3:4",
-                buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 5))
-        );
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9",
-                buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4, 7, 10))
-        );
-        cases.put(
-                "726757ad-4455-11e8-ae04-0242ac110002:1-3,726757ad-4455-11e8-ae04-0242ac110003:4",
-                buildForTest(
-                        Arrays.asList(
-                            new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4),
-                                new Material("726757ad-4455-11e8-ae04-0242ac110003", 4, 5)
-                        )
-                )
-        );
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1",
+            buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 2)));
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1-3",
+            buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4)));
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1-3:4",
+            buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 5)));
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1-3:7-9",
+            buildForTest(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4, 7, 10)));
+        cases.put("726757ad-4455-11e8-ae04-0242ac110002:1-3,726757ad-4455-11e8-ae04-0242ac110003:4",
+            buildForTest(Arrays.asList(new Material("726757ad-4455-11e8-ae04-0242ac110002", 1, 4),
+                new Material("726757ad-4455-11e8-ae04-0242ac110003", 4, 5))));
 
         for (Map.Entry<String, MysqlGTIDSet> entry : cases.entrySet()) {
             MysqlGTIDSet expected = entry.getValue();
@@ -71,7 +61,7 @@ public class MysqlGTIDSetTest {
 
     private static class Material {
 
-        public Material(String uuid, long start, long stop) {
+        public Material(String uuid, long start, long stop){
             this.uuid = uuid;
             this.start = start;
             this.stop = stop;
@@ -79,7 +69,7 @@ public class MysqlGTIDSetTest {
             this.stop1 = 0;
         }
 
-        public Material(String uuid, long start, long stop, long start1, long stop1) {
+        public Material(String uuid, long start, long stop, long start1, long stop1){
             this.uuid = uuid;
             this.start = start;
             this.stop = stop;
@@ -88,10 +78,10 @@ public class MysqlGTIDSetTest {
         }
 
         public String uuid;
-        public long start;
-        public long stop;
-        public long start1;
-        public long stop1;
+        public long   start;
+        public long   stop;
+        public long   start1;
+        public long   stop1;
     }
 
     private MysqlGTIDSet buildForTest(Material material) {

+ 5 - 5
driver/src/test/java/com/alibaba/otter/canal/parse/driver/mysql/UUIDSetTest.java

@@ -1,16 +1,16 @@
 package com.alibaba.otter.canal.parse.driver.mysql;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.UUIDSet;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.UUIDSet;
 
 /**
- * Created by hiwjd on 2018/4/26.
- * hiwjd0@gmail.com
+ * Created by hiwjd on 2018/4/26. hiwjd0@gmail.com
  */
 public class UUIDSetTest {
 

+ 0 - 1
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -13,7 +13,6 @@ import org.springframework.util.Assert;
 import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

+ 15 - 15
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java

@@ -1,14 +1,15 @@
 package com.alibaba.otter.canal.kafka.client;
 
-import com.alibaba.otter.canal.protocol.Message;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
+import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * canal kafka 数据操作客户端
@@ -20,14 +21,13 @@ public class KafkaCanalConnector {
 
     private KafkaConsumer<String, Message> kafkaConsumer;
 
-    private String topic;
-
-    private Integer partition;
+    private String                         topic;
 
+    private Integer                        partition;
 
-    private Properties properties;
+    private Properties                     properties;
 
-    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId) {
+    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId){
         this.topic = topic;
         this.partition = partition;
 
@@ -36,10 +36,11 @@ public class KafkaCanalConnector {
         properties.put("group.id", groupId);
         properties.put("enable.auto.commit", false);
         properties.put("auto.commit.interval.ms", "1000");
-        properties.put("auto.offset.reset", "latest"); //earliest //如果没有offset则从最后的offset开始读
-        properties.put("request.timeout.ms", "40000"); //必须大于session.timeout.ms的设置
-        properties.put("session.timeout.ms", "30000"); //默认为30秒
-        properties.put("max.poll.records", "1"); //所以一次只取一条数据
+        properties.put("auto.offset.reset", "latest"); // earliest
+                                                       // //如果没有offset则从最后的offset开始读
+        properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
+        properties.put("session.timeout.ms", "30000"); // 默认为30秒
+        properties.put("max.poll.records", "1"); // 所以一次只取一条数据
         properties.put("key.deserializer", StringDeserializer.class.getName());
         properties.put("value.deserializer", MessageDeserializer.class.getName());
     }
@@ -110,8 +111,7 @@ public class KafkaCanalConnector {
      * @return
      */
     public Message getWithoutAck(Long timeout, TimeUnit unit) {
-        ConsumerRecords<String, Message> records =
-                kafkaConsumer.poll(unit.toMillis(timeout)); //基于配置,最多只能poll到一条数据
+        ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); // 基于配置,最多只能poll到一条数据
 
         if (!records.isEmpty()) {
             return records.iterator().next().value();

+ 2 - 1
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnectors.java

@@ -7,6 +7,7 @@ package com.alibaba.otter.canal.kafka.client;
  * @version 1.0.0
  */
 public class KafkaCanalConnectors {
+
     /**
      * 创建kafka客户端链接
      *
@@ -28,7 +29,7 @@ public class KafkaCanalConnectors {
      * @param groupId
      * @return
      */
-    public static KafkaCanalConnector newKafkaConnector(String servers, String topic,  String groupId) {
+    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, String groupId) {
         return new KafkaCanalConnector(servers, topic, null, groupId);
     }
 }

+ 9 - 8
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/MessageDeserializer.java

@@ -1,15 +1,16 @@
 package com.alibaba.otter.canal.kafka.client;
 
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalPacket;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.google.protobuf.ByteString;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
 
 /**
  * Kafka Message类的反序列化
@@ -18,6 +19,7 @@ import java.util.Map;
  * @version 1.0.0
  */
 public class MessageDeserializer implements Deserializer<Message> {
+
     private static Logger logger = LoggerFactory.getLogger(MessageDeserializer.class);
 
     @Override
@@ -27,8 +29,7 @@ public class MessageDeserializer implements Deserializer<Message> {
     @Override
     public Message deserialize(String topic, byte[] data) {
         try {
-            if (data == null)
-                return null;
+            if (data == null) return null;
             else {
                 CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
                 switch (p.getType()) {
@@ -58,4 +59,4 @@ public class MessageDeserializer implements Deserializer<Message> {
     public void close() {
         // nothing to do
     }
-}
+}

+ 3 - 3
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/AbstractKafkaTest.java

@@ -10,10 +10,10 @@ import org.junit.Assert;
  */
 public abstract class AbstractKafkaTest {
 
-    public static String topic = "example";
+    public static String  topic     = "example";
     public static Integer partition = null;
-    public static String groupId    = "g1";
-    public static String servers    = "slave1:6667,slave2:6667,slave3:6667";
+    public static String  groupId   = "g1";
+    public static String  servers   = "slave1:6667,slave2:6667,slave3:6667";
 
     public void sleep(long time) {
         try {

+ 25 - 18
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/CanalKafkaClientExample.java

@@ -1,14 +1,15 @@
 package com.alibaba.otter.canal.kafka.client.running;
 
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
-import com.alibaba.otter.canal.protocol.Message;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import java.util.concurrent.TimeUnit;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * Kafka client example
@@ -17,28 +18,32 @@ import java.util.concurrent.TimeUnit;
  * @version 1.0.0
  */
 public class CanalKafkaClientExample {
-    protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaClientExample.class);
 
-    private KafkaCanalConnector connector;
+    protected final static Logger           logger  = LoggerFactory.getLogger(CanalKafkaClientExample.class);
 
-    private static volatile boolean running = false;
+    private KafkaCanalConnector             connector;
 
-    private Thread thread = null;
+    private static volatile boolean         running = false;
+
+    private Thread                          thread  = null;
 
     private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
-        public void uncaughtException(Thread t, Throwable e) {
-            logger.error("parse events has an error", e);
-        }
-    };
 
-    public CanalKafkaClientExample(String servers, String topic, Integer partition, String groupId) {
+                                                        public void uncaughtException(Thread t, Throwable e) {
+                                                            logger.error("parse events has an error", e);
+                                                        }
+                                                    };
+
+    public CanalKafkaClientExample(String servers, String topic, Integer partition, String groupId){
         connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
     }
 
     public static void main(String[] args) {
         try {
             final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(AbstractKafkaTest.servers,
-                    AbstractKafkaTest.topic, AbstractKafkaTest.partition, AbstractKafkaTest.groupId);
+                AbstractKafkaTest.topic,
+                AbstractKafkaTest.partition,
+                AbstractKafkaTest.groupId);
             logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
             kafkaCanalClientExample.start();
             logger.info("## the canal kafka consumer is running now ......");
@@ -56,7 +61,8 @@ public class CanalKafkaClientExample {
                 }
 
             });
-            while (running) ;
+            while (running)
+                ;
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the kafka consumer:", e);
             System.exit(0);
@@ -91,12 +97,13 @@ public class CanalKafkaClientExample {
     }
 
     private void process() {
-        while (!running) ;
+        while (!running)
+            ;
         try {
             connector.subscribe();
             while (running) {
                 try {
-                    Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); //获取message
+                    Message message = connector.getWithoutAck(1L, TimeUnit.SECONDS); // 获取message
                     if (message == null) {
                         continue;
                     }
@@ -129,4 +136,4 @@ public class CanalKafkaClientExample {
         }
         connector.close();
     }
-}
+}

+ 11 - 8
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/KafkaClientRunningTest.java

@@ -1,16 +1,17 @@
 package com.alibaba.otter.canal.kafka.client.running;
 
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
-import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
-import com.alibaba.otter.canal.protocol.Message;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.common.errors.WakeupException;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnector;
+import com.alibaba.otter.canal.kafka.client.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * Kafka consumer获取Message的测试例子
@@ -19,7 +20,8 @@ import java.util.concurrent.TimeUnit;
  * @version 1.0.0
  */
 public class KafkaClientRunningTest extends AbstractKafkaTest {
-    private Logger logger = LoggerFactory.getLogger(KafkaClientRunningTest.class);
+
+    private Logger  logger  = LoggerFactory.getLogger(KafkaClientRunningTest.class);
 
     private boolean running = true;
 
@@ -30,6 +32,7 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
         final KafkaCanalConnector connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
 
         executor.submit(new Runnable() {
+
             @Override
             public void run() {
                 connector.subscribe();
@@ -41,7 +44,7 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
                         }
                         connector.ack();
                     } catch (WakeupException e) {
-                        //ignore
+                        // ignore
                     }
                 }
                 connector.unsubscribe();

+ 2 - 4
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalLauncher.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.kafka;
 
-
 import com.alibaba.otter.canal.kafka.producer.CanalKafkaStarter;
 
 /**
@@ -11,9 +10,8 @@ import com.alibaba.otter.canal.kafka.producer.CanalKafkaStarter;
  */
 public class CanalLauncher {
 
-    public static void main(String[] args)  {
+    public static void main(String[] args) {
         CanalServerStarter.init();
-
         CanalKafkaStarter.init();
     }
-}
+}

+ 9 - 7
kafka/src/main/java/com/alibaba/otter/canal/kafka/CanalServerStarter.java

@@ -1,12 +1,13 @@
 package com.alibaba.otter.canal.kafka;
 
-import com.alibaba.otter.canal.deployer.CanalController;
+import java.io.FileInputStream;
+import java.util.Properties;
+
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileInputStream;
-import java.util.Properties;
+import com.alibaba.otter.canal.deployer.CanalController;
 
 /**
  * canal server 启动类
@@ -15,10 +16,10 @@ import java.util.Properties;
  * @version 1.0.0
  */
 public class CanalServerStarter {
-    private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger = LoggerFactory.getLogger(CanalServerStarter.class);
 
-    private static boolean running = false;
+    private static final String CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger logger               = LoggerFactory.getLogger(CanalServerStarter.class);
+    private static boolean      running              = false;
 
     public static void init() {
         try {
@@ -61,12 +62,13 @@ public class CanalServerStarter {
         }
     }
 
-    public static boolean isRunning(){
+    public static boolean isRunning() {
         return running;
     }
 
     private static void setGlobalUncaughtExceptionHandler() {
         Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+
             @Override
             public void uncaughtException(Thread t, Throwable e) {
                 logger.error("UnCaughtException", e);

+ 9 - 7
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java

@@ -1,8 +1,7 @@
 package com.alibaba.otter.canal.kafka.producer;
 
-import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.Message;
+import java.util.Properties;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -10,7 +9,9 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Properties;
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * kafka producer 主操作类
@@ -19,7 +20,8 @@ import java.util.Properties;
  * @version 1.0.0
  */
 public class CanalKafkaProducer {
-    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
+
+    private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
 
     private Producer<String, Message> producer;
 
@@ -51,7 +53,8 @@ public class CanalKafkaProducer {
         boolean valid = false;
         if (message != null && !message.getEntries().isEmpty()) {
             for (CanalEntry.Entry entry : message.getEntries()) {
-                if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
+                if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
+                    && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
                     valid = true;
                     break;
                 }
@@ -61,7 +64,6 @@ public class CanalKafkaProducer {
             return;
         }
 
-
         ProducerRecord<String, Message> record;
         if (topic.getPartition() != null) {
             record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);

+ 28 - 24
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java

@@ -1,20 +1,21 @@
 package com.alibaba.otter.canal.kafka.producer;
 
+import java.io.FileInputStream;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
 import com.alibaba.otter.canal.kafka.CanalServerStarter;
 import com.alibaba.otter.canal.kafka.producer.KafkaProperties.CanalDestination;
 import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.FileInputStream;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * kafka 启动类
@@ -23,36 +24,36 @@ import java.util.concurrent.Executors;
  * @version 1.0.0
  */
 public class CanalKafkaStarter {
-    private static final String CLASSPATH_URL_PREFIX = "classpath:";
-    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaStarter.class);
 
-    private volatile static boolean running = false;
+    private static final String       CLASSPATH_URL_PREFIX = "classpath:";
+    private static final Logger       logger               = LoggerFactory.getLogger(CanalKafkaStarter.class);
 
-    private static ExecutorService executorService;
+    private volatile static boolean   running              = false;
+
+    private static ExecutorService    executorService;
 
     private static CanalKafkaProducer canalKafkaProducer;
 
-    private static KafkaProperties kafkaProperties;
+    private static KafkaProperties    kafkaProperties;
 
     public static void init() {
         try {
-
             logger.info("## load kafka configurations");
             String conf = System.getProperty("kafka.conf", "classpath:kafka.yml");
 
-
             if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
-                kafkaProperties = new Yaml().loadAs(CanalKafkaStarter.class.getClassLoader().getResourceAsStream(conf), KafkaProperties.class);
+                kafkaProperties = new Yaml().loadAs(CanalKafkaStarter.class.getClassLoader().getResourceAsStream(conf),
+                    KafkaProperties.class);
             } else {
                 kafkaProperties = new Yaml().loadAs(new FileInputStream(conf), KafkaProperties.class);
             }
 
-            //初始化 kafka producer
+            // 初始化 kafka producer
             canalKafkaProducer = new CanalKafkaProducer();
             canalKafkaProducer.init(kafkaProperties);
 
-            //对应每个instance启动一个worker线程
+            // 对应每个instance启动一个worker线程
             List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();
 
             executorService = Executors.newFixedThreadPool(destinations.size());
@@ -60,6 +61,7 @@ public class CanalKafkaStarter {
             logger.info("## start the kafka workers.");
             for (final CanalDestination destination : destinations) {
                 executorService.execute(new Runnable() {
+
                     @Override
                     public void run() {
                         worker(destination);
@@ -69,6 +71,7 @@ public class CanalKafkaStarter {
             running = true;
             logger.info("## the kafka workers is running now ......");
             Runtime.getRuntime().addShutdownHook(new Thread() {
+
                 public void run() {
                     try {
                         logger.info("## stop the kafka workers");
@@ -90,10 +93,11 @@ public class CanalKafkaStarter {
         }
     }
 
-
     private static void worker(CanalDestination destination) {
-        while (!running) ;
-        while (!CanalServerStarter.isRunning()) ; //等待server启动完成
+        while (!running)
+            ;
+        while (!CanalServerStarter.isRunning())
+            ; // 等待server启动完成
         logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
         CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
         ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
@@ -103,7 +107,7 @@ public class CanalKafkaStarter {
                     try {
                         Thread.sleep(3000);
                     } catch (InterruptedException e) {
-                        //ignore
+                        // ignore
                     }
                     continue;
                 }
@@ -123,7 +127,7 @@ public class CanalKafkaStarter {
                                 destination.getTopics().add(topic);
                             }
                             for (Topic topic : destination.getTopics()) {
-                                canalKafkaProducer.send(topic, message); //发送message到所有topic
+                                canalKafkaProducer.send(topic, message); // 发送message到所有topic
                             }
                         }
 

+ 13 - 10
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java

@@ -12,20 +12,22 @@ import java.util.Set;
  * @version 1.0.0
  */
 public class KafkaProperties {
-    private String servers = "localhost:6667";
-    private int retries = 0;
-    private int batchSize = 16384;
-    private int lingerMs = 1;
-    private long bufferMemory = 33554432L;
 
-    private int canalBatchSize = 5;
+    private String                 servers           = "localhost:6667";
+    private int                    retries           = 0;
+    private int                    batchSize         = 16384;
+    private int                    lingerMs          = 1;
+    private long                   bufferMemory      = 33554432L;
+
+    private int                    canalBatchSize    = 5;
 
     private List<CanalDestination> canalDestinations = new ArrayList<CanalDestination>();
 
     public static class CanalDestination {
-        private String canalDestination;
-        private String topic;
-        private Integer partition;
+
+        private String     canalDestination;
+        private String     topic;
+        private Integer    partition;
         private Set<Topic> topics = new HashSet<Topic>();
 
         public String getCanalDestination() {
@@ -62,7 +64,8 @@ public class KafkaProperties {
     }
 
     public static class Topic {
-        private String topic;
+
+        private String  topic;
         private Integer partition;
 
         public String getTopic() {

+ 7 - 6
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java

@@ -1,13 +1,14 @@
 package com.alibaba.otter.canal.kafka.producer;
 
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.CanalPacket;
-import com.alibaba.otter.canal.protocol.Message;
+import java.util.Map;
+
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Serializer;
 import org.springframework.util.CollectionUtils;
 
-import java.util.Map;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * Kafka Message类的序列化
@@ -16,6 +17,7 @@ import java.util.Map;
  * @version 1.0.0
  */
 public class MessageSerializer implements Serializer<Message> {
+
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
     }
@@ -23,8 +25,7 @@ public class MessageSerializer implements Serializer<Message> {
     @Override
     public byte[] serialize(String topic, Message data) {
         try {
-            if (data == null)
-                return null;
+            if (data == null) return null;
             else {
                 CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
                 if (data.getId() != -1 && !CollectionUtils.isEmpty(data.getEntries())) {

+ 5 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -88,7 +88,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected TimerTask                              heartBeatTimerTask;
     protected Throwable                              exception                  = null;
 
-    protected boolean                                isGTIDMode                 = false; // 是否是GTID模式
+    protected boolean                                isGTIDMode                 = false;                                   // 是否是GTID模式
 
     protected abstract BinlogParser buildParser();
 
@@ -221,12 +221,13 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         if (isGTIDMode()) {
                             erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
                         } else {
-                            if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
+                            if (StringUtils.isEmpty(startPosition.getJournalName())
+                                && startPosition.getTimestamp() != null) {
                                 erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
                             } else {
                                 erosaConnection.dump(startPosition.getJournalName(),
-                                        startPosition.getPosition(),
-                                        sinkHandler);
+                                    startPosition.getPosition(),
+                                    sinkHandler);
                             }
                         }
 

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

@@ -1,9 +1,9 @@
 package com.alibaba.otter.canal.parse.inbound;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-
 import java.io.IOException;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
+
 /**
  * 通用的Erosa的链接接口, 用于一般化处理mysql/oracle的解析过程
  * 

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -4,12 +4,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
 import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -8,8 +8,6 @@ import java.nio.charset.Charset;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpGTIDCommandPacket;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -17,8 +15,10 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpGTIDCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.client.SemiAckCommandPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ErrorPacket;

+ 10 - 10
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -19,30 +19,30 @@ import com.taobao.tddl.dbsync.binlog.LogFetcher;
  */
 public class DirectLogFetcher extends LogFetcher {
 
-    protected static final Logger logger            = LoggerFactory.getLogger(DirectLogFetcher.class);
+    protected static final Logger logger                          = LoggerFactory.getLogger(DirectLogFetcher.class);
 
     // Master heartbeat interval
-    public static final int MASTER_HEARTBEAT_PERIOD_SECONDS = 15;
+    public static final int       MASTER_HEARTBEAT_PERIOD_SECONDS = 15;
     // +10s 确保 timeout > heartbeat interval
-    private static final int READ_TIMEOUT_MILLISECONDS = (MASTER_HEARTBEAT_PERIOD_SECONDS + 10) * 1000;
+    private static final int      READ_TIMEOUT_MILLISECONDS       = (MASTER_HEARTBEAT_PERIOD_SECONDS + 10) * 1000;
 
     /** Command to dump binlog */
-    public static final byte      COM_BINLOG_DUMP   = 18;
+    public static final byte      COM_BINLOG_DUMP                 = 18;
 
     /** Packet header sizes */
-    public static final int       NET_HEADER_SIZE   = 4;
-    public static final int       SQLSTATE_LENGTH   = 5;
+    public static final int       NET_HEADER_SIZE                 = 4;
+    public static final int       SQLSTATE_LENGTH                 = 5;
 
     /** Packet offsets */
-    public static final int       PACKET_LEN_OFFSET = 0;
-    public static final int       PACKET_SEQ_OFFSET = 3;
+    public static final int       PACKET_LEN_OFFSET               = 0;
+    public static final int       PACKET_SEQ_OFFSET               = 3;
 
     /** Maximum packet length */
-    public static final int       MAX_PACKET_LENGTH = (256 * 256 * 256 - 1);
+    public static final int       MAX_PACKET_LENGTH               = (256 * 256 * 256 - 1);
 
     private SocketChannel         channel;
 
-    private boolean               issemi            = false;
+    private boolean               issemi                          = false;
 
     // private BufferedInputStream input;
 

+ 0 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/dao/MetaHistoryDO.java

@@ -87,7 +87,6 @@ public class MetaHistoryDO {
         this.useSchema = useSchema;
     }
 
-
     public String getExtra() {
         return extra;
     }

+ 1 - 1
pom.xml

@@ -256,7 +256,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_186</version>
+                <version>2.0.0_preview_371</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>

Diferenças do arquivo suprimidas por serem muito extensas
+ 578 - 565
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java


+ 0 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/position/EntryPosition.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.protocol.position;
 
-
 /**
  * 数据库对象的唯一标示
  * 

+ 17 - 15
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -221,7 +221,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
 
             if (CollectionUtils.isEmpty(events.getEvents())) {
                 logger.debug("get successfully, clientId:{} batchSize:{} but result is null",
-                        clientIdentity.getClientId(), batchSize);
+                    clientIdentity.getClientId(),
+                    batchSize);
                 return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
             } else {
                 // 记录到流式信息
@@ -234,11 +235,11 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 });
                 if (logger.isInfoEnabled()) {
                     logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
-                            clientIdentity.getClientId(),
-                            batchSize,
-                            entrys.size(),
-                            batchId,
-                            events.getPositionRange());
+                        clientIdentity.getClientId(),
+                        batchSize,
+                        entrys.size(),
+                        batchId,
+                        events.getPositionRange());
                 }
                 // 直接提交ack
                 ack(clientIdentity, batchId);
@@ -299,7 +300,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
 
             if (CollectionUtils.isEmpty(events.getEvents())) {
                 logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
-                        clientIdentity.getClientId(), batchSize);
+                    clientIdentity.getClientId(),
+                    batchSize);
                 return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
             } else {
                 // 记录到流式信息
@@ -312,11 +314,11 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
                 });
                 if (logger.isInfoEnabled()) {
                     logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",
-                            clientIdentity.getClientId(),
-                            batchSize,
-                            entrys.size(),
-                            batchId,
-                            events.getPositionRange());
+                        clientIdentity.getClientId(),
+                        batchSize,
+                        entrys.size(),
+                        batchId,
+                        events.getPositionRange());
                 }
                 return new Message(batchId, entrys);
             }
@@ -381,9 +383,9 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
             if (logger.isInfoEnabled()) {
                 logger.info("ack successfully, clientId:{} batchId:{} position:{}",
-                        clientIdentity.getClientId(),
-                        batchId,
-                        positionRanges);
+                    clientIdentity.getClientId(),
+                    batchId,
+                    positionRanges);
             }
         }
 

+ 5 - 5
sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java

@@ -60,11 +60,11 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
     }
 
     public void clear(Event event) {
-       super.clear(event);
+        super.clear(event);
 
-       //应该先判断2,再判断是否是事务尾,因为事务尾也可以导致txState的状态为2
-       //如果先判断事务尾,那么2的状态可能永远没机会被修改了,系统出现死锁
-       //CanalSinkException被注释的代码是不是可以放开??我们内部使用的时候已经放开了,从代码逻辑的分析上以及实践效果来看,应该抛异常
+        // 应该先判断2,再判断是否是事务尾,因为事务尾也可以导致txState的状态为2
+        // 如果先判断事务尾,那么2的状态可能永远没机会被修改了,系统出现死锁
+        // CanalSinkException被注释的代码是不是可以放开??我们内部使用的时候已经放开了,从代码逻辑的分析上以及实践效果来看,应该抛异常
         if (txState.intValue() == 2) {// 非事务中
             boolean result = txState.compareAndSet(2, 0);
             if (result == false) {
@@ -92,7 +92,7 @@ public class TimelineTransactionBarrier extends TimelineBarrier {
                         return true; // 事务允许通过
                     }
                 } else if (txState.compareAndSet(0, 2)) { // 非事务保护中
-                    //当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
+                    // 当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
                     return true; // DDL/DCL/TransactionEnd允许通过
                 }
             }

Alguns arquivos não foram mostrados porque muitos arquivos mudaram nesse diff