Browse Source

增加kafka producer sender的callback,canal和kafka同步提交回滚
去除一个canal实例对应多个kafka topic的机制,一个实例只对应一个topic
kafka模式下server端的批次大小不能超过1M,最大为900K

mcy 6 years ago
parent
commit
ebf9139762

+ 3 - 3
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -97,9 +97,9 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             try {
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
                 connector.connect();
-                logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
+                logger.info("=============> Start to subscribe topic: {} <=============", this.topic);
                 connector.subscribe();
-                logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
+                logger.info("=============> Subscribe topic: {} succeed <=============", this.topic);
                 while (running) {
                     try {
                         // switcher.get(); //等待开关开启
@@ -146,7 +146,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                             long currentTS = System.currentTimeMillis();
                             while (executing.get()) {
                                 // 大于1分钟未消费完ack一次keep alive
-                                if (System.currentTimeMillis() - currentTS >  60000) {
+                                if (System.currentTimeMillis() - currentTS > 60000) {
                                     connector.ack();
                                     currentTS = System.currentTimeMillis();
                                 }

+ 8 - 7
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -1,12 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.loader;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
@@ -16,6 +9,14 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+
 /**
  * 外部适配器的加载器
  *

+ 4 - 5
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterWorker.java

@@ -1,16 +1,15 @@
 package com.alibaba.otter.canal.client.adapter.loader;
 
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.Executors;
+
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
 import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.adapter.loader.AbstractCanalAdapterWorker;
 import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.protocol.Message;
 
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.concurrent.Executors;
-
 /**
  * 原生canal-server对应的client适配器工作线程
  *

+ 1 - 5
deployer/src/main/resources/kafka.yml

@@ -3,15 +3,11 @@ retries: 0
 batchSize: 16384
 lingerMs: 1
 bufferMemory: 33554432
-filterTransactionEntry: true
+canalBatchSize: 50 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
 
 canalDestinations:
   - canalDestination: example
     topic: example
     partition:
-    # 一个destination可以对应多个topic
-    #topics:
-    #  - topic: example
-    #    partition:
 
 

+ 27 - 11
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,11 +1,11 @@
 package com.alibaba.otter.canal.kafka;
 
-import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,16 +48,32 @@ public class CanalKafkaProducer {
         }
     }
 
-    public void send(KafkaProperties.Topic topic, Message message) {
-        ProducerRecord<String, Message> record;
-        if (topic.getPartition() != null) {
-            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
-        } else {
-            record = new ProducerRecord<String, Message>(topic.getTopic(), message);
-        }
-        producer.send(record);
-        if (logger.isDebugEnabled()) {
-            logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
+    public void send(KafkaProperties.Topic topic, Message message, Callback callback) {
+        producer.initTransactions();
+        try {
+            producer.beginTransaction();
+            ProducerRecord<String, Message> record;
+            if (topic.getPartition() != null) {
+                record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
+            } else {
+                record = new ProducerRecord<String, Message>(topic.getTopic(), message);
+            }
+            producer.send(record);
+            producer.commitTransaction();
+            callback.commit();
+            if (logger.isDebugEnabled()) {
+                logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
+            }
+        } catch (KafkaException e) {
+            producer.abortTransaction();
+            callback.rollback();
         }
     }
+
+    public interface Callback {
+
+        void commit();
+
+        void rollback();
+    }
 }

+ 21 - 19
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaStarter.java

@@ -33,9 +33,9 @@ public class CanalKafkaStarter implements CanalServerStarter {
 
     private ExecutorService     executorService;
 
-    private CanalKafkaProducer canalKafkaProducer;
+    private CanalKafkaProducer  canalKafkaProducer;
 
-    private KafkaProperties kafkaProperties;
+    private KafkaProperties     kafkaProperties;
 
     public void init() {
         try {
@@ -55,7 +55,7 @@ public class CanalKafkaStarter implements CanalServerStarter {
             canalKafkaProducer.init(kafkaProperties);
             // set filterTransactionEntry
             // if (kafkaProperties.isFilterTransactionEntry()) {
-            //     System.setProperty("canal.instance.filter.transaction.entry", "true");
+            // System.setProperty("canal.instance.filter.transaction.entry", "true");
             // }
             // 对应每个instance启动一个worker线程
             List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();
@@ -101,8 +101,8 @@ public class CanalKafkaStarter implements CanalServerStarter {
         while (!running)
             ;
         logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
-        CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
-        ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
+        final CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
+        final ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
         while (running) {
             try {
                 if (!server.getCanalInstances().containsKey(clientIdentity.getDestination())) {
@@ -118,26 +118,28 @@ public class CanalKafkaStarter implements CanalServerStarter {
 
                 while (running) {
                     Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
-                    long batchId = message.getId();
+                    final long batchId = message.getId();
                     try {
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         if (batchId != -1 && size != 0) {
-                            if (!StringUtils.isEmpty(destination.getTopic())) {
-                                Topic topic = new Topic();
-                                topic.setTopic(destination.getTopic());
-                                topic.setPartition(destination.getPartition());
-                                destination.getTopics().add(topic);
-                            }
-                            for (Topic topic : destination.getTopics()) {
-                                canalKafkaProducer.send(topic, message); // 发送message到所有topic
-                            }
+                            Topic topic = new Topic();
+                            topic.setTopic(destination.getTopic());
+                            topic.setPartition(destination.getPartition());
+                            canalKafkaProducer.send(topic, message, new CanalKafkaProducer.Callback() {
+
+                                @Override
+                                public void commit() {
+                                    server.ack(clientIdentity, batchId); // 提交确认
+                                }
+
+                                @Override
+                                public void rollback() {
+                                    server.rollback(clientIdentity, batchId);
+                                }
+                            }); // 发送message到topic
                         }
 
-                        if (batchId != -1) {
-                            server.ack(clientIdentity, batchId); // 提交确认
-                        }
                     } catch (Exception e) {
-                        server.rollback(clientIdentity);
                         logger.error(e.getMessage(), e);
                     }
                 }