|
@@ -44,7 +44,7 @@ import com.google.protobuf.ByteString;
|
|
|
|
|
|
/**
|
|
|
* 基于{@linkplain CanalServerWithNetty}定义的网络协议接口,对于canal数据进行get/rollback/ack等操作
|
|
|
- *
|
|
|
+ *
|
|
|
* @author jianghang 2012-10-24 下午05:37:20
|
|
|
* @version 1.0.0
|
|
|
*/
|
|
@@ -75,6 +75,8 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
private Object readDataLock = new Object();
|
|
|
private Object writeDataLock = new Object();
|
|
|
|
|
|
+ private boolean running = false;
|
|
|
+
|
|
|
public SimpleCanalConnector(SocketAddress address, String username, String password, String destination){
|
|
|
this(address, username, password, destination, 60000);
|
|
|
}
|
|
@@ -99,6 +101,9 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
}
|
|
|
} else {
|
|
|
waitClientRunning();
|
|
|
+ if (!running) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
doConnect();
|
|
|
if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
|
|
|
subscribe(filter);
|
|
@@ -208,6 +213,9 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
|
|
|
public void subscribe(String filter) throws CanalClientException {
|
|
|
waitClientRunning();
|
|
|
+ if (!running) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
try {
|
|
|
writeWithHeader(Packet.newBuilder()
|
|
|
.setType(PacketType.SUBSCRIPTION)
|
|
@@ -234,6 +242,9 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
|
|
|
public void unsubscribe() throws CanalClientException {
|
|
|
waitClientRunning();
|
|
|
+ if (!running) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
try {
|
|
|
writeWithHeader(Packet.newBuilder()
|
|
|
.setType(PacketType.UNSUBSCRIPTION)
|
|
@@ -445,6 +456,8 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
throw new CanalClientException("should connect first");
|
|
|
}
|
|
|
|
|
|
+ running = true;
|
|
|
+
|
|
|
mutex.get();// 阻塞等待
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
@@ -502,4 +515,13 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
this.filter = filter;
|
|
|
}
|
|
|
|
|
|
+ public void stopRunning() {
|
|
|
+ if (running) {
|
|
|
+ running = false; //设置为非running状态
|
|
|
+ if (!mutex.state()) {
|
|
|
+ mutex.set(true); //中断阻塞
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|