|
@@ -14,6 +14,7 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import com.alibaba.otter.canal.client.CanalConnector;
|
|
|
+import com.alibaba.otter.canal.client.CanalNodeAccessStrategy;
|
|
|
import com.alibaba.otter.canal.client.impl.running.ClientRunningData;
|
|
|
import com.alibaba.otter.canal.client.impl.running.ClientRunningListener;
|
|
|
import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;
|
|
@@ -69,18 +70,21 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
// 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败
|
|
|
private Object readDataLock = new Object();
|
|
|
private Object writeDataLock = new Object();
|
|
|
+ private CanalNodeAccessStrategy accessStrategy;
|
|
|
|
|
|
- public SimpleCanalConnector(SocketAddress address, String username, String password, String destination){
|
|
|
- this(address, username, password, destination, 60000);
|
|
|
+ public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
|
|
|
+ CanalNodeAccessStrategy accessStrategy){
|
|
|
+ this(address, username, password, destination, 60000, accessStrategy);
|
|
|
}
|
|
|
|
|
|
public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,
|
|
|
- int soTimeout){
|
|
|
+ int soTimeout, CanalNodeAccessStrategy accessStrategy){
|
|
|
this.address = address;
|
|
|
this.username = username;
|
|
|
this.password = password;
|
|
|
this.soTimeout = soTimeout;
|
|
|
this.clientIdentity = new ClientIdentity(destination, (short) 1001);
|
|
|
+ this.accessStrategy = accessStrategy;
|
|
|
}
|
|
|
|
|
|
public void connect() throws CanalClientException {
|
|
@@ -123,6 +127,9 @@ public class SimpleCanalConnector implements CanalConnector {
|
|
|
|
|
|
private InetSocketAddress doConnect() throws CanalClientException {
|
|
|
try {
|
|
|
+ if (accessStrategy != null) {
|
|
|
+ address = accessStrategy.nextNode();//实时获取一下最新的running节点,lubiao
|
|
|
+ }
|
|
|
channel = SocketChannel.open();
|
|
|
channel.socket().setSoTimeout(soTimeout);
|
|
|
channel.connect(address);
|