|
@@ -7,11 +7,7 @@ import org.apache.commons.lang.StringUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import com.alibaba.otter.canal.common.MQProperties;
|
|
|
|
-import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
|
|
|
|
-import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
|
|
|
|
-import com.alibaba.otter.canal.server.CanalMQStarter;
|
|
|
|
-import com.alibaba.otter.canal.spi.CanalMQProducer;
|
|
|
|
|
|
+import com.alibaba.otter.canal.deployer.monitor.ManagerDbConfigMonitor;
|
|
|
|
|
|
/**
|
|
/**
|
|
* canal独立版本启动的入口类
|
|
* canal独立版本启动的入口类
|
|
@@ -21,128 +17,67 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
|
|
*/
|
|
*/
|
|
public class CanalLauncher {
|
|
public class CanalLauncher {
|
|
|
|
|
|
- private static final String CLASSPATH_URL_PREFIX = "classpath:";
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class);
|
|
|
|
|
|
+ private static final String CLASSPATH_URL_PREFIX = "classpath:";
|
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class);
|
|
|
|
+ public static volatile boolean running = false;
|
|
|
|
|
|
- public static void main(String[] args) throws Throwable {
|
|
|
|
|
|
+ public static void main(String[] args) {
|
|
try {
|
|
try {
|
|
|
|
+ running = true;
|
|
logger.info("## set default uncaught exception handler");
|
|
logger.info("## set default uncaught exception handler");
|
|
setGlobalUncaughtExceptionHandler();
|
|
setGlobalUncaughtExceptionHandler();
|
|
|
|
|
|
logger.info("## load canal configurations");
|
|
logger.info("## load canal configurations");
|
|
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
|
|
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
|
|
Properties properties = new Properties();
|
|
Properties properties = new Properties();
|
|
|
|
+ ManagerDbConfigMonitor managerDbConfigMonitor = null;
|
|
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
|
|
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
|
|
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
|
|
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
|
|
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
|
|
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
|
|
|
|
+
|
|
|
|
+ String jdbcUrl = properties.getProperty("canal.manager.jdbc.url");
|
|
|
|
+ if (!StringUtils.isEmpty(jdbcUrl)) {
|
|
|
|
+ // load remote config
|
|
|
|
+ String jdbcUsername = properties.getProperty("canal.manager.jdbc.username");
|
|
|
|
+ String jdbcPassword = properties.getProperty("canal.manager.jdbc.password");
|
|
|
|
+ managerDbConfigMonitor = new ManagerDbConfigMonitor(jdbcUrl, jdbcUsername, jdbcPassword);
|
|
|
|
+ Properties remoteConfig = managerDbConfigMonitor.loadRemoteConfig();
|
|
|
|
+ if (remoteConfig != null) {
|
|
|
|
+ properties = remoteConfig;
|
|
|
|
+ } else {
|
|
|
|
+ managerDbConfigMonitor = null;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
properties.load(new FileInputStream(conf));
|
|
properties.load(new FileInputStream(conf));
|
|
}
|
|
}
|
|
|
|
|
|
- CanalMQProducer canalMQProducer = null;
|
|
|
|
- String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
|
|
|
|
- if (serverMode.equalsIgnoreCase("kafka")) {
|
|
|
|
- canalMQProducer = new CanalKafkaProducer();
|
|
|
|
- } else if (serverMode.equalsIgnoreCase("rocketmq")) {
|
|
|
|
- canalMQProducer = new CanalRocketMQProducer();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (canalMQProducer != null) {
|
|
|
|
- // disable netty
|
|
|
|
- System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
|
|
|
|
- System.setProperty(CanalConstants.CANAL_DESTINATIONS,
|
|
|
|
- properties.getProperty(CanalConstants.CANAL_DESTINATIONS));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- logger.info("## start the canal server.");
|
|
|
|
- final CanalController controller = new CanalController(properties);
|
|
|
|
- controller.start();
|
|
|
|
- logger.info("## the canal server is running now ......");
|
|
|
|
- Runtime.getRuntime().addShutdownHook(new Thread() {
|
|
|
|
-
|
|
|
|
- public void run() {
|
|
|
|
- try {
|
|
|
|
- logger.info("## stop the canal server");
|
|
|
|
- controller.stop();
|
|
|
|
- } catch (Throwable e) {
|
|
|
|
- logger.warn("##something goes wrong when stopping canal Server:", e);
|
|
|
|
- } finally {
|
|
|
|
- logger.info("## canal server is down.");
|
|
|
|
|
|
+ final CanalStater canalStater = new CanalStater();
|
|
|
|
+ canalStater.start(properties);
|
|
|
|
+
|
|
|
|
+ if (managerDbConfigMonitor != null) {
|
|
|
|
+ managerDbConfigMonitor.start(new ManagerDbConfigMonitor.Listener<Properties>() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onChange(Properties properties) {
|
|
|
|
+ try {
|
|
|
|
+ // 远程配置canal.properties修改重新启动
|
|
|
|
+ canalStater.destroy();
|
|
|
|
+ canalStater.start(properties);
|
|
|
|
+ } catch (Throwable throwable) {
|
|
|
|
+ logger.error(throwable.getMessage(), throwable);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- if (canalMQProducer != null) {
|
|
|
|
- CanalMQStarter canalMQStarter = new CanalMQStarter(canalMQProducer);
|
|
|
|
- MQProperties mqProperties = buildMQPosition(properties);
|
|
|
|
- canalMQStarter.start(mqProperties);
|
|
|
|
- controller.setCanalMQStarter(canalMQStarter);
|
|
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ while (running)
|
|
|
|
+ ;
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
logger.error("## Something goes wrong when starting up the canal Server:", e);
|
|
logger.error("## Something goes wrong when starting up the canal Server:", e);
|
|
- System.exit(0);
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private static MQProperties buildMQPosition(Properties properties) {
|
|
|
|
- MQProperties mqProperties = new MQProperties();
|
|
|
|
- String servers = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS);
|
|
|
|
- if (!StringUtils.isEmpty(servers)) {
|
|
|
|
- mqProperties.setServers(servers);
|
|
|
|
- }
|
|
|
|
- String retires = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_RETRIES);
|
|
|
|
- if (!StringUtils.isEmpty(retires)) {
|
|
|
|
- mqProperties.setRetries(Integer.valueOf(retires));
|
|
|
|
- }
|
|
|
|
- String batchSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BATCHSIZE);
|
|
|
|
- if (!StringUtils.isEmpty(batchSize)) {
|
|
|
|
- mqProperties.setBatchSize(Integer.valueOf(batchSize));
|
|
|
|
- }
|
|
|
|
- String lingerMs = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_LINGERMS);
|
|
|
|
- if (!StringUtils.isEmpty(lingerMs)) {
|
|
|
|
- mqProperties.setLingerMs(Integer.valueOf(lingerMs));
|
|
|
|
- }
|
|
|
|
- String maxRequestSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_MAXREQUESTSIZE);
|
|
|
|
- if (!StringUtils.isEmpty(maxRequestSize)) {
|
|
|
|
- mqProperties.setMaxRequestSize(Integer.valueOf(maxRequestSize));
|
|
|
|
- }
|
|
|
|
- String bufferMemory = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_BUFFERMEMORY);
|
|
|
|
- if (!StringUtils.isEmpty(bufferMemory)) {
|
|
|
|
- mqProperties.setBufferMemory(Long.valueOf(bufferMemory));
|
|
|
|
- }
|
|
|
|
- String canalBatchSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALBATCHSIZE);
|
|
|
|
- if (!StringUtils.isEmpty(canalBatchSize)) {
|
|
|
|
- mqProperties.setCanalBatchSize(Integer.valueOf(canalBatchSize));
|
|
|
|
- }
|
|
|
|
- String canalGetTimeout = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CANALGETTIMEOUT);
|
|
|
|
- if (!StringUtils.isEmpty(canalGetTimeout)) {
|
|
|
|
- mqProperties.setCanalGetTimeout(Long.valueOf(canalGetTimeout));
|
|
|
|
- }
|
|
|
|
- String flatMessage = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_FLATMESSAGE);
|
|
|
|
- if (!StringUtils.isEmpty(flatMessage)) {
|
|
|
|
- mqProperties.setFlatMessage(Boolean.valueOf(flatMessage));
|
|
|
|
- }
|
|
|
|
- String compressionType = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_COMPRESSION_TYPE);
|
|
|
|
- if (!StringUtils.isEmpty(compressionType)) {
|
|
|
|
- mqProperties.setCompressionType(compressionType);
|
|
|
|
- }
|
|
|
|
- String acks = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ACKS);
|
|
|
|
- if (!StringUtils.isEmpty(acks)) {
|
|
|
|
- mqProperties.setAcks(acks);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- String aliyunAccessKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESSKEY);
|
|
|
|
- if (!StringUtils.isEmpty(aliyunAccessKey)) {
|
|
|
|
- mqProperties.setAliyunAccessKey(aliyunAccessKey);
|
|
|
|
- }
|
|
|
|
- String aliyunSecretKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRETKEY);
|
|
|
|
- if (!StringUtils.isEmpty(aliyunSecretKey)) {
|
|
|
|
- mqProperties.setAliyunSecretKey(aliyunSecretKey);
|
|
|
|
- }
|
|
|
|
- return mqProperties;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private static void setGlobalUncaughtExceptionHandler() {
|
|
private static void setGlobalUncaughtExceptionHandler() {
|
|
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
|
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
|
|
|
|
|
@@ -152,4 +87,5 @@ public class CanalLauncher {
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
+
|
|
}
|
|
}
|