Sfoglia il codice sorgente

建议增加一个属性,针对根据某一字段hash时,hash算法可以不根据database算出 #2248 (#2294)

shicongyang 5 anni fa
parent
commit
65b5325ca6

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -75,6 +75,7 @@ public class CanalConstants {
     public static final String CANAL_MQ_VHOST                       = ROOT + "." + "mq.vhost";
     public static final String CANAL_MQ_ALIYUN_UID                  = ROOT + "." + "mq.aliyunuid";
     public static final String CANAL_MQ_EXCHANGE                    = ROOT + "." + "mq.exchange";
+    public static final String CANAL_MQ_DATABASE_HASH               = ROOT + "." + "mq.database.hash";
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 5 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStarter.java

@@ -298,6 +298,11 @@ public class CanalStarter {
             mqProperties.setExchange(exchange);
         }
 
+        String databaseHash = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_DATABASE_HASH);
+        if (!StringUtils.isEmpty(databaseHash)){
+            mqProperties.setDatabaseHash(Boolean.valueOf(databaseHash));
+        }
+
         for (Object key : properties.keySet()) {
             key = StringUtils.trim(key.toString());
             if (((String) key).startsWith(CanalConstants.CANAL_MQ_PROPERTIES)) {

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -137,6 +137,7 @@ canal.mq.username=
 canal.mq.password=
 canal.mq.aliyunuid=
 
+canal.mq.database.hash = true
 ##################################################
 #########     Kafka Kerberos Info    #############
 ##################################################

+ 12 - 4
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -234,10 +234,11 @@ public class MQMessageUtils {
      *
      * @param partitionsNum 分区数
      * @param pkHashConfigs 分区库表主键正则表达式
+     * @param databaseHash  是否取消根据database进行hash
      * @return 分区message数组
      */
     @SuppressWarnings("unchecked")
-    public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum, String pkHashConfigs) {
+    public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum, String pkHashConfigs,boolean databaseHash) {
         if (partitionsNum == null) {
             partitionsNum = 1;
         }
@@ -279,7 +280,10 @@ public class MQMessageUtils {
                         RowChange.Builder rowChangeBuilder = RowChange.newBuilder(rowChange);
 
                         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-                            int hashCode = database.hashCode();
+                            int hashCode = 0;
+                            if(databaseHash){
+                                hashCode = database.hashCode();
+                            }
                             CanalEntry.EventType eventType = rowChange.getEventType();
                             List<CanalEntry.Column> columns = null;
                             if (eventType == CanalEntry.EventType.DELETE) {
@@ -444,9 +448,10 @@ public class MQMessageUtils {
      * @param flatMessage flatMessage
      * @param partitionsNum 分区数量
      * @param pkHashConfigs hash映射
+     * @param databaseHash  是否取消根据database进行hash
      * @return 拆分后的flatMessage数组
      */
-    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs) {
+    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs,boolean databaseHash) {
         if (partitionsNum == null) {
             partitionsNum = 1;
         }
@@ -476,7 +481,10 @@ public class MQMessageUtils {
 
                     int idx = 0;
                     for (Map<String, String> row : flatMessage.getData()) {
-                        int hashCode = database.hashCode();
+                        int hashCode = 0;
+                        if(databaseHash){
+                            hashCode = database.hashCode();
+                        }
                         if (pkNames != null) {
                             for (String pkName : pkNames) {
                                 String value = row.get(pkName);

+ 13 - 1
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -48,6 +48,9 @@ public class MQProperties {
     private String     exchange               = "";
     // 消息发送的并行度
     private int        parallelThreadSize     = 8;
+    //是否取消根据database进行hash
+    private boolean    databaseHash           = true;
+
 
     public static class CanalDestination {
 
@@ -339,6 +342,13 @@ public class MQProperties {
         this.parallelThreadSize = parallelThreadSize;
     }
 
+    public boolean getDatabaseHash() {
+        return databaseHash;
+    }
+
+    public void setDatabaseHash(boolean databaseHash) {
+        this.databaseHash = databaseHash;
+    }
     @Override
     public String toString() {
         return "MQProperties [servers=" + servers + ", retries=" + retries + ", batchSize=" + batchSize + ", lingerMs="
@@ -352,7 +362,9 @@ public class MQProperties {
                + kerberosEnable + ", kerberosKrb5FilePath=" + kerberosKrb5FilePath + ", kerberosJaasFilePath="
                + kerberosJaasFilePath + ", username=" + username + ", password=" + password + ", vhost=" + vhost
                + ", aliyunUID=" + aliyunUID + ", exchange=" + exchange + ", parallelThreadSize=" + parallelThreadSize
-               + "]";
+               + ",databaseHash=" + databaseHash +"]";
     }
 
+
+
 }

+ 7 - 2
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -42,11 +42,14 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
     private Producer<String, byte[]> producer;
     private MQProperties             kafkaProperties;
 
+    private boolean databaseHash;
+
     @Override
     public void init(MQProperties kafkaProperties) {
         super.init(kafkaProperties);
 
         this.kafkaProperties = kafkaProperties;
+        databaseHash = kafkaProperties.getDatabaseHash();
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("acks", kafkaProperties.getAcks());
@@ -172,7 +175,8 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
                 Message[] messages = MQMessageUtils.messagePartition(datas,
                     message.getId(),
                     canalDestination.getPartitionsNum(),
-                    canalDestination.getPartitionHash());
+                    canalDestination.getPartitionHash(),
+                    databaseHash);
                 int length = messages.length;
                 for (int i = 0; i < length; i++) {
                     Message messagePartition = messages[i];
@@ -201,7 +205,8 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
                 if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                     FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                         canalDestination.getPartitionsNum(),
-                        canalDestination.getPartitionHash());
+                        canalDestination.getPartitionHash(),
+                        databaseHash);
                     int length = partitionFlatMessage.length;
                     for (int i = 0; i < length; i++) {
                         FlatMessage flatMessagePart = partitionFlatMessage[i];

+ 5 - 2
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -40,10 +40,13 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
     private MQProperties        mqProperties;
     private static final String CLOUD_ACCESS_CHANNEL = "cloud";
 
+    private boolean databaseHash;
+
     @Override
     public void init(MQProperties rocketMQProperties) {
         super.init(rocketMQProperties);
         this.mqProperties = rocketMQProperties;
+        this.databaseHash = rocketMQProperties.getDatabaseHash();
         RPCHook rpcHook = null;
         if (rocketMQProperties.getAliyunAccessKey().length() > 0
             && rocketMQProperties.getAliyunSecretKey().length() > 0) {
@@ -125,7 +128,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                 com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
                     message.getId(),
                     destination.getPartitionsNum(),
-                    destination.getPartitionHash());
+                    destination.getPartitionHash(),databaseHash);
                 int length = messages.length;
 
                 ExecutorTemplate template = new ExecutorTemplate(executor);
@@ -168,7 +171,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                     for (FlatMessage flatMessage : flatMessages) {
                         FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                             destination.getPartitionsNum(),
-                            destination.getPartitionHash());
+                            destination.getPartitionHash(),databaseHash);
                         int length = partitionFlatMessage.length;
                         for (int i = 0; i < length; i++) {
                             partitionFlatMessages.get(i).add(partitionFlatMessage[i]);