|
@@ -74,12 +74,7 @@ public class CanalController {
|
|
}
|
|
}
|
|
|
|
|
|
public CanalController(final Properties properties){
|
|
public CanalController(final Properties properties){
|
|
- managerClients = MigrateMap.makeComputingMap(new Function<String, PlainCanalConfigClient>() {
|
|
|
|
-
|
|
|
|
- public PlainCanalConfigClient apply(String managerAddress) {
|
|
|
|
- return getManagerClient(managerAddress);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
+ managerClients = MigrateMap.makeComputingMap(this::getManagerClient);
|
|
|
|
|
|
// 初始化全局参数设置
|
|
// 初始化全局参数设置
|
|
globalInstanceConfig = initGlobalConfig(properties);
|
|
globalInstanceConfig = initGlobalConfig(properties);
|
|
@@ -147,85 +142,82 @@ public class CanalController {
|
|
|
|
|
|
final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);
|
|
final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);
|
|
ServerRunningMonitors.setServerData(serverData);
|
|
ServerRunningMonitors.setServerData(serverData);
|
|
- ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
|
|
|
|
-
|
|
|
|
- public ServerRunningMonitor apply(final String destination) {
|
|
|
|
- ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
|
|
|
|
- runningMonitor.setDestination(destination);
|
|
|
|
- runningMonitor.setListener(new ServerRunningListener() {
|
|
|
|
-
|
|
|
|
- public void processActiveEnter() {
|
|
|
|
- try {
|
|
|
|
- MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
|
- embededCanalServer.start(destination);
|
|
|
|
- if (canalMQStarter != null) {
|
|
|
|
- canalMQStarter.startDestination(destination);
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
|
|
|
+ ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> {
|
|
|
|
+ ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
|
|
|
|
+ runningMonitor.setDestination(destination);
|
|
|
|
+ runningMonitor.setListener(new ServerRunningListener() {
|
|
|
|
+
|
|
|
|
+ public void processActiveEnter() {
|
|
|
|
+ try {
|
|
|
|
+ MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
|
+ embededCanalServer.start(destination);
|
|
|
|
+ if (canalMQStarter != null) {
|
|
|
|
+ canalMQStarter.startDestination(destination);
|
|
}
|
|
}
|
|
|
|
+ } finally {
|
|
|
|
+ MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- public void processActiveExit() {
|
|
|
|
- try {
|
|
|
|
- MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
|
- if (canalMQStarter != null) {
|
|
|
|
- canalMQStarter.stopDestination(destination);
|
|
|
|
- }
|
|
|
|
- embededCanalServer.stop(destination);
|
|
|
|
- } finally {
|
|
|
|
- MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
|
|
|
+ public void processActiveExit() {
|
|
|
|
+ try {
|
|
|
|
+ MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
|
+ if (canalMQStarter != null) {
|
|
|
|
+ canalMQStarter.stopDestination(destination);
|
|
}
|
|
}
|
|
|
|
+ embededCanalServer.stop(destination);
|
|
|
|
+ } finally {
|
|
|
|
+ MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- public void processStart() {
|
|
|
|
- try {
|
|
|
|
- if (zkclientx != null) {
|
|
|
|
- final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
|
|
|
|
- registerIp + ":" + port);
|
|
|
|
- initCid(path);
|
|
|
|
- zkclientx.subscribeStateChanges(new IZkStateListener() {
|
|
|
|
|
|
+ public void processStart() {
|
|
|
|
+ try {
|
|
|
|
+ if (zkclientx != null) {
|
|
|
|
+ final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
|
|
|
|
+ registerIp + ":" + port);
|
|
|
|
+ initCid(path);
|
|
|
|
+ zkclientx.subscribeStateChanges(new IZkStateListener() {
|
|
|
|
|
|
- public void handleStateChanged(KeeperState state) throws Exception {
|
|
|
|
|
|
+ public void handleStateChanged(KeeperState state) throws Exception {
|
|
|
|
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
- public void handleNewSession() throws Exception {
|
|
|
|
- initCid(path);
|
|
|
|
- }
|
|
|
|
|
|
+ public void handleNewSession() throws Exception {
|
|
|
|
+ initCid(path);
|
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
|
- public void handleSessionEstablishmentError(Throwable error) throws Exception {
|
|
|
|
- logger.error("failed to connect to zookeeper", error);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public void handleSessionEstablishmentError(Throwable error) throws Exception {
|
|
|
|
+ logger.error("failed to connect to zookeeper", error);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
+ } finally {
|
|
|
|
+ MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- public void processStop() {
|
|
|
|
- try {
|
|
|
|
- MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
|
- if (zkclientx != null) {
|
|
|
|
- final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
|
|
|
|
- registerIp + ":" + port);
|
|
|
|
- releaseCid(path);
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
|
|
|
+ public void processStop() {
|
|
|
|
+ try {
|
|
|
|
+ MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
|
+ if (zkclientx != null) {
|
|
|
|
+ final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
|
|
|
|
+ registerIp + ":" + port);
|
|
|
|
+ releaseCid(path);
|
|
}
|
|
}
|
|
|
|
+ } finally {
|
|
|
|
+ MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
}
|
|
}
|
|
-
|
|
|
|
- });
|
|
|
|
- if (zkclientx != null) {
|
|
|
|
- runningMonitor.setZkClient(zkclientx);
|
|
|
|
}
|
|
}
|
|
- // 触发创建一下cid节点
|
|
|
|
- runningMonitor.init();
|
|
|
|
- return runningMonitor;
|
|
|
|
|
|
+
|
|
|
|
+ });
|
|
|
|
+ if (zkclientx != null) {
|
|
|
|
+ runningMonitor.setZkClient(zkclientx);
|
|
}
|
|
}
|
|
|
|
+ // 触发创建一下cid节点
|
|
|
|
+ runningMonitor.init();
|
|
|
|
+ return runningMonitor;
|
|
}));
|
|
}));
|
|
|
|
|
|
// 初始化monitor机制
|
|
// 初始化monitor机制
|
|
@@ -303,40 +295,37 @@ public class CanalController {
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
|
|
- instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
|
|
|
|
-
|
|
|
|
- public InstanceConfigMonitor apply(InstanceMode mode) {
|
|
|
|
- int scanInterval = Integer.valueOf(getProperty(properties,
|
|
|
|
- CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
|
|
|
|
- "5"));
|
|
|
|
-
|
|
|
|
- if (mode.isSpring()) {
|
|
|
|
- SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
|
|
|
|
- monitor.setScanIntervalInSecond(scanInterval);
|
|
|
|
- monitor.setDefaultAction(defaultAction);
|
|
|
|
- // 设置conf目录,默认是user.dir + conf目录组成
|
|
|
|
- String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
|
|
|
|
- if (StringUtils.isEmpty(rootDir)) {
|
|
|
|
- rootDir = "../conf";
|
|
|
|
- }
|
|
|
|
|
|
+ instanceConfigMonitors = MigrateMap.makeComputingMap(mode -> {
|
|
|
|
+ int scanInterval = Integer.valueOf(getProperty(properties,
|
|
|
|
+ CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
|
|
|
|
+ "5"));
|
|
|
|
+
|
|
|
|
+ if (mode.isSpring()) {
|
|
|
|
+ SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
|
|
|
|
+ monitor.setScanIntervalInSecond(scanInterval);
|
|
|
|
+ monitor.setDefaultAction(defaultAction);
|
|
|
|
+ // 设置conf目录,默认是user.dir + conf目录组成
|
|
|
|
+ String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
|
|
|
|
+ if (StringUtils.isEmpty(rootDir)) {
|
|
|
|
+ rootDir = "../conf";
|
|
|
|
+ }
|
|
|
|
|
|
- if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
|
|
|
|
- monitor.setRootConf(rootDir);
|
|
|
|
- } else {
|
|
|
|
- // eclipse debug模式
|
|
|
|
- monitor.setRootConf("src/main/resources/");
|
|
|
|
- }
|
|
|
|
- return monitor;
|
|
|
|
- } else if (mode.isManager()) {
|
|
|
|
- ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
|
|
|
|
- monitor.setScanIntervalInSecond(scanInterval);
|
|
|
|
- monitor.setDefaultAction(defaultAction);
|
|
|
|
- String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
|
|
|
|
- monitor.setConfigClient(getManagerClient(managerAddress));
|
|
|
|
- return monitor;
|
|
|
|
|
|
+ if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
|
|
|
|
+ monitor.setRootConf(rootDir);
|
|
} else {
|
|
} else {
|
|
- throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
|
|
|
|
|
|
+ // eclipse debug模式
|
|
|
|
+ monitor.setRootConf("src/main/resources/");
|
|
}
|
|
}
|
|
|
|
+ return monitor;
|
|
|
|
+ } else if (mode.isManager()) {
|
|
|
|
+ ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
|
|
|
|
+ monitor.setScanIntervalInSecond(scanInterval);
|
|
|
|
+ monitor.setDefaultAction(defaultAction);
|
|
|
|
+ String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
|
|
|
|
+ monitor.setConfigClient(getManagerClient(managerAddress));
|
|
|
|
+ return monitor;
|
|
|
|
+ } else {
|
|
|
|
+ throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
}
|
|
@@ -373,27 +362,23 @@ public class CanalController {
|
|
globalConfig.setSpringXml(springXml);
|
|
globalConfig.setSpringXml(springXml);
|
|
}
|
|
}
|
|
|
|
|
|
- instanceGenerator = new CanalInstanceGenerator() {
|
|
|
|
-
|
|
|
|
- public CanalInstance generate(String destination) {
|
|
|
|
- InstanceConfig config = instanceConfigs.get(destination);
|
|
|
|
- if (config == null) {
|
|
|
|
- throw new CanalServerException("can't find destination:" + destination);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (config.getMode().isManager()) {
|
|
|
|
- PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator(properties);
|
|
|
|
- instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
|
|
|
|
- instanceGenerator.setSpringXml(config.getSpringXml());
|
|
|
|
- return instanceGenerator.generate(destination);
|
|
|
|
- } else if (config.getMode().isSpring()) {
|
|
|
|
- SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
|
|
|
|
- instanceGenerator.setSpringXml(config.getSpringXml());
|
|
|
|
- return instanceGenerator.generate(destination);
|
|
|
|
- } else {
|
|
|
|
- throw new UnsupportedOperationException("unknow mode :" + config.getMode());
|
|
|
|
- }
|
|
|
|
|
|
+ instanceGenerator = destination -> {
|
|
|
|
+ InstanceConfig config = instanceConfigs.get(destination);
|
|
|
|
+ if (config == null) {
|
|
|
|
+ throw new CanalServerException("can't find destination:" + destination);
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ if (config.getMode().isManager()) {
|
|
|
|
+ PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator(properties);
|
|
|
|
+ instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
|
|
|
|
+ instanceGenerator.setSpringXml(config.getSpringXml());
|
|
|
|
+ return instanceGenerator.generate(destination);
|
|
|
|
+ } else if (config.getMode().isSpring()) {
|
|
|
|
+ SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
|
|
|
|
+ instanceGenerator.setSpringXml(config.getSpringXml());
|
|
|
|
+ return instanceGenerator.generate(destination);
|
|
|
|
+ } else {
|
|
|
|
+ throw new UnsupportedOperationException("unknow mode :" + config.getMode());
|
|
}
|
|
}
|
|
|
|
|
|
};
|
|
};
|