|
@@ -65,7 +65,7 @@ public class CanalController {
|
|
|
private boolean autoScan = true;
|
|
|
private InstanceAction defaultAction;
|
|
|
private Map<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors;
|
|
|
- private CanalServerWithEmbedded embededCanalServer;
|
|
|
+ private CanalServerWithEmbedded embeddedCanalServer;
|
|
|
private CanalServerWithNetty canalServer;
|
|
|
|
|
|
private CanalInstanceGenerator instanceGenerator;
|
|
@@ -109,15 +109,15 @@ public class CanalController {
|
|
|
registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
|
|
|
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT, "11111"));
|
|
|
adminPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110"));
|
|
|
- embededCanalServer = CanalServerWithEmbedded.instance();
|
|
|
- embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
|
|
|
+ embeddedCanalServer = CanalServerWithEmbedded.instance();
|
|
|
+ embeddedCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
|
|
|
int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
|
|
|
- embededCanalServer.setMetricsPort(metricsPort);
|
|
|
+ embeddedCanalServer.setMetricsPort(metricsPort);
|
|
|
|
|
|
this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
|
|
|
this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
|
|
|
- embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));
|
|
|
- embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));
|
|
|
+ embeddedCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));
|
|
|
+ embeddedCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));
|
|
|
|
|
|
String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
|
|
|
if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
|
|
@@ -156,7 +156,7 @@ public class CanalController {
|
|
|
public void processActiveEnter() {
|
|
|
try {
|
|
|
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
- embededCanalServer.start(destination);
|
|
|
+ embeddedCanalServer.start(destination);
|
|
|
if (canalMQStarter != null) {
|
|
|
canalMQStarter.startDestination(destination);
|
|
|
}
|
|
@@ -171,7 +171,7 @@ public class CanalController {
|
|
|
if (canalMQStarter != null) {
|
|
|
canalMQStarter.stopDestination(destination);
|
|
|
}
|
|
|
- embededCanalServer.stop(destination);
|
|
|
+ embeddedCanalServer.stop(destination);
|
|
|
} finally {
|
|
|
MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
}
|
|
@@ -239,7 +239,7 @@ public class CanalController {
|
|
|
instanceConfigs.put(destination, config);
|
|
|
}
|
|
|
|
|
|
- if (!embededCanalServer.isStart(destination)) {
|
|
|
+ if (!embeddedCanalServer.isStart(destination)) {
|
|
|
// HA机制启动
|
|
|
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
|
|
|
if (!config.getLazy() && !runningMonitor.isStart()) {
|
|
@@ -254,7 +254,7 @@ public class CanalController {
|
|
|
// 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息
|
|
|
InstanceConfig config = instanceConfigs.remove(destination);
|
|
|
if (config != null) {
|
|
|
- embededCanalServer.stop(destination);
|
|
|
+ embeddedCanalServer.stop(destination);
|
|
|
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
|
|
|
if (runningMonitor.isStart()) {
|
|
|
runningMonitor.stop();
|
|
@@ -384,7 +384,7 @@ public class CanalController {
|
|
|
instanceGenerator.setSpringXml(config.getSpringXml());
|
|
|
return instanceGenerator.generate(destination);
|
|
|
} else {
|
|
|
- throw new UnsupportedOperationException("unknow mode :" + config.getMode());
|
|
|
+ throw new UnsupportedOperationException("unknown mode :" + config.getMode());
|
|
|
}
|
|
|
|
|
|
};
|
|
@@ -524,14 +524,14 @@ public class CanalController {
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
- // 优先启动embeded服务
|
|
|
- embededCanalServer.start();
|
|
|
+ // 优先启动embedded服务
|
|
|
+ embeddedCanalServer.start();
|
|
|
// 尝试启动一下非lazy状态的通道
|
|
|
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
|
|
|
final String destination = entry.getKey();
|
|
|
InstanceConfig config = entry.getValue();
|
|
|
// 创建destination的工作节点
|
|
|
- if (!embededCanalServer.isStart(destination)) {
|
|
|
+ if (!embeddedCanalServer.isStart(destination)) {
|
|
|
// HA机制启动
|
|
|
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
|
|
|
if (!config.getLazy() && !runningMonitor.isStart()) {
|
|
@@ -601,8 +601,8 @@ public class CanalController {
|
|
|
ZkClientx.clearClients();
|
|
|
|
|
|
// 需要释放 CanalServerWithEmbedded 否则主线程退出后,进程无法自动完整退出...
|
|
|
- if (embededCanalServer != null) {
|
|
|
- embededCanalServer.stop();
|
|
|
+ if (embeddedCanalServer != null) {
|
|
|
+ embeddedCanalServer.stop();
|
|
|
}
|
|
|
}
|
|
|
|