|
@@ -1,19 +1,20 @@
|
|
|
package com.alibaba.otter.canal.client.kafka;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
import java.util.Properties;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
|
|
|
|
-import com.alibaba.otter.canal.client.kafka.running.ClientRunningData;
|
|
|
-import com.alibaba.otter.canal.common.utils.AddressUtils;
|
|
|
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.otter.canal.protocol.FlatMessage;
|
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
|
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
|
|
|
|
|
/**
|
|
|
* canal kafka 数据操作客户端
|
|
@@ -24,18 +25,19 @@ import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
|
|
public class KafkaCanalConnector {
|
|
|
|
|
|
private KafkaConsumer<String, Message> kafkaConsumer;
|
|
|
+ private KafkaConsumer<String, String> kafkaConsumer2; // 用于扁平message的数据消费
|
|
|
private String topic;
|
|
|
private Integer partition;
|
|
|
private Properties properties;
|
|
|
- // private ClientRunningMonitor runningMonitor; // 运行控制
|
|
|
- // private BooleanMutex mutex = new BooleanMutex(false);
|
|
|
- private ZkClientx zkClientx;
|
|
|
private volatile boolean connected = false;
|
|
|
private volatile boolean running = false;
|
|
|
+ private boolean flatMessage;
|
|
|
|
|
|
- public KafkaCanalConnector(String zkServers, String servers, String topic, Integer partition, String groupId){
|
|
|
+ public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId,
|
|
|
+ boolean flatMessage){
|
|
|
this.topic = topic;
|
|
|
this.partition = partition;
|
|
|
+ this.flatMessage = flatMessage;
|
|
|
|
|
|
properties = new Properties();
|
|
|
properties.put("bootstrap.servers", servers);
|
|
@@ -45,32 +47,13 @@ public class KafkaCanalConnector {
|
|
|
properties.put("auto.offset.reset", "latest"); // 如果没有offset则从最后的offset开始读
|
|
|
properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
|
|
|
properties.put("session.timeout.ms", "30000"); // 默认为30秒
|
|
|
- properties.put("max.poll.records", "1"); // 一次只取一条message数据
|
|
|
+ properties.put("max.poll.records", "100");
|
|
|
properties.put("key.deserializer", StringDeserializer.class.getName());
|
|
|
- properties.put("value.deserializer", MessageDeserializer.class.getName());
|
|
|
-
|
|
|
- if (zkServers != null) {
|
|
|
- zkClientx = new ZkClientx(zkServers);
|
|
|
-
|
|
|
- ClientRunningData clientData = new ClientRunningData();
|
|
|
- clientData.setGroupId(groupId);
|
|
|
- clientData.setAddress(AddressUtils.getHostIp());
|
|
|
-
|
|
|
- // runningMonitor = new ClientRunningMonitor();
|
|
|
- // runningMonitor.setTopic(topic);
|
|
|
- // runningMonitor.setZkClient(zkClientx);
|
|
|
- // runningMonitor.setClientData(clientData);
|
|
|
- // runningMonitor.setListener(new ClientRunningListener() {
|
|
|
- // public void processActiveEnter() {
|
|
|
- // mutex.set(true);
|
|
|
- // }
|
|
|
- //
|
|
|
- // public void processActiveExit() {
|
|
|
- // mutex.set(false);
|
|
|
- // }
|
|
|
- // });
|
|
|
+ if (!flatMessage) {
|
|
|
+ properties.put("value.deserializer", MessageDeserializer.class.getName());
|
|
|
+ } else {
|
|
|
+ properties.put("value.deserializer", StringDeserializer.class.getName());
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -93,57 +76,36 @@ public class KafkaCanalConnector {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // if (runningMonitor != null) {
|
|
|
- // if (!runningMonitor.isStart()) {
|
|
|
- // runningMonitor.start();
|
|
|
- // }
|
|
|
- // }
|
|
|
-
|
|
|
connected = true;
|
|
|
|
|
|
- if (kafkaConsumer == null) {
|
|
|
+ if (kafkaConsumer == null && !flatMessage) {
|
|
|
kafkaConsumer = new KafkaConsumer<String, Message>(properties);
|
|
|
}
|
|
|
+ if (kafkaConsumer2 == null && flatMessage) {
|
|
|
+ kafkaConsumer2 = new KafkaConsumer<String, String>(properties);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 关闭链接
|
|
|
*/
|
|
|
- public void disconnnect() {
|
|
|
- kafkaConsumer.close();
|
|
|
+ public void disconnect() {
|
|
|
+ if (kafkaConsumer != null) {
|
|
|
+ kafkaConsumer.close();
|
|
|
+ }
|
|
|
+ if (kafkaConsumer2 != null) {
|
|
|
+ kafkaConsumer2.close();
|
|
|
+ }
|
|
|
|
|
|
connected = false;
|
|
|
- // if (runningMonitor.isStart()) {
|
|
|
- // runningMonitor.stop();
|
|
|
- // }
|
|
|
}
|
|
|
|
|
|
private void waitClientRunning() {
|
|
|
- try {
|
|
|
- if (zkClientx != null) {
|
|
|
- if (!connected) {// 未调用connect
|
|
|
- throw new CanalClientException("should connect first");
|
|
|
- }
|
|
|
-
|
|
|
- running = true;
|
|
|
- // mutex.get();// 阻塞等待
|
|
|
- } else {
|
|
|
- // 单机模式直接设置为running
|
|
|
- running = true;
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- throw new CanalClientException(e);
|
|
|
- }
|
|
|
+ running = true;
|
|
|
}
|
|
|
|
|
|
public boolean checkValid() {
|
|
|
- if (zkClientx != null) {
|
|
|
- // return mutex.state();
|
|
|
- return true;
|
|
|
- } else {
|
|
|
- return true;// 默认都放过
|
|
|
- }
|
|
|
+ return true;// 默认都放过
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -156,10 +118,20 @@ public class KafkaCanalConnector {
|
|
|
}
|
|
|
|
|
|
if (partition == null) {
|
|
|
- kafkaConsumer.subscribe(Collections.singletonList(topic));
|
|
|
+ if (kafkaConsumer != null) {
|
|
|
+ kafkaConsumer.subscribe(Collections.singletonList(topic));
|
|
|
+ }
|
|
|
+ if (kafkaConsumer2 != null) {
|
|
|
+ kafkaConsumer2.subscribe(Collections.singletonList(topic));
|
|
|
+ }
|
|
|
} else {
|
|
|
TopicPartition topicPartition = new TopicPartition(topic, partition);
|
|
|
- kafkaConsumer.assign(Collections.singletonList(topicPartition));
|
|
|
+ if (kafkaConsumer != null) {
|
|
|
+ kafkaConsumer.assign(Collections.singletonList(topicPartition));
|
|
|
+ }
|
|
|
+ if (kafkaConsumer2 != null) {
|
|
|
+ kafkaConsumer2.assign(Collections.singletonList(topicPartition));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -172,7 +144,12 @@ public class KafkaCanalConnector {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- kafkaConsumer.unsubscribe();
|
|
|
+ if (kafkaConsumer != null) {
|
|
|
+ kafkaConsumer.unsubscribe();
|
|
|
+ }
|
|
|
+ if (kafkaConsumer2 != null) {
|
|
|
+ kafkaConsumer2.unsubscribe();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -180,22 +157,22 @@ public class KafkaCanalConnector {
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
- public Message get() {
|
|
|
+ public List<Message> get() {
|
|
|
return get(100L, TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
|
|
|
- public Message get(Long timeout, TimeUnit unit) {
|
|
|
+ public List<Message> get(Long timeout, TimeUnit unit) {
|
|
|
waitClientRunning();
|
|
|
if (!running) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- Message message = getWithoutAck(timeout, unit);
|
|
|
+ List<Message> messages = getWithoutAck(timeout, unit);
|
|
|
this.ack();
|
|
|
- return message;
|
|
|
+ return messages;
|
|
|
}
|
|
|
|
|
|
- public Message getWithoutAck() {
|
|
|
+ public List<Message> getWithoutAck() {
|
|
|
return getWithoutAck(100L, TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
|
|
@@ -204,7 +181,7 @@ public class KafkaCanalConnector {
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
- public Message getWithoutAck(Long timeout, TimeUnit unit) {
|
|
|
+ public List<Message> getWithoutAck(Long timeout, TimeUnit unit) {
|
|
|
waitClientRunning();
|
|
|
if (!running) {
|
|
|
return null;
|
|
@@ -213,7 +190,33 @@ public class KafkaCanalConnector {
|
|
|
ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); // 基于配置,最多只能poll到一条数据
|
|
|
|
|
|
if (!records.isEmpty()) {
|
|
|
- return records.iterator().next().value();
|
|
|
+ // return records.iterator().next().value();
|
|
|
+ List<Message> messages = new ArrayList<>();
|
|
|
+ for (ConsumerRecord<String, Message> record : records) {
|
|
|
+ messages.add(record.value());
|
|
|
+ }
|
|
|
+ return messages;
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<FlatMessage> getFlatMessageWithoutAck(Long timeout, TimeUnit unit) {
|
|
|
+ waitClientRunning();
|
|
|
+ if (!running) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));
|
|
|
+
|
|
|
+ if (!records.isEmpty()) {
|
|
|
+ List<FlatMessage> flatMessages = new ArrayList<>();
|
|
|
+ for (ConsumerRecord<String, String> record : records) {
|
|
|
+ String flatMessageJson = record.value();
|
|
|
+ FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
|
|
|
+ flatMessages.add(flatMessage);
|
|
|
+ }
|
|
|
+
|
|
|
+ return flatMessages;
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
@@ -227,7 +230,12 @@ public class KafkaCanalConnector {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- kafkaConsumer.commitSync();
|
|
|
+ if (kafkaConsumer != null) {
|
|
|
+ kafkaConsumer.commitSync();
|
|
|
+ }
|
|
|
+ if (kafkaConsumer2 != null) {
|
|
|
+ kafkaConsumer2.commitAsync();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void stopRunning() {
|