Browse Source

采用事务批量异步投递kafka flatMessage
fixed #1454

machey 6 years ago
parent
commit
2f214ce3a6

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -48,6 +48,7 @@ public class CanalConstants {
     public static final String CANAL_MQ_FLATMESSAGE              = ROOT + "." + "mq.flatMessage";
     public static final String CANAL_MQ_COMPRESSION_TYPE         = ROOT + "." + "mq.compressionType";
     public static final String CANAL_MQ_ACKS                     = ROOT + "." + "mq.acks";
+    public static final String CANAL_MQ_TRANSACTION              = ROOT + "." + "mq.transaction";
     public static final String CANAL_ALIYUN_ACCESSKEY            = ROOT + "." + "aliyun.accessKey";
     public static final String CANAL_ALIYUN_SECRETKEY            = ROOT + "." + "aliyun.secretKey";
 

+ 4 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -150,7 +150,6 @@ public class CanalStater {
         if (!StringUtils.isEmpty(acks)) {
             mqProperties.setAcks(acks);
         }
-
         String aliyunAccessKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESSKEY);
         if (!StringUtils.isEmpty(aliyunAccessKey)) {
             mqProperties.setAliyunAccessKey(aliyunAccessKey);
@@ -159,6 +158,10 @@ public class CanalStater {
         if (!StringUtils.isEmpty(aliyunSecretKey)) {
             mqProperties.setAliyunSecretKey(aliyunSecretKey);
         }
+        String transaction = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_TRANSACTION);
+        if (!StringUtils.isEmpty(transaction)) {
+            mqProperties.setTransaction(Boolean.valueOf(transaction));
+        }
         return mqProperties;
     }
 

+ 3 - 1
deployer/src/main/resources/canal.properties

@@ -114,4 +114,6 @@ canal.mq.canalBatchSize = 50
 canal.mq.canalGetTimeout = 100
 canal.mq.flatMessage = true
 canal.mq.compressionType = none
-canal.mq.acks = all
+canal.mq.acks = all
+# use transaction for kafka flatMessage batch produce
+canal.mq.transaction = false

+ 9 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -23,6 +23,7 @@ public class MQProperties {
     private String  acks                   = "all";
     private String  aliyunAccessKey        = "";
     private String  aliyunSecretKey        = "";
+    private boolean transaction            = false;           // 是否开启事务
 
     public static class CanalDestination {
 
@@ -201,4 +202,12 @@ public class MQProperties {
     public void setMaxRequestSize(int maxRequestSize) {
         this.maxRequestSize = maxRequestSize;
     }
+
+    public boolean getTransaction() {
+        return transaction;
+    }
+
+    public void setTransaction(boolean transaction) {
+        this.transaction = transaction;
+    }
 }

+ 39 - 37
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -98,56 +98,51 @@ public class CanalKafkaProducer implements CanalMQProducer {
             }
             callback.commit();
         } catch (Exception e) {
+            logger.error(e.getMessage(), e);
             callback.rollback();
         }
     }
 
     private void send(MQProperties.CanalDestination canalDestination, String topicName,
                       Message message) throws Exception {
-        // producer.beginTransaction();
         if (!kafkaProperties.getFlatMessage()) {
-            try {
-                ProducerRecord<String, Message> record = null;
-                if (canalDestination.getPartition() != null) {
-                    record = new ProducerRecord<>(topicName, canalDestination.getPartition(), null, message);
-                } else {
-                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
-                        Message[] messages = MQMessageUtils.messagePartition(message,
-                            canalDestination.getPartitionsNum(),
-                            canalDestination.getPartitionHash());
-                        int length = messages.length;
-                        for (int i = 0; i < length; i++) {
-                            Message messagePartition = messages[i];
-                            if (messagePartition != null) {
-                                record = new ProducerRecord<>(topicName, i, null, messagePartition);
-                            }
+            ProducerRecord<String, Message> record = null;
+            if (canalDestination.getPartition() != null) {
+                record = new ProducerRecord<>(topicName, canalDestination.getPartition(), null, message);
+            } else {
+                if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
+                    Message[] messages = MQMessageUtils.messagePartition(message,
+                        canalDestination.getPartitionsNum(),
+                        canalDestination.getPartitionHash());
+                    int length = messages.length;
+                    for (int i = 0; i < length; i++) {
+                        Message messagePartition = messages[i];
+                        if (messagePartition != null) {
+                            record = new ProducerRecord<>(topicName, i, null, messagePartition);
                         }
-                    } else {
-                        record = new ProducerRecord<>(topicName, 0, null, message);
                     }
+                } else {
+                    record = new ProducerRecord<>(topicName, 0, null, message);
                 }
+            }
 
-                if (record != null) {
-                    // 同步发送原生message
-                    producer.send(record).get();
+            if (record != null) {
+                // 同步发送原生message
+                producer.send(record).get();
 
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
-                    }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
                 }
-            } catch (Exception e) {
-                logger.error(e.getMessage(), e);
-                // producer.abortTransaction();
-                throw e;
             }
         } else {
             // 发送扁平数据json
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
-                int idx = 0;
-                int size = flatMessages.size();
+                // 开启事务,需要kafka版本支持
+                if (kafkaProperties.getTransaction()) {
+                    producer.beginTransaction();
+                }
                 for (FlatMessage flatMessage : flatMessages) {
-                    idx++;
                     if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
                         try {
                             Integer partition = canalDestination.getPartition();
@@ -158,14 +153,15 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 partition,
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                            if (idx != size) {
+                            if (kafkaProperties.getTransaction()) {
                                 producer2.send(record);
                             } else {
                                 producer2.send(record).get();
                             }
                         } catch (Exception e) {
-                            logger.error(e.getMessage(), e);
-                            // producer.abortTransaction();
+                            if (kafkaProperties.getTransaction()) {
+                                producer.abortTransaction();
+                            }
                             throw e;
                         }
                     } else {
@@ -182,19 +178,25 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                         i,
                                         null,
                                         JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
-                                    if (idx != size) {
+                                    if (kafkaProperties.getTransaction()) {
                                         producer2.send(record);
                                     } else {
                                         producer2.send(record).get();
                                     }
                                 } catch (Exception e) {
-                                    logger.error(e.getMessage(), e);
-                                    // producer.abortTransaction();
+                                    if (kafkaProperties.getTransaction()) {
+                                        producer.abortTransaction();
+                                    }
                                     throw e;
                                 }
                             }
                         }
                     }
+
+                    if (kafkaProperties.getTransaction()) {
+                        producer.commitTransaction();
+                    }
+
                     if (logger.isDebugEnabled()) {
                         logger.debug("Send flat message to kafka topic: [{}], packet: {}",
                             topicName,