Przeglądaj źródła

修复client中的kafka consumer消费回滚的bug (#4532)

rewerma 2 lat temu
rodzic
commit
ea8fb5e5b8

+ 18 - 17
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.client.kafka;
 
+import com.alibaba.fastjson.JSON;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -10,7 +11,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
-import com.alibaba.fastjson2.JSON;
 import com.alibaba.otter.canal.client.CanalMQConnector;
 import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -43,7 +43,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
     private Map<Integer, Long>               currentOffsets = new ConcurrentHashMap<>();
 
     public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, Integer batchSize,
-                               boolean flatMessage){
+        boolean flatMessage){
         this.topic = topic;
         this.partition = partition;
         this.flatMessage = flatMessage;
@@ -61,11 +61,11 @@ public class KafkaCanalConnector implements CanalMQConnector {
             batchSize = 100;
         }
         properties.put("max.poll.records", batchSize.toString());
-        properties.put("key.deserializer", StringDeserializer.class.getName());
+        properties.put("key.deserializer", StringDeserializer.class);
         if (!flatMessage) {
-            properties.put("value.deserializer", MessageDeserializer.class.getName());
+            properties.put("value.deserializer", MessageDeserializer.class);
         } else {
-            properties.put("value.deserializer", StringDeserializer.class.getName());
+            properties.put("value.deserializer", StringDeserializer.class);
         }
     }
 
@@ -183,14 +183,13 @@ public class KafkaCanalConnector implements CanalMQConnector {
 
         ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
 
-        currentOffsets.clear();
-        for (TopicPartition topicPartition : records.partitions()) {
-            currentOffsets.put(topicPartition.partition(), kafkaConsumer.position(topicPartition));
-        }
-
         if (!records.isEmpty()) {
+            currentOffsets.clear();
             List<Message> messages = new ArrayList<>();
             for (ConsumerRecord<String, Message> record : records) {
+                if (currentOffsets.get(record.partition()) == null) {
+                    currentOffsets.put(record.partition(), record.offset());
+                }
                 messages.add(record.value());
             }
             return messages;
@@ -221,14 +220,13 @@ public class KafkaCanalConnector implements CanalMQConnector {
 
         ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
 
-        currentOffsets.clear();
-        for (TopicPartition topicPartition : records.partitions()) {
-            currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
-        }
-
         if (!records.isEmpty()) {
+            currentOffsets.clear();
             List<FlatMessage> flatMessages = new ArrayList<>();
             for (ConsumerRecord<String, String> record : records) {
+                if (currentOffsets.get(record.partition()) == null) {
+                    currentOffsets.put(record.partition(), record.offset());
+                }
                 String flatMessageJson = record.value();
                 FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
                 flatMessages.add(flatMessage);
@@ -248,12 +246,14 @@ public class KafkaCanalConnector implements CanalMQConnector {
         // 回滚所有分区
         if (kafkaConsumer != null) {
             for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
-                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
+                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), currentOffsets.get(entry.getKey()));
+                kafkaConsumer.commitSync();
             }
         }
         if (kafkaConsumer2 != null) {
             for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
-                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
+                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), currentOffsets.get(entry.getKey()));
+                kafkaConsumer.commitSync();
             }
         }
     }
@@ -324,3 +324,4 @@ public class KafkaCanalConnector implements CanalMQConnector {
     }
 
 }
+