|
@@ -1,9 +1,6 @@
|
|
package com.alibaba.otter.canal.client.kafka;
|
|
package com.alibaba.otter.canal.client.kafka;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.Collections;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Properties;
|
|
|
|
|
|
+import java.util.*;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
@@ -22,7 +19,7 @@ import com.google.common.collect.Lists;
|
|
|
|
|
|
/**
|
|
/**
|
|
* canal kafka 数据操作客户端
|
|
* canal kafka 数据操作客户端
|
|
- *
|
|
|
|
|
|
+ *
|
|
* <pre>
|
|
* <pre>
|
|
* 注意点:
|
|
* 注意点:
|
|
* 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
|
|
* 1. 相比于canal {@linkplain SimpleCanalConnector}, 这里get和ack操作不能有并发, 必须是一个线程执行get后,内存里执行完毕ack后再取下一个get
|
|
@@ -33,14 +30,16 @@ import com.google.common.collect.Lists;
|
|
*/
|
|
*/
|
|
public class KafkaCanalConnector implements CanalMQConnector {
|
|
public class KafkaCanalConnector implements CanalMQConnector {
|
|
|
|
|
|
- private KafkaConsumer<String, Message> kafkaConsumer;
|
|
|
|
- private KafkaConsumer<String, String> kafkaConsumer2; // 用于扁平message的数据消费
|
|
|
|
- private String topic;
|
|
|
|
- private Integer partition;
|
|
|
|
- private Properties properties;
|
|
|
|
- private volatile boolean connected = false;
|
|
|
|
- private volatile boolean running = false;
|
|
|
|
- private boolean flatMessage;
|
|
|
|
|
|
+ protected KafkaConsumer<String, Message> kafkaConsumer;
|
|
|
|
+ protected KafkaConsumer<String, String> kafkaConsumer2; // 用于扁平message的数据消费
|
|
|
|
+ protected String topic;
|
|
|
|
+ protected Integer partition;
|
|
|
|
+ protected Properties properties;
|
|
|
|
+ protected volatile boolean connected = false;
|
|
|
|
+ protected volatile boolean running = false;
|
|
|
|
+ protected boolean flatMessage;
|
|
|
|
+
|
|
|
|
+ private Map<Integer, Long> currentOffsets = new HashMap<>();
|
|
|
|
|
|
public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, Integer batchSize,
|
|
public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, Integer batchSize,
|
|
boolean flatMessage){
|
|
boolean flatMessage){
|
|
@@ -71,6 +70,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
|
|
/**
|
|
/**
|
|
* 打开连接
|
|
* 打开连接
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void connect() {
|
|
public void connect() {
|
|
if (connected) {
|
|
if (connected) {
|
|
return;
|
|
return;
|
|
@@ -79,6 +79,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
|
|
connected = true;
|
|
connected = true;
|
|
if (kafkaConsumer == null && !flatMessage) {
|
|
if (kafkaConsumer == null && !flatMessage) {
|
|
kafkaConsumer = new KafkaConsumer<String, Message>(properties);
|
|
kafkaConsumer = new KafkaConsumer<String, Message>(properties);
|
|
|
|
+
|
|
}
|
|
}
|
|
if (kafkaConsumer2 == null && flatMessage) {
|
|
if (kafkaConsumer2 == null && flatMessage) {
|
|
kafkaConsumer2 = new KafkaConsumer<String, String>(properties);
|
|
kafkaConsumer2 = new KafkaConsumer<String, String>(properties);
|
|
@@ -88,6 +89,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
|
|
/**
|
|
/**
|
|
* 关闭链接
|
|
* 关闭链接
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void disconnect() {
|
|
public void disconnect() {
|
|
if (kafkaConsumer != null) {
|
|
if (kafkaConsumer != null) {
|
|
kafkaConsumer.close();
|
|
kafkaConsumer.close();
|
|
@@ -101,10 +103,11 @@ public class KafkaCanalConnector implements CanalMQConnector {
|
|
connected = false;
|
|
connected = false;
|
|
}
|
|
}
|
|
|
|
|
|
- private void waitClientRunning() {
|
|
|
|
|
|
+ protected void waitClientRunning() {
|
|
running = true;
|
|
running = true;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
public boolean checkValid() {
|
|
public boolean checkValid() {
|
|
return true;// 默认都放过
|
|
return true;// 默认都放过
|
|
}
|
|
}
|
|
@@ -112,6 +115,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
|
|
/**
|
|
/**
|
|
* 订阅topic
|
|
* 订阅topic
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void subscribe() {
|
|
public void subscribe() {
|
|
waitClientRunning();
|
|
waitClientRunning();
|
|
if (!running) {
|
|
if (!running) {
|
|
@@ -139,6 +143,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
|
|
/**
|
|
/**
|
|
* 取消订阅
|
|
* 取消订阅
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void unsubscribe() {
|
|
public void unsubscribe() {
|
|
waitClientRunning();
|
|
waitClientRunning();
|
|
if (!running) {
|
|
if (!running) {
|
|
@@ -176,6 +181,11 @@ public class KafkaCanalConnector implements CanalMQConnector {
|
|
|
|
|
|
ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
|
|
ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
|
|
|
|
|
|
|
|
+ currentOffsets.clear();
|
|
|
|
+ for (TopicPartition topicPartition : records.partitions()) {
|
|
|
|
+ currentOffsets.put(topicPartition.partition(), kafkaConsumer.position(topicPartition));
|
|
|
|
+ }
|
|
|
|
+
|
|
if (!records.isEmpty()) {
|
|
if (!records.isEmpty()) {
|
|
List<Message> messages = new ArrayList<>();
|
|
List<Message> messages = new ArrayList<>();
|
|
for (ConsumerRecord<String, Message> record : records) {
|
|
for (ConsumerRecord<String, Message> record : records) {
|
|
@@ -208,6 +218,12 @@ public class KafkaCanalConnector implements CanalMQConnector {
|
|
}
|
|
}
|
|
|
|
|
|
ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
|
|
ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
|
|
|
|
+
|
|
|
|
+ currentOffsets.clear();
|
|
|
|
+ for (TopicPartition topicPartition : records.partitions()) {
|
|
|
|
+ currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
|
|
|
|
+ }
|
|
|
|
+
|
|
if (!records.isEmpty()) {
|
|
if (!records.isEmpty()) {
|
|
List<FlatMessage> flatMessages = new ArrayList<>();
|
|
List<FlatMessage> flatMessages = new ArrayList<>();
|
|
for (ConsumerRecord<String, String> record : records) {
|
|
for (ConsumerRecord<String, String> record : records) {
|
|
@@ -222,12 +238,28 @@ public class KafkaCanalConnector implements CanalMQConnector {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void rollback() throws CanalClientException {
|
|
|
|
|
|
+ public void rollback() {
|
|
|
|
+ waitClientRunning();
|
|
|
|
+ if (!running) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // 回滚所有分区
|
|
|
|
+ if (kafkaConsumer != null) {
|
|
|
|
+ for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
|
|
|
|
+ kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (kafkaConsumer2 != null) {
|
|
|
|
+ for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
|
|
|
|
+ kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 提交offset,如果超过 session.timeout.ms 设置的时间没有ack则会抛出异常,ack失败
|
|
* 提交offset,如果超过 session.timeout.ms 设置的时间没有ack则会抛出异常,ack失败
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void ack() {
|
|
public void ack() {
|
|
waitClientRunning();
|
|
waitClientRunning();
|
|
if (!running) {
|
|
if (!running) {
|