浏览代码

kafka producer 修改配置,一个destination可以对应多个kafka topic

rewerma 7 年之前
父节点
当前提交
efb7b32694

+ 22 - 17
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaStarter.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.kafka.producer;
 
 import com.alibaba.otter.canal.kafka.CanalServerStarter;
+import com.alibaba.otter.canal.kafka.producer.KafkaProperties.CanalDestination;
 import com.alibaba.otter.canal.kafka.producer.KafkaProperties.Topic;
 import com.alibaba.otter.canal.protocol.ClientIdentity;
 import com.alibaba.otter.canal.protocol.Message;
@@ -31,13 +32,15 @@ public class CanalKafkaStarter {
 
     private static CanalKafkaProducer canalKafkaProducer;
 
+    private static KafkaProperties kafkaProperties;
+
     public static void init() {
         try {
 
             logger.info("## load kafka configurations");
             String conf = System.getProperty("kafka.conf", "classpath:kafka.yml");
 
-            KafkaProperties kafkaProperties;
+
             if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                 kafkaProperties = new Yaml().loadAs(CanalKafkaStarter.class.getClassLoader().getResourceAsStream(conf), KafkaProperties.class);
@@ -50,16 +53,16 @@ public class CanalKafkaStarter {
             canalKafkaProducer.init(kafkaProperties);
 
             //对应每个instance启动一个worker线程
-            List<Topic> topics = kafkaProperties.getTopics();
+            List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();
 
-            executorService = Executors.newFixedThreadPool(topics.size());
+            executorService = Executors.newFixedThreadPool(destinations.size());
 
             logger.info("## start the kafka workers.");
-            for (final Topic topic : topics) {
+            for (final CanalDestination destination : destinations) {
                 executorService.execute(new Runnable() {
                     @Override
                     public void run() {
-                        worker(topic);
+                        worker(destination);
                     }
                 });
             }
@@ -88,12 +91,12 @@ public class CanalKafkaStarter {
     }
 
 
-    private static void worker(Topic topic) {
+    private static void worker(CanalDestination destination) {
         while (!running) ;
         while (!CanalServerStarter.isRunning()) ; //等待server启动完成
-        logger.info("## start the canal consumer: {}.", topic.getCanalDestination());
+        logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
         CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
-        ClientIdentity clientIdentity = new ClientIdentity(topic.getCanalDestination(), (short) 1001, "");
+        ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
         while (running) {
             try {
                 if (!server.getCanalInstances().containsKey(clientIdentity.getDestination())) {
@@ -105,21 +108,23 @@ public class CanalKafkaStarter {
                     continue;
                 }
                 server.subscribe(clientIdentity);
-                logger.info("## the canal consumer {} is running now ......", topic.getCanalDestination());
+                logger.info("## the canal consumer {} is running now ......", destination.getCanalDestination());
 
                 while (running) {
-                    Message message = server.getWithoutAck(clientIdentity, 5 * 1024); // 获取指定数量的数据
+                    Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
                     long batchId = message.getId();
                     try {
                         int size = message.getEntries().size();
-                        if (batchId == -1 || size == 0) {
-                            try {
-                                Thread.sleep(1000);
-                            } catch (InterruptedException e) {
-                                //ignore
+                        if (batchId != -1 && size != 0) {
+                            if (!StringUtils.isEmpty(destination.getTopic())) {
+                                Topic topic = new Topic();
+                                topic.setTopic(destination.getTopic());
+                                topic.setPartition(destination.getPartition());
+                                destination.getTopics().add(topic);
+                            }
+                            for (Topic topic : destination.getTopics()) {
+                                canalKafkaProducer.send(topic, message); //发送message到所有topic
                             }
-                        } else {
-                            canalKafkaProducer.send(topic, message);
                         }
 
                         if (batchId != -1) {

+ 70 - 10
kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/KafkaProperties.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.kafka.producer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * kafka 配置项
@@ -16,12 +18,52 @@ public class KafkaProperties {
     private int lingerMs = 1;
     private long bufferMemory = 33554432L;
 
-    private List<Topic> topics = new ArrayList<Topic>();
+    private int canalBatchSize = 5;
+
+    private List<CanalDestination> canalDestinations = new ArrayList<CanalDestination>();
+
+    public static class CanalDestination {
+        private String canalDestination;
+        private String topic;
+        private Integer partition;
+        private Set<Topic> topics = new HashSet<Topic>();
+
+        public String getCanalDestination() {
+            return canalDestination;
+        }
+
+        public void setCanalDestination(String canalDestination) {
+            this.canalDestination = canalDestination;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public void setTopic(String topic) {
+            this.topic = topic;
+        }
+
+        public Integer getPartition() {
+            return partition;
+        }
+
+        public void setPartition(Integer partition) {
+            this.partition = partition;
+        }
+
+        public Set<Topic> getTopics() {
+            return topics;
+        }
+
+        public void setTopics(Set<Topic> topics) {
+            this.topics = topics;
+        }
+    }
 
     public static class Topic {
         private String topic;
         private Integer partition;
-        private String canalDestination;
 
         public String getTopic() {
             return topic;
@@ -39,12 +81,22 @@ public class KafkaProperties {
             this.partition = partition;
         }
 
-        public String getCanalDestination() {
-            return canalDestination;
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Topic topic1 = (Topic) o;
+
+            if (topic != null ? !topic.equals(topic1.topic) : topic1.topic != null) return false;
+            return partition != null ? partition.equals(topic1.partition) : topic1.partition == null;
         }
 
-        public void setCanalDestination(String canalDestination) {
-            this.canalDestination = canalDestination;
+        @Override
+        public int hashCode() {
+            int result = topic != null ? topic.hashCode() : 0;
+            result = 31 * result + (partition != null ? partition.hashCode() : 0);
+            return result;
         }
     }
 
@@ -88,11 +140,19 @@ public class KafkaProperties {
         this.bufferMemory = bufferMemory;
     }
 
-    public List<Topic> getTopics() {
-        return topics;
+    public int getCanalBatchSize() {
+        return canalBatchSize;
+    }
+
+    public void setCanalBatchSize(int canalBatchSize) {
+        this.canalBatchSize = canalBatchSize;
+    }
+
+    public List<CanalDestination> getCanalDestinations() {
+        return canalDestinations;
     }
 
-    public void setTopics(List<Topic> topics) {
-        this.topics = topics;
+    public void setCanalDestinations(List<CanalDestination> canalDestinations) {
+        this.canalDestinations = canalDestinations;
     }
 }

+ 9 - 6
kafka/src/main/resources/kafka.yml

@@ -3,13 +3,16 @@ retries: 0
 batchSize: 16384
 lingerMs: 1
 bufferMemory: 33554432
+# canal的批次大小,单位 k
+canalBatchSize: 5
 
-topics:
-  - topic: example
+canalDestinations:
+  - canalDestination: example
+    topic: example
     partition:
-    canalDestination: example
-#  - topic: example2
-#    partition:
-#    canalDestination: example
+    # 一个destination可以对应多个topic
+#    topics:
+#      - topics: example
+#        partition: