Bläddra i källkod

Merge pull request #958 from rewerma/master

kafka生产端增加按pk hash到对应partition功能
agapple 6 år sedan
förälder
incheckning
05718a9f20

+ 7 - 3
deployer/src/main/resources/kafka.yml

@@ -11,8 +11,12 @@ canalGetTimeout: 100
 flatMessage: true
 flatMessage: true
 
 
 canalDestinations:
 canalDestinations:
-  - canalDestination: example
-    topic: example
-    partition:
+- canalDestination: example
+  topic: exp3
+#  #对应topic分区数量
+#  partitionsNum: 3
+#  partitionHash:
+#    #库名.表名: 唯一主键
+#    mytest.person: id
 
 
 
 

+ 22 - 11
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,14 +1,8 @@
 package com.alibaba.otter.canal.protocol;
 package com.alibaba.otter.canal.protocol;
 
 
 import java.io.Serializable;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Table;
+import java.util.*;
+
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
 
 
 /**
 /**
@@ -31,7 +25,7 @@ public class FlatMessage implements Serializable {
     private List<Map<String, String>> data;
     private List<Map<String, String>> data;
     private List<Map<String, String>> old;
     private List<Map<String, String>> old;
 
 
-    public FlatMessage() {
+    public FlatMessage(){
     }
     }
 
 
     public FlatMessage(long id){
     public FlatMessage(long id){
@@ -126,6 +120,12 @@ public class FlatMessage implements Serializable {
         this.old = old;
         this.old = old;
     }
     }
 
 
+    /**
+     * 将Message转换为FlatMessage
+     * 
+     * @param message 原生message
+     * @return FlatMessage列表
+     */
     public static List<FlatMessage> messageConverter(Message message) {
     public static List<FlatMessage> messageConverter(Message message) {
         try {
         try {
             if (message == null) {
             if (message == null) {
@@ -231,11 +231,22 @@ public class FlatMessage implements Serializable {
         }
         }
     }
     }
 
 
+    /**
+     * 将FlatMessage按指定的字段值hash拆分
+     * 
+     * @param flatMessage flatMessage
+     * @param partitionsNum 分区数量
+     * @param pkHashConfig hash映射
+     * @return 拆分后的flatMessage数组
+     */
     public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
     public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
-                                                 Table<String, String, String> pkHashConfig) {
+                                                 Map<String, String> pkHashConfig) {
+        if (partitionsNum == null) {
+            partitionsNum = 1;
+        }
         FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
         FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
 
 
-        String pk = pkHashConfig.get(flatMessage.getDatabase(), flatMessage.getTable());
+        String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
         if (pk == null || flatMessage.getIsDdl()) {
         if (pk == null || flatMessage.getIsDdl()) {
             partitionMessages[0] = flatMessage;
             partitionMessages[0] = flatMessage;
         } else {
         } else {

+ 40 - 8
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -67,15 +67,18 @@ public class CanalKafkaProducer {
         }
         }
     }
     }
 
 
-    public void send(KafkaProperties.Topic topic, Message message, Callback callback) {
+    public void send(KafkaProperties.CanalDestination canalDestination, Message message, Callback callback) {
         try {
         try {
             // producer.beginTransaction();
             // producer.beginTransaction();
             if (!kafkaProperties.getFlatMessage()) {
             if (!kafkaProperties.getFlatMessage()) {
                 ProducerRecord<String, Message> record;
                 ProducerRecord<String, Message> record;
-                if (topic.getPartition() != null) {
-                    record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
+                if (canalDestination.getPartition() != null) {
+                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
+                        canalDestination.getPartition(),
+                        null,
+                        message);
                 } else {
                 } else {
-                    record = new ProducerRecord<String, Message>(topic.getTopic(), message);
+                    record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
                 }
                 }
 
 
                 producer.send(record);
                 producer.send(record);
@@ -84,9 +87,38 @@ public class CanalKafkaProducer {
                 List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
                 List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
                 if (flatMessages != null) {
                 if (flatMessages != null) {
                     for (FlatMessage flatMessage : flatMessages) {
                     for (FlatMessage flatMessage : flatMessages) {
-                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic.getTopic(),
-                            JSON.toJSONString(flatMessage));
-                        producer2.send(record);
+                        if (canalDestination.getPartition() != null) {
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination
+                                .getTopic(), canalDestination.getPartition(), null, JSON.toJSONString(flatMessage));
+                            producer2.send(record);
+                        } else {
+                            if (canalDestination.getPartitionHash() != null
+                                && !canalDestination.getPartitionHash().isEmpty()) {
+                                FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                                    canalDestination.getPartitionsNum(),
+                                    canalDestination.getPartitionHash());
+                                int length = partitionFlatMessage.length;
+                                for (int i = 0; i < length; i++) {
+                                    FlatMessage flatMessagePart = partitionFlatMessage[i];
+                                    if (flatMessagePart != null) {
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                                canalDestination.getTopic(),
+                                                i,
+                                                null,
+                                                JSON.toJSONString(flatMessagePart));
+                                        producer2.send(record);
+                                    }
+                                }
+                            } else {
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                    canalDestination.getTopic(),
+                                    0,
+                                    null,
+                                    JSON.toJSONString(flatMessage));
+                                producer2.send(record);
+                            }
+                        }
+
                     }
                     }
                 }
                 }
             }
             }
@@ -94,7 +126,7 @@ public class CanalKafkaProducer {
             // producer.commitTransaction();
             // producer.commitTransaction();
             callback.commit();
             callback.commit();
             if (logger.isDebugEnabled()) {
             if (logger.isDebugEnabled()) {
-                logger.debug("send message to kafka topic: {}", topic.getTopic());
+                logger.debug("send message to kafka topic: {}", canalDestination.getTopic());
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             logger.error(e.getMessage(), e);

+ 1 - 5
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaStarter.java

@@ -12,7 +12,6 @@ import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.Yaml;
 
 
 import com.alibaba.otter.canal.kafka.KafkaProperties.CanalDestination;
 import com.alibaba.otter.canal.kafka.KafkaProperties.CanalDestination;
-import com.alibaba.otter.canal.kafka.KafkaProperties.Topic;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.server.CanalServerStarter;
 import com.alibaba.otter.canal.server.CanalServerStarter;
@@ -132,10 +131,7 @@ public class CanalKafkaStarter implements CanalServerStarter {
                     try {
                     try {
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         if (batchId != -1 && size != 0) {
                         if (batchId != -1 && size != 0) {
-                            Topic topic = new Topic();
-                            topic.setTopic(destination.getTopic());
-                            topic.setPartition(destination.getPartition());
-                            canalKafkaProducer.send(topic, message, new CanalKafkaProducer.Callback() {
+                            canalKafkaProducer.send(destination, message, new CanalKafkaProducer.Callback() {
 
 
                                 @Override
                                 @Override
                                 public void commit() {
                                 public void commit() {

+ 14 - 46
server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java

@@ -1,9 +1,8 @@
 package com.alibaba.otter.canal.kafka;
 package com.alibaba.otter.canal.kafka;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 
 
 /**
 /**
  * kafka 配置项
  * kafka 配置项
@@ -27,10 +26,11 @@ public class KafkaProperties {
 
 
     public static class CanalDestination {
     public static class CanalDestination {
 
 
-        private String     canalDestination;
-        private String     topic;
-        private Integer    partition;
-        private Set<Topic> topics = new HashSet<Topic>();
+        private String              canalDestination;
+        private String              topic;
+        private Integer             partition;
+        private Integer             partitionsNum;
+        private Map<String, String> partitionHash;
 
 
         public String getCanalDestination() {
         public String getCanalDestination() {
             return canalDestination;
             return canalDestination;
@@ -56,52 +56,20 @@ public class KafkaProperties {
             this.partition = partition;
             this.partition = partition;
         }
         }
 
 
-        public Set<Topic> getTopics() {
-            return topics;
+        public Integer getPartitionsNum() {
+            return partitionsNum;
         }
         }
 
 
-        public void setTopics(Set<Topic> topics) {
-            this.topics = topics;
+        public void setPartitionsNum(Integer partitionsNum) {
+            this.partitionsNum = partitionsNum;
         }
         }
-    }
-
-    public static class Topic {
-
-        private String  topic;
-        private Integer partition;
-
-        public String getTopic() {
-            return topic;
-        }
-
-        public void setTopic(String topic) {
-            this.topic = topic;
-        }
-
-        public Integer getPartition() {
-            return partition;
-        }
-
-        public void setPartition(Integer partition) {
-            this.partition = partition;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            Topic topic1 = (Topic) o;
 
 
-            if (topic != null ? !topic.equals(topic1.topic) : topic1.topic != null) return false;
-            return partition != null ? partition.equals(topic1.partition) : topic1.partition == null;
+        public Map<String, String> getPartitionHash() {
+            return partitionHash;
         }
         }
 
 
-        @Override
-        public int hashCode() {
-            int result = topic != null ? topic.hashCode() : 0;
-            result = 31 * result + (partition != null ? partition.hashCode() : 0);
-            return result;
+        public void setPartitionHash(Map<String, String> partitionHash) {
+            this.partitionHash = partitionHash;
         }
         }
     }
     }