|
@@ -12,11 +12,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.slf4j.MDC;
|
|
|
-import org.springframework.beans.factory.BeanFactory;
|
|
|
-import org.springframework.context.ApplicationContext;
|
|
|
-import org.springframework.context.support.ClassPathXmlApplicationContext;
|
|
|
|
|
|
-import com.alibaba.otter.canal.common.CanalException;
|
|
|
import com.alibaba.otter.canal.common.utils.AddressUtils;
|
|
|
import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
|
|
|
import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
|
|
@@ -31,10 +27,9 @@ import com.alibaba.otter.canal.deployer.monitor.ManagerInstanceConfigMonitor;
|
|
|
import com.alibaba.otter.canal.deployer.monitor.SpringInstanceConfigMonitor;
|
|
|
import com.alibaba.otter.canal.instance.core.CanalInstance;
|
|
|
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
|
|
|
-import com.alibaba.otter.canal.instance.manager.CanalConfigClient;
|
|
|
-import com.alibaba.otter.canal.instance.manager.ManagerCanalInstanceGenerator;
|
|
|
+import com.alibaba.otter.canal.instance.manager.PlainCanalInstanceGenerator;
|
|
|
+import com.alibaba.otter.canal.instance.manager.plain.PlainCanalConfigClient;
|
|
|
import com.alibaba.otter.canal.instance.spring.SpringCanalInstanceGenerator;
|
|
|
-import com.alibaba.otter.canal.parse.CanalEventParser;
|
|
|
import com.alibaba.otter.canal.server.CanalMQStarter;
|
|
|
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
|
|
|
import com.alibaba.otter.canal.server.exception.CanalServerException;
|
|
@@ -56,10 +51,11 @@ public class CanalController {
|
|
|
private String ip;
|
|
|
private String registerIp;
|
|
|
private int port;
|
|
|
+ private int adminPort;
|
|
|
// 默认使用spring的方式载入
|
|
|
private Map<String, InstanceConfig> instanceConfigs;
|
|
|
private InstanceConfig globalInstanceConfig;
|
|
|
- private Map<String, CanalConfigClient> managerClients;
|
|
|
+ private Map<String, PlainCanalConfigClient> managerClients;
|
|
|
// 监听instance config的变化
|
|
|
private boolean autoScan = true;
|
|
|
private InstanceAction defaultAction;
|
|
@@ -71,15 +67,17 @@ public class CanalController {
|
|
|
private ZkClientx zkclientx;
|
|
|
|
|
|
private CanalMQStarter canalMQStarter;
|
|
|
+ private String adminUser;
|
|
|
+ private String adminPasswd;
|
|
|
|
|
|
public CanalController(){
|
|
|
this(System.getProperties());
|
|
|
}
|
|
|
|
|
|
public CanalController(final Properties properties){
|
|
|
- managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() {
|
|
|
+ managerClients = MigrateMap.makeComputingMap(new Function<String, PlainCanalConfigClient>() {
|
|
|
|
|
|
- public CanalConfigClient apply(String managerAddress) {
|
|
|
+ public PlainCanalConfigClient apply(String managerAddress) {
|
|
|
return getManagerClient(managerAddress);
|
|
|
}
|
|
|
});
|
|
@@ -111,6 +109,7 @@ public class CanalController {
|
|
|
ip = getProperty(properties, CanalConstants.CANAL_IP);
|
|
|
registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
|
|
|
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
|
|
|
+ adminPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_ADMIN_PORT));
|
|
|
embededCanalServer = CanalServerWithEmbedded.instance();
|
|
|
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
|
|
|
try {
|
|
@@ -121,6 +120,8 @@ public class CanalController {
|
|
|
embededCanalServer.setMetricsPort(11112);
|
|
|
}
|
|
|
|
|
|
+ this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
|
|
|
+ this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
|
|
|
embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));
|
|
|
embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));
|
|
|
|
|
@@ -254,6 +255,8 @@ public class CanalController {
|
|
|
runningMonitor.start();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ logger.info("auto notify start {} successful.", destination);
|
|
|
}
|
|
|
|
|
|
public void stop(String destination) {
|
|
@@ -266,12 +269,16 @@ public class CanalController {
|
|
|
runningMonitor.stop();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ logger.info("auto notify stop {} successful.", destination);
|
|
|
}
|
|
|
|
|
|
public void reload(String destination) {
|
|
|
// 目前任何配置变化,直接重启,简单处理
|
|
|
stop(destination);
|
|
|
start(destination);
|
|
|
+
|
|
|
+ logger.info("auto notify reload {} successful.", destination);
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -298,7 +305,14 @@ public class CanalController {
|
|
|
}
|
|
|
return monitor;
|
|
|
} else if (mode.isManager()) {
|
|
|
- return new ManagerInstanceConfigMonitor();
|
|
|
+ ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
|
|
|
+ monitor.setScanIntervalInSecond(scanInterval);
|
|
|
+ monitor.setDefaultAction(defaultAction);
|
|
|
+ String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
|
|
|
+ monitor.setConfigClient(getManagerClient(managerAddress));
|
|
|
+ monitor.setIp(registerIp);
|
|
|
+ monitor.setPort(adminPort);
|
|
|
+ return monitor;
|
|
|
} else {
|
|
|
throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
|
|
|
}
|
|
@@ -308,9 +322,13 @@ public class CanalController {
|
|
|
}
|
|
|
|
|
|
private InstanceConfig initGlobalConfig(Properties properties) {
|
|
|
+ String adminManagerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
|
|
|
InstanceConfig globalConfig = new InstanceConfig();
|
|
|
String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME));
|
|
|
- if (StringUtils.isNotEmpty(modeStr)) {
|
|
|
+ if (StringUtils.isNotEmpty(adminManagerAddress)) {
|
|
|
+ // 如果指定了manager地址,则强制适用manager
|
|
|
+ globalConfig.setMode(InstanceMode.MANAGER);
|
|
|
+ } else if (StringUtils.isNotEmpty(modeStr)) {
|
|
|
globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr)));
|
|
|
}
|
|
|
|
|
@@ -322,6 +340,10 @@ public class CanalController {
|
|
|
String managerAddress = getProperty(properties,
|
|
|
CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME));
|
|
|
if (StringUtils.isNotEmpty(managerAddress)) {
|
|
|
+ if (StringUtils.equals(managerAddress, "${canal.admin.manager}")) {
|
|
|
+ managerAddress = adminManagerAddress;
|
|
|
+ }
|
|
|
+
|
|
|
globalConfig.setManagerAddress(managerAddress);
|
|
|
}
|
|
|
|
|
@@ -335,28 +357,18 @@ public class CanalController {
|
|
|
public CanalInstance generate(String destination) {
|
|
|
InstanceConfig config = instanceConfigs.get(destination);
|
|
|
if (config == null) {
|
|
|
- throw new CanalServerException("can't find destination:{}");
|
|
|
+ throw new CanalServerException("can't find destination:" + destination);
|
|
|
}
|
|
|
|
|
|
if (config.getMode().isManager()) {
|
|
|
- ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator();
|
|
|
+ PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator();
|
|
|
instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
|
|
|
+ instanceGenerator.setSpringXml(config.getSpringXml());
|
|
|
return instanceGenerator.generate(destination);
|
|
|
} else if (config.getMode().isSpring()) {
|
|
|
SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
|
|
|
- synchronized (CanalEventParser.class) {
|
|
|
- try {
|
|
|
- // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
|
|
|
- System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
|
|
|
- instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
|
|
|
- return instanceGenerator.generate(destination);
|
|
|
- } catch (Throwable e) {
|
|
|
- logger.error("generator instance failed.", e);
|
|
|
- throw new CanalException(e);
|
|
|
- } finally {
|
|
|
- System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
|
|
|
- }
|
|
|
- }
|
|
|
+ instanceGenerator.setSpringXml(config.getSpringXml());
|
|
|
+ return instanceGenerator.generate(destination);
|
|
|
} else {
|
|
|
throw new UnsupportedOperationException("unknow mode :" + config.getMode());
|
|
|
}
|
|
@@ -368,13 +380,8 @@ public class CanalController {
|
|
|
return globalConfig;
|
|
|
}
|
|
|
|
|
|
- private CanalConfigClient getManagerClient(String managerAddress) {
|
|
|
- return new CanalConfigClient();
|
|
|
- }
|
|
|
-
|
|
|
- private BeanFactory getBeanFactory(String springXml) {
|
|
|
- ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
|
|
|
- return applicationContext;
|
|
|
+ private PlainCanalConfigClient getManagerClient(String managerAddress) {
|
|
|
+ return new PlainCanalConfigClient(managerAddress, this.adminUser, this.adminPasswd);
|
|
|
}
|
|
|
|
|
|
private void initInstanceConfig(Properties properties) {
|
|
@@ -392,9 +399,13 @@ public class CanalController {
|
|
|
}
|
|
|
|
|
|
private InstanceConfig parseInstanceConfig(Properties properties, String destination) {
|
|
|
+ String adminManagerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
|
|
|
InstanceConfig config = new InstanceConfig(globalInstanceConfig);
|
|
|
String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(destination));
|
|
|
- if (!StringUtils.isEmpty(modeStr)) {
|
|
|
+ if (StringUtils.isNotEmpty(adminManagerAddress)) {
|
|
|
+ // 如果指定了manager地址,则强制适用manager
|
|
|
+ config.setMode(InstanceMode.MANAGER);
|
|
|
+ } else if (StringUtils.isNotEmpty(modeStr)) {
|
|
|
config.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr)));
|
|
|
}
|
|
|
|
|
@@ -406,6 +417,9 @@ public class CanalController {
|
|
|
if (config.getMode().isManager()) {
|
|
|
String managerAddress = getProperty(properties, CanalConstants.getInstanceManagerAddressKey(destination));
|
|
|
if (StringUtils.isNotEmpty(managerAddress)) {
|
|
|
+ if (StringUtils.equals(managerAddress, "${canal.admin.manager}")) {
|
|
|
+ managerAddress = adminManagerAddress;
|
|
|
+ }
|
|
|
config.setManagerAddress(managerAddress);
|
|
|
}
|
|
|
} else if (config.getMode().isSpring()) {
|