瀏覽代碼

调整kafka同步发送和消费,确保消息不丢失

mcy 6 年之前
父節點
當前提交
e4b6385dcc

+ 35 - 30
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -96,7 +96,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
         while (!running)
         while (!running)
             ;
             ;
         ExecutorService executor = Executors.newSingleThreadExecutor();
         ExecutorService executor = Executors.newSingleThreadExecutor();
-        final AtomicBoolean executing = new AtomicBoolean(true);
+        // final AtomicBoolean executing = new AtomicBoolean(true);
         while (running) {
         while (running) {
             try {
             try {
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
@@ -116,36 +116,41 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                         }
                         }
                         if (messages != null) {
                         if (messages != null) {
                             for (final Object message : messages) {
                             for (final Object message : messages) {
-                                executing.set(true);
-                                if (message != null) {
-                                    executor.submit(new Runnable() {
-
-                                        @Override
-                                        public void run() {
-                                            try {
-                                                if (message instanceof FlatMessage) {
-                                                    writeOut((FlatMessage) message);
-                                                } else {
-                                                    writeOut((Message) message);
-                                                }
-                                            } catch (Exception e) {
-                                                logger.error(e.getMessage(), e);
-                                            } finally {
-                                                executing.compareAndSet(true, false);
-                                            }
-                                        }
-                                    });
-
-                                    // 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
-                                    long currentTS = System.currentTimeMillis();
-                                    while (executing.get()) {
-                                        // 大于10秒未消费完ack一次keep alive
-                                        if (System.currentTimeMillis() - currentTS > 10000) {
-                                            connector.ack();
-                                            currentTS = System.currentTimeMillis();
-                                        }
-                                    }
+                                if (message instanceof FlatMessage) {
+                                    writeOut((FlatMessage) message);
+                                } else {
+                                    writeOut((Message) message);
                                 }
                                 }
+                                // executing.set(true);
+                                // if (message != null) {
+                                // executor.submit(new Runnable() {
+                                //
+                                // @Override
+                                // public void run() {
+                                // try {
+                                // if (message instanceof FlatMessage) {
+                                // writeOut((FlatMessage) message);
+                                // } else {
+                                // writeOut((Message) message);
+                                // }
+                                // } catch (Exception e) {
+                                // logger.error(e.getMessage(), e);
+                                // } finally {
+                                // executing.compareAndSet(true, false);
+                                // }
+                                // }
+                                // });
+                                //
+                                // // 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
+                                // long currentTS = System.currentTimeMillis();
+                                // while (executing.get()) {
+                                // // 大于10秒未消费完ack一次keep alive
+                                // if (System.currentTimeMillis() - currentTS > 10000) {
+                                // connector.ack();
+                                // currentTS = System.currentTimeMillis();
+                                // }
+                                // }
+                                // }
                             }
                             }
                         }
                         }
                         connector.ack();
                         connector.ack();

+ 1 - 1
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -233,7 +233,7 @@ public class KafkaCanalConnector {
             kafkaConsumer.commitSync();
             kafkaConsumer.commitSync();
         }
         }
         if (kafkaConsumer2 != null) {
         if (kafkaConsumer2 != null) {
-            kafkaConsumer2.commitAsync();
+            kafkaConsumer2.commitSync();
         }
         }
     }
     }
 
 

+ 62 - 41
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,11 +1,14 @@
 package com.alibaba.otter.canal.kafka;
 package com.alibaba.otter.canal.kafka;
 
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Properties;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -14,8 +17,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
-;
+import com.alibaba.otter.canal.spi.CanalMQProducer;;
 
 
 /**
 /**
  * kafka producer 主操作类
  * kafka producer 主操作类
@@ -74,9 +76,10 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
 
     @Override
     @Override
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
-        try {
-            // producer.beginTransaction();
-            if (!kafkaProperties.getFlatMessage()) {
+
+        // producer.beginTransaction();
+        if (!kafkaProperties.getFlatMessage()) {
+            try {
                 ProducerRecord<String, Message> record;
                 ProducerRecord<String, Message> record;
                 if (canalDestination.getPartition() != null) {
                 if (canalDestination.getPartition() != null) {
                     record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
                     record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
@@ -87,58 +90,76 @@ public class CanalKafkaProducer implements CanalMQProducer {
                     record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
                     record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
                 }
                 }
 
 
-                producer.send(record);
-            } else {
-                // 发送扁平数据json
-                List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
-                if (flatMessages != null) {
-                    for (FlatMessage flatMessage : flatMessages) {
-                        if (canalDestination.getPartition() != null) {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
-                                canalDestination.getPartition(),
-                                null,
-                                JSON.toJSONString(flatMessage));
+                producer.send(record).get();
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+                // producer.abortTransaction();
+                callback.rollback();
+            }
+        } else {
+            // 发送扁平数据json
+            List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
+            if (flatMessages != null) {
+                for (FlatMessage flatMessage : flatMessages) {
+                    if (canalDestination.getPartition() != null) {
+                        try {
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination
+                                .getTopic(), canalDestination.getPartition(), null, JSON.toJSONString(flatMessage));
                             producer2.send(record);
                             producer2.send(record);
-                        } else {
-                            if (canalDestination.getPartitionHash() != null
-                                && !canalDestination.getPartitionHash().isEmpty()) {
-                                FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
-                                    canalDestination.getPartitionsNum(),
-                                    canalDestination.getPartitionHash());
-                                int length = partitionFlatMessage.length;
-                                for (int i = 0; i < length; i++) {
-                                    FlatMessage flatMessagePart = partitionFlatMessage[i];
-                                    if (flatMessagePart != null) {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                        } catch (Exception e) {
+                            logger.error(e.getMessage(), e);
+                            // producer.abortTransaction();
+                            callback.rollback();
+                        }
+                    } else {
+                        if (canalDestination.getPartitionHash() != null
+                            && !canalDestination.getPartitionHash().isEmpty()) {
+                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                                canalDestination.getPartitionsNum(),
+                                canalDestination.getPartitionHash());
+                            int length = partitionFlatMessage.length;
+                            for (int i = 0; i < length; i++) {
+                                FlatMessage flatMessagePart = partitionFlatMessage[i];
+                                if (flatMessagePart != null) {
+                                    try {
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                            canalDestination.getTopic(),
                                             i,
                                             i,
                                             null,
                                             null,
                                             JSON.toJSONString(flatMessagePart));
                                             JSON.toJSONString(flatMessagePart));
-                                        producer2.send(record);
+                                        producer2.send(record).get();
+                                    } catch (Exception e) {
+                                        logger.error(e.getMessage(), e);
+                                        // producer.abortTransaction();
+                                        callback.rollback();
                                     }
                                     }
                                 }
                                 }
-                            } else {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                            }
+                        } else {
+                            try {
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                    canalDestination.getTopic(),
                                     0,
                                     0,
                                     null,
                                     null,
                                     JSON.toJSONString(flatMessage));
                                     JSON.toJSONString(flatMessage));
-                                producer2.send(record);
+                                producer2.send(record).get();
+                            } catch (Exception e) {
+                                logger.error(e.getMessage(), e);
+                                // producer.abortTransaction();
+                                callback.rollback();
                             }
                             }
                         }
                         }
-
                     }
                     }
                 }
                 }
             }
             }
+        }
 
 
-            // producer.commitTransaction();
-            callback.commit();
-            if (logger.isDebugEnabled()) {
-                logger.debug("send message to kafka topic: {}", canalDestination.getTopic());
-            }
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            // producer.abortTransaction();
-            callback.rollback();
+        // producer.commitTransaction();
+        callback.commit();
+        if (logger.isDebugEnabled()) {
+            logger.debug("send message to kafka topic: {}", canalDestination.getTopic());
         }
         }
+
     }
     }
 
 
 }
 }