|
@@ -46,7 +46,7 @@ import com.google.common.collect.MigrateMap;
|
|
|
*/
|
|
|
public class CanalController {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(CanalController.class);
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(CanalController.class);
|
|
|
private Long cid;
|
|
|
private String ip;
|
|
|
private String registerIp;
|
|
@@ -54,11 +54,10 @@ public class CanalController {
|
|
|
private int adminPort;
|
|
|
// 默认使用spring的方式载入
|
|
|
private Map<String, InstanceConfig> instanceConfigs;
|
|
|
- private Map<String, String> runningInstances = new MapMaker().makeMap();
|
|
|
private InstanceConfig globalInstanceConfig;
|
|
|
private Map<String, PlainCanalConfigClient> managerClients;
|
|
|
// 监听instance config的变化
|
|
|
- private boolean autoScan = true;
|
|
|
+ private boolean autoScan = true;
|
|
|
private InstanceAction defaultAction;
|
|
|
private Map<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors;
|
|
|
private CanalServerWithEmbedded embededCanalServer;
|
|
@@ -153,91 +152,88 @@ public class CanalController {
|
|
|
zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
|
|
|
}
|
|
|
|
|
|
- final ServerRunningData serverData = new ServerRunningData(cid, registerIp + ":" + port);
|
|
|
+ final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);
|
|
|
ServerRunningMonitors.setServerData(serverData);
|
|
|
- ServerRunningMonitors
|
|
|
- .setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
|
|
|
-
|
|
|
- public ServerRunningMonitor apply(final String destination) {
|
|
|
- ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
|
|
|
- runningMonitor.setDestination(destination);
|
|
|
- runningMonitor.setListener(new ServerRunningListener() {
|
|
|
-
|
|
|
- public void processActiveEnter() {
|
|
|
- try {
|
|
|
- MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
- embededCanalServer.start(destination);
|
|
|
- if (canalMQStarter != null) {
|
|
|
- canalMQStarter.startDestination(destination);
|
|
|
- }
|
|
|
- runningInstances.put(destination, destination);
|
|
|
- } finally {
|
|
|
- MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
+ ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
|
|
|
+
|
|
|
+ public ServerRunningMonitor apply(final String destination) {
|
|
|
+ ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
|
|
|
+ runningMonitor.setDestination(destination);
|
|
|
+ runningMonitor.setListener(new ServerRunningListener() {
|
|
|
+
|
|
|
+ public void processActiveEnter() {
|
|
|
+ try {
|
|
|
+ MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
+ embededCanalServer.start(destination);
|
|
|
+ if (canalMQStarter != null) {
|
|
|
+ canalMQStarter.startDestination(destination);
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- public void processActiveExit() {
|
|
|
- try {
|
|
|
- MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
- if (canalMQStarter != null) {
|
|
|
- canalMQStarter.stopDestination(destination);
|
|
|
- }
|
|
|
- embededCanalServer.stop(destination);
|
|
|
- runningInstances.remove(destination);
|
|
|
- } finally {
|
|
|
- MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
+ public void processActiveExit() {
|
|
|
+ try {
|
|
|
+ MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
+ if (canalMQStarter != null) {
|
|
|
+ canalMQStarter.stopDestination(destination);
|
|
|
}
|
|
|
+ embededCanalServer.stop(destination);
|
|
|
+ } finally {
|
|
|
+ MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ public void processStart() {
|
|
|
+ try {
|
|
|
+ if (zkclientx != null) {
|
|
|
+ final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
|
|
|
+ registerIp + ":" + port);
|
|
|
+ initCid(path);
|
|
|
+ zkclientx.subscribeStateChanges(new IZkStateListener() {
|
|
|
+
|
|
|
+ public void handleStateChanged(KeeperState state) throws Exception {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public void handleNewSession() throws Exception {
|
|
|
+ initCid(path);
|
|
|
+ }
|
|
|
|
|
|
- public void processStart() {
|
|
|
- try {
|
|
|
- if (zkclientx != null) {
|
|
|
- final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
|
|
|
- registerIp + ":" + port);
|
|
|
- initCid(path);
|
|
|
- zkclientx.subscribeStateChanges(new IZkStateListener() {
|
|
|
-
|
|
|
- public void handleStateChanged(KeeperState state) throws Exception {
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public void handleNewSession() throws Exception {
|
|
|
- initCid(path);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void handleSessionEstablishmentError(Throwable error) throws Exception {
|
|
|
- logger.error("failed to connect to zookeeper", error);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
- } finally {
|
|
|
- MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
+ @Override
|
|
|
+ public void handleSessionEstablishmentError(Throwable error) throws Exception {
|
|
|
+ logger.error("failed to connect to zookeeper", error);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- public void processStop() {
|
|
|
- try {
|
|
|
- MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
- if (zkclientx != null) {
|
|
|
- final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
|
|
|
- registerIp + ":" + port);
|
|
|
- releaseCid(path);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
+ public void processStop() {
|
|
|
+ try {
|
|
|
+ MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
|
|
|
+ if (zkclientx != null) {
|
|
|
+ final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
|
|
|
+ registerIp + ":" + port);
|
|
|
+ releaseCid(path);
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ MDC.remove(CanalConstants.MDC_DESTINATION);
|
|
|
}
|
|
|
-
|
|
|
- });
|
|
|
- if (zkclientx != null) {
|
|
|
- runningMonitor.setZkClient(zkclientx);
|
|
|
}
|
|
|
- // 触发创建一下cid节点
|
|
|
- runningMonitor.init();
|
|
|
- return runningMonitor;
|
|
|
+
|
|
|
+ });
|
|
|
+ if (zkclientx != null) {
|
|
|
+ runningMonitor.setZkClient(zkclientx);
|
|
|
}
|
|
|
- }));
|
|
|
+ // 触发创建一下cid节点
|
|
|
+ runningMonitor.init();
|
|
|
+ return runningMonitor;
|
|
|
+ }
|
|
|
+ }));
|
|
|
|
|
|
// 初始化monitor机制
|
|
|
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
|
|
@@ -284,13 +280,26 @@ public class CanalController {
|
|
|
|
|
|
logger.info("auto notify reload {} successful.", destination);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void release(String destination) {
|
|
|
+ // 此处的release,代表强制释放,主要针对HA机制释放运行,让给其他机器抢占
|
|
|
+ InstanceConfig config = instanceConfigs.get(destination);
|
|
|
+ if (config != null) {
|
|
|
+ ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
|
|
|
+ if (runningMonitor.isStart()) {
|
|
|
+ runningMonitor.release();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info("auto notify release {} successful.", destination);
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
|
|
|
|
|
|
public InstanceConfigMonitor apply(InstanceMode mode) {
|
|
|
- int scanInterval = Integer
|
|
|
- .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
|
|
|
+ int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
|
|
|
|
|
|
if (mode.isSpring()) {
|
|
|
SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
|
|
@@ -366,7 +375,7 @@ public class CanalController {
|
|
|
}
|
|
|
|
|
|
if (config.getMode().isManager()) {
|
|
|
- PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator(properties);
|
|
|
+ PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator();
|
|
|
instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
|
|
|
instanceGenerator.setSpringXml(config.getSpringXml());
|
|
|
return instanceGenerator.generate(destination);
|
|
@@ -386,7 +395,7 @@ public class CanalController {
|
|
|
}
|
|
|
|
|
|
private PlainCanalConfigClient getManagerClient(String managerAddress) {
|
|
|
- return new PlainCanalConfigClient(managerAddress, this.adminUser, this.adminPasswd, null, adminPort);
|
|
|
+ return new PlainCanalConfigClient(managerAddress, this.adminUser, this.adminPasswd, this.registerIp, adminPort);
|
|
|
}
|
|
|
|
|
|
private void initInstanceConfig(Properties properties) {
|
|
@@ -398,8 +407,7 @@ public class CanalController {
|
|
|
InstanceConfig oldConfig = instanceConfigs.put(destination, config);
|
|
|
|
|
|
if (oldConfig != null) {
|
|
|
- logger
|
|
|
- .warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
|
|
|
+ logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -595,7 +603,4 @@ public class CanalController {
|
|
|
return instanceConfigs;
|
|
|
}
|
|
|
|
|
|
- public Map<String, String> getRunningInstances() {
|
|
|
- return runningInstances;
|
|
|
- }
|
|
|
}
|