1
0
agapple 5 жил өмнө
parent
commit
dd7554374f

+ 8 - 6
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -234,11 +234,12 @@ public class MQMessageUtils {
      *
      * @param partitionsNum 分区数
      * @param pkHashConfigs 分区库表主键正则表达式
-     * @param databaseHash  是否取消根据database进行hash
+     * @param databaseHash 是否取消根据database进行hash
      * @return 分区message数组
      */
     @SuppressWarnings("unchecked")
-    public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum, String pkHashConfigs,boolean databaseHash) {
+    public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum,
+                                             String pkHashConfigs, boolean databaseHash) {
         if (partitionsNum == null) {
             partitionsNum = 1;
         }
@@ -281,7 +282,7 @@ public class MQMessageUtils {
 
                         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                             int hashCode = 0;
-                            if(databaseHash){
+                            if (databaseHash) {
                                 hashCode = database.hashCode();
                             }
                             CanalEntry.EventType eventType = rowChange.getEventType();
@@ -448,10 +449,11 @@ public class MQMessageUtils {
      * @param flatMessage flatMessage
      * @param partitionsNum 分区数量
      * @param pkHashConfigs hash映射
-     * @param databaseHash  是否取消根据database进行hash
+     * @param databaseHash 是否取消根据database进行hash
      * @return 拆分后的flatMessage数组
      */
-    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs,boolean databaseHash) {
+    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs,
+                                                 boolean databaseHash) {
         if (partitionsNum == null) {
             partitionsNum = 1;
         }
@@ -482,7 +484,7 @@ public class MQMessageUtils {
                     int idx = 0;
                     for (Map<String, String> row : flatMessage.getData()) {
                         int hashCode = 0;
-                        if(databaseHash){
+                        if (databaseHash) {
                             hashCode = database.hashCode();
                         }
                         if (pkNames != null) {

+ 3 - 5
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -48,10 +48,9 @@ public class MQProperties {
     private String     exchange               = "";
     // 消息发送的并行度
     private int        parallelThreadSize     = 8;
-    //是否取消根据database进行hash
+    // 是否取消根据database进行hash
     private boolean    databaseHash           = true;
 
-
     public static class CanalDestination {
 
         private String  canalDestination;
@@ -349,6 +348,7 @@ public class MQProperties {
     public void setDatabaseHash(boolean databaseHash) {
         this.databaseHash = databaseHash;
     }
+
     @Override
     public String toString() {
         return "MQProperties [servers=" + servers + ", retries=" + retries + ", batchSize=" + batchSize + ", lingerMs="
@@ -362,9 +362,7 @@ public class MQProperties {
                + kerberosEnable + ", kerberosKrb5FilePath=" + kerberosKrb5FilePath + ", kerberosJaasFilePath="
                + kerberosJaasFilePath + ", username=" + username + ", password=" + password + ", vhost=" + vhost
                + ", aliyunUID=" + aliyunUID + ", exchange=" + exchange + ", parallelThreadSize=" + parallelThreadSize
-               + ",databaseHash=" + databaseHash +"]";
+               + ",databaseHash=" + databaseHash + "]";
     }
 
-
-
 }

+ 1 - 1
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -42,7 +42,7 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
     private Producer<String, byte[]> producer;
     private MQProperties             kafkaProperties;
 
-    private boolean databaseHash;
+    private boolean                  databaseHash;
 
     @Override
     public void init(MQProperties kafkaProperties) {

+ 2 - 7
server/src/main/java/com/alibaba/otter/canal/rabbitmq/AliyunCredentialsProvider.java

@@ -10,10 +10,8 @@ import java.security.NoSuchAlgorithmException;
 import com.alibaba.mq.amqp.utils.UserUtils;
 import com.rabbitmq.client.impl.CredentialsProvider;
 
-
 public class AliyunCredentialsProvider implements CredentialsProvider {
 
-
     /**
      * Access Key ID
      */
@@ -27,16 +25,14 @@ public class AliyunCredentialsProvider implements CredentialsProvider {
     /**
      * 资源主账号ID
      */
-    private final long resourceOwnerId;
-
+    private final long   resourceOwnerId;
 
-    public AliyunCredentialsProvider(final String accessKey, final String accessSecret, final long resourceOwnerId) {
+    public AliyunCredentialsProvider(final String accessKey, final String accessSecret, final long resourceOwnerId){
         this.AliyunAccessKey = accessKey;
         this.AliyunAccessSecret = accessSecret;
         this.resourceOwnerId = resourceOwnerId;
     }
 
-
     @Override
     public String getUsername() {
         return UserUtils.getUserName(AliyunAccessKey, resourceOwnerId);
@@ -51,5 +47,4 @@ public class AliyunCredentialsProvider implements CredentialsProvider {
         return null;
     }
 
-
 }

+ 5 - 3
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -40,7 +40,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
     private MQProperties        mqProperties;
     private static final String CLOUD_ACCESS_CHANNEL = "cloud";
 
-    private boolean databaseHash;
+    private boolean             databaseHash;
 
     @Override
     public void init(MQProperties rocketMQProperties) {
@@ -128,7 +128,8 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                 com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
                     message.getId(),
                     destination.getPartitionsNum(),
-                    destination.getPartitionHash(),databaseHash);
+                    destination.getPartitionHash(),
+                    databaseHash);
                 int length = messages.length;
 
                 ExecutorTemplate template = new ExecutorTemplate(executor);
@@ -171,7 +172,8 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                     for (FlatMessage flatMessage : flatMessages) {
                         FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                             destination.getPartitionsNum(),
-                            destination.getPartitionHash(),databaseHash);
+                            destination.getPartitionHash(),
+                            databaseHash);
                         int length = partitionFlatMessage.length;
                         for (int i = 0; i < length; i++) {
                             partitionFlatMessages.get(i).add(partitionFlatMessage[i]);