|
@@ -12,6 +12,12 @@ import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
|
|
|
import com.alibaba.otter.canal.server.CanalMQStarter;
|
|
|
import com.alibaba.otter.canal.spi.CanalMQProducer;
|
|
|
|
|
|
+/**
|
|
|
+ * Canal server 启动类
|
|
|
+ *
|
|
|
+ * @author rewerma 2018-12-30 下午05:12:16
|
|
|
+ * @version 1.0.1
|
|
|
+ */
|
|
|
public class CanalStater {
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(CanalStater.class);
|
|
@@ -21,6 +27,12 @@ public class CanalStater {
|
|
|
private Thread shutdownThread = null;
|
|
|
private CanalMQStarter canalMQStarter = null;
|
|
|
|
|
|
+ /**
|
|
|
+ * 启动方法
|
|
|
+ *
|
|
|
+ * @param properties canal.properties 配置
|
|
|
+ * @throws Throwable
|
|
|
+ */
|
|
|
synchronized void start(Properties properties) throws Throwable {
|
|
|
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
|
|
|
if (serverMode.equalsIgnoreCase("kafka")) {
|
|
@@ -59,12 +71,17 @@ public class CanalStater {
|
|
|
|
|
|
if (canalMQProducer != null) {
|
|
|
canalMQStarter = new CanalMQStarter(canalMQProducer);
|
|
|
- MQProperties mqProperties = buildMQPosition(properties);
|
|
|
+ MQProperties mqProperties = buildMQProperties(properties);
|
|
|
canalMQStarter.start(mqProperties);
|
|
|
controller.setCanalMQStarter(canalMQStarter);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 销毁方法,远程配置变更时调用
|
|
|
+ *
|
|
|
+ * @throws Throwable
|
|
|
+ */
|
|
|
synchronized void destroy() throws Throwable {
|
|
|
if (controller != null) {
|
|
|
controller.stop();
|
|
@@ -81,7 +98,13 @@ public class CanalStater {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static MQProperties buildMQPosition(Properties properties) {
|
|
|
+ /**
|
|
|
+ * 构造MQ对应的配置
|
|
|
+ *
|
|
|
+ * @param properties canal.properties 配置
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static MQProperties buildMQProperties(Properties properties) {
|
|
|
MQProperties mqProperties = new MQProperties();
|
|
|
String servers = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_SERVERS);
|
|
|
if (!StringUtils.isEmpty(servers)) {
|