machey 6 years ago
parent
commit
851637611f

+ 64 - 58
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.kafka;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -56,8 +57,13 @@ public class CanalKafkaProducer implements CanalMQProducer {
             properties.put("value.serializer", StringSerializer.class.getName());
             producer2 = new KafkaProducer<String, String>(properties);
         }
-
-        // producer.initTransactions();
+        if (kafkaProperties.getTransaction()) {
+            if (!kafkaProperties.getFlatMessage()) {
+                producer.initTransactions();
+            } else {
+                producer2.initTransactions();
+            }
+        }
     }
 
     @Override
@@ -79,6 +85,17 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
     @Override
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
+        // 开启事务,需要kafka版本支持
+        Producer producerTmp;
+        if (!kafkaProperties.getFlatMessage()) {
+            producerTmp = producer;
+        } else {
+            producerTmp = producer2;
+        }
+
+        if (kafkaProperties.getTransaction()) {
+            producerTmp.beginTransaction();
+        }
         try {
             if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
                 // 动态topic
@@ -96,9 +113,15 @@ public class CanalKafkaProducer implements CanalMQProducer {
             } else {
                 send(canalDestination, canalDestination.getTopic(), message);
             }
+            if (kafkaProperties.getTransaction()) {
+                producerTmp.commitTransaction();
+            }
             callback.commit();
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
+            if (kafkaProperties.getTransaction()) {
+                producerTmp.abortTransaction();
+            }
             callback.rollback();
         }
     }
@@ -127,8 +150,11 @@ public class CanalKafkaProducer implements CanalMQProducer {
             }
 
             if (record != null) {
-                // 同步发送原生message
-                producer.send(record).get();
+                if (kafkaProperties.getTransaction()) {
+                    producer.send(record);
+                } else {
+                    producer.send(record).get();
+                }
 
                 if (logger.isDebugEnabled()) {
                     logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
@@ -138,67 +164,47 @@ public class CanalKafkaProducer implements CanalMQProducer {
             // 发送扁平数据json
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
-                // 开启事务,需要kafka版本支持
-                if (kafkaProperties.getTransaction()) {
-                    producer.initTransactions();
-                    producer.beginTransaction();
-                }
-                try {
-                    for (FlatMessage flatMessage : flatMessages) {
-                        if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
-                            Integer partition = canalDestination.getPartition();
-                            if (partition == null) {
-                                partition = 0;
-                            }
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
-                                partition,
-                                null,
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                            if (kafkaProperties.getTransaction()) {
-                                producer2.send(record);
-                            } else {
-                                producer2.send(record).get();
-                            }
-                        } else {
-                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
-                                canalDestination.getPartitionsNum(),
-                                canalDestination.getPartitionHash());
-                            int length = partitionFlatMessage.length;
-                            for (int i = 0; i < length; i++) {
-                                FlatMessage flatMessagePart = partitionFlatMessage[i];
-                                if (flatMessagePart != null) {
-                                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                        topicName,
-                                        i,
-                                        null,
-                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
-                                    if (kafkaProperties.getTransaction()) {
-                                        producer2.send(record);
-                                    } else {
-                                        producer2.send(record).get();
-                                    }
-                                }
-                            }
+                for (FlatMessage flatMessage : flatMessages) {
+                    if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
+                        Integer partition = canalDestination.getPartition();
+                        if (partition == null) {
+                            partition = 0;
                         }
-
-                        if (kafkaProperties.getTransaction()) {
-                            producer.commitTransaction();
-                        }
-
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Send flat message to kafka topic: [{}], packet: {}",
-                                topicName,
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
+                        produce(topicName, partition, flatMessage);
+                    } else {
+                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
+                            canalDestination.getPartitionsNum(),
+                            canalDestination.getPartitionHash());
+                        int length = partitionFlatMessage.length;
+                        for (int i = 0; i < length; i++) {
+                            FlatMessage flatMessagePart = partitionFlatMessage[i];
+                            if (flatMessagePart != null) {
+                                produce(topicName, i, flatMessagePart);
+                            }
                         }
                     }
-                } catch (Exception e) {
-                    if (kafkaProperties.getTransaction()) {
-                        producer.abortTransaction();
+
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Send flat message to kafka topic: [{}], packet: {}",
+                            topicName,
+                            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                     }
-                    throw e;
                 }
             }
         }
     }
 
+    private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException,
+                                                                                   InterruptedException {
+        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
+            partition,
+            null,
+            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
+        if (kafkaProperties.getTransaction()) {
+            producer2.send(record);
+        } else {
+            producer2.send(record).get();
+        }
+    }
+
 }