瀏覽代碼

增加kafka producer send结果同步判断

mcy 6 年之前
父節點
當前提交
728550cac3
共有 1 個文件被更改,包括 7 次插入4 次删除
  1. 7 4
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

+ 7 - 4
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,11 +1,12 @@
 package com.alibaba.otter.canal.kafka;
 
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,13 +59,15 @@ public class CanalKafkaProducer {
             } else {
                 record = new ProducerRecord<String, Message>(topic.getTopic(), message);
             }
-            producer.send(record);
+            Future<RecordMetadata> future = producer.send(record);
+            future.get();
             // producer.commitTransaction();
             callback.commit();
             if (logger.isDebugEnabled()) {
-                logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
+                logger.debug("send message to kafka topic: {}", topic.getTopic());
             }
-        } catch (KafkaException e) {
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
             // producer.abortTransaction();
             callback.rollback();
         }