|
@@ -1,21 +1,19 @@
|
|
|
-package com.alibaba.otter.canal.kafka.client;
|
|
|
+package com.alibaba.otter.canal.client.kafka;
|
|
|
+
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Properties;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
-import com.alibaba.otter.canal.common.utils.AddressUtils;
|
|
|
-import com.alibaba.otter.canal.common.utils.BooleanMutex;
|
|
|
-import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
|
|
|
-import com.alibaba.otter.canal.kafka.client.running.ClientRunningData;
|
|
|
-import com.alibaba.otter.canal.kafka.client.running.ClientRunningListener;
|
|
|
-import com.alibaba.otter.canal.kafka.client.running.ClientRunningMonitor;
|
|
|
-import com.alibaba.otter.canal.protocol.Message;
|
|
|
-import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
|
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.Properties;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import com.alibaba.otter.canal.client.kafka.running.ClientRunningData;
|
|
|
+import com.alibaba.otter.canal.common.utils.AddressUtils;
|
|
|
+import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
|
|
|
+import com.alibaba.otter.canal.protocol.Message;
|
|
|
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
|
|
|
|
|
/**
|
|
|
* canal kafka 数据操作客户端
|
|
@@ -26,16 +24,16 @@ import java.util.concurrent.TimeUnit;
|
|
|
public class KafkaCanalConnector {
|
|
|
|
|
|
private KafkaConsumer<String, Message> kafkaConsumer;
|
|
|
- private String topic;
|
|
|
- private Integer partition;
|
|
|
- private Properties properties;
|
|
|
- private ClientRunningMonitor runningMonitor; // 运行控制
|
|
|
- private ZkClientx zkClientx;
|
|
|
- private BooleanMutex mutex = new BooleanMutex(false);
|
|
|
- private volatile boolean connected = false;
|
|
|
- private volatile boolean running = false;
|
|
|
-
|
|
|
- public KafkaCanalConnector(String zkServers, String servers, String topic, Integer partition, String groupId) {
|
|
|
+ private String topic;
|
|
|
+ private Integer partition;
|
|
|
+ private Properties properties;
|
|
|
+ // private ClientRunningMonitor runningMonitor; // 运行控制
|
|
|
+ // private BooleanMutex mutex = new BooleanMutex(false);
|
|
|
+ private ZkClientx zkClientx;
|
|
|
+ private volatile boolean connected = false;
|
|
|
+ private volatile boolean running = false;
|
|
|
+
|
|
|
+ public KafkaCanalConnector(String zkServers, String servers, String topic, Integer partition, String groupId){
|
|
|
this.topic = topic;
|
|
|
this.partition = partition;
|
|
|
|
|
@@ -44,7 +42,7 @@ public class KafkaCanalConnector {
|
|
|
properties.put("group.id", groupId);
|
|
|
properties.put("enable.auto.commit", false);
|
|
|
properties.put("auto.commit.interval.ms", "1000");
|
|
|
- properties.put("auto.offset.reset", "latest"); //如果没有offset则从最后的offset开始读
|
|
|
+ properties.put("auto.offset.reset", "latest"); // 如果没有offset则从最后的offset开始读
|
|
|
properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
|
|
|
properties.put("session.timeout.ms", "30000"); // 默认为30秒
|
|
|
properties.put("max.poll.records", "1"); // 所以一次只取一条数据
|
|
@@ -58,19 +56,19 @@ public class KafkaCanalConnector {
|
|
|
clientData.setGroupId(groupId);
|
|
|
clientData.setAddress(AddressUtils.getHostIp());
|
|
|
|
|
|
- runningMonitor = new ClientRunningMonitor();
|
|
|
- runningMonitor.setTopic(topic);
|
|
|
- runningMonitor.setZkClient(zkClientx);
|
|
|
- runningMonitor.setClientData(clientData);
|
|
|
- runningMonitor.setListener(new ClientRunningListener() {
|
|
|
- public void processActiveEnter() {
|
|
|
- mutex.set(true);
|
|
|
- }
|
|
|
-
|
|
|
- public void processActiveExit() {
|
|
|
- mutex.set(false);
|
|
|
- }
|
|
|
- });
|
|
|
+ // runningMonitor = new ClientRunningMonitor();
|
|
|
+ // runningMonitor.setTopic(topic);
|
|
|
+ // runningMonitor.setZkClient(zkClientx);
|
|
|
+ // runningMonitor.setClientData(clientData);
|
|
|
+ // runningMonitor.setListener(new ClientRunningListener() {
|
|
|
+ // public void processActiveEnter() {
|
|
|
+ // mutex.set(true);
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // public void processActiveExit() {
|
|
|
+ // mutex.set(false);
|
|
|
+ // }
|
|
|
+ // });
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -95,11 +93,11 @@ public class KafkaCanalConnector {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if (runningMonitor != null) {
|
|
|
- if (!runningMonitor.isStart()) {
|
|
|
- runningMonitor.start();
|
|
|
- }
|
|
|
- }
|
|
|
+ // if (runningMonitor != null) {
|
|
|
+ // if (!runningMonitor.isStart()) {
|
|
|
+ // runningMonitor.start();
|
|
|
+ // }
|
|
|
+ // }
|
|
|
|
|
|
connected = true;
|
|
|
|
|
@@ -115,9 +113,9 @@ public class KafkaCanalConnector {
|
|
|
kafkaConsumer.close();
|
|
|
|
|
|
connected = false;
|
|
|
- if (runningMonitor.isStart()) {
|
|
|
- runningMonitor.stop();
|
|
|
- }
|
|
|
+ // if (runningMonitor.isStart()) {
|
|
|
+ // runningMonitor.stop();
|
|
|
+ // }
|
|
|
}
|
|
|
|
|
|
private void waitClientRunning() {
|
|
@@ -128,12 +126,12 @@ public class KafkaCanalConnector {
|
|
|
}
|
|
|
|
|
|
running = true;
|
|
|
- mutex.get();// 阻塞等待
|
|
|
+ // mutex.get();// 阻塞等待
|
|
|
} else {
|
|
|
// 单机模式直接设置为running
|
|
|
running = true;
|
|
|
}
|
|
|
- } catch (InterruptedException e) {
|
|
|
+ } catch (Exception e) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
throw new CanalClientException(e);
|
|
|
}
|
|
@@ -141,7 +139,8 @@ public class KafkaCanalConnector {
|
|
|
|
|
|
public boolean checkValid() {
|
|
|
if (zkClientx != null) {
|
|
|
- return mutex.state();
|
|
|
+ // return mutex.state();
|
|
|
+ return true;
|
|
|
} else {
|
|
|
return true;// 默认都放过
|
|
|
}
|
|
@@ -234,9 +233,9 @@ public class KafkaCanalConnector {
|
|
|
public void stopRunning() {
|
|
|
if (running) {
|
|
|
running = false; // 设置为非running状态
|
|
|
- if (!mutex.state()) {
|
|
|
- mutex.set(true); // 中断阻塞
|
|
|
- }
|
|
|
+ // if (!mutex.state()) {
|
|
|
+ // mutex.set(true); // 中断阻塞
|
|
|
+ // }
|
|
|
}
|
|
|
}
|
|
|
}
|