|
@@ -18,11 +18,17 @@ import com.alibaba.otter.canal.deployer.CanalConstants;
|
|
import com.google.common.base.Joiner;
|
|
import com.google.common.base.Joiner;
|
|
import com.google.common.collect.MapMaker;
|
|
import com.google.common.collect.MapMaker;
|
|
|
|
|
|
-public class ManagerDbConfigMonitor {
|
|
|
|
|
|
+/**
|
|
|
|
+ * 远程配置装载监控
|
|
|
|
+ *
|
|
|
|
+ * @author rewerma 2018-12-30 下午05:12:16
|
|
|
|
+ * @version 1.0.1
|
|
|
|
+ */
|
|
|
|
+public class ManagerRemoteConfigMonitor {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(ManagerDbConfigMonitor.class);
|
|
|
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(ManagerRemoteConfigMonitor.class);
|
|
|
|
|
|
- private Map<String, ConfigItem> remoteInstanceConfigs = new MapMaker().makeMap();
|
|
|
|
|
|
+ private Map<String, ConfigItem> remoteInstanceConfigs = new MapMaker().makeMap();
|
|
|
|
|
|
private Connection conn;
|
|
private Connection conn;
|
|
private String jdbcUrl;
|
|
private String jdbcUrl;
|
|
@@ -35,7 +41,7 @@ public class ManagerDbConfigMonitor {
|
|
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(2,
|
|
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(2,
|
|
new NamedThreadFactory("remote-canal-config-scan"));
|
|
new NamedThreadFactory("remote-canal-config-scan"));
|
|
|
|
|
|
- public ManagerDbConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
|
|
|
|
|
|
+ public ManagerRemoteConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
|
|
this.jdbcUrl = jdbcUrl;
|
|
this.jdbcUrl = jdbcUrl;
|
|
this.jdbcUsername = jdbcUsername;
|
|
this.jdbcUsername = jdbcUsername;
|
|
this.jdbcPassword = jdbcPassword;
|
|
this.jdbcPassword = jdbcPassword;
|
|
@@ -53,6 +59,11 @@ public class ManagerDbConfigMonitor {
|
|
return conn;
|
|
return conn;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 加载远程 canal.properties文件 覆盖本地
|
|
|
|
+ *
|
|
|
|
+ * @return 远程配置的properties
|
|
|
|
+ */
|
|
public Properties loadRemoteConfig() {
|
|
public Properties loadRemoteConfig() {
|
|
Properties properties = null;
|
|
Properties properties = null;
|
|
try {
|
|
try {
|
|
@@ -74,6 +85,9 @@ public class ManagerDbConfigMonitor {
|
|
return properties;
|
|
return properties;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 加载远程的instance配置 覆盖本地
|
|
|
|
+ */
|
|
public void loadRemoteInstanceConfigs() {
|
|
public void loadRemoteInstanceConfigs() {
|
|
try {
|
|
try {
|
|
// 加载远程instance配置
|
|
// 加载远程instance配置
|
|
@@ -86,6 +100,11 @@ public class ManagerDbConfigMonitor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 获取远程canal.properties配置内容
|
|
|
|
+ *
|
|
|
|
+ * @return 内容对象
|
|
|
|
+ */
|
|
private ConfigItem getRemoteCanalConfig() {
|
|
private ConfigItem getRemoteCanalConfig() {
|
|
String sql = "select name, content, modified_time from canal_config where id=1";
|
|
String sql = "select name, content, modified_time from canal_config where id=1";
|
|
try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
|
|
try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
|
|
@@ -103,6 +122,11 @@ public class ManagerDbConfigMonitor {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 覆盖本地 canal.properties
|
|
|
|
+ *
|
|
|
|
+ * @param content 远程配置内容文本
|
|
|
|
+ */
|
|
private void overrideLocalCanalConfig(String content) {
|
|
private void overrideLocalCanalConfig(String content) {
|
|
try (FileWriter writer = new FileWriter(getConfPath() + "canal.properties")) {
|
|
try (FileWriter writer = new FileWriter(getConfPath() + "canal.properties")) {
|
|
writer.write(content);
|
|
writer.write(content);
|
|
@@ -112,38 +136,11 @@ public class ManagerDbConfigMonitor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void start(final Listener<Properties> listener) {
|
|
|
|
- // 监听canal.properties变化
|
|
|
|
- executor.scheduleWithFixedDelay(new Runnable() {
|
|
|
|
-
|
|
|
|
- public void run() {
|
|
|
|
- try {
|
|
|
|
- Properties properties = loadRemoteConfig();
|
|
|
|
- if (properties != null) {
|
|
|
|
- // 重启整个进程
|
|
|
|
- listener.onChange(properties);
|
|
|
|
- }
|
|
|
|
- } catch (Throwable e) {
|
|
|
|
- logger.error("scan failed", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- }, 10, scanIntervalInSecond, TimeUnit.SECONDS);
|
|
|
|
-
|
|
|
|
- // 监听instance变化
|
|
|
|
- executor.scheduleWithFixedDelay(new Runnable() {
|
|
|
|
-
|
|
|
|
- public void run() {
|
|
|
|
- try {
|
|
|
|
- loadRemoteInstanceConfigs();
|
|
|
|
- } catch (Throwable e) {
|
|
|
|
- logger.error("scan failed", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- }, 10, 3, TimeUnit.SECONDS);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 获取远程instance新增、修改、删除配置
|
|
|
|
+ *
|
|
|
|
+ * @return Map[0]:新增或修改的instance配置; Map[1]:删除的instance配置
|
|
|
|
+ */
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
private Map<String, ConfigItem>[] getModifiedInstanceConfigs() {
|
|
private Map<String, ConfigItem>[] getModifiedInstanceConfigs() {
|
|
Map<String, ConfigItem>[] res = new Map[2];
|
|
Map<String, ConfigItem>[] res = new Map[2];
|
|
@@ -216,12 +213,16 @@ public class ManagerDbConfigMonitor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 覆盖本地instance配置
|
|
|
|
+ *
|
|
|
|
+ * @param modifiedInstanceConfigs 有变更的配置项
|
|
|
|
+ */
|
|
private void overrideLocalInstanceConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
|
|
private void overrideLocalInstanceConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
|
|
Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
|
|
Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
|
|
if (changedInstanceConfigs != null) {
|
|
if (changedInstanceConfigs != null) {
|
|
for (ConfigItem configItem : changedInstanceConfigs.values()) {
|
|
for (ConfigItem configItem : changedInstanceConfigs.values()) {
|
|
- try (FileWriter writer = new FileWriter(
|
|
|
|
- getConfPath() + configItem.getName() + "/instance.properties")) {
|
|
|
|
|
|
+ try (FileWriter writer = new FileWriter(getConfPath() + configItem.getName() + "/instance.properties")) {
|
|
writer.write(configItem.getContent());
|
|
writer.write(configItem.getContent());
|
|
writer.flush();
|
|
writer.flush();
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -240,6 +241,45 @@ public class ManagerDbConfigMonitor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 监听 canal 主配置和 instance 配置变化
|
|
|
|
+ *
|
|
|
|
+ * @param listener 监听回调方法
|
|
|
|
+ */
|
|
|
|
+ public void start(final Listener<Properties> listener) {
|
|
|
|
+ // 监听canal.properties变化
|
|
|
|
+ executor.scheduleWithFixedDelay(new Runnable() {
|
|
|
|
+
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ Properties properties = loadRemoteConfig();
|
|
|
|
+ if (properties != null) {
|
|
|
|
+ listener.onChange(properties);
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable e) {
|
|
|
|
+ logger.error("scan failed", e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }, 10, scanIntervalInSecond, TimeUnit.SECONDS);
|
|
|
|
+
|
|
|
|
+ // 监听instance变化
|
|
|
|
+ executor.scheduleWithFixedDelay(new Runnable() {
|
|
|
|
+
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ loadRemoteInstanceConfigs();
|
|
|
|
+ } catch (Throwable e) {
|
|
|
|
+ logger.error("scan failed", e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }, 10, 3, TimeUnit.SECONDS);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 销毁
|
|
|
|
+ */
|
|
public void destroy() {
|
|
public void destroy() {
|
|
executor.shutdownNow();
|
|
executor.shutdownNow();
|
|
if (conn != null) {
|
|
if (conn != null) {
|
|
@@ -251,6 +291,11 @@ public class ManagerDbConfigMonitor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 获取conf文件夹所在路径
|
|
|
|
+ *
|
|
|
|
+ * @return 路径地址
|
|
|
|
+ */
|
|
private String getConfPath() {
|
|
private String getConfPath() {
|
|
String classpath = this.getClass().getResource("/").getPath();
|
|
String classpath = this.getClass().getResource("/").getPath();
|
|
String confPath = classpath + ".." + File.separator + "conf" + File.separator;
|
|
String confPath = classpath + ".." + File.separator + "conf" + File.separator;
|
|
@@ -261,6 +306,9 @@ public class ManagerDbConfigMonitor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 配置对应对象
|
|
|
|
+ */
|
|
public static class ConfigItem {
|
|
public static class ConfigItem {
|
|
|
|
|
|
private Long id;
|
|
private Long id;
|