Parcourir la source

Revert "修复kafka消费回滚的bug (#2521)"

This reverts commit 31acfc01019883c8066538d846e7d47de09bad8c.
rewerma il y a 5 ans
Parent
commit
21e9b325b1

+ 0 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -174,11 +174,6 @@ public abstract class AbstractCanalAdapterWorker {
             if (i == retry - 1) {
                 connector.ack();
                 logger.error(e.getMessage() + " Error sync but ACK!");
-                try {
-                    Thread.sleep(500);
-                } catch (InterruptedException e1) {
-                    // ignore
-                }
                 return true;
             } else {
                 connector.rollback();

+ 15 - 15
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -61,11 +61,11 @@ public class KafkaCanalConnector implements CanalMQConnector {
             batchSize = 100;
         }
         properties.put("max.poll.records", batchSize.toString());
-        properties.put("key.deserializer", StringDeserializer.class);
+        properties.put("key.deserializer", StringDeserializer.class.getName());
         if (!flatMessage) {
-            properties.put("value.deserializer", MessageDeserializer.class);
+            properties.put("value.deserializer", MessageDeserializer.class.getName());
         } else {
-            properties.put("value.deserializer", StringDeserializer.class);
+            properties.put("value.deserializer", StringDeserializer.class.getName());
         }
     }
 
@@ -183,13 +183,14 @@ public class KafkaCanalConnector implements CanalMQConnector {
 
         ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
 
+        currentOffsets.clear();
+        for (TopicPartition topicPartition : records.partitions()) {
+            currentOffsets.put(topicPartition.partition(), kafkaConsumer.position(topicPartition));
+        }
+
         if (!records.isEmpty()) {
-            currentOffsets.clear();
             List<Message> messages = new ArrayList<>();
             for (ConsumerRecord<String, Message> record : records) {
-                if (currentOffsets.get(record.partition()) == null) {
-                    currentOffsets.put(record.partition(), record.offset());
-                }
                 messages.add(record.value());
             }
             return messages;
@@ -220,13 +221,14 @@ public class KafkaCanalConnector implements CanalMQConnector {
 
         ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
 
+        currentOffsets.clear();
+        for (TopicPartition topicPartition : records.partitions()) {
+            currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
+        }
+
         if (!records.isEmpty()) {
-            currentOffsets.clear();
             List<FlatMessage> flatMessages = new ArrayList<>();
             for (ConsumerRecord<String, String> record : records) {
-                if (currentOffsets.get(record.partition()) == null) {
-                    currentOffsets.put(record.partition(), record.offset());
-                }
                 String flatMessageJson = record.value();
                 FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
                 flatMessages.add(flatMessage);
@@ -246,14 +248,12 @@ public class KafkaCanalConnector implements CanalMQConnector {
         // 回滚所有分区
         if (kafkaConsumer != null) {
             for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
-                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), currentOffsets.get(entry.getKey()));
-                kafkaConsumer.commitSync();
+                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
             }
         }
         if (kafkaConsumer2 != null) {
             for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
-                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), currentOffsets.get(entry.getKey()));
-                kafkaConsumer.commitSync();
+                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
             }
         }
     }