1
0
七锋 6 жил өмнө
parent
commit
9344ed121f
46 өөрчлөгдсөн 472 нэмэгдсэн , 478 устгасан
  1. 0 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapter.java
  2. 1 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  3. 3 3
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalOuterAdapterConfiguration.java
  4. 2 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java
  5. 56 34
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java
  6. 10 6
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  7. 5 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/SPI.java
  8. 1 5
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  9. 6 2
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java
  10. 18 7
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java
  11. 5 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/HbaseTemplate.java
  12. 10 26
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/PhType.java
  13. 14 8
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/PhTypeUtil.java
  14. 3 13
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/Type.java
  15. 2 1
      client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java
  16. 4 3
      client-launcher/src/main/java/com/alibaba/otter/canal/client/ClientLauncher.java
  17. 10 18
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java
  18. 2 2
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java
  19. 26 18
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java
  20. 8 7
      client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java
  21. 25 12
      client/src/main/java/com/alibaba/otter/canal/client/CanalMessageDeserializer.java
  22. 3 6
      client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.java
  23. 9 34
      client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java
  24. 1 1
      client/src/main/java/com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor.java
  25. 3 3
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
  26. 3 4
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnectors.java
  27. 0 33
      client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java
  28. 4 5
      client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningMonitor.java
  29. 4 3
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java
  30. 18 15
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
  31. 2 2
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectorProvider.java
  32. 2 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java
  33. 114 114
      client/src/test/java/com/alibaba/otter/canal/client/running/ClientRunningTest.java
  34. 1 2
      client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java
  35. 3 2
      client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/AbstractRocektMQTest.java
  36. 17 18
      client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java
  37. 1 1
      deployer/src/main/resources/canal.properties
  38. 4 3
      protocol/src/main/java/com/alibaba/otter/canal/protocol/ClientIdentity.java
  39. 10 8
      server/src/main/java/com/alibaba/otter/canal/common/CanalMessageSerializer.java
  40. 16 17
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  41. 17 14
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  42. 4 2
      server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java
  43. 4 2
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java
  44. 16 13
      server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java
  45. 1 1
      server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java
  46. 4 3
      server/src/main/java/com/alibaba/otter/canal/spi/CanalMQProducer.java

+ 0 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapter.java

@@ -3,8 +3,6 @@ package com.alibaba.otter.canal.client.adapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * 外部适配器接口

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -24,7 +24,6 @@ public class CanalClientConfig {
 
     private Boolean             flatMessage = true;
 
-
     private List<CanalInstance> canalInstances;
 
     public String getCanalServerHost() {
@@ -122,6 +121,7 @@ public class CanalClientConfig {
     }
 
     public static class MQTopic {
+
         private String      mqMode;
 
         private String      topic;

+ 3 - 3
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalOuterAdapterConfiguration.java

@@ -10,11 +10,11 @@ import java.util.Properties;
  */
 public class CanalOuterAdapterConfiguration {
 
-    private String     name;       // 适配器名称, 如: logger, hbase, es
+    private String     name;      // 适配器名称, 如: logger, hbase, es
 
-    private String     hosts;      // 适配器内部的地址, 比如对应es该参数可以填写es的server地址
+    private String     hosts;     // 适配器内部的地址, 比如对应es该参数可以填写es的server地址
 
-    private String     zkHosts;    // 适配器内部的ZK地址, 比如对应HBase该参数可以填写HBase对应的ZK地址
+    private String     zkHosts;   // 适配器内部的ZK地址, 比如对应HBase该参数可以填写HBase对应的ZK地址
 
     private Properties properties; // 其余参数, 可填写适配器中的所需的配置信息
 

+ 2 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -90,7 +90,7 @@ public class Dml implements Serializable {
 
     @Override
     public String toString() {
-        return "Dml{" + "database='" + database + '\'' + ", table='" + table + '\'' + ", type='" + type + '\'' + ", ts="
-               + ts + ", sql='" + sql + '\'' + ", data=" + data + ", old=" + old + '}';
+        return "Dml{" + "database='" + database + '\'' + ", table='" + table + '\'' + ", type='" + type + '\''
+               + ", ts=" + ts + ", sql='" + sql + '\'' + ", data=" + data + ", old=" + old + '}';
     }
 }

+ 56 - 34
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -1,11 +1,22 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.file.Paths;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
@@ -21,8 +32,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ExtensionLoader<T> {
 
-    private static final Logger                                      logger                     = LoggerFactory
-        .getLogger(ExtensionLoader.class);
+    private static final Logger                                      logger                     = LoggerFactory.getLogger(ExtensionLoader.class);
 
     private static final String                                      SERVICES_DIRECTORY         = "META-INF/services/";
 
@@ -30,8 +40,7 @@ public class ExtensionLoader<T> {
 
     private static final String                                      DEFAULT_CLASSLOADER_POLICY = "internal";
 
-    private static final Pattern                                     NAME_SEPARATOR             = Pattern
-        .compile("\\s*[,]+\\s*");
+    private static final Pattern                                     NAME_SEPARATOR             = Pattern.compile("\\s*[,]+\\s*");
 
     private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS          = new ConcurrentHashMap<>();
 
@@ -262,8 +271,7 @@ public class ExtensionLoader<T> {
             return instance;
         } catch (Throwable t) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
-                                            + ")  could not be instantiated: " + t.getMessage(),
-                t);
+                                            + ")  could not be instantiated: " + t.getMessage(), t);
         }
     }
 
@@ -271,8 +279,8 @@ public class ExtensionLoader<T> {
         if (type == null) throw new IllegalArgumentException("Extension type == null");
         if (name == null) throw new IllegalArgumentException("Extension name == null");
         Class<?> clazz = getExtensionClasses().get(name);
-        if (clazz == null)
-            throw new IllegalStateException("No such extension \"" + name + "\" for " + type.getName() + "!");
+        if (clazz == null) throw new IllegalStateException("No such extension \"" + name + "\" for " + type.getName()
+                                                           + "!");
         return clazz;
     }
 
@@ -296,10 +304,17 @@ public class ExtensionLoader<T> {
             throw new IllegalStateException("failed to get class loader resource");
         }
         String dirtyPath = url.toString();
-        String jarPath = dirtyPath.replaceAll("^.*file:/", ""); // removes file:/ and everything before it
-        jarPath = jarPath.replaceAll("jar!.*", "jar"); // removes everything after .jar, if .jar exists in dirtyPath
-        jarPath = jarPath.replaceAll("%20", " "); // necessary if path has spaces within
-        if (!jarPath.endsWith(".jar")) { // this is needed if you plan to run the app using Spring Tools Suit play
+        String jarPath = dirtyPath.replaceAll("^.*file:/", ""); // removes
+                                                                // file:/ and
+                                                                // everything
+                                                                // before it
+        jarPath = jarPath.replaceAll("jar!.*", "jar"); // removes everything
+                                                       // after .jar, if .jar
+                                                       // exists in dirtyPath
+        jarPath = jarPath.replaceAll("%20", " "); // necessary if path has
+                                                  // spaces within
+        if (!jarPath.endsWith(".jar")) { // this is needed if you plan to run
+                                         // the app using Spring Tools Suit play
                                          // button.
             jarPath = jarPath.replaceAll("/classes/.*", "/classes/");
         }
@@ -327,8 +342,8 @@ public class ExtensionLoader<T> {
         logger.info("extension classpath dir: " + dir);
         File externalLibDir = new File(dir);
         if (!externalLibDir.exists()) {
-            externalLibDir = new File(
-                File.separator + this.getJarDirectoryPath() + File.separator + "canal_client" + File.separator + "lib");
+            externalLibDir = new File(File.separator + this.getJarDirectoryPath() + File.separator + "canal_client"
+                                      + File.separator + "lib");
         }
         if (externalLibDir.exists()) {
             File[] files = externalLibDir.listFiles(new FilenameFilter() {
@@ -363,7 +378,8 @@ public class ExtensionLoader<T> {
                                 if (name.startsWith("java.") || name.startsWith("org.slf4j.")
                                     || name.startsWith("org.apache.logging")
                                     || name.startsWith("org.apache.commons.logging.")) {
-                                    // || name.startsWith("org.apache.hadoop.")) {
+                                    // || name.startsWith("org.apache.hadoop."))
+                                    // {
                                     c = super.loadClass(name);
                                 }
                                 if (c != null) return c;
@@ -386,7 +402,8 @@ public class ExtensionLoader<T> {
                                 @SuppressWarnings("unchecked")
                                 Enumeration<URL>[] tmp = (Enumeration<URL>[]) new Enumeration<?>[2];
 
-                                tmp[0] = findResources(name); // local class path first
+                                tmp[0] = findResources(name); // local class
+                                                              // path first
                                 // tmp[1] = super.getResources(name);
 
                                 return new CompoundEnumeration<>(tmp);
@@ -474,12 +491,16 @@ public class ExtensionLoader<T> {
                                         }
                                         if (line.length() > 0) {
                                             Class<?> clazz = classLoader.loadClass(line);
-                                            // Class<?> clazz = Class.forName(line, true, classLoader);
+                                            // Class<?> clazz =
+                                            // Class.forName(line, true,
+                                            // classLoader);
                                             if (!type.isAssignableFrom(clazz)) {
-                                                throw new IllegalStateException(
-                                                    "Error when load extension class(interface: " + type
-                                                                                + ", class line: " + clazz.getName()
-                                                                                + "), class " + clazz.getName()
+                                                throw new IllegalStateException("Error when load extension class(interface: "
+                                                                                + type
+                                                                                + ", class line: "
+                                                                                + clazz.getName()
+                                                                                + "), class "
+                                                                                + clazz.getName()
                                                                                 + "is not subtype of interface.");
                                             } else {
                                                 try {
@@ -497,9 +518,9 @@ public class ExtensionLoader<T> {
                                                                 extensionClasses.put(n, clazz);
                                                             } else if (c != clazz) {
                                                                 cachedNames.remove(clazz);
-                                                                throw new IllegalStateException(
-                                                                    "Duplicate extension " + type.getName() + " name "
-                                                                                                + n + " on "
+                                                                throw new IllegalStateException("Duplicate extension "
+                                                                                                + type.getName()
+                                                                                                + " name " + n + " on "
                                                                                                 + c.getName() + " and "
                                                                                                 + clazz.getName());
                                                             }
@@ -509,9 +530,12 @@ public class ExtensionLoader<T> {
                                             }
                                         }
                                     } catch (Throwable t) {
-                                        IllegalStateException e = new IllegalStateException(
-                                            "Failed to load extension class(interface: " + type + ", class line: "
-                                                                                            + line + ") in " + url
+                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: "
+                                                                                            + type
+                                                                                            + ", class line: "
+                                                                                            + line
+                                                                                            + ") in "
+                                                                                            + url
                                                                                             + ", cause: "
                                                                                             + t.getMessage(),
                                             t);
@@ -526,15 +550,13 @@ public class ExtensionLoader<T> {
                         }
                     } catch (Throwable t) {
                         logger.error("Exception when load extension class(interface: " + type + ", class file: " + url
-                                     + ") in " + url,
-                            t);
+                                     + ") in " + url, t);
                     }
                 } // end of while urls
             }
         } catch (Throwable t) {
-            logger.error(
-                "Exception when load extension class(interface: " + type + ", description file: " + fileName + ").",
-                t);
+            logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName
+                         + ").", t);
         }
     }
 

+ 10 - 6
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -1,6 +1,11 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -81,11 +86,10 @@ public class MessageUtil {
                         Map<String, Object> rowOld = new LinkedHashMap<>();
                         for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             if (updateSet.contains(column.getName())) {
-                                rowOld.put(column.getName(),
-                                    JdbcTypeUtil.typeConvert(column.getName(),
-                                        column.getValue(),
-                                        column.getSqlType(),
-                                        column.getMysqlType()));
+                                rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(column.getName(),
+                                    column.getValue(),
+                                    column.getSqlType(),
+                                    column.getMysqlType()));
                             }
                         }
                         // update操作将记录修改前的值

+ 5 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/SPI.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.lang.annotation.*;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
 @Documented
 @Retention(RetentionPolicy.RUNTIME)

+ 1 - 5
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -4,7 +4,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.alibaba.otter.canal.protocol.FlatMessage;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -17,9 +16,7 @@ import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfigLoader;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
 import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
-import com.alibaba.otter.canal.protocol.Message;
 
 /**
  * HBase外部适配器
@@ -45,8 +42,7 @@ public class HbaseAdapter implements CanalOuterAdapter {
                         mappingConfigCache = new HashMap<>();
                         for (MappingConfig mappingConfig : hbaseMapping.values()) {
                             mappingConfigCache.put(mappingConfig.getHbaseOrm().getDatabase() + "-"
-                                                   + mappingConfig.getHbaseOrm().getTable(),
-                                mappingConfig);
+                                                   + mappingConfig.getHbaseOrm().getTable(), mappingConfig);
                         }
                     }
                 }

+ 6 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.hbase.config;
 
-import java.util.*;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 
 /**
  * HBase表映射配置
@@ -117,7 +121,7 @@ public class MappingConfig {
     }
 
     public enum Mode {
-                      STRING("STRING"), NATIVE("NATIVE"), PHOENIX("PHOENIX");
+        STRING("STRING"), NATIVE("NATIVE"), PHOENIX("PHOENIX");
 
         private String type;
 

+ 18 - 7
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.hbase.service;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -8,7 +12,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.hbase.support.*;
+import com.alibaba.otter.canal.client.adapter.hbase.support.HRow;
+import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
+import com.alibaba.otter.canal.client.adapter.hbase.support.PhType;
+import com.alibaba.otter.canal.client.adapter.hbase.support.PhTypeUtil;
+import com.alibaba.otter.canal.client.adapter.hbase.support.Type;
+import com.alibaba.otter.canal.client.adapter.hbase.support.TypeUtil;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
 /**
@@ -63,7 +72,8 @@ public class HbaseSyncService {
         MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
 
         // if (!validHTable(config)) {
-        // logger.error("HBase table '{}' not exists", hbaseOrm.getHbaseTable());
+        // logger.error("HBase table '{}' not exists",
+        // hbaseOrm.getHbaseTable());
         // return;
         // }
         int i = 1;
@@ -156,7 +166,8 @@ public class HbaseSyncService {
         MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
 
         // if (!validHTable(config)) {
-        // logger.error("HBase table '{}' not exists", hbaseOrm.getHbaseTable());
+        // logger.error("HBase table '{}' not exists",
+        // hbaseOrm.getHbaseTable());
         // return;
         // }
 
@@ -248,7 +259,8 @@ public class HbaseSyncService {
         MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
 
         // if (!validHTable(config)) {
-        // logger.error("HBase table '{}' not exists", hbaseOrm.getHbaseTable());
+        // logger.error("HBase table '{}' not exists",
+        // hbaseOrm.getHbaseTable());
         // return;
         // }
 
@@ -363,8 +375,7 @@ public class HbaseSyncService {
      * @param value 值
      * @return 复合字段rowKey
      */
-    private static byte[] typeConvert(MappingConfig.ColumnItem columnItem, MappingConfig.HbaseOrm hbaseOrm,
-                                      Object value) {
+    private static byte[] typeConvert(MappingConfig.ColumnItem columnItem, MappingConfig.HbaseOrm hbaseOrm, Object value) {
         if (value == null) {
             return null;
         }

+ 5 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/HbaseTemplate.java

@@ -8,7 +8,11 @@ import java.util.Set;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+ 10 - 26
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/PhType.java

@@ -13,37 +13,21 @@ import java.util.Date;
  * @version 1.0.0
  */
 public enum PhType {
-    DEFAULT(-1, "VARCHAR"),
-    UNSIGNED_INT(4, "UNSIGNED_INT"),
-    UNSIGNED_LONG(8, "UNSIGNED_LONG"),
-    UNSIGNED_TINYINT(1, "UNSIGNED_TINYINT"),
-    UNSIGNED_SMALLINT(2, "UNSIGNED_SMALLINT"),
-    UNSIGNED_FLOAT(4, "UNSIGNED_FLOAT"),
-    UNSIGNED_DOUBLE(8, "UNSIGNED_DOUBLE"),
-    INTEGER(4, "INTEGER"),
-    BIGINT(8, "BIGINT"),
-    TINYINT(1, "TINYINT"),
-    SMALLINT(2, "SMALLINT"),
-    FLOAT(4, "FLOAT"),
-    DOUBLE(8, "DOUBLE"),
-    DECIMAL(-1, "DECIMAL"),
-    BOOLEAN(1, "BOOLEAN"),
-    UNSIGNED_TIME(8, "UNSIGNED_TIME"),
-    UNSIGNED_DATE(8, "UNSIGNED_DATE"),
-    UNSIGNED_TIMESTAMP(12, "UNSIGNED_TIMESTAMP"),
-    TIME(8, "TIME"),
-    DATE(8, "DATE"),
-    TIMESTAMP(12, "TIMESTAMP"),
-    VARCHAR(-1, "VARCHAR"),
-    VARBINARY(-1, "VARBINARY");
+    DEFAULT(-1, "VARCHAR"), UNSIGNED_INT(4, "UNSIGNED_INT"), UNSIGNED_LONG(8, "UNSIGNED_LONG"),
+    UNSIGNED_TINYINT(1, "UNSIGNED_TINYINT"), UNSIGNED_SMALLINT(2, "UNSIGNED_SMALLINT"),
+    UNSIGNED_FLOAT(4, "UNSIGNED_FLOAT"), UNSIGNED_DOUBLE(8, "UNSIGNED_DOUBLE"), INTEGER(4, "INTEGER"),
+    BIGINT(8, "BIGINT"), TINYINT(1, "TINYINT"), SMALLINT(2, "SMALLINT"), FLOAT(4, "FLOAT"), DOUBLE(8, "DOUBLE"),
+    DECIMAL(-1, "DECIMAL"), BOOLEAN(1, "BOOLEAN"), UNSIGNED_TIME(8, "UNSIGNED_TIME"),
+    UNSIGNED_DATE(8, "UNSIGNED_DATE"), UNSIGNED_TIMESTAMP(12, "UNSIGNED_TIMESTAMP"), TIME(8, "TIME"), DATE(8, "DATE"),
+    TIMESTAMP(12, "TIMESTAMP"), VARCHAR(-1, "VARCHAR"), VARBINARY(-1, "VARBINARY");
 
     /**
      * -1:长度可变
      */
-    private int len;
+    private int    len;
     private String type;
 
-    PhType(int len, String type) {
+    PhType(int len, String type){
         this.len = len;
         this.type = type;
     }
@@ -87,7 +71,7 @@ public enum PhType {
             phType = VARCHAR;
         } else if (BigDecimal.class.isAssignableFrom(javaType)) {
             phType = DECIMAL;
-        }  else if (BigInteger.class.isAssignableFrom(javaType)) {
+        } else if (BigInteger.class.isAssignableFrom(javaType)) {
             phType = UNSIGNED_LONG;
         } else {
             phType = DEFAULT;

+ 14 - 8
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/PhTypeUtil.java

@@ -169,7 +169,8 @@ public class PhTypeUtil {
 
     private static int encodeInt(int v, byte[] b, int o) {
         checkForSufficientLength(b, o, Bytes.SIZEOF_INT);
-        b[o + 0] = (byte) ((v >> 24) ^ 0x80); // Flip sign bit so that INTEGER is binary comparable
+        b[o + 0] = (byte) ((v >> 24) ^ 0x80); // Flip sign bit so that INTEGER
+                                              // is binary comparable
         b[o + 1] = (byte) (v >> 16);
         b[o + 2] = (byte) (v >> 8);
         b[o + 3] = (byte) v;
@@ -209,7 +210,8 @@ public class PhTypeUtil {
 
     private static int encodeLong(long v, byte[] b, int o) {
         checkForSufficientLength(b, o, Bytes.SIZEOF_LONG);
-        b[o + 0] = (byte) ((v >> 56) ^ 0x80); // Flip sign bit so that INTEGER is binary comparable
+        b[o + 0] = (byte) ((v >> 56) ^ 0x80); // Flip sign bit so that INTEGER
+                                              // is binary comparable
         b[o + 1] = (byte) (v >> 48);
         b[o + 2] = (byte) (v >> 40);
         b[o + 3] = (byte) (v >> 32);
@@ -255,7 +257,8 @@ public class PhTypeUtil {
 
     private static int encodeShort(short v, byte[] b, int o) {
         checkForSufficientLength(b, o, Bytes.SIZEOF_SHORT);
-        b[o + 0] = (byte) ((v >> 8) ^ 0x80); // Flip sign bit so that Short is binary comparable
+        b[o + 0] = (byte) ((v >> 8) ^ 0x80); // Flip sign bit so that Short is
+                                             // binary comparable
         b[o + 1] = (byte) v;
         return Bytes.SIZEOF_SHORT;
     }
@@ -287,7 +290,8 @@ public class PhTypeUtil {
 
     private static int encodeByte(byte v, byte[] b, int o) {
         checkForSufficientLength(b, o, Bytes.SIZEOF_BYTE);
-        b[o] = (byte) (v ^ 0x80); // Flip sign bit so that Short is binary comparable
+        b[o] = (byte) (v ^ 0x80); // Flip sign bit so that Short is binary
+                                  // comparable
         return Bytes.SIZEOF_BYTE;
     }
 
@@ -546,7 +550,8 @@ public class PhTypeUtil {
             multiplyBy = 10;
             divideBy = BigInteger.TEN;
         }
-        // Normalize the scale based on what is necessary to end up with a base 100
+        // Normalize the scale based on what is necessary to end up with a base
+        // 100
         // decimal (i.e. 10.123e3)
         int digitOffset;
         BigInteger compareAgainst;
@@ -564,7 +569,8 @@ public class PhTypeUtil {
             if (length <= MAX_BIG_DECIMAL_BYTES) {
                 result[--index] = NEG_TERMINAL_BYTE;
             } else {
-                // Adjust length and offset down because we don't have enough room
+                // Adjust length and offset down because we don't have enough
+                // room
                 length = MAX_BIG_DECIMAL_BYTES;
                 index = offset + length;
             }
@@ -593,8 +599,8 @@ public class PhTypeUtil {
 
     private static void checkForSufficientLength(byte[] b, int offset, int requiredLength) {
         if (b.length < offset + requiredLength) {
-            throw new RuntimeException(
-                "Expected length of at least " + requiredLength + " bytes, but had " + (b.length - offset));
+            throw new RuntimeException("Expected length of at least " + requiredLength + " bytes, but had "
+                                       + (b.length - offset));
         }
     }
 

+ 3 - 13
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/Type.java

@@ -11,22 +11,12 @@ import java.util.Date;
  * @version 1.0.0
  */
 public enum Type {
-    DEFAULT("STRING"),
-    STRING("STRING"),
-    INTEGER("INTEGER"),
-    LONG("LONG"),
-    SHORT("SHORT"),
-    BOOLEAN("BOOLEAN"),
-    FLOAT("FLOAT"),
-    DOUBLE("DOUBLE"),
-    BIGDECIMAL("BIGDECIMAL"),
-    DATE("DATE"),
-    BYTE("BYTE"),
-    BYTES("BYTES");
+    DEFAULT("STRING"), STRING("STRING"), INTEGER("INTEGER"), LONG("LONG"), SHORT("SHORT"), BOOLEAN("BOOLEAN"),
+    FLOAT("FLOAT"), DOUBLE("DOUBLE"), BIGDECIMAL("BIGDECIMAL"), DATE("DATE"), BYTE("BYTE"), BYTES("BYTES");
 
     private String type;
 
-    Type(String type) {
+    Type(String type){
         this.type = type;
     }
 

+ 2 - 1
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -14,7 +14,8 @@ import com.alibaba.otter.canal.client.adapter.support.SPI;
  * @author machengyuan 2018-8-19 下午11:45:38
  * @version 1.0.0
  */
-@SPI("logger") // logger参数对应CanalOuterAdapterConfiguration配置中的name
+@SPI("logger")
+// logger参数对应CanalOuterAdapterConfiguration配置中的name
 public class LoggerAdapterExample implements CanalOuterAdapter {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());

+ 4 - 3
client-launcher/src/main/java/com/alibaba/otter/canal/client/ClientLauncher.java

@@ -1,13 +1,14 @@
 package com.alibaba.otter.canal.client;
 
-import com.alibaba.otter.canal.client.adapter.loader.CanalAdapterLoader;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import java.io.FileInputStream;
+
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
-import java.io.FileInputStream;
+import com.alibaba.otter.canal.client.adapter.loader.CanalAdapterLoader;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 
 public class ClientLauncher {
 

+ 10 - 18
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java

@@ -26,10 +26,10 @@ public abstract class AbstractCanalAdapterWorker {
 
     protected final Logger                    logger  = LoggerFactory.getLogger(this.getClass());
 
-    protected String                          canalDestination;                                                 // canal实例
-    protected List<List<CanalOuterAdapter>>   canalOuterAdapters;                                               // 外部适配器
-    protected ExecutorService                 groupInnerExecutorService;                                        // 组内工作线程池
-    protected volatile boolean                running = false;                                                  // 是否运行中
+    protected String                          canalDestination;                                  // canal实例
+    protected List<List<CanalOuterAdapter>>   canalOuterAdapters;                                // 外部适配器
+    protected ExecutorService                 groupInnerExecutorService;                         // 组内工作线程池
+    protected volatile boolean                running = false;                                   // 是否运行中
     protected Thread                          thread  = null;
     protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
 
@@ -129,31 +129,22 @@ public abstract class AbstractCanalAdapterWorker {
         }
     }
 
-    protected void writeOut(Message message,String topic){
+    protected void writeOut(Message message, String topic) {
         if (logger.isDebugEnabled()) {
-            logger.debug("topic: {} batchId: {} batchSize: {} ",
-                topic,
-                message.getId(),
-                message.getEntries().size());
+            logger.debug("topic: {} batchId: {} batchSize: {} ", topic, message.getId(), message.getEntries().size());
         }
         long begin = System.currentTimeMillis();
         writeOut(message);
         long now = System.currentTimeMillis();
         if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
-            logger.error("topic: {} batchId {} elapsed time: {} ms",
-                topic,
-                message.getId(),
-                now - begin);
+            logger.error("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
         }
         if (logger.isDebugEnabled()) {
-            logger.debug("topic: {} batchId {} elapsed time: {} ms",
-                topic,
-                message.getId(),
-                now - begin);
+            logger.debug("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
         }
     }
 
-    protected void stopOutAdapters(){
+    protected void stopOutAdapters() {
         if (thread != null) {
             try {
                 thread.join();
@@ -170,6 +161,7 @@ public abstract class AbstractCanalAdapterWorker {
         }
         logger.info("topic all connectors destroyed!");
     }
+
     public abstract void start();
 
     public abstract void stop();

+ 2 - 2
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -30,7 +30,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     private boolean             flatMessage;
 
     public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
-        List<List<CanalOuterAdapter>> canalOuterAdapters, boolean flatMessage){
+                                   List<List<CanalOuterAdapter>> canalOuterAdapters, boolean flatMessage){
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
@@ -171,4 +171,4 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
         connector.disconnect();
         logger.info("=============> Disconnect topic: {} <=============", this.topic);
     }
-}
+}

+ 26 - 18
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -1,9 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.loader;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
@@ -12,10 +8,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+
 /**
  * MQ外部适配器的加载器
  *
@@ -23,17 +25,17 @@ import org.slf4j.LoggerFactory;
  */
 public class CanalAdapterLoader {
 
-    private static final Logger logger = LoggerFactory.getLogger(CanalAdapterLoader.class);
+    private static final Logger                     logger        = LoggerFactory.getLogger(CanalAdapterLoader.class);
 
-    private CanalClientConfig canalClientConfig;
+    private CanalClientConfig                       canalClientConfig;
 
-    private Map<String, CanalAdapterWorker> canalWorkers = new HashMap<>();
+    private Map<String, CanalAdapterWorker>         canalWorkers  = new HashMap<>();
 
     private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
 
-    private ExtensionLoader<CanalOuterAdapter> loader;
+    private ExtensionLoader<CanalOuterAdapter>      loader;
 
-    public CanalAdapterLoader(CanalClientConfig canalClientConfig) {
+    public CanalAdapterLoader(CanalClientConfig canalClientConfig){
         this.canalClientConfig = canalClientConfig;
     }
 
@@ -46,8 +48,14 @@ public class CanalAdapterLoader {
             throw new RuntimeException("Blank config property: canalInstances or canalMQTopics");
         }
 
-        loader = ExtensionLoader.getExtensionLoader(CanalOuterAdapter.class,
-            "" /* TODO canalClientConfig.getClassloaderPolicy() */);
+        loader = ExtensionLoader.getExtensionLoader(CanalOuterAdapter.class, "" /*
+                                                                                 * TODO
+                                                                                 * canalClientConfig
+                                                                                 * .
+                                                                                 * getClassloaderPolicy
+                                                                                 * (
+                                                                                 * )
+                                                                                 */);
 
         String canalServerHost = this.canalClientConfig.getCanalServerHost();
         SocketAddress sa = null;
@@ -94,24 +102,23 @@ public class CanalAdapterLoader {
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
                     if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(
-                            canalClientConfig.getBootstrapServers(),
+                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig.getBootstrapServers(),
                             topic.getTopic(),
                             group.getGroupId(),
                             canalOuterAdapterGroups);
                         canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                         rocketMQWorker.start();
                     } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(
-                            canalClientConfig.getBootstrapServers(),
+                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig.getBootstrapServers(),
                             topic.getTopic(),
                             group.getGroupId(),
-                            canalOuterAdapterGroups,canalClientConfig.getFlatMessage());
+                            canalOuterAdapterGroups,
+                            canalClientConfig.getFlatMessage());
                         canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
                         canalKafkaWorker.start();
                     }
-                    logger.info("Start adapter for canal-client rocketmq topic: {} succeed",
-                        topic.getTopic() + "-" + group.getGroupId());
+                    logger.info("Start adapter for canal-client rocketmq topic: {} succeed", topic.getTopic() + "-"
+                                                                                             + group.getGroupId());
 
                 }
             }
@@ -156,6 +163,7 @@ public class CanalAdapterLoader {
             for (AbstractCanalAdapterWorker tmp : canalMQWorker.values()) {
                 final AbstractCanalAdapterWorker worker = tmp;
                 stopMQWokerService.submit(new Runnable() {
+
                     @Override
                     public void run() {
                         worker.stop();

+ 8 - 7
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java

@@ -1,17 +1,18 @@
 package com.alibaba.otter.canal.client.adapter.loader;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
-import com.alibaba.otter.canal.protocol.Message;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.errors.WakeupException;
 
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.protocol.Message;
+
 /**
  * kafka对应的client适配器工作线程
  *
@@ -22,10 +23,10 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
 
     private RocketMQCanalConnector connector;
 
-    private String topic;
+    private String                 topic;
 
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
-        List<List<CanalOuterAdapter>> canalOuterAdapters) {
+                                      List<List<CanalOuterAdapter>> canalOuterAdapters){
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());

+ 25 - 12
client/src/main/java/com/alibaba/otter/canal/client/CanalMessageDeserializer.java

@@ -2,16 +2,19 @@ package com.alibaba.otter.canal.client;
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalPacket;
+import com.alibaba.otter.canal.protocol.CanalPacket.Ack;
+import com.alibaba.otter.canal.protocol.CanalPacket.Compression;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.google.protobuf.ByteString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class CanalMessageDeserializer {
-    private static Logger logger = LoggerFactory.getLogger(CanalMessageDeserializer.class);
 
     public static Message deserializer(byte[] data) {
+        return deserializer(data, false);
+    }
+
+    public static Message deserializer(byte[] data, boolean lazyParseEntry) {
         try {
             if (data == null) {
                 return null;
@@ -19,24 +22,34 @@ public class CanalMessageDeserializer {
                 CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
                 switch (p.getType()) {
                     case MESSAGES: {
-//                        if (!p.getCompression().equals(CanalPacket.Compression.NONE)) {
-//                            throw new CanalClientException("compression is not supported in this connector");
-//                        }
+                        if (!p.getCompression().equals(Compression.NONE)
+                            && !p.getCompression().equals(Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
+                            throw new CanalClientException("compression is not supported in this connector");
+                        }
 
                         CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
                         Message result = new Message(messages.getBatchId());
-                        for (ByteString byteString : messages.getMessagesList()) {
-                            result.addEntry(CanalEntry.Entry.parseFrom(byteString));
+                        if (lazyParseEntry) {
+                            // byteString
+                            result.setRawEntries(messages.getMessagesList());
+                        } else {
+                            for (ByteString byteString : messages.getMessagesList()) {
+                                result.addEntry(CanalEntry.Entry.parseFrom(byteString));
+                            }
                         }
                         return result;
                     }
-                    default:
-                        break;
+                    case ACK: {
+                        Ack ack = Ack.parseFrom(p.getBody());
+                        throw new CanalClientException("something goes wrong with reason: " + ack.getErrorMessage());
+                    }
+                    default: {
+                        throw new CanalClientException("unexpected packet type: " + p.getType());
+                    }
                 }
             }
         } catch (Exception e) {
-            logger.error("Error when deserializing byte[] to message ", e);
+            throw new CanalClientException("deserializer failed", e);
         }
-        return null;
     }
 }

+ 3 - 6
client/src/main/java/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.java

@@ -218,8 +218,7 @@ public class ClusterCanalConnector implements CanalConnector {
                 return;
             } catch (Throwable t) {
                 logger.warn(String.format("something goes wrong when rollbacking data from server:%s",
-                    currentConnector != null ? currentConnector.getAddress() : "null"),
-                    t);
+                    currentConnector != null ? currentConnector.getAddress() : "null"), t);
                 times++;
                 restart();
                 logger.info("restart the connector for next round retry.");
@@ -236,8 +235,7 @@ public class ClusterCanalConnector implements CanalConnector {
                 return;
             } catch (Throwable t) {
                 logger.warn(String.format("something goes wrong when rollbacking data from server:%s",
-                    currentConnector != null ? currentConnector.getAddress() : "null"),
-                    t);
+                    currentConnector != null ? currentConnector.getAddress() : "null"), t);
                 times++;
                 restart();
                 logger.info("restart the connector for next round retry.");
@@ -255,8 +253,7 @@ public class ClusterCanalConnector implements CanalConnector {
                 return;
             } catch (Throwable t) {
                 logger.warn(String.format("something goes wrong when acking data from server:%s",
-                    currentConnector != null ? currentConnector.getAddress() : "null"),
-                    t);
+                    currentConnector != null ? currentConnector.getAddress() : "null"), t);
                 times++;
                 restart();
                 logger.info("restart the connector for next round retry.");

+ 9 - 34
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -18,13 +18,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalMessageDeserializer;
 import com.alibaba.otter.canal.client.impl.running.ClientRunningData;
 import com.alibaba.otter.canal.client.impl.running.ClientRunningListener;
 import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;
 import com.alibaba.otter.canal.common.utils.AddressUtils;
 import com.alibaba.otter.canal.common.utils.BooleanMutex;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
-import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalPacket.Ack;
 import com.alibaba.otter.canal.protocol.CanalPacket.ClientAck;
 import com.alibaba.otter.canal.protocol.CanalPacket.ClientAuth;
@@ -32,7 +32,6 @@ import com.alibaba.otter.canal.protocol.CanalPacket.ClientRollback;
 import com.alibaba.otter.canal.protocol.CanalPacket.Compression;
 import com.alibaba.otter.canal.protocol.CanalPacket.Get;
 import com.alibaba.otter.canal.protocol.CanalPacket.Handshake;
-import com.alibaba.otter.canal.protocol.CanalPacket.Messages;
 import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
 import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
 import com.alibaba.otter.canal.protocol.CanalPacket.Sub;
@@ -180,8 +179,8 @@ public class SimpleCanalConnector implements CanalConnector {
 
             Ack ackBody = Ack.parseFrom(ack.getBody());
             if (ackBody.getErrorCode() > 0) {
-                throw new CanalClientException(
-                    "something goes wrong when doing authentication: " + ackBody.getErrorMessage());
+                throw new CanalClientException("something goes wrong when doing authentication: "
+                                               + ackBody.getErrorMessage());
             }
 
             connected = true;
@@ -320,34 +319,7 @@ public class SimpleCanalConnector implements CanalConnector {
 
     private Message receiveMessages() throws IOException {
         byte[] data = readNextPacket();
-        Packet p = Packet.parseFrom(data);
-        switch (p.getType()) {
-            case MESSAGES: {
-                if (!p.getCompression().equals(Compression.NONE)
-                    && !p.getCompression().equals(Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
-                    throw new CanalClientException("compression is not supported in this connector");
-                }
-
-                Messages messages = Messages.parseFrom(p.getBody());
-                Message result = new Message(messages.getBatchId());
-                if (lazyParseEntry) {
-                    // byteString
-                    result.setRawEntries(messages.getMessagesList());
-                } else {
-                    for (ByteString byteString : messages.getMessagesList()) {
-                        result.addEntry(Entry.parseFrom(byteString));
-                    }
-                }
-                return result;
-            }
-            case ACK: {
-                Ack ack = Ack.parseFrom(p.getBody());
-                throw new CanalClientException("something goes wrong with reason: " + ack.getErrorMessage());
-            }
-            default: {
-                throw new CanalClientException("unexpected packet type: " + p.getType());
-            }
-        }
+        return CanalMessageDeserializer.deserializer(data, lazyParseEntry);
     }
 
     public void ack(long batchId) throws CanalClientException {
@@ -361,8 +333,11 @@ public class SimpleCanalConnector implements CanalConnector {
             .setBatchId(batchId)
             .build();
         try {
-            writeWithHeader(
-                Packet.newBuilder().setType(PacketType.CLIENTACK).setBody(ca.toByteString()).build().toByteArray());
+            writeWithHeader(Packet.newBuilder()
+                .setType(PacketType.CLIENTACK)
+                .setBody(ca.toByteString())
+                .build()
+                .toByteArray());
         } catch (IOException e) {
             throw new CanalClientException(e);
         }

+ 1 - 1
client/src/main/java/com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor.java

@@ -98,7 +98,7 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
         String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
         zkClient.unsubscribeDataChanges(path, dataListener);
         releaseRunning(); // 尝试一下release
-        //Fix issue #697
+        // Fix issue #697
         if (delayExector != null) {
             delayExector.shutdown();
         }

+ 3 - 3
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -13,6 +13,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.CanalMessageDeserializer;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -33,8 +34,7 @@ public class KafkaCanalConnector {
     private volatile boolean               running   = false;
     private boolean                        flatMessage;
 
-    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId,
-                               boolean flatMessage){
+    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, boolean flatMessage){
         this.topic = topic;
         this.partition = partition;
         this.flatMessage = flatMessage;
@@ -50,7 +50,7 @@ public class KafkaCanalConnector {
         properties.put("max.poll.records", "100");
         properties.put("key.deserializer", StringDeserializer.class.getName());
         if (!flatMessage) {
-            properties.put("value.deserializer", MessageDeserializer.class.getName());
+            properties.put("value.deserializer", CanalMessageDeserializer.class.getName());
         } else {
             properties.put("value.deserializer", StringDeserializer.class.getName());
         }

+ 3 - 4
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnectors.java

@@ -17,8 +17,7 @@ public class KafkaCanalConnectors {
      * @param groupId
      * @return
      */
-    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition,
-                                                        String groupId) {
+    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition, String groupId) {
         return new KafkaCanalConnector(servers, topic, partition, groupId, false);
     }
 
@@ -44,8 +43,8 @@ public class KafkaCanalConnectors {
      * @param flatMessage
      * @return
      */
-    public static KafkaCanalConnector newKafkaConnector( String servers, String topic,
-                                                        Integer partition, String groupId,boolean flatMessage) {
+    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition,
+                                                        String groupId, boolean flatMessage) {
         return new KafkaCanalConnector(servers, topic, partition, groupId, flatMessage);
     }
 }

+ 0 - 33
client/src/main/java/com/alibaba/otter/canal/client/kafka/MessageDeserializer.java

@@ -1,33 +0,0 @@
-package com.alibaba.otter.canal.client.kafka;
-
-import com.alibaba.otter.canal.client.CanalMessageDeserializer;
-import com.alibaba.otter.canal.protocol.Message;
-import java.util.Map;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Kafka Message类的反序列化
- *
- * @author machengyuan @ 2018-6-12
- * @version 1.0.0
- */
-public class MessageDeserializer implements Deserializer<Message> {
-
-    private static Logger logger = LoggerFactory.getLogger(MessageDeserializer.class);
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-    }
-
-    @Override
-    public Message deserialize(String topic1, byte[] data) {
-        return CanalMessageDeserializer.deserializer(data);
-    }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
-}

+ 4 - 5
client/src/main/java/com/alibaba/otter/canal/client/kafka/running/ClientRunningMonitor.java

@@ -24,7 +24,6 @@ import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 
-
 /**
  * kafka client running状态信息
  *
@@ -55,15 +54,15 @@ public class ClientRunningMonitor extends AbstractCanalLifeCycle {
     }
 
     private static final Logger        logger       = LoggerFactory.getLogger(ClientRunningMonitor.class);
-    private ZkClientx zkClient;
+    private ZkClientx                  zkClient;
     private String                     topic;
-    private ClientRunningData clientData;
+    private ClientRunningData          clientData;
     private IZkDataListener            dataListener;
-    private BooleanMutex mutex        = new BooleanMutex(false);
+    private BooleanMutex               mutex        = new BooleanMutex(false);
     private volatile boolean           release      = false;
     private volatile ClientRunningData activeData;
     private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
-    private ClientRunningListener listener;
+    private ClientRunningListener      listener;
     private int                        delayTime    = 5;
 
     private static Integer             virtualPort;

+ 4 - 3
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java

@@ -5,11 +5,12 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 public class ConsumerBatchMessage<T> {
+
     private final BlockingQueue<T> data;
-    private CountDownLatch latch;
-    private boolean hasFailure = false;
+    private CountDownLatch         latch;
+    private boolean                hasFailure = false;
 
-    public ConsumerBatchMessage(BlockingQueue<T> data) {
+    public ConsumerBatchMessage(BlockingQueue<T> data){
         this.data = data;
         latch = new CountDownLatch(data.size());
     }

+ 18 - 15
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -1,15 +1,12 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
-import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.client.CanalMessageDeserializer;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
@@ -20,19 +17,25 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalMessageDeserializer;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+
 public class RocketMQCanalConnector implements CanalConnector {
-    private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);
 
-    private String nameServer;
-    private String topic;
-    private String groupName;
-    private volatile boolean connected = false;
-    private DefaultMQPushConsumer rocketMQConsumer;
+    private static final Logger                          logger              = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+
+    private String                                       nameServer;
+    private String                                       topic;
+    private String                                       groupName;
+    private volatile boolean                             connected           = false;
+    private DefaultMQPushConsumer                        rocketMQConsumer;
     private BlockingQueue<ConsumerBatchMessage<Message>> messageBlockingQueue;
-    Map<Long, ConsumerBatchMessage<Message>> messageCache;
-    private long batchProcessTimeout = 10000;
+    Map<Long, ConsumerBatchMessage<Message>>             messageCache;
+    private long                                         batchProcessTimeout = 10000;
 
-    public RocketMQCanalConnector(String nameServer, String topic, String groupName) {
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName){
         this.nameServer = nameServer;
         this.topic = topic;
         this.groupName = groupName;
@@ -69,9 +72,9 @@ public class RocketMQCanalConnector implements CanalConnector {
             }
             rocketMQConsumer.subscribe(topic, "*");
             rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
+
                 @Override
-                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts,
-                    ConsumeOrderlyContext context) {
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
                     context.setAutoCommit(true);
                     boolean isSuccess = process(messageExts);
                     if (isSuccess) {

+ 2 - 2
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectorProvider.java

@@ -4,6 +4,7 @@ package com.alibaba.otter.canal.client.rocketmq;
  * RocketMQ connector provider.
  */
 public class RocketMQCanalConnectorProvider {
+
     /**
      * Create RocketMQ connector
      *
@@ -12,8 +13,7 @@ public class RocketMQCanalConnectorProvider {
      * @param groupId
      * @return {@link RocketMQCanalConnector}
      */
-    public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic,
-        String groupId) {
+    public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic, String groupId) {
         return new RocketMQCanalConnector(nameServers, topic, groupId);
     }
 

+ 2 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalListener.java

@@ -1,11 +1,13 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
 import java.util.List;
+
 import org.apache.rocketmq.common.message.MessageExt;
 
 /**
  * RocketMQ message listener
  */
 public interface RocketMQCanalListener {
+
     boolean onReceive(List<MessageExt> messageExts);
 }

+ 114 - 114
client/src/test/java/com/alibaba/otter/canal/client/running/ClientRunningTest.java

@@ -20,117 +20,117 @@ import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
 
 public class ClientRunningTest extends AbstractZkTest {
 
-	private ZkClientx zkclientx = new ZkClientx(cluster1 + ";" + cluster2);
-	private short     clientId  = 1001;
-
-	@Before
-	public void setUp() {
-		String path = ZookeeperPathUtils.getDestinationPath(destination);
-		zkclientx.deleteRecursive(path);
-
-		zkclientx.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientId), true);
-	}
-
-	@After
-	public void tearDown() {
-		String path = ZookeeperPathUtils.getDestinationPath(destination);
-		zkclientx.deleteRecursive(path);
-	}
-
-	@Test
-	public void testOneServer() {
-		final CountDownLatch countLatch = new CountDownLatch(2);
-		ClientRunningMonitor runningMonitor = buildClientRunning(countLatch, clientId, 2088);
-		runningMonitor.start();
-		sleep(2000L);
-		runningMonitor.stop();
-		sleep(2000L);
-
-		if (countLatch.getCount() != 0) {
-			Assert.fail();
-		}
-	}
-
-	@Test
-	public void testMultiServer() {
-		final CountDownLatch countLatch = new CountDownLatch(30);
-		final ClientRunningMonitor runningMonitor1 = buildClientRunning(countLatch, clientId, 2088);
-		final ClientRunningMonitor runningMonitor2 = buildClientRunning(countLatch, clientId, 2089);
-		final ClientRunningMonitor runningMonitor3 = buildClientRunning(countLatch, clientId, 2090);
-		final ExecutorService executor = Executors.newFixedThreadPool(3);
-		executor.submit(new Runnable() {
-
-			public void run() {
-				for (int i = 0; i < 10; i++) {
-					if (!runningMonitor1.isStart()) {
-						runningMonitor1.start();
-					}
-					sleep(2000L + RandomUtils.nextInt(500));
-					runningMonitor1.stop();
-					sleep(2000L + RandomUtils.nextInt(500));
-				}
-			}
-
-		});
-
-		executor.submit(new Runnable() {
-
-			public void run() {
-				for (int i = 0; i < 10; i++) {
-					if (!runningMonitor2.isStart()) {
-						runningMonitor2.start();
-					}
-					sleep(2000L + RandomUtils.nextInt(500));
-					runningMonitor2.stop();
-					sleep(2000L + RandomUtils.nextInt(500));
-				}
-			}
-
-		});
-
-		executor.submit(new Runnable() {
-
-			public void run() {
-				for (int i = 0; i < 10; i++) {
-					if (!runningMonitor3.isStart()) {
-						runningMonitor3.start();
-					}
-					sleep(2000L + RandomUtils.nextInt(500));
-					runningMonitor3.stop();
-					sleep(2000L + RandomUtils.nextInt(500));
-				}
-			}
-
-		});
-
-		sleep(30000L);
-	}
-
-	private ClientRunningMonitor buildClientRunning(final CountDownLatch countLatch, final short clientId,
-																									final int port) {
-		ClientRunningData clientData = new ClientRunningData();
-		clientData.setClientId(clientId);
-		clientData.setAddress(AddressUtils.getHostIp());
-
-		ClientRunningMonitor runningMonitor = new ClientRunningMonitor();
-		runningMonitor.setDestination(destination);
-		runningMonitor.setZkClient(zkclientx);
-		runningMonitor.setClientData(clientData);
-		runningMonitor.setListener(new ClientRunningListener() {
-
-			public InetSocketAddress processActiveEnter() {
-				System.out.println(String.format("clientId:%s port:%s has start", clientId, port));
-				countLatch.countDown();
-				return new InetSocketAddress(AddressUtils.getHostIp(), port);
-			}
-
-			public void processActiveExit() {
-				countLatch.countDown();
-				System.out.println(String.format("clientId:%s port:%s has stop", clientId, port));
-			}
-
-		});
-		runningMonitor.setDelayTime(1);
-		return runningMonitor;
-	}
-}
+    private ZkClientx zkclientx = new ZkClientx(cluster1 + ";" + cluster2);
+    private short     clientId  = 1001;
+
+    @Before
+    public void setUp() {
+        String path = ZookeeperPathUtils.getDestinationPath(destination);
+        zkclientx.deleteRecursive(path);
+
+        zkclientx.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientId), true);
+    }
+
+    @After
+    public void tearDown() {
+        String path = ZookeeperPathUtils.getDestinationPath(destination);
+        zkclientx.deleteRecursive(path);
+    }
+
+    @Test
+    public void testOneServer() {
+        final CountDownLatch countLatch = new CountDownLatch(2);
+        ClientRunningMonitor runningMonitor = buildClientRunning(countLatch, clientId, 2088);
+        runningMonitor.start();
+        sleep(2000L);
+        runningMonitor.stop();
+        sleep(2000L);
+
+        if (countLatch.getCount() != 0) {
+            Assert.fail();
+        }
+    }
+
+    @Test
+    public void testMultiServer() {
+        final CountDownLatch countLatch = new CountDownLatch(30);
+        final ClientRunningMonitor runningMonitor1 = buildClientRunning(countLatch, clientId, 2088);
+        final ClientRunningMonitor runningMonitor2 = buildClientRunning(countLatch, clientId, 2089);
+        final ClientRunningMonitor runningMonitor3 = buildClientRunning(countLatch, clientId, 2090);
+        final ExecutorService executor = Executors.newFixedThreadPool(3);
+        executor.submit(new Runnable() {
+
+            public void run() {
+                for (int i = 0; i < 10; i++) {
+                    if (!runningMonitor1.isStart()) {
+                        runningMonitor1.start();
+                    }
+                    sleep(2000L + RandomUtils.nextInt(500));
+                    runningMonitor1.stop();
+                    sleep(2000L + RandomUtils.nextInt(500));
+                }
+            }
+
+        });
+
+        executor.submit(new Runnable() {
+
+            public void run() {
+                for (int i = 0; i < 10; i++) {
+                    if (!runningMonitor2.isStart()) {
+                        runningMonitor2.start();
+                    }
+                    sleep(2000L + RandomUtils.nextInt(500));
+                    runningMonitor2.stop();
+                    sleep(2000L + RandomUtils.nextInt(500));
+                }
+            }
+
+        });
+
+        executor.submit(new Runnable() {
+
+            public void run() {
+                for (int i = 0; i < 10; i++) {
+                    if (!runningMonitor3.isStart()) {
+                        runningMonitor3.start();
+                    }
+                    sleep(2000L + RandomUtils.nextInt(500));
+                    runningMonitor3.stop();
+                    sleep(2000L + RandomUtils.nextInt(500));
+                }
+            }
+
+        });
+
+        sleep(30000L);
+    }
+
+    private ClientRunningMonitor buildClientRunning(final CountDownLatch countLatch, final short clientId,
+                                                    final int port) {
+        ClientRunningData clientData = new ClientRunningData();
+        clientData.setClientId(clientId);
+        clientData.setAddress(AddressUtils.getHostIp());
+
+        ClientRunningMonitor runningMonitor = new ClientRunningMonitor();
+        runningMonitor.setDestination(destination);
+        runningMonitor.setZkClient(zkclientx);
+        runningMonitor.setClientData(clientData);
+        runningMonitor.setListener(new ClientRunningListener() {
+
+            public InetSocketAddress processActiveEnter() {
+                System.out.println(String.format("clientId:%s port:%s has start", clientId, port));
+                countLatch.countDown();
+                return new InetSocketAddress(AddressUtils.getHostIp(), port);
+            }
+
+            public void processActiveExit() {
+                countLatch.countDown();
+                System.out.println(String.format("clientId:%s port:%s has stop", clientId, port));
+            }
+
+        });
+        runningMonitor.setDelayTime(1);
+        return runningMonitor;
+    }
+}

+ 1 - 2
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

@@ -41,8 +41,7 @@ public class CanalKafkaClientExample {
 
     public static void main(String[] args) {
         try {
-            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(
-                AbstractKafkaTest.zkServers,
+            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(AbstractKafkaTest.zkServers,
                 AbstractKafkaTest.servers,
                 AbstractKafkaTest.topic,
                 AbstractKafkaTest.partition,

+ 3 - 2
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/AbstractRocektMQTest.java

@@ -1,8 +1,9 @@
 package com.alibaba.otter.canal.client.running.rocketmq;
 
 public class AbstractRocektMQTest {
-    public static String topic = "example";
-    public static String groupId = "group";
+
+    public static String topic       = "example";
+    public static String groupId     = "group";
     public static String nameServers = "localhost:9876";
 
 }

+ 17 - 18
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

@@ -1,14 +1,15 @@
 package com.alibaba.otter.canal.client.running.rocketmq;
 
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
-import com.alibaba.otter.canal.client.running.kafka.AbstractKafkaTest;
-import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.client.running.kafka.AbstractKafkaTest;
+import com.alibaba.otter.canal.protocol.Message;
+
 /**
  * Kafka client example
  *
@@ -17,32 +18,30 @@ import org.springframework.util.Assert;
  */
 public class CanalRocketMQClientExample extends AbstractRocektMQTest {
 
-    protected final static Logger logger = LoggerFactory.getLogger(CanalRocketMQClientExample.class);
+    protected final static Logger           logger  = LoggerFactory.getLogger(CanalRocketMQClientExample.class);
 
-    private RocketMQCanalConnector connector;
+    private RocketMQCanalConnector          connector;
 
-    private static volatile boolean running = false;
+    private static volatile boolean         running = false;
 
-    private Thread thread = null;
+    private Thread                          thread  = null;
 
     private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
 
-        public void uncaughtException(Thread t, Throwable e) {
-            logger.error("parse events has an error", e);
-        }
-    };
+                                                        public void uncaughtException(Thread t, Throwable e) {
+                                                            logger.error("parse events has an error", e);
+                                                        }
+                                                    };
 
-    public CanalRocketMQClientExample(String nameServers, String topic,
-        String groupId) {
-        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers,
-            topic,
-            groupId);
+    public CanalRocketMQClientExample(String nameServers, String topic, String groupId){
+        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
     }
 
     public static void main(String[] args) {
         try {
             final CanalRocketMQClientExample rocketMQClientExample = new CanalRocketMQClientExample(nameServers,
-                topic, groupId);
+                topic,
+                groupId);
             logger.info("## Start the rocketmq consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
             rocketMQClientExample.start();
             logger.info("## The canal rocketmq consumer is running now ......");

+ 1 - 1
deployer/src/main/resources/canal.properties

@@ -10,7 +10,7 @@ canal.zkServers=
 canal.zookeeper.flush.period = 1000
 canal.withoutNetty = false
 # tcp, kafka, RocketMQ
-canal.serverMode = rocketmq
+canal.serverMode = tcp
 # flush meta cursor/parse position to file
 canal.file.data.dir = ${canal.conf.dir}
 canal.file.flush.period = 1000

+ 4 - 3
protocol/src/main/java/com/alibaba/otter/canal/protocol/ClientIdentity.java

@@ -13,9 +13,10 @@ import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
  */
 public class ClientIdentity implements Serializable {
 
-    private String destination;
-    private short  clientId;
-    private String filter;
+    private static final long serialVersionUID = -8262100681930834834L;
+    private String            destination;
+    private short             clientId;
+    private String            filter;
 
     public ClientIdentity(){
 

+ 10 - 8
server/src/main/java/com/alibaba/otter/canal/common/CanalMessageSerializer.java

@@ -1,5 +1,10 @@
 package com.alibaba.otter.canal.common;
 
+import java.util.List;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.springframework.util.CollectionUtils;
+
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalPacket;
 import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
@@ -7,12 +12,11 @@ import com.alibaba.otter.canal.protocol.Message;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.WireFormat;
-import java.util.List;
-import org.apache.kafka.common.errors.SerializationException;
-import org.springframework.util.CollectionUtils;
 
 public class CanalMessageSerializer {
-    public static byte[] serializer(Message data){
+
+    @SuppressWarnings("deprecation")
+    public static byte[] serializer(Message data) {
         try {
             if (data != null) {
                 if (data.getId() != -1) {
@@ -31,11 +35,9 @@ public class CanalMessageSerializer {
                         messageSize += 1 * rowEntries.size();
                         // packet size
                         int size = 0;
-                        size += CodedOutputStream.computeEnumSize(3,
-                            PacketType.MESSAGES.getNumber());
+                        size += CodedOutputStream.computeEnumSize(3, PacketType.MESSAGES.getNumber());
                         size += CodedOutputStream.computeTagSize(5)
-                            + CodedOutputStream.computeRawVarint32Size(messageSize)
-                            + messageSize;
+                                + CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
                         // build data
                         byte[] body = new byte[size];
                         CodedOutputStream output = CodedOutputStream.newInstance(body);

+ 16 - 17
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -12,25 +12,25 @@ import java.util.Map;
  */
 public class MQProperties {
 
-    private String servers = "localhost:6667";
-    private int retries = 0;
-    private int batchSize = 16384;
-    private int lingerMs = 1;
-    private long bufferMemory = 33554432L;
-    private boolean filterTransactionEntry = true;
-    private String producerGroup = "Canal-Producer";
-    private int canalBatchSize = 50;
-    private Long canalGetTimeout;
-    private boolean flatMessage = true;
-
-    private List<CanalDestination> canalDestinations = new ArrayList<CanalDestination>();
+    private String                 servers                = "localhost:6667";
+    private int                    retries                = 0;
+    private int                    batchSize              = 16384;
+    private int                    lingerMs               = 1;
+    private long                   bufferMemory           = 33554432L;
+    private boolean                filterTransactionEntry = true;
+    private String                 producerGroup          = "Canal-Producer";
+    private int                    canalBatchSize         = 50;
+    private Long                   canalGetTimeout;
+    private boolean                flatMessage            = true;
+
+    private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();
 
     public static class CanalDestination {
 
-        private String canalDestination;
-        private String topic;
-        private Integer partition;
-        private Integer partitionsNum;
+        private String              canalDestination;
+        private String              topic;
+        private Integer             partition;
+        private Integer             partitionsNum;
         private Map<String, String> partitionHash;
 
         public String getCanalDestination() {
@@ -74,7 +74,6 @@ public class MQProperties {
         }
     }
 
-
     public String getServers() {
         return servers;
     }

+ 17 - 14
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,12 +1,8 @@
 package com.alibaba.otter.canal.kafka;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.util.List;
 import java.util.Properties;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -14,6 +10,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+;
+
 /**
  * kafka producer 主操作类
  *
@@ -22,13 +25,13 @@ import org.slf4j.LoggerFactory;
  */
 public class CanalKafkaProducer implements CanalMQProducer {
 
-    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
+    private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
 
     private Producer<String, Message> producer;
 
-    private Producer<String, String> producer2;                                                 // 用于扁平message的数据投递
+    private Producer<String, String>  producer2;                                                 // 用于扁平message的数据投递
 
-    private MQProperties kafkaProperties;
+    private MQProperties              kafkaProperties;
 
     @Override
     public void init(MQProperties kafkaProperties) {
@@ -91,8 +94,10 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 if (flatMessages != null) {
                     for (FlatMessage flatMessage : flatMessages) {
                         if (canalDestination.getPartition() != null) {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination
-                                .getTopic(), canalDestination.getPartition(), null, JSON.toJSONString(flatMessage));
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                                canalDestination.getPartition(),
+                                null,
+                                JSON.toJSONString(flatMessage));
                             producer2.send(record);
                         } else {
                             if (canalDestination.getPartitionHash() != null
@@ -104,8 +109,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 for (int i = 0; i < length; i++) {
                                     FlatMessage flatMessagePart = partitionFlatMessage[i];
                                     if (flatMessagePart != null) {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                            canalDestination.getTopic(),
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                             i,
                                             null,
                                             JSON.toJSONString(flatMessagePart));
@@ -113,8 +117,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                     }
                                 }
                             } else {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                    canalDestination.getTopic(),
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
                                     0,
                                     null,
                                     JSON.toJSONString(flatMessage));

+ 4 - 2
server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java

@@ -1,10 +1,12 @@
 package com.alibaba.otter.canal.kafka;
 
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.protocol.Message;
 import java.util.Map;
+
 import org.apache.kafka.common.serialization.Serializer;
 
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.protocol.Message;
+
 /**
  * Kafka Message类的序列化
  *

+ 4 - 2
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -16,9 +16,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CanalRocketMQProducer implements CanalMQProducer {
+
     private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
 
-    private DefaultMQProducer defaultMQProducer;
+    private DefaultMQProducer   defaultMQProducer;
 
     @Override
     public void init(MQProperties rocketMQProperties) {
@@ -36,10 +37,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
 
     @Override
     public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
-        Callback callback) {
+                     Callback callback) {
         try {
             Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data));
             this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
                 @Override
                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                     int partition = 0;

+ 16 - 13
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -1,35 +1,38 @@
 package com.alibaba.otter.canal.server;
 
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
-import com.alibaba.otter.canal.protocol.ClientIdentity;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.io.FileInputStream;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
+import com.alibaba.otter.canal.protocol.ClientIdentity;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+
 public class CanalMQStarter {
-    private static final Logger logger = LoggerFactory.getLogger(CanalMQStarter.class);
+
+    private static final Logger logger               = LoggerFactory.getLogger(CanalMQStarter.class);
 
     private static final String CLASSPATH_URL_PREFIX = "classpath:";
 
-    private volatile boolean running = false;
+    private volatile boolean    running              = false;
 
-    private ExecutorService executorService;
+    private ExecutorService     executorService;
 
-    private CanalMQProducer canalMQProducer;
+    private CanalMQProducer     canalMQProducer;
 
-    private MQProperties properties;
+    private MQProperties        properties;
 
-    public CanalMQStarter(CanalMQProducer canalMQProducer) {
+    public CanalMQStarter(CanalMQProducer canalMQProducer){
         this.canalMQProducer = canalMQProducer;
     }
 
@@ -47,7 +50,7 @@ public class CanalMQStarter {
             }
 
             // 初始化 kafka producer
-//            canalMQProducer = new CanalKafkaProducer();
+            // canalMQProducer = new CanalKafkaProducer();
             canalMQProducer.init(properties);
             // set filterTransactionEntry
             if (properties.isFilterTransactionEntry()) {

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java

@@ -50,7 +50,7 @@ public class SessionHandler extends SimpleChannelHandler {
         this.embeddedServer = embeddedServer;
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings({ "deprecation", "deprecation" })
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
         logger.info("message receives in session handler...");
         long start = System.nanoTime();

+ 4 - 3
server/src/main/java/com/alibaba/otter/canal/spi/CanalMQProducer.java

@@ -1,11 +1,12 @@
 package com.alibaba.otter.canal.spi;
 
+import java.io.IOException;
+
 import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.protocol.Message;
-import java.io.IOException;
 
 public interface CanalMQProducer {
+
     /**
      * Init producer.
      *
@@ -33,4 +34,4 @@ public interface CanalMQProducer {
 
         void rollback();
     }
-}
+}