|
@@ -1,5 +1,13 @@
|
|
|
package com.alibaba.otter.canal.connector.tcp.consumer;
|
|
|
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.net.SocketAddress;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Properties;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
+
|
|
|
import com.alibaba.otter.canal.client.CanalConnector;
|
|
|
import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
|
|
|
import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy;
|
|
@@ -12,14 +20,6 @@ import com.alibaba.otter.canal.connector.core.util.MessageUtil;
|
|
|
import com.alibaba.otter.canal.connector.tcp.config.TCPConstants;
|
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
|
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
-
|
|
|
-import java.net.InetSocketAddress;
|
|
|
-import java.net.SocketAddress;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Properties;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
/**
|
|
|
* TCP 消费者连接器, 一个destination对应一个SPI实例
|
|
|
*
|
|
@@ -99,7 +99,9 @@ public class CanalTCPConsumer implements CanalMsgConsumer {
|
|
|
|
|
|
@Override
|
|
|
public void disconnect() {
|
|
|
- canalConnector.unsubscribe();
|
|
|
+ // tcp模式下,因为是单tcp消费,避免adapter异常断开时直接unsubscribe
|
|
|
+ // unsubscribe发送给canal-server会导致清理cursor位点,如果此时canal-server出现重启,就会丢失binlog数据
|
|
|
+ // canalConnector.unsubscribe();
|
|
|
canalConnector.disconnect();
|
|
|
}
|
|
|
}
|