agapple 5 jaren geleden
bovenliggende
commit
523e8b85e6
43 gewijzigde bestanden met toevoegingen van 300 en 255 verwijderingen
  1. 5 5
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/config/EbeanConfig.java
  2. 3 2
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/connector/SimpleAdminConnectors.java
  3. 9 2
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/controller/CanalClusterController.java
  4. 1 1
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/controller/CanalConfigController.java
  5. 2 3
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/controller/CanalInstanceController.java
  6. 21 13
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/controller/UserController.java
  7. 3 2
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/handler/CustomExceptionHandler.java
  8. 2 1
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/CanalCluster.java
  9. 12 6
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/CanalInstanceConfig.java
  10. 3 3
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/Model.java
  11. 7 1
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/NodeServer.java
  12. 2 1
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/User.java
  13. 2 2
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/service/NodeServerService.java
  14. 2 2
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalInstanceServiceImpl.java
  15. 2 2
      admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/service/impl/NodeServerServiceImpl.java
  16. 22 25
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/consumer/CommonMessage.java
  17. 13 7
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/filter/AviaterRegexFilter.java
  18. 12 9
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/filter/PatternUtils.java
  19. 1 1
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/AbstractMQProducer.java
  20. 58 68
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQMessageUtils.java
  21. 1 1
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/spi/CanalMQProducer.java
  22. 5 1
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/spi/SPI.java
  23. 4 4
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/spi/URLClassExtensionLoader.java
  24. 1 0
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/Callback.java
  25. 4 5
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/CanalMessageSerializerUtil.java
  26. 3 2
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/DateUtil.java
  27. 8 3
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/JdbcTypeUtil.java
  28. 19 15
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/MessageUtil.java
  29. 2 2
      connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/TimeZone.java
  30. 1 1
      connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/config/KafkaProducerConfig.java
  31. 11 8
      connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/consumer/CanalKafkaConsumer.java
  32. 1 1
      connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/consumer/KafkaMessageDeserializer.java
  33. 13 13
      connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java
  34. 2 2
      connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/KafkaMessageSerializer.java
  35. 3 2
      connector/kafka-connector/src/test/java/com/alibaba/otter/canal/connector/kafka/test/CanalKafkaProducerTest.java
  36. 10 5
      connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/consumer/CanalRabbitMQConsumer.java
  37. 2 2
      connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/consumer/ConsumerBatchMessage.java
  38. 1 2
      connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/AliyunCredentialsProvider.java
  39. 4 3
      connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java
  40. 1 2
      connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/consumer/CanalRocketMQConsumer.java
  41. 2 2
      connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/consumer/ConsumerBatchMessage.java
  42. 14 16
      connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java
  43. 6 7
      connector/tcp-connector/src/main/java/com/alibaba/otter/canal/connector/tcp/consumer/CanalTCPConsumer.java

+ 5 - 5
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/config/EbeanConfig.java

@@ -1,5 +1,10 @@
 package com.alibaba.otter.canal.admin.config;
 
+import io.ebean.EbeanServer;
+import io.ebean.EbeanServerFactory;
+import io.ebean.config.ServerConfig;
+import io.ebean.config.UnderscoreNamingConvention;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -8,11 +13,6 @@ import javax.sql.DataSource;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import io.ebean.EbeanServer;
-import io.ebean.EbeanServerFactory;
-import io.ebean.config.ServerConfig;
-import io.ebean.config.UnderscoreNamingConvention;
-
 /**
  * Ebean 配置
  *

+ 3 - 2
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/connector/SimpleAdminConnectors.java

@@ -2,11 +2,12 @@ package com.alibaba.otter.canal.admin.connector;
 
 import java.util.function.Function;
 
-import com.alibaba.otter.canal.admin.config.SpringContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.env.Environment;
 
+import com.alibaba.otter.canal.admin.config.SpringContext;
+
 public class SimpleAdminConnectors {
 
     private static final Logger logger = LoggerFactory.getLogger(SimpleAdminConnectors.class);
@@ -25,7 +26,7 @@ public class SimpleAdminConnectors {
             connector.connect();
             return function.apply(connector);
         } catch (Exception e) {
-            logger.error("connect to ip:{},port:{},user:{},password:{}, failed", ip, port, user,passwd);
+            logger.error("connect to ip:{},port:{},user:{},password:{}, failed", ip, port, user, passwd);
             logger.error(e.getMessage());
         } finally {
             connector.disconnect();

+ 9 - 2
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/controller/CanalClusterController.java

@@ -4,7 +4,14 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
@@ -22,7 +29,7 @@ public class CanalClusterController {
     CanalClusterService canalClusterServic;
 
     @Autowired
-    NodeServerService  nodeServerService;
+    NodeServerService   nodeServerService;
 
     @GetMapping(value = "/clusters")
     public BaseModel<List<CanalCluster>> clusters(CanalCluster canalCluster, @PathVariable String env) {

+ 1 - 1
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/controller/CanalConfigController.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.admin.controller;
 
-import com.alibaba.otter.canal.admin.common.TemplateConfigLoader;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -9,6 +8,7 @@ import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import com.alibaba.otter.canal.admin.common.TemplateConfigLoader;
 import com.alibaba.otter.canal.admin.model.BaseModel;
 import com.alibaba.otter.canal.admin.model.CanalConfig;
 import com.alibaba.otter.canal.admin.service.CanalConfigService;

+ 2 - 3
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/controller/CanalInstanceController.java

@@ -3,7 +3,6 @@ package com.alibaba.otter.canal.admin.controller;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.otter.canal.admin.common.TemplateConfigLoader;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -15,6 +14,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import com.alibaba.otter.canal.admin.common.TemplateConfigLoader;
 import com.alibaba.otter.canal.admin.model.BaseModel;
 import com.alibaba.otter.canal.admin.model.CanalInstanceConfig;
 import com.alibaba.otter.canal.admin.model.Pager;
@@ -143,8 +143,7 @@ public class CanalInstanceController {
      * @return 是否成功
      */
     @PutMapping(value = "/instance/status/{id}")
-    public BaseModel<Boolean> instanceStart(@PathVariable Long id, @RequestParam String option,
-                                            @PathVariable String env) {
+    public BaseModel<Boolean> instanceStart(@PathVariable Long id, @RequestParam String option, @PathVariable String env) {
         return BaseModel.getInstance(canalInstanceConfigService.instanceOperation(id, option));
     }
 

+ 21 - 13
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/controller/UserController.java

@@ -1,20 +1,28 @@
 package com.alibaba.otter.canal.admin.controller;
 
-import com.alibaba.otter.canal.admin.service.UserService;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.*;
-
-import com.alibaba.otter.canal.admin.model.BaseModel;
-import com.alibaba.otter.canal.admin.model.User;
-
-import javax.servlet.http.HttpServletRequest;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import javax.servlet.http.HttpServletRequest;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.alibaba.otter.canal.admin.model.BaseModel;
+import com.alibaba.otter.canal.admin.model.User;
+import com.alibaba.otter.canal.admin.service.UserService;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+
 /**
  * 用户管理控制层
  *
@@ -26,9 +34,9 @@ import java.util.concurrent.TimeUnit;
 public class UserController {
 
     public static final LoadingCache<String, User> loginUsers = Caffeine.newBuilder()
-        .maximumSize(10_000)
-        .expireAfterAccess(30, TimeUnit.MINUTES)
-        .build(key -> null);                                                         // 用户登录信息缓存
+                                                                  .maximumSize(10_000)
+                                                                  .expireAfterAccess(30, TimeUnit.MINUTES)
+                                                                  .build(key -> null); // 用户登录信息缓存
 
     @Autowired
     UserService                                    userService;

+ 3 - 2
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/handler/CustomExceptionHandler.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.admin.handler;
 
-import com.alibaba.otter.canal.admin.common.exception.ServiceException;
-import com.alibaba.otter.canal.admin.model.BaseModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpStatus;
@@ -10,6 +8,9 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.ResponseStatus;
 
+import com.alibaba.otter.canal.admin.common.exception.ServiceException;
+import com.alibaba.otter.canal.admin.model.BaseModel;
+
 /**
  * MVC异常拦截器
  *

+ 2 - 1
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/CanalCluster.java

@@ -3,10 +3,11 @@ package com.alibaba.otter.canal.admin.model;
 import io.ebean.Finder;
 import io.ebean.annotation.WhenModified;
 
+import java.util.Date;
+
 import javax.persistence.Entity;
 import javax.persistence.Id;
 import javax.persistence.Table;
-import java.util.Date;
 
 /**
  * Canal集群信息实体类

+ 12 - 6
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/CanalInstanceConfig.java

@@ -1,12 +1,18 @@
 package com.alibaba.otter.canal.admin.model;
 
-import java.util.Date;
-
-import javax.persistence.*;
-
 import io.ebean.Finder;
 import io.ebean.annotation.WhenModified;
 
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.Transient;
+
 /**
  * Canal实例配置信息实体类
  *
@@ -44,14 +50,14 @@ public class CanalInstanceConfig extends Model {
     private String       name;
     private String       content;
     private String       contentMd5;
-    private String       status;         // 1: 正常 0: 停止
+    private String       status;             // 1: 正常 0: 停止
     @WhenModified
     private Date         modifiedTime;
 
     @Transient
     private String       clusterServerId;
     @Transient
-    private String       runningStatus = "0";  // 1: 运行中 0: 停止
+    private String       runningStatus = "0"; // 1: 运行中 0: 停止
 
     public void init() {
         status = "1";

+ 3 - 3
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/Model.java

@@ -1,13 +1,13 @@
 package com.alibaba.otter.canal.admin.model;
 
+import io.ebean.Ebean;
+import io.ebean.EbeanServer;
+
 import javax.persistence.MappedSuperclass;
 import javax.persistence.OptimisticLockException;
 
 import org.apache.commons.beanutils.PropertyUtils;
 
-import io.ebean.Ebean;
-import io.ebean.EbeanServer;
-
 /**
  * EBean Model扩展类
  *

+ 7 - 1
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/NodeServer.java

@@ -5,7 +5,13 @@ import io.ebean.annotation.WhenModified;
 
 import java.util.Date;
 
-import javax.persistence.*;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.Table;
 
 /**
  * 节点信息实体类

+ 2 - 1
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/model/User.java

@@ -3,11 +3,12 @@ package com.alibaba.otter.canal.admin.model;
 import io.ebean.Finder;
 import io.ebean.annotation.WhenCreated;
 
+import java.util.Date;
+
 import javax.persistence.Entity;
 import javax.persistence.Id;
 import javax.persistence.Table;
 import javax.persistence.Transient;
-import java.util.Date;
 
 /**
  * 用户信息实体类

+ 2 - 2
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/service/NodeServerService.java

@@ -1,10 +1,10 @@
 package com.alibaba.otter.canal.admin.service;
 
+import java.util.List;
+
 import com.alibaba.otter.canal.admin.model.NodeServer;
 import com.alibaba.otter.canal.admin.model.Pager;
 
-import java.util.List;
-
 public interface NodeServerService {
 
     void save(NodeServer nodeServer);

+ 2 - 2
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalInstanceServiceImpl.java

@@ -109,12 +109,12 @@ public class CanalInstanceServiceImpl implements CanalInstanceService {
             }));
         }
 
-        for (Future<Void> f :futures){
+        for (Future<Void> f : futures) {
             try {
                 f.get(3, TimeUnit.SECONDS);
             } catch (InterruptedException | ExecutionException e) {
                 // ignore
-            } catch (TimeoutException e){
+            } catch (TimeoutException e) {
                 break;
             }
         }

+ 2 - 2
admin/admin-web/src/main/java/com/alibaba/otter/canal/admin/service/impl/NodeServerServiceImpl.java

@@ -154,12 +154,12 @@ public class NodeServerServiceImpl implements NodeServerService {
                 return !status;
             }));
         }
-        for (Future<Boolean> f :futures){
+        for (Future<Boolean> f : futures) {
             try {
                 f.get(3, TimeUnit.SECONDS);
             } catch (InterruptedException | ExecutionException e) {
                 // ignore
-            } catch (TimeoutException e){
+            } catch (TimeoutException e) {
                 break;
             }
         }

+ 22 - 25
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/consumer/CommonMessage.java

@@ -6,21 +6,27 @@ import java.util.Map;
 
 public class CommonMessage implements Serializable {
 
-    private static final long serialVersionUID = 2611556444074013268L;
-
-    private String database;                               // 数据库或schema
-    private String table;                                  // 表名
-    private List<String> pkNames;
-    private Boolean isDdl;
-    private String type;                                   // 类型: INSERT UPDATE DELETE
+    private static final long         serialVersionUID = 2611556444074013268L;
+
+    private String                    database;                               // 数据库或schema
+    private String                    table;                                  // 表名
+    private List<String>              pkNames;
+    private Boolean                   isDdl;
+    private String                    type;                                   // 类型:
+                                                                               // INSERT
+                                                                               // UPDATE
+                                                                               // DELETE
     // binlog executeTime
-    private Long es;                                       // 执行耗时
+    private Long                      es;                                     // 执行耗时
     // dml build timeStamp
-    private Long ts;                                       // 同步时间
-    private String sql;                                    // 执行的sql, dml sql为空
-    private List<Map<String, Object>> data;                // 数据列表
-    private List<Map<String, Object>> old;                 // 旧数据列表, 用于update, size和data的size一一对应
-
+    private Long                      ts;                                     // 同步时间
+    private String                    sql;                                    // 执行的sql,
+                                                                               // dml
+                                                                               // sql为空
+    private List<Map<String, Object>> data;                                   // 数据列表
+    private List<Map<String, Object>> old;                                    // 旧数据列表,
+                                                                               // 用于update,
+                                                                               // size和data的size一一对应
 
     public String getDatabase() {
         return database;
@@ -115,17 +121,8 @@ public class CommonMessage implements Serializable {
 
     @Override
     public String toString() {
-        return "CommonMessage{" +
-                "database='" + database + '\'' +
-                ", table='" + table + '\'' +
-                ", pkNames=" + pkNames +
-                ", isDdl=" + isDdl +
-                ", type='" + type + '\'' +
-                ", es=" + es +
-                ", ts=" + ts +
-                ", sql='" + sql + '\'' +
-                ", data=" + data +
-                ", old=" + old +
-                '}';
+        return "CommonMessage{" + "database='" + database + '\'' + ", table='" + table + '\'' + ", pkNames=" + pkNames
+               + ", isDdl=" + isDdl + ", type='" + type + '\'' + ", es=" + es + ", ts=" + ts + ", sql='" + sql + '\''
+               + ", data=" + data + ", old=" + old + '}';
     }
 }

+ 13 - 7
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/filter/AviaterRegexFilter.java

@@ -1,6 +1,12 @@
 package com.alibaba.otter.canal.connector.core.filter;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -14,16 +20,16 @@ import com.googlecode.aviator.Expression;
  */
 public class AviaterRegexFilter {
 
-    private static final String        SPLIT             = ",";
-    private static final String        PATTERN_SPLIT     = "|";
-    private static final String        FILTER_EXPRESSION = "regex(pattern,target)";
-    private static final RegexFunction regexFunction     = new RegexFunction();
-    private final Expression           exp               = AviatorEvaluator.compile(FILTER_EXPRESSION, true);
+    private static final String             SPLIT             = ",";
+    private static final String             PATTERN_SPLIT     = "|";
+    private static final String             FILTER_EXPRESSION = "regex(pattern,target)";
+    private static final RegexFunction      regexFunction     = new RegexFunction();
+    private final Expression                exp               = AviatorEvaluator.compile(FILTER_EXPRESSION, true);
     static {
         AviatorEvaluator.addFunction(regexFunction);
     }
 
-    private static final Comparator<String> COMPARATOR = new StringComparator();
+    private static final Comparator<String> COMPARATOR        = new StringComparator();
 
     final private String                    pattern;
     final private boolean                   defaultEmptyValue;

+ 12 - 9
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/filter/PatternUtils.java

@@ -13,15 +13,18 @@ import com.google.common.collect.MigrateMap;
 public class PatternUtils {
 
     @SuppressWarnings("deprecation")
-    private static Map<String, Pattern> patterns = MigrateMap.makeComputingMap(new MapMaker().softValues(), pattern -> {
-        try {
-            PatternCompiler pc = new Perl5Compiler();
-            return pc.compile(pattern,
-                Perl5Compiler.CASE_INSENSITIVE_MASK | Perl5Compiler.READ_ONLY_MASK | Perl5Compiler.SINGLELINE_MASK);
-        } catch (MalformedPatternException e) {
-            throw new RuntimeException(e);
-        }
-    });
+    private static Map<String, Pattern> patterns = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                     pattern -> {
+                                                         try {
+                                                             PatternCompiler pc = new Perl5Compiler();
+                                                             return pc.compile(pattern,
+                                                                 Perl5Compiler.CASE_INSENSITIVE_MASK
+                                                                         | Perl5Compiler.READ_ONLY_MASK
+                                                                         | Perl5Compiler.SINGLELINE_MASK);
+                                                         } catch (MalformedPatternException e) {
+                                                             throw new RuntimeException(e);
+                                                         }
+                                                     });
 
     public static Pattern getPattern(String pattern) {
         return patterns.get(pattern);

+ 1 - 1
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/AbstractMQProducer.java

@@ -5,10 +5,10 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.connector.core.config.CanalConstants;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.alibaba.otter.canal.connector.core.config.CanalConstants;
 import com.alibaba.otter.canal.connector.core.config.MQProperties;
 import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
 

+ 58 - 68
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQMessageUtils.java

@@ -1,9 +1,15 @@
 package com.alibaba.otter.canal.connector.core.producer;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import com.google.common.base.Function;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
@@ -13,6 +19,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
+import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.MigrateMap;
@@ -28,82 +35,65 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class MQMessageUtils {
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), pkHashConfigs -> {
-            List<PartitionData> datas = Lists
-                .newArrayList();
-
-            String[] pkHashConfigArray = StringUtils
-                .split(StringUtils.replace(
-                    pkHashConfigs,
-                    ",",
-                    ";"), ";");
-            // schema.table:id^name
-            for (String pkHashConfig : pkHashConfigArray) {
-                PartitionData data = new PartitionData();
-                int i = pkHashConfig
-                    .lastIndexOf(":");
-                if (i > 0) {
-                    String pkStr = pkHashConfig
-                        .substring(i + 1);
-                    if (pkStr.equalsIgnoreCase(
-                        "$pk$")) {
-                        data.hashMode.autoPkHash = true;
-                    } else {
-                        data.hashMode.pkNames = Lists
-                            .newArrayList(
-                                StringUtils
-                                    .split(
-                                        pkStr,
-                                        '^'));
-                    }
+    private static Map<String, List<PartitionData>>    partitionDatas    = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             pkHashConfigs -> {
+                                                                                 List<PartitionData> datas = Lists.newArrayList();
+
+                                                                                 String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
+                                                                                     ",",
+                                                                                     ";"),
+                                                                                     ";");
+                                                                                 // schema.table:id^name
+                                                                                 for (String pkHashConfig : pkHashConfigArray) {
+                                                                                     PartitionData data = new PartitionData();
+                                                                                     int i = pkHashConfig.lastIndexOf(":");
+                                                                                     if (i > 0) {
+                                                                                         String pkStr = pkHashConfig.substring(i + 1);
+                                                                                         if (pkStr.equalsIgnoreCase("$pk$")) {
+                                                                                             data.hashMode.autoPkHash = true;
+                                                                                         } else {
+                                                                                             data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
+                                                                                                 '^'));
+                                                                                         }
 
-                    pkHashConfig = pkHashConfig
-                        .substring(0, i);
-                } else {
-                    data.hashMode.tableHash = true;
-                }
+                                                                                         pkHashConfig = pkHashConfig.substring(0,
+                                                                                             i);
+                                                                                     } else {
+                                                                                         data.hashMode.tableHash = true;
+                                                                                     }
 
-                if (!isWildCard(
-                    pkHashConfig)) {
-                    data.simpleName = pkHashConfig;
-                } else {
-                    data.regexFilter = new AviaterRegexFilter(
-                        pkHashConfig);
-                }
-                datas.add(data);
-            }
+                                                                                     if (!isWildCard(pkHashConfig)) {
+                                                                                         data.simpleName = pkHashConfig;
+                                                                                     } else {
+                                                                                         data.regexFilter = new AviaterRegexFilter(pkHashConfig);
+                                                                                     }
+                                                                                     datas.add(data);
+                                                                                 }
 
-            return datas;
-        });
+                                                                                 return datas;
+                                                                             });
 
     @SuppressWarnings("deprecation")
-    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap
-        .makeComputingMap(new MapMaker().softValues(), new Function<String, List<DynamicTopicData>>() {
+    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
+                                                                             new Function<String, List<DynamicTopicData>>() {
 
                                                                                  public List<DynamicTopicData> apply(String pkHashConfigs) {
-                                                                                     List<DynamicTopicData> datas = Lists
-                                                                                         .newArrayList();
-                                                                                     String[] dynamicTopicArray = StringUtils
-                                                                                         .split(StringUtils.replace(
-                                                                                             pkHashConfigs,
-                                                                                             ",",
-                                                                                             ";"), ";");
+                                                                                     List<DynamicTopicData> datas = Lists.newArrayList();
+                                                                                     String[] dynamicTopicArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
+                                                                                         ",",
+                                                                                         ";"),
+                                                                                         ";");
                                                                                      // schema.table
                                                                                      for (String dynamicTopic : dynamicTopicArray) {
                                                                                          DynamicTopicData data = new DynamicTopicData();
 
-                                                                                         if (!isWildCard(
-                                                                                             dynamicTopic)) {
+                                                                                         if (!isWildCard(dynamicTopic)) {
                                                                                              data.simpleName = dynamicTopic;
                                                                                          } else {
-                                                                                             if (dynamicTopic
-                                                                                                 .contains("\\.")) {
-                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                             if (dynamicTopic.contains("\\.")) {
+                                                                                                 data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              } else {
-                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(
-                                                                                                     dynamicTopic);
+                                                                                                 data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic);
                                                                                              }
                                                                                          }
                                                                                          datas.add(data);
@@ -242,8 +232,8 @@ public class MQMessageUtils {
      * @return 分区message数组
      */
     @SuppressWarnings("unchecked")
-    public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum, String pkHashConfigs,
-                                             boolean databaseHash) {
+    public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum,
+                                             String pkHashConfigs, boolean databaseHash) {
         if (partitionsNum == null) {
             partitionsNum = 1;
         }
@@ -631,8 +621,8 @@ public class MQMessageUtils {
 
     private static boolean isWildCard(String value) {
         // not contaiins '.' ?
-        return StringUtils.containsAny(value,
-            new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$', '^' });
+        return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
+                '^' });
     }
 
     private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,

+ 1 - 1
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/spi/CanalMQProducer.java

@@ -3,8 +3,8 @@ package com.alibaba.otter.canal.connector.core.spi;
 import java.util.Properties;
 
 import com.alibaba.otter.canal.connector.core.config.MQProperties;
-import com.alibaba.otter.canal.connector.core.util.Callback;
 import com.alibaba.otter.canal.connector.core.producer.MQDestination;
+import com.alibaba.otter.canal.connector.core.util.Callback;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**

+ 5 - 1
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/spi/SPI.java

@@ -1,6 +1,10 @@
 package com.alibaba.otter.canal.connector.core.spi;
 
-import java.lang.annotation.*;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
 /**
  * SPI装载器注解

+ 4 - 4
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/spi/URLClassExtensionLoader.java

@@ -7,7 +7,8 @@ import java.util.Enumeration;
 import java.util.NoSuchElementException;
 
 public class URLClassExtensionLoader extends URLClassLoader {
-    public URLClassExtensionLoader(URL[] urls) {
+
+    public URLClassExtensionLoader(URL[] urls){
         super(urls);
     }
 
@@ -18,9 +19,8 @@ public class URLClassExtensionLoader extends URLClassLoader {
             return c;
         }
 
-        if (name.startsWith("java.") || name.startsWith("org.slf4j.")
-                || name.startsWith("org.apache.logging")
-                || name.startsWith("org.apache.commons.logging.")) {
+        if (name.startsWith("java.") || name.startsWith("org.slf4j.") || name.startsWith("org.apache.logging")
+            || name.startsWith("org.apache.commons.logging.")) {
             // || name.startsWith("org.apache.hadoop."))
             // {
             c = super.loadClass(name);

+ 1 - 0
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/Callback.java

@@ -7,6 +7,7 @@ package com.alibaba.otter.canal.connector.core.util;
  * @version 1.0.0
  */
 public interface Callback {
+
     void commit();
 
     void rollback();

+ 4 - 5
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/CanalMessageSerializerUtil.java

@@ -2,13 +2,13 @@ package com.alibaba.otter.canal.connector.core.util;
 
 import java.util.List;
 
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalPacket;
 import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
 import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.WireFormat;
@@ -59,12 +59,11 @@ public class CanalMessageSerializerUtil {
                         output.checkNoSpaceLeft();
                         return body;
                     } else if (!CollectionUtils.isEmpty(data.getEntries())) {
-                        // mq模式只会走到非rowEntry模式
+                        // mq模式只会走到非rawEntry模式
                         CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
                         for (CanalEntry.Entry entry : data.getEntries()) {
                             if (filterTransactionEntry
-                                && (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
-                                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND)) {
+                                && (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND)) {
                                 continue;
                             }
 
@@ -98,7 +97,7 @@ public class CanalMessageSerializerUtil {
                 switch (p.getType()) {
                     case MESSAGES: {
                         if (!p.getCompression().equals(CanalPacket.Compression.NONE)
-                                && !p.getCompression().equals(CanalPacket.Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
+                            && !p.getCompression().equals(CanalPacket.Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
                             throw new CanalClientException("compression is not supported in this connector");
                         }
 

+ 3 - 2
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/DateUtil.java

@@ -1,12 +1,13 @@
 package com.alibaba.otter.canal.connector.core.util;
 
+import java.util.Date;
+
 import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
-import java.util.Date;
-
 public class DateUtil {
+
     private static DateTimeZone dateTimeZone;
 
     static {

+ 8 - 3
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/JdbcTypeUtil.java

@@ -2,7 +2,12 @@ package com.alibaba.otter.canal.connector.core.util;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.sql.*;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,8 +77,8 @@ public class JdbcTypeUtil {
     }
 
     public static Object typeConvert(String tableName, String columnName, String value, int sqlType, String mysqlType) {
-        if (value == null || (value.equals("") && !(isText(mysqlType) || sqlType == Types.CHAR
-                                                    || sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR))) {
+        if (value == null
+            || (value.equals("") && !(isText(mysqlType) || sqlType == Types.CHAR || sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR))) {
             return null;
         }
 

+ 19 - 15
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/MessageUtil.java

@@ -1,6 +1,11 @@
 package com.alibaba.otter.canal.connector.core.util;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
 import com.alibaba.otter.canal.protocol.CanalEntry;
@@ -22,7 +27,7 @@ public class MessageUtil {
         List<CommonMessage> msgs = new ArrayList<>(entries.size());
         for (CanalEntry.Entry entry : entries) {
             if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
-                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                 continue;
             }
 
@@ -31,7 +36,7 @@ public class MessageUtil {
                 rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
             } catch (Exception e) {
                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
+                    e);
             }
 
             CanalEntry.EventType eventType = rowChange.getEventType();
@@ -55,7 +60,7 @@ public class MessageUtil {
                 int i = 0;
                 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                     if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
-                            && eventType != CanalEntry.EventType.DELETE) {
+                        && eventType != CanalEntry.EventType.DELETE) {
                         continue;
                     }
 
@@ -78,11 +83,11 @@ public class MessageUtil {
                             row.put(column.getName(), null);
                         } else {
                             row.put(column.getName(),
-                                    JdbcTypeUtil.typeConvert(msg.getTable(),
-                                            column.getName(),
-                                            column.getValue(),
-                                            column.getSqlType(),
-                                            column.getMysqlType()));
+                                JdbcTypeUtil.typeConvert(msg.getTable(),
+                                    column.getName(),
+                                    column.getValue(),
+                                    column.getSqlType(),
+                                    column.getMysqlType()));
                         }
                         // 获取update为true的字段
                         if (column.getUpdated()) {
@@ -100,12 +105,11 @@ public class MessageUtil {
                                 if (column.getIsNull()) {
                                     rowOld.put(column.getName(), null);
                                 } else {
-                                    rowOld.put(column.getName(),
-                                            JdbcTypeUtil.typeConvert(msg.getTable(),
-                                                    column.getName(),
-                                                    column.getValue(),
-                                                    column.getSqlType(),
-                                                    column.getMysqlType()));
+                                    rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(msg.getTable(),
+                                        column.getName(),
+                                        column.getValue(),
+                                        column.getSqlType(),
+                                        column.getMysqlType()));
                                 }
                             }
                         }

+ 2 - 2
connector/core/src/main/java/com/alibaba/otter/canal/connector/core/util/TimeZone.java

@@ -1,8 +1,8 @@
 package com.alibaba.otter.canal.connector.core.util;
 
-
 public class TimeZone {
-    public final static String LOCATION_TIME_ZONE;    // 当前时区
+
+    public final static String LOCATION_TIME_ZONE; // 当前时区
 
     static {
         java.util.TimeZone localTimeZone = java.util.TimeZone.getDefault();

+ 1 - 1
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/config/KafkaProducerConfig.java

@@ -15,7 +15,7 @@ public class KafkaProducerConfig extends MQProperties {
 
     private Map<String, Object> kafkaProperties = new LinkedHashMap<>();
 
-    private boolean             kerberosEnabled         = false;
+    private boolean             kerberosEnabled = false;
     private String              krb5File;
     private String              jaasFile;
 

+ 11 - 8
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/consumer/CanalKafkaConsumer.java

@@ -1,20 +1,25 @@
 package com.alibaba.otter.canal.connector.kafka.consumer;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.otter.canal.connector.core.config.CanalConstants;
-import com.alibaba.otter.canal.connector.core.spi.SPI;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.connector.core.config.CanalConstants;
 import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
 import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
+import com.alibaba.otter.canal.connector.core.spi.SPI;
 import com.alibaba.otter.canal.connector.core.util.MessageUtil;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -72,8 +77,7 @@ public class CanalKafkaConsumer implements CanalMsgConsumer {
     @Override
     public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
         if (!flatMessage) {
-            ConsumerRecords<String, Message> records = (ConsumerRecords<String, Message>) kafkaConsumer
-                .poll(unit.toMillis(timeout));
+            ConsumerRecords<String, Message> records = (ConsumerRecords<String, Message>) kafkaConsumer.poll(unit.toMillis(timeout));
             if (!records.isEmpty()) {
                 currentOffsets.clear();
                 List<CommonMessage> messages = new ArrayList<>();
@@ -86,8 +90,7 @@ public class CanalKafkaConsumer implements CanalMsgConsumer {
                 return messages;
             }
         } else {
-            ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) kafkaConsumer
-                .poll(unit.toMillis(timeout));
+            ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) kafkaConsumer.poll(unit.toMillis(timeout));
 
             if (!records.isEmpty()) {
                 List<CommonMessage> messages = new ArrayList<>();

+ 1 - 1
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/consumer/KafkaMessageDeserializer.java

@@ -2,9 +2,9 @@ package com.alibaba.otter.canal.connector.kafka.consumer;
 
 import java.util.Map;
 
-import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
 import org.apache.kafka.common.serialization.Deserializer;
 
+import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**

+ 13 - 13
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java

@@ -1,12 +1,14 @@
 package com.alibaba.otter.canal.connector.kafka.producer;
 
 import java.io.File;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import com.alibaba.otter.canal.connector.kafka.config.KafkaConstants;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -19,13 +21,14 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
 import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
-import com.alibaba.otter.canal.connector.core.util.Callback;
 import com.alibaba.otter.canal.connector.core.producer.MQDestination;
 import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
 import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils.EntryRowData;
 import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
 import com.alibaba.otter.canal.connector.core.spi.SPI;
+import com.alibaba.otter.canal.connector.core.util.Callback;
 import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
+import com.alibaba.otter.canal.connector.kafka.config.KafkaConstants;
 import com.alibaba.otter.canal.connector.kafka.config.KafkaProducerConfig;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
@@ -126,8 +129,9 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
             List result;
             if (!StringUtils.isEmpty(mqDestination.getDynamicTopic())) {
                 // 动态topic路由计算,只是基于schema/table,不涉及proto数据反序列化
-                Map<String, Message> messageMap = MQMessageUtils
-                    .messageTopics(message, mqDestination.getTopic(), mqDestination.getDynamicTopic());
+                Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
+                    mqDestination.getTopic(),
+                    mqDestination.getDynamicTopic());
 
                 // 针对不同的topic,引入多线程提升效率
                 for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
@@ -222,18 +226,14 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
                     for (int i = 0; i < length; i++) {
                         FlatMessage flatMessagePart = partitionFlatMessage[i];
                         if (flatMessagePart != null) {
-                            records.add(new ProducerRecord<>(topicName,
-                                i,
-                                null,
-                                JSON.toJSONBytes(flatMessagePart, SerializerFeature.WriteMapNullValue)));
+                            records.add(new ProducerRecord<>(topicName, i, null, JSON.toJSONBytes(flatMessagePart,
+                                SerializerFeature.WriteMapNullValue)));
                         }
                     }
                 } else {
                     final int partition = mqDestination.getPartition() != null ? mqDestination.getPartition() : 0;
-                    records.add(new ProducerRecord<>(topicName,
-                        partition,
-                        null,
-                        JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)));
+                    records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage,
+                        SerializerFeature.WriteMapNullValue)));
                 }
             }
         }

+ 2 - 2
connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/KafkaMessageSerializer.java

@@ -1,9 +1,9 @@
 package com.alibaba.otter.canal.connector.kafka.producer;
 
-import org.apache.kafka.common.serialization.Serializer;
-
 import java.util.Map;
 
+import org.apache.kafka.common.serialization.Serializer;
+
 /**
  * Kafka Message类的序列化
  *

+ 3 - 2
connector/kafka-connector/src/test/java/com/alibaba/otter/canal/connector/kafka/test/CanalKafkaProducerTest.java

@@ -20,8 +20,9 @@ public class CanalKafkaProducerTest {
         pro.load(in);
 
         ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
-        CanalMQProducer canalMQProducer = loader
-            .getExtension("kafka", "/../../deployer/target/canal/plugin", "/../../deployer/target/canal/plugin");
+        CanalMQProducer canalMQProducer = loader.getExtension("kafka",
+            "/../../deployer/target/canal/plugin",
+            "/../../deployer/target/canal/plugin");
         canalMQProducer.init(pro);
 
         in.close();

+ 10 - 5
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/consumer/CanalRabbitMQConsumer.java

@@ -8,7 +8,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import com.alibaba.otter.canal.connector.core.spi.SPI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -16,6 +15,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.otter.canal.connector.core.config.CanalConstants;
 import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
 import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
+import com.alibaba.otter.canal.connector.core.spi.SPI;
 import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
 import com.alibaba.otter.canal.connector.core.util.MessageUtil;
 import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
@@ -23,7 +23,13 @@ import com.alibaba.otter.canal.connector.rabbitmq.producer.AliyunCredentialsProv
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.google.common.collect.Lists;
-import com.rabbitmq.client.*;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
 
 /**
  * RabbitMQ consumer SPI 实现
@@ -34,8 +40,7 @@ import com.rabbitmq.client.*;
 @SPI("rabbitmq")
 public class CanalRabbitMQConsumer implements CanalMsgConsumer {
 
-    private static final Logger                                logger              = LoggerFactory
-        .getLogger(CanalRabbitMQConsumer.class);
+    private static final Logger                                logger              = LoggerFactory.getLogger(CanalRabbitMQConsumer.class);
 
     // 链接地址
     private String                                             nameServer;
@@ -131,7 +136,7 @@ public class CanalRabbitMQConsumer implements CanalMsgConsumer {
             CommonMessage commonMessage = JSON.parseObject(messageData, CommonMessage.class);
             messageList.add(commonMessage);
         }
-        ConsumerBatchMessage<CommonMessage>  batchMessage = new ConsumerBatchMessage<>(messageList);
+        ConsumerBatchMessage<CommonMessage> batchMessage = new ConsumerBatchMessage<>(messageList);
         try {
             messageBlockingQueue.put(batchMessage);
         } catch (InterruptedException e) {

+ 2 - 2
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/consumer/ConsumerBatchMessage.java

@@ -7,8 +7,8 @@ import java.util.concurrent.TimeUnit;
 public class ConsumerBatchMessage<T> {
 
     private final List<T>  data;
-    private CountDownLatch         latch;
-    private boolean                hasFailure = false;
+    private CountDownLatch latch;
+    private boolean        hasFailure = false;
 
     public ConsumerBatchMessage(List<T> data){
         this.data = data;

+ 1 - 2
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/AliyunCredentialsProvider.java

@@ -7,8 +7,7 @@ import com.alibaba.mq.amqp.utils.UserUtils;
 import com.rabbitmq.client.impl.CredentialsProvider;
 
 /**
- * aliyun amqp协议 账号类
- * 暂不支持STS授权情况
+ * aliyun amqp协议 账号类 暂不支持STS授权情况
  *
  * @version 1.0.0
  */

+ 4 - 3
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -15,11 +15,11 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalException;
 import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
 import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
-import com.alibaba.otter.canal.connector.core.util.Callback;
 import com.alibaba.otter.canal.connector.core.producer.MQDestination;
 import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
 import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
 import com.alibaba.otter.canal.connector.core.spi.SPI;
+import com.alibaba.otter.canal.connector.core.util.Callback;
 import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
 import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
 import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig;
@@ -102,8 +102,9 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         try {
             if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
                 // 动态topic
-                Map<String, Message> messageMap = MQMessageUtils
-                    .messageTopics(message, destination.getTopic(), destination.getDynamicTopic());
+                Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
+                    destination.getTopic(),
+                    destination.getDynamicTopic());
 
                 for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
                     final String topicName = entry.getKey().replace('.', '_');

+ 1 - 2
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/consumer/CanalRocketMQConsumer.java

@@ -41,8 +41,7 @@ import com.google.common.collect.Lists;
 @SPI("rocketmq")
 public class CanalRocketMQConsumer implements CanalMsgConsumer {
 
-    private static final Logger                                logger               = LoggerFactory
-        .getLogger(CanalRocketMQConsumer.class);
+    private static final Logger                                logger               = LoggerFactory.getLogger(CanalRocketMQConsumer.class);
     private static final String                                CLOUD_ACCESS_CHANNEL = "cloud";
 
     private String                                             nameServer;

+ 2 - 2
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/consumer/ConsumerBatchMessage.java

@@ -7,8 +7,8 @@ import java.util.concurrent.TimeUnit;
 public class ConsumerBatchMessage<T> {
 
     private final List<T>  data;
-    private CountDownLatch         latch;
-    private boolean                hasFailure = false;
+    private CountDownLatch latch;
+    private boolean        hasFailure = false;
 
     public ConsumerBatchMessage(List<T> data){
         this.data = data;

+ 14 - 16
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -26,11 +26,11 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalException;
 import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
 import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
-import com.alibaba.otter.canal.connector.core.util.Callback;
 import com.alibaba.otter.canal.connector.core.producer.MQDestination;
 import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
 import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
 import com.alibaba.otter.canal.connector.core.spi.SPI;
+import com.alibaba.otter.canal.connector.core.util.Callback;
 import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
 import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQConstants;
 import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQProducerConfig;
@@ -58,8 +58,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         loadRocketMQProperties(properties);
 
         RPCHook rpcHook = null;
-        if (mqProperties.getAliyunAccessKey().length() > 0
-            && mqProperties.getAliyunSecretKey().length() > 0) {
+        if (mqProperties.getAliyunAccessKey().length() > 0 && mqProperties.getAliyunSecretKey().length() > 0) {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(mqProperties.getAliyunAccessKey());
             sessionCredentials.setSecretKey(mqProperties.getAliyunSecretKey());
@@ -126,8 +125,9 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         try {
             if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
                 // 动态topic
-                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
-                    .messageTopics(message, destination.getTopic(), destination.getDynamicTopic());
+                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils.messageTopics(message,
+                    destination.getTopic(),
+                    destination.getDynamicTopic());
 
                 for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
                     String topicName = entry.getKey().replace('.', '_');
@@ -155,8 +155,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         }
     }
 
-    public void send(final MQDestination destination, String topicName,
-                     com.alibaba.otter.canal.protocol.Message message) {
+    public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
         if (!mqProperties.isFlatMessage()) {
             if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                 // 并发构造
@@ -175,9 +174,8 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                     if (dataPartition != null) {
                         final int index = i;
                         template.submit(() -> {
-                            Message data = new Message(topicName,
-                                CanalMessageSerializerUtil.serializer(dataPartition,
-                                    mqProperties.isFilterTransactionEntry()));
+                            Message data = new Message(topicName, CanalMessageSerializerUtil.serializer(dataPartition,
+                                mqProperties.isFilterTransactionEntry()));
                             sendMessage(data, index);
                         });
                     }
@@ -186,8 +184,8 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                 template.waitForResult();
             } else {
                 final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
-                Message data = new Message(topicName,
-                    CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
+                Message data = new Message(topicName, CanalMessageSerializerUtil.serializer(message,
+                    mqProperties.isFilterTransactionEntry()));
                 sendMessage(data, partition);
             }
         } else {
@@ -220,8 +218,8 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                         final int index = i;
                         template.submit(() -> {
                             List<Message> messages = flatMessagePart.stream()
-                                .map(flatMessage -> new Message(topicName,
-                                    JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
+                                .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
+                                    SerializerFeature.WriteMapNullValue)))
                                 .collect(Collectors.toList());
                             // 批量发送
                             sendMessage(messages, index);
@@ -234,8 +232,8 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
             } else {
                 final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                 List<Message> messages = flatMessages.stream()
-                    .map(flatMessage -> new Message(topicName,
-                        JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
+                    .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
+                        SerializerFeature.WriteMapNullValue)))
                     .collect(Collectors.toList());
                 // 批量发送
                 sendMessage(messages, partition);

+ 6 - 7
connector/tcp-connector/src/main/java/com/alibaba/otter/canal/connector/tcp/consumer/CanalTCPConsumer.java

@@ -1,5 +1,11 @@
 package com.alibaba.otter.canal.connector.tcp.consumer;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy;
@@ -12,13 +18,6 @@ import com.alibaba.otter.canal.connector.core.util.MessageUtil;
 import com.alibaba.otter.canal.connector.tcp.config.TCPConstants;
 import com.alibaba.otter.canal.protocol.Message;
 
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
 /**
  * TCP 消费者连接器, 一个destination对应一个SPI实例
  *