Browse Source

代码调整

machey 6 years ago
parent
commit
4ebf17f57b

+ 25 - 33
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -140,11 +140,12 @@ public class CanalKafkaProducer implements CanalMQProducer {
             if (flatMessages != null) {
                 // 开启事务,需要kafka版本支持
                 if (kafkaProperties.getTransaction()) {
+                    producer.initTransactions();
                     producer.beginTransaction();
                 }
-                for (FlatMessage flatMessage : flatMessages) {
-                    if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
-                        try {
+                try {
+                    for (FlatMessage flatMessage : flatMessages) {
+                        if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
                             Integer partition = canalDestination.getPartition();
                             if (partition == null) {
                                 partition = 0;
@@ -158,21 +159,14 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             } else {
                                 producer2.send(record).get();
                             }
-                        } catch (Exception e) {
-                            if (kafkaProperties.getTransaction()) {
-                                producer.abortTransaction();
-                            }
-                            throw e;
-                        }
-                    } else {
-                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
-                            canalDestination.getPartitionsNum(),
-                            canalDestination.getPartitionHash());
-                        int length = partitionFlatMessage.length;
-                        for (int i = 0; i < length; i++) {
-                            FlatMessage flatMessagePart = partitionFlatMessage[i];
-                            if (flatMessagePart != null) {
-                                try {
+                        } else {
+                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
+                                canalDestination.getPartitionsNum(),
+                                canalDestination.getPartitionHash());
+                            int length = partitionFlatMessage.length;
+                            for (int i = 0; i < length; i++) {
+                                FlatMessage flatMessagePart = partitionFlatMessage[i];
+                                if (flatMessagePart != null) {
                                     ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                                         topicName,
                                         i,
@@ -183,30 +177,28 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                     } else {
                                         producer2.send(record).get();
                                     }
-                                } catch (Exception e) {
-                                    if (kafkaProperties.getTransaction()) {
-                                        producer.abortTransaction();
-                                    }
-                                    throw e;
                                 }
                             }
                         }
-                    }
 
-                    if (kafkaProperties.getTransaction()) {
-                        producer.commitTransaction();
-                    }
+                        if (kafkaProperties.getTransaction()) {
+                            producer.commitTransaction();
+                        }
 
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Send flat message to kafka topic: [{}], packet: {}",
-                            topicName,
-                            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Send flat message to kafka topic: [{}], packet: {}",
+                                topicName,
+                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
+                        }
                     }
+                } catch (Exception e) {
+                    if (kafkaProperties.getTransaction()) {
+                        producer.abortTransaction();
+                    }
+                    throw e;
                 }
             }
         }
-
-        // producer.commitTransaction();
     }
 
 }