Browse Source

0.11的环境未测过,先去除kafka的事务

mcy 6 years ago
parent
commit
17f345bfe5

+ 4 - 4
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -35,6 +35,7 @@ public class CanalKafkaProducer {
         properties.put("key.serializer", StringSerializer.class.getName());
         properties.put("value.serializer", MessageSerializer.class.getName());
         producer = new KafkaProducer<String, Message>(properties);
+        // producer.initTransactions();
     }
 
     public void stop() {
@@ -49,9 +50,8 @@ public class CanalKafkaProducer {
     }
 
     public void send(KafkaProperties.Topic topic, Message message, Callback callback) {
-        producer.initTransactions();
         try {
-            producer.beginTransaction();
+            // producer.beginTransaction();
             ProducerRecord<String, Message> record;
             if (topic.getPartition() != null) {
                 record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
@@ -59,13 +59,13 @@ public class CanalKafkaProducer {
                 record = new ProducerRecord<String, Message>(topic.getTopic(), message);
             }
             producer.send(record);
-            producer.commitTransaction();
+            // producer.commitTransaction();
             callback.commit();
             if (logger.isDebugEnabled()) {
                 logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
             }
         } catch (KafkaException e) {
-            producer.abortTransaction();
+            // producer.abortTransaction();
             callback.rollback();
         }
     }