Browse Source

fix:未捕获kafka事务异常,造成canal ack异常 (#1814)

nieyuan1991 6 years ago
parent
commit
f9989e0856

+ 8 - 4
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -102,10 +102,10 @@ public class CanalKafkaProducer implements CanalMQProducer {
             producerTmp = producer2;
         }
 
-        if (kafkaProperties.getTransaction()) {
-            producerTmp.beginTransaction();
-        }
         try {
+            if (kafkaProperties.getTransaction()) {
+                producerTmp.beginTransaction();
+            }
             if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
                 // 动态topic
                 Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
@@ -130,7 +130,11 @@ public class CanalKafkaProducer implements CanalMQProducer {
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
             if (kafkaProperties.getTransaction()) {
-                producerTmp.abortTransaction();
+                try {
+                    producerTmp.abortTransaction();
+                } catch (Exception e1) {
+                    logger.error(e1.getMessage(), e1);
+                }
             }
             callback.rollback();
         }