Browse Source

Revert "Revert "修复kafka消费回滚的bug (#2521)" (#2526)"

This reverts commit b6333f018f265859796d3c2871ec62e95b374864.
rewerma 5 years ago
parent
commit
f742bd73cb

+ 5 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -174,6 +174,11 @@ public abstract class AbstractCanalAdapterWorker {
             if (i == retry - 1) {
                 connector.ack();
                 logger.error(e.getMessage() + " Error sync but ACK!");
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e1) {
+                    // ignore
+                }
                 return true;
             } else {
                 connector.rollback();

+ 15 - 15
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -61,11 +61,11 @@ public class KafkaCanalConnector implements CanalMQConnector {
             batchSize = 100;
         }
         properties.put("max.poll.records", batchSize.toString());
-        properties.put("key.deserializer", StringDeserializer.class.getName());
+        properties.put("key.deserializer", StringDeserializer.class);
         if (!flatMessage) {
-            properties.put("value.deserializer", MessageDeserializer.class.getName());
+            properties.put("value.deserializer", MessageDeserializer.class);
         } else {
-            properties.put("value.deserializer", StringDeserializer.class.getName());
+            properties.put("value.deserializer", StringDeserializer.class);
         }
     }
 
@@ -183,14 +183,13 @@ public class KafkaCanalConnector implements CanalMQConnector {
 
         ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
 
-        currentOffsets.clear();
-        for (TopicPartition topicPartition : records.partitions()) {
-            currentOffsets.put(topicPartition.partition(), kafkaConsumer.position(topicPartition));
-        }
-
         if (!records.isEmpty()) {
+            currentOffsets.clear();
             List<Message> messages = new ArrayList<>();
             for (ConsumerRecord<String, Message> record : records) {
+                if (currentOffsets.get(record.partition()) == null) {
+                    currentOffsets.put(record.partition(), record.offset());
+                }
                 messages.add(record.value());
             }
             return messages;
@@ -221,14 +220,13 @@ public class KafkaCanalConnector implements CanalMQConnector {
 
         ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
 
-        currentOffsets.clear();
-        for (TopicPartition topicPartition : records.partitions()) {
-            currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
-        }
-
         if (!records.isEmpty()) {
+            currentOffsets.clear();
             List<FlatMessage> flatMessages = new ArrayList<>();
             for (ConsumerRecord<String, String> record : records) {
+                if (currentOffsets.get(record.partition()) == null) {
+                    currentOffsets.put(record.partition(), record.offset());
+                }
                 String flatMessageJson = record.value();
                 FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
                 flatMessages.add(flatMessage);
@@ -248,12 +246,14 @@ public class KafkaCanalConnector implements CanalMQConnector {
         // 回滚所有分区
         if (kafkaConsumer != null) {
             for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
-                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
+                kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), currentOffsets.get(entry.getKey()));
+                kafkaConsumer.commitSync();
             }
         }
         if (kafkaConsumer2 != null) {
             for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
-                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
+                kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), currentOffsets.get(entry.getKey()));
+                kafkaConsumer.commitSync();
             }
         }
     }