|
@@ -23,6 +23,12 @@ import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
|
|
|
import com.google.common.base.Joiner;
|
|
|
import com.google.common.collect.MapMaker;
|
|
|
|
|
|
+/**
|
|
|
+ * 远程配置装载、监控类
|
|
|
+ *
|
|
|
+ * @author rewerma @ 2019-01-05
|
|
|
+ * @version 1.0.0
|
|
|
+ */
|
|
|
public class AdapterRemoteConfigMonitor {
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(AdapterRemoteConfigMonitor.class);
|
|
@@ -44,26 +50,6 @@ public class AdapterRemoteConfigMonitor {
|
|
|
this.jdbcPassword = jdbcPassword;
|
|
|
}
|
|
|
|
|
|
- public AdapterRemoteConfigMonitor(){
|
|
|
- try {
|
|
|
- File configFile = new File(".." + File.separator + Constant.CONF_DIR + File.separator + "bootstrap.yml");
|
|
|
- if (!configFile.exists()) {
|
|
|
- URL url = MappingConfigsLoader.class.getClassLoader().getResource("");
|
|
|
- if (url != null) {
|
|
|
- configFile = new File(url.getPath() + "bootstrap.yml");
|
|
|
- }
|
|
|
- }
|
|
|
- if (configFile.exists()) {
|
|
|
- try (FileInputStream fis = new FileInputStream(configFile)) {
|
|
|
- Map a = new Yaml().load(fis);
|
|
|
- a = a;
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private Connection getConn() throws Exception {
|
|
|
if (conn == null || conn.isClosed()) {
|
|
|
Class.forName("com.mysql.jdbc.Driver");
|
|
@@ -72,6 +58,9 @@ public class AdapterRemoteConfigMonitor {
|
|
|
return conn;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 加载远程application.yml配置到本地
|
|
|
+ */
|
|
|
public void loadRemoteConfig() {
|
|
|
try {
|
|
|
// 加载远程adapter配置
|
|
@@ -87,6 +76,11 @@ public class AdapterRemoteConfigMonitor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 获取远程application.yml配置
|
|
|
+ *
|
|
|
+ * @return 配置对象
|
|
|
+ */
|
|
|
private ConfigItem getRemoteAdapterConfig() {
|
|
|
String sql = "select name, content, modified_time from canal_config where id=2";
|
|
|
try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
|
|
@@ -104,6 +98,11 @@ public class AdapterRemoteConfigMonitor {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 覆盖本地application.yml文件
|
|
|
+ *
|
|
|
+ * @param content 文件内容
|
|
|
+ */
|
|
|
private void overrideLocalCanalConfig(String content) {
|
|
|
try (FileWriter writer = new FileWriter(getConfPath() + "application.yml")) {
|
|
|
writer.write(content);
|
|
@@ -113,6 +112,9 @@ public class AdapterRemoteConfigMonitor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 启动监听数据库变化
|
|
|
+ */
|
|
|
public void start() {
|
|
|
// 监听application.yml变化
|
|
|
executor.scheduleWithFixedDelay(() -> {
|
|
@@ -133,6 +135,9 @@ public class AdapterRemoteConfigMonitor {
|
|
|
}, 10, 3, TimeUnit.SECONDS);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 加载adapter配置到本地
|
|
|
+ */
|
|
|
public void loadRemoteAdapterConfigs() {
|
|
|
try {
|
|
|
// 加载远程instance配置
|
|
@@ -145,6 +150,11 @@ public class AdapterRemoteConfigMonitor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 获取有变动的adapter配置
|
|
|
+ *
|
|
|
+ * @return Map[0]: 新增修改的配置, Map[1]: 删除的配置
|
|
|
+ */
|
|
|
@SuppressWarnings("unchecked")
|
|
|
private Map<String, ConfigItem>[] getModifiedAdapterConfigs() {
|
|
|
Map<String, ConfigItem>[] res = new Map[2];
|
|
@@ -223,6 +233,11 @@ public class AdapterRemoteConfigMonitor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 覆盖adapter配置到本地
|
|
|
+ *
|
|
|
+ * @param modifiedInstanceConfigs 变动的配置集合
|
|
|
+ */
|
|
|
private void overrideLocalAdapterConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
|
|
|
Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
|
|
|
if (changedInstanceConfigs != null) {
|