瀏覽代碼

修改kafka consumer关闭方式

rewerma 7 年之前
父節點
當前提交
1b6efdf463

+ 18 - 48
kafka-client/src/main/java/com/alibaba/otter/canal/kafka/client/KafkaCanalConnector.java

@@ -4,11 +4,9 @@ import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.util.Collections;
-import java.util.ConcurrentModificationException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -59,35 +57,24 @@ public class KafkaCanalConnector {
     }
 
     /**
-     * 释放链接
+     * 关闭链接
      */
-    public void disconnect() {
-        if (kafkaConsumer != null) {
-            try {
-                kafkaConsumer.close();
-            } catch (ConcurrentModificationException e) {
-                kafkaConsumer.wakeup(); //通过wakeup异常间接关闭consumer
-            }
-        }
+    public void close() {
+        kafkaConsumer.close();
     }
 
     /**
      * 订阅topic
      */
     public void subscribe() {
-        try {
-            if (kafkaConsumer == null) {
-                kafkaConsumer = new KafkaConsumer<String, Message>(properties);
-            }
-            if (partition == null) {
-                kafkaConsumer.subscribe(Collections.singletonList(topic));
-            } else {
-                kafkaConsumer.subscribe(Collections.singletonList(topic));
-                TopicPartition topicPartition = new TopicPartition(topic, partition);
-                kafkaConsumer.assign(Collections.singletonList(topicPartition));
-            }
-        } catch (WakeupException e) {
-            closeByWakeupException(e);
+        if (kafkaConsumer == null) {
+            kafkaConsumer = new KafkaConsumer<String, Message>(properties);
+        }
+        if (partition == null) {
+            kafkaConsumer.subscribe(Collections.singletonList(topic));
+        } else {
+            TopicPartition topicPartition = new TopicPartition(topic, partition);
+            kafkaConsumer.assign(Collections.singletonList(topicPartition));
         }
     }
 
@@ -95,11 +82,7 @@ public class KafkaCanalConnector {
      * 取消订阅
      */
     public void unsubscribe() {
-        try {
-            kafkaConsumer.unsubscribe();
-        } catch (WakeupException e) {
-            closeByWakeupException(e);
-        }
+        kafkaConsumer.unsubscribe();
     }
 
     /**
@@ -127,15 +110,11 @@ public class KafkaCanalConnector {
      * @return
      */
     public Message getWithoutAck(Long timeout, TimeUnit unit) {
-        try {
-            ConsumerRecords<String, Message> records =
-                    kafkaConsumer.poll(unit.toMillis(timeout)); //基于配置,最多只能poll到一条数据
-
-            if (!records.isEmpty()) {
-                return records.iterator().next().value();
-            }
-        } catch (WakeupException e) {
-            closeByWakeupException(e);
+        ConsumerRecords<String, Message> records =
+                kafkaConsumer.poll(unit.toMillis(timeout)); //基于配置,最多只能poll到一条数据
+
+        if (!records.isEmpty()) {
+            return records.iterator().next().value();
         }
         return null;
     }
@@ -144,15 +123,6 @@ public class KafkaCanalConnector {
      * 提交offset,如果超过 session.timeout.ms 设置的时间没有ack则会抛出异常,ack失败
      */
     public void ack() {
-        try {
-            kafkaConsumer.commitSync();
-        } catch (WakeupException e) {
-            closeByWakeupException(e);
-        }
-    }
-
-    private void closeByWakeupException(WakeupException e) {
-        kafkaConsumer.close();
-        throw e;
+        kafkaConsumer.commitSync();
     }
 }

+ 7 - 9
kafka-client/src/test/java/com/alibaba/otter/canal/kafka/client/running/CanalKafkaClientExample.java

@@ -80,7 +80,6 @@ public class CanalKafkaClientExample {
         if (!running) {
             return;
         }
-        connector.disconnect();
         running = false;
         if (thread != null) {
             try {
@@ -114,20 +113,19 @@ public class CanalKafkaClientExample {
                     }
 
                     connector.ack(); // 提交确认
-                } catch (WakeupException e) {
-                    try {
-                        Thread.sleep(500); //延时确保running状态的改变
-                    } catch (InterruptedException ie) {
-                        //ignore
-                    }
                 } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                 }
             }
-        } catch (WakeupException e) {
-            //ignore
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.close();
     }
 }