Browse Source

Merge pull request #900 from rewerma/master

kafka server端代码调整
agapple 6 years ago
parent
commit
d4a70d5eea

+ 3 - 3
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -97,9 +97,9 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             try {
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
                 connector.connect();
-                logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
+                logger.info("=============> Start to subscribe topic: {} <=============", this.topic);
                 connector.subscribe();
-                logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
+                logger.info("=============> Subscribe topic: {} succeed <=============", this.topic);
                 while (running) {
                     try {
                         // switcher.get(); //等待开关开启
@@ -146,7 +146,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                             long currentTS = System.currentTimeMillis();
                             while (executing.get()) {
                                 // 大于1分钟未消费完ack一次keep alive
-                                if (System.currentTimeMillis() - currentTS >  60000) {
+                                if (System.currentTimeMillis() - currentTS > 60000) {
                                     connector.ack();
                                     currentTS = System.currentTimeMillis();
                                 }

+ 8 - 7
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -1,12 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.loader;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
@@ -16,6 +9,14 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+
 /**
  * 外部适配器的加载器
  *

+ 4 - 5
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterWorker.java

@@ -1,16 +1,15 @@
 package com.alibaba.otter.canal.client.adapter.loader;
 
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.Executors;
+
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
 import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.adapter.loader.AbstractCanalAdapterWorker;
 import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.protocol.Message;
 
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.concurrent.Executors;
-
 /**
  * 原生canal-server对应的client适配器工作线程
  *

+ 1 - 1
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -45,7 +45,7 @@ public class KafkaCanalConnector {
         properties.put("auto.offset.reset", "latest"); // 如果没有offset则从最后的offset开始读
         properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
         properties.put("session.timeout.ms", "30000"); // 默认为30秒
-        properties.put("max.poll.records", "1"); // 所以一次只取一条数据
+        properties.put("max.poll.records", "1"); // 一次只取一条message数据
         properties.put("key.deserializer", StringDeserializer.class.getName());
         properties.put("value.deserializer", MessageDeserializer.class.getName());
 

+ 5 - 5
deployer/src/main/resources/kafka.yml

@@ -3,15 +3,15 @@ retries: 0
 batchSize: 16384
 lingerMs: 1
 bufferMemory: 33554432
-filterTransactionEntry: true
+
+# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
+canalBatchSize: 50
+# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
+canalGetTimeout: 100
 
 canalDestinations:
   - canalDestination: example
     topic: example
     partition:
-    # 一个destination可以对应多个topic
-    #topics:
-    #  - topic: example
-    #    partition:
 
 

+ 30 - 37
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,11 +1,12 @@
 package com.alibaba.otter.canal.kafka;
 
-import java.io.IOException;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ public class CanalKafkaProducer {
         properties.put("key.serializer", StringSerializer.class.getName());
         properties.put("value.serializer", MessageSerializer.class.getName());
         producer = new KafkaProducer<String, Message>(properties);
+        // producer.initTransactions();
     }
 
     public void stop() {
@@ -48,42 +50,33 @@ public class CanalKafkaProducer {
         }
     }
 
-    public void send(KafkaProperties.Topic topic, Message message) throws IOException {
-        // set canal.instance.filter.transaction.entry = true
-
-        // boolean valid = false;
-        // if (message != null) {
-        // if (message.isRaw() && !message.getRawEntries().isEmpty()) {
-        // for (ByteString byteString : message.getRawEntries()) {
-        // CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
-        // if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
-        // && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
-        // valid = true;
-        // break;
-        // }
-        // }
-        // } else if (!message.getEntries().isEmpty()){
-        // for (CanalEntry.Entry entry : message.getEntries()) {
-        // if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
-        // && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
-        // valid = true;
-        // break;
-        // }
-        // }
-        // }
-        // }
-        // if (!valid) {
-        // return;
-        // }
-        ProducerRecord<String, Message> record;
-        if (topic.getPartition() != null) {
-            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
-        } else {
-            record = new ProducerRecord<String, Message>(topic.getTopic(), message);
-        }
-        producer.send(record);
-        if (logger.isDebugEnabled()) {
-            logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
+    public void send(KafkaProperties.Topic topic, Message message, Callback callback) {
+        try {
+            // producer.beginTransaction();
+            ProducerRecord<String, Message> record;
+            if (topic.getPartition() != null) {
+                record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
+            } else {
+                record = new ProducerRecord<String, Message>(topic.getTopic(), message);
+            }
+            Future<RecordMetadata> future = producer.send(record);
+            future.get();
+            // producer.commitTransaction();
+            callback.commit();
+            if (logger.isDebugEnabled()) {
+                logger.debug("send message to kafka topic: {}", topic.getTopic());
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            // producer.abortTransaction();
+            callback.rollback();
         }
     }
+
+    public interface Callback {
+
+        void commit();
+
+        void rollback();
+    }
 }

+ 34 - 22
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaStarter.java

@@ -4,6 +4,7 @@ import java.io.FileInputStream;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -33,9 +34,9 @@ public class CanalKafkaStarter implements CanalServerStarter {
 
     private ExecutorService     executorService;
 
-    private CanalKafkaProducer canalKafkaProducer;
+    private CanalKafkaProducer  canalKafkaProducer;
 
-    private KafkaProperties kafkaProperties;
+    private KafkaProperties     kafkaProperties;
 
     public void init() {
         try {
@@ -54,9 +55,9 @@ public class CanalKafkaStarter implements CanalServerStarter {
             canalKafkaProducer = new CanalKafkaProducer();
             canalKafkaProducer.init(kafkaProperties);
             // set filterTransactionEntry
-            if (kafkaProperties.isFilterTransactionEntry()) {
-                System.setProperty("canal.instance.filter.transaction.entry", "true");
-            }
+            // if (kafkaProperties.isFilterTransactionEntry()) {
+            // System.setProperty("canal.instance.filter.transaction.entry", "true");
+            // }
             // 对应每个instance启动一个worker线程
             List<CanalDestination> destinations = kafkaProperties.getCanalDestinations();
 
@@ -101,8 +102,8 @@ public class CanalKafkaStarter implements CanalServerStarter {
         while (!running)
             ;
         logger.info("## start the canal consumer: {}.", destination.getCanalDestination());
-        CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
-        ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
+        final CanalServerWithEmbedded server = CanalServerWithEmbedded.instance();
+        final ClientIdentity clientIdentity = new ClientIdentity(destination.getCanalDestination(), (short) 1001, "");
         while (running) {
             try {
                 if (!server.getCanalInstances().containsKey(clientIdentity.getDestination())) {
@@ -117,27 +118,38 @@ public class CanalKafkaStarter implements CanalServerStarter {
                 logger.info("## the canal consumer {} is running now ......", destination.getCanalDestination());
 
                 while (running) {
-                    Message message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize()); // 获取指定数量的数据
-                    long batchId = message.getId();
+                    Message message;
+                    if (kafkaProperties.getCanalGetTimeout() != null) {
+                        message = server.getWithoutAck(clientIdentity,
+                            kafkaProperties.getCanalBatchSize(),
+                            kafkaProperties.getCanalGetTimeout(),
+                            TimeUnit.MILLISECONDS);
+                    } else {
+                        message = server.getWithoutAck(clientIdentity, kafkaProperties.getCanalBatchSize());
+                    }
+
+                    final long batchId = message.getId();
                     try {
                         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                         if (batchId != -1 && size != 0) {
-                            if (!StringUtils.isEmpty(destination.getTopic())) {
-                                Topic topic = new Topic();
-                                topic.setTopic(destination.getTopic());
-                                topic.setPartition(destination.getPartition());
-                                destination.getTopics().add(topic);
-                            }
-                            for (Topic topic : destination.getTopics()) {
-                                canalKafkaProducer.send(topic, message); // 发送message到所有topic
-                            }
+                            Topic topic = new Topic();
+                            topic.setTopic(destination.getTopic());
+                            topic.setPartition(destination.getPartition());
+                            canalKafkaProducer.send(topic, message, new CanalKafkaProducer.Callback() {
+
+                                @Override
+                                public void commit() {
+                                    server.ack(clientIdentity, batchId); // 提交确认
+                                }
+
+                                @Override
+                                public void rollback() {
+                                    server.rollback(clientIdentity, batchId);
+                                }
+                            }); // 发送message到topic
                         }
 
-                        if (batchId != -1) {
-                            server.ack(clientIdentity, batchId); // 提交确认
-                        }
                     } catch (Exception e) {
-                        server.rollback(clientIdentity);
                         logger.error(e.getMessage(), e);
                     }
                 }

+ 9 - 0
server/src/main/java/com/alibaba/otter/canal/kafka/KafkaProperties.java

@@ -20,6 +20,7 @@ public class KafkaProperties {
     private long                   bufferMemory           = 33554432L;
     private boolean                filterTransactionEntry = true;
     private int                    canalBatchSize         = 50;
+    private Long                   canalGetTimeout;
 
     private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();
 
@@ -151,6 +152,14 @@ public class KafkaProperties {
         this.canalBatchSize = canalBatchSize;
     }
 
+    public Long getCanalGetTimeout() {
+        return canalGetTimeout;
+    }
+
+    public void setCanalGetTimeout(Long canalGetTimeout) {
+        this.canalGetTimeout = canalGetTimeout;
+    }
+
     public List<CanalDestination> getCanalDestinations() {
         return canalDestinations;
     }