Ver código fonte

Merge pull request #1456 from rewerma/master

新增kafka消息投递事务
agapple 6 anos atrás
pai
commit
5f8eb4d261

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -48,6 +48,7 @@ public class CanalConstants {
     public static final String CANAL_MQ_FLATMESSAGE              = ROOT + "." + "mq.flatMessage";
     public static final String CANAL_MQ_COMPRESSION_TYPE         = ROOT + "." + "mq.compressionType";
     public static final String CANAL_MQ_ACKS                     = ROOT + "." + "mq.acks";
+    public static final String CANAL_MQ_TRANSACTION              = ROOT + "." + "mq.transaction";
     public static final String CANAL_ALIYUN_ACCESSKEY            = ROOT + "." + "aliyun.accessKey";
     public static final String CANAL_ALIYUN_SECRETKEY            = ROOT + "." + "aliyun.secretKey";
 

+ 4 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -150,7 +150,6 @@ public class CanalStater {
         if (!StringUtils.isEmpty(acks)) {
             mqProperties.setAcks(acks);
         }
-
         String aliyunAccessKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESSKEY);
         if (!StringUtils.isEmpty(aliyunAccessKey)) {
             mqProperties.setAliyunAccessKey(aliyunAccessKey);
@@ -159,6 +158,10 @@ public class CanalStater {
         if (!StringUtils.isEmpty(aliyunSecretKey)) {
             mqProperties.setAliyunSecretKey(aliyunSecretKey);
         }
+        String transaction = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_TRANSACTION);
+        if (!StringUtils.isEmpty(transaction)) {
+            mqProperties.setTransaction(Boolean.valueOf(transaction));
+        }
         return mqProperties;
     }
 

+ 3 - 1
deployer/src/main/resources/canal.properties

@@ -114,4 +114,6 @@ canal.mq.canalBatchSize = 50
 canal.mq.canalGetTimeout = 100
 canal.mq.flatMessage = true
 canal.mq.compressionType = none
-canal.mq.acks = all
+canal.mq.acks = all
+# use transaction for kafka flatMessage batch produce
+canal.mq.transaction = false

+ 9 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -23,6 +23,7 @@ public class MQProperties {
     private String  acks                   = "all";
     private String  aliyunAccessKey        = "";
     private String  aliyunSecretKey        = "";
+    private boolean transaction            = false;           // 是否开启事务
 
     public static class CanalDestination {
 
@@ -201,4 +202,12 @@ public class MQProperties {
     public void setMaxRequestSize(int maxRequestSize) {
         this.maxRequestSize = maxRequestSize;
     }
+
+    public boolean getTransaction() {
+        return transaction;
+    }
+
+    public void setTransaction(boolean transaction) {
+        this.transaction = transaction;
+    }
 }

+ 72 - 68
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.kafka;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -43,12 +44,16 @@ public class CanalKafkaProducer implements CanalMQProducer {
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("acks", kafkaProperties.getAcks());
         properties.put("compression.type", kafkaProperties.getCompressionType());
-        properties.put("retries", kafkaProperties.getRetries());
         properties.put("batch.size", kafkaProperties.getBatchSize());
         properties.put("linger.ms", kafkaProperties.getLingerMs());
         properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
         properties.put("buffer.memory", kafkaProperties.getBufferMemory());
         properties.put("key.serializer", StringSerializer.class.getName());
+        if(kafkaProperties.getTransaction()){
+            properties.put("transactional.id", "canal-transactional-id");
+        } else {
+            properties.put("retries", kafkaProperties.getRetries());
+        }
         if (!kafkaProperties.getFlatMessage()) {
             properties.put("value.serializer", MessageSerializer.class.getName());
             producer = new KafkaProducer<String, Message>(properties);
@@ -56,8 +61,13 @@ public class CanalKafkaProducer implements CanalMQProducer {
             properties.put("value.serializer", StringSerializer.class.getName());
             producer2 = new KafkaProducer<String, String>(properties);
         }
-
-        // producer.initTransactions();
+        if (kafkaProperties.getTransaction()) {
+            if (!kafkaProperties.getFlatMessage()) {
+                producer.initTransactions();
+            } else {
+                producer2.initTransactions();
+            }
+        }
     }
 
     @Override
@@ -79,6 +89,17 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
     @Override
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
+        // 开启事务,需要kafka版本支持
+        Producer producerTmp;
+        if (!kafkaProperties.getFlatMessage()) {
+            producerTmp = producer;
+        } else {
+            producerTmp = producer2;
+        }
+
+        if (kafkaProperties.getTransaction()) {
+            producerTmp.beginTransaction();
+        }
         try {
             if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
                 // 动态topic
@@ -96,78 +117,64 @@ public class CanalKafkaProducer implements CanalMQProducer {
             } else {
                 send(canalDestination, canalDestination.getTopic(), message);
             }
+            if (kafkaProperties.getTransaction()) {
+                producerTmp.commitTransaction();
+            }
             callback.commit();
         } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            if (kafkaProperties.getTransaction()) {
+                producerTmp.abortTransaction();
+            }
             callback.rollback();
         }
     }
 
     private void send(MQProperties.CanalDestination canalDestination, String topicName,
                       Message message) throws Exception {
-        // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
-            try {
-                ProducerRecord<String, Message> record = null;
-                if (canalDestination.getPartition() != null) {
-                    record = new ProducerRecord<>(topicName, canalDestination.getPartition(), null, message);
-                } else {
-                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
-                        Message[] messages = MQMessageUtils.messagePartition(message,
-                            canalDestination.getPartitionsNum(),
-                            canalDestination.getPartitionHash());
-                        int length = messages.length;
-                        for (int i = 0; i < length; i++) {
-                            Message messagePartition = messages[i];
-                            if (messagePartition != null) {
-                                record = new ProducerRecord<>(topicName, i, null, messagePartition);
-                            }
+            ProducerRecord<String, Message> record = null;
+            if (canalDestination.getPartition() != null) {
+                record = new ProducerRecord<>(topicName, canalDestination.getPartition(), null, message);
+            } else {
+                if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
+                    Message[] messages = MQMessageUtils.messagePartition(message,
+                        canalDestination.getPartitionsNum(),
+                        canalDestination.getPartitionHash());
+                    int length = messages.length;
+                    for (int i = 0; i < length; i++) {
+                        Message messagePartition = messages[i];
+                        if (messagePartition != null) {
+                            record = new ProducerRecord<>(topicName, i, null, messagePartition);
                         }
-                    } else {
-                        record = new ProducerRecord<>(topicName, 0, null, message);
                     }
+                } else {
+                    record = new ProducerRecord<>(topicName, 0, null, message);
                 }
+            }
 
-                if (record != null) {
-                    // 同步发送原生message
+            if (record != null) {
+                if (kafkaProperties.getTransaction()) {
+                    producer.send(record);
+                } else {
                     producer.send(record).get();
+                }
 
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
-                    }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
                 }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-                // producer.abortTransaction();
-                throw e;
             }
         } else {
             // 发送扁平数据json
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
-                int idx = 0;
-                int size = flatMessages.size();
                 for (FlatMessage flatMessage : flatMessages) {
-                    idx++;
                     if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
-                        try {
-                            Integer partition = canalDestination.getPartition();
-                            if (partition == null) {
-                                partition = 0;
-                            }
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
-                                partition,
-                                null,
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                            if (idx != size) {
-                                producer2.send(record);
-                            } else {
-                                producer2.send(record).get();
-                            }
-                        } catch (Exception e) {
-                            logger.error(e.getMessage(), e);
-                            // producer.abortTransaction();
-                            throw e;
+                        Integer partition = canalDestination.getPartition();
+                        if (partition == null) {
+                            partition = 0;
                         }
+                        produce(topicName, partition, flatMessage);
                     } else {
                         FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                             canalDestination.getPartitionsNum(),
@@ -176,25 +183,11 @@ public class CanalKafkaProducer implements CanalMQProducer {
                         for (int i = 0; i < length; i++) {
                             FlatMessage flatMessagePart = partitionFlatMessage[i];
                             if (flatMessagePart != null) {
-                                try {
-                                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                        topicName,
-                                        i,
-                                        null,
-                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
-                                    if (idx != size) {
-                                        producer2.send(record);
-                                    } else {
-                                        producer2.send(record).get();
-                                    }
-                                } catch (Exception e) {
-                                    logger.error(e.getMessage(), e);
-                                    // producer.abortTransaction();
-                                    throw e;
-                                }
+                                produce(topicName, i, flatMessagePart);
                             }
                         }
                     }
+
                     if (logger.isDebugEnabled()) {
                         logger.debug("Send flat message to kafka topic: [{}], packet: {}",
                             topicName,
@@ -203,8 +196,19 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 }
             }
         }
+    }
 
-        // producer.commitTransaction();
+    private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException,
+                                                                                   InterruptedException {
+        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
+            partition,
+            null,
+            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
+        if (kafkaProperties.getTransaction()) {
+            producer2.send(record);
+        } else {
+            producer2.send(record).get();
+        }
     }
 
 }