|
@@ -1,39 +1,41 @@
|
|
|
-package com.alibaba.otter.canal.deployer.monitor;
|
|
|
+package com.alibaba.otter.canal.deployer.monitor.remote;
|
|
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
import java.io.File;
|
|
|
import java.io.FileWriter;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
-import java.sql.*;
|
|
|
+import java.sql.Connection;
|
|
|
+import java.sql.ResultSet;
|
|
|
+import java.sql.SQLException;
|
|
|
+import java.sql.Statement;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.alibaba.druid.pool.DruidDataSource;
|
|
|
import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
|
|
|
import com.alibaba.otter.canal.deployer.CanalConstants;
|
|
|
import com.google.common.base.Joiner;
|
|
|
import com.google.common.collect.MapMaker;
|
|
|
|
|
|
/**
|
|
|
- * 远程配置装载监控
|
|
|
+ * 基于数据库的远程配置装载器
|
|
|
*
|
|
|
- * @author rewerma 2018-12-30 下午05:12:16
|
|
|
- * @version 1.0.1
|
|
|
+ * @author rewerma 2019-01-25 下午05:20:16
|
|
|
+ * @version 1.0.0
|
|
|
*/
|
|
|
-public class ManagerRemoteConfigMonitor {
|
|
|
+public class DbRemoteConfigLoader implements RemoteConfigLoader {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(ManagerRemoteConfigMonitor.class);
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(DbRemoteConfigLoader.class);
|
|
|
|
|
|
private Map<String, ConfigItem> remoteInstanceConfigs = new MapMaker().makeMap();
|
|
|
|
|
|
- private Connection conn;
|
|
|
- private String jdbcUrl;
|
|
|
- private String jdbcUsername;
|
|
|
- private String jdbcPassword;
|
|
|
+ private DruidDataSource dataSource;
|
|
|
|
|
|
private long currentConfigTimestamp = 0;
|
|
|
|
|
@@ -41,29 +43,36 @@ public class ManagerRemoteConfigMonitor {
|
|
|
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(2,
|
|
|
new NamedThreadFactory("remote-canal-config-scan"));
|
|
|
|
|
|
- public ManagerRemoteConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
|
|
|
- this.jdbcUrl = jdbcUrl;
|
|
|
- this.jdbcUsername = jdbcUsername;
|
|
|
- this.jdbcPassword = jdbcPassword;
|
|
|
- }
|
|
|
-
|
|
|
- public void setScanIntervalInSecond(long scanIntervalInSecond) {
|
|
|
- this.scanIntervalInSecond = scanIntervalInSecond;
|
|
|
- }
|
|
|
+ private RemoteInstanceMonitor remoteInstanceMonitor = new RemoteInstanceMonitorImpl();
|
|
|
|
|
|
- private Connection getConn() throws Exception {
|
|
|
- if (conn == null || conn.isClosed()) {
|
|
|
- Class.forName("com.mysql.jdbc.Driver");
|
|
|
- conn = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);
|
|
|
+ public DbRemoteConfigLoader(String driverName, String jdbcUrl, String jdbcUsername, String jdbcPassword){
|
|
|
+ dataSource = new DruidDataSource();
|
|
|
+ if (StringUtils.isEmpty(driverName)) {
|
|
|
+ driverName = "com.mysql.jdbc.Driver";
|
|
|
+ }
|
|
|
+ dataSource.setDriverClassName(driverName);
|
|
|
+ dataSource.setUrl(jdbcUrl);
|
|
|
+ dataSource.setUsername(jdbcUsername);
|
|
|
+ dataSource.setPassword(jdbcPassword);
|
|
|
+ dataSource.setInitialSize(1);
|
|
|
+ dataSource.setMinIdle(1);
|
|
|
+ dataSource.setMaxActive(1);
|
|
|
+ dataSource.setMaxWait(60000);
|
|
|
+ dataSource.setTimeBetweenEvictionRunsMillis(60000);
|
|
|
+ dataSource.setMinEvictableIdleTimeMillis(300000);
|
|
|
+ try {
|
|
|
+ dataSource.init();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ throw new RuntimeException(e.getMessage(), e);
|
|
|
}
|
|
|
- return conn;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 加载远程 canal.properties文件 覆盖本地
|
|
|
+ * 加载远程 canal.properties文件
|
|
|
*
|
|
|
* @return 远程配置的properties
|
|
|
*/
|
|
|
+ @Override
|
|
|
public Properties loadRemoteConfig() {
|
|
|
Properties properties = null;
|
|
|
try {
|
|
@@ -87,15 +96,14 @@ public class ManagerRemoteConfigMonitor {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 加载远程的instance配置 覆盖本地
|
|
|
+ * 覆盖本地 canal.properties
|
|
|
+ *
|
|
|
+ * @param content 远程配置内容文本
|
|
|
*/
|
|
|
- public void loadRemoteInstanceConfigs() {
|
|
|
- try {
|
|
|
- // 加载远程instance配置
|
|
|
- Map<String, ConfigItem>[] modifiedConfigs = getModifiedInstanceConfigs();
|
|
|
- if (modifiedConfigs != null) {
|
|
|
- overrideLocalInstanceConfigs(modifiedConfigs);
|
|
|
- }
|
|
|
+ private void overrideLocalCanalConfig(String content) {
|
|
|
+ try (FileWriter writer = new FileWriter(getConfPath() + "canal.properties")) {
|
|
|
+ writer.write(content);
|
|
|
+ writer.flush();
|
|
|
} catch (Exception e) {
|
|
|
logger.error(e.getMessage(), e);
|
|
|
}
|
|
@@ -108,7 +116,9 @@ public class ManagerRemoteConfigMonitor {
|
|
|
*/
|
|
|
private ConfigItem getRemoteCanalConfig() {
|
|
|
String sql = "select name, content, modified_time from canal_config where id=1";
|
|
|
- try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
|
|
|
+ try (Connection conn = dataSource.getConnection();
|
|
|
+ Statement stmt = conn.createStatement();
|
|
|
+ ResultSet rs = stmt.executeQuery(sql)) {
|
|
|
if (rs.next()) {
|
|
|
ConfigItem configItem = new ConfigItem();
|
|
|
configItem.setId(1L);
|
|
@@ -124,30 +134,27 @@ public class ManagerRemoteConfigMonitor {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 覆盖本地 canal.properties
|
|
|
- *
|
|
|
- * @param content 远程配置内容文本
|
|
|
+ * 加载远程的instance配置
|
|
|
*/
|
|
|
- private void overrideLocalCanalConfig(String content) {
|
|
|
- try (FileWriter writer = new FileWriter(getConfPath() + "canal.properties")) {
|
|
|
- writer.write(content);
|
|
|
- writer.flush();
|
|
|
+ @Override
|
|
|
+ public void loadRemoteInstanceConfigs() {
|
|
|
+ try {
|
|
|
+ // 加载远程instance配置
|
|
|
+ loadModifiedInstanceConfigs();
|
|
|
} catch (Exception e) {
|
|
|
logger.error(e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 获取远程instance新增、修改、删除配置
|
|
|
- *
|
|
|
- * @return Map[0]:新增或修改的instance配置; Map[1]:删除的instance配置
|
|
|
+ * 加载远程instance新增、修改、删除配置
|
|
|
*/
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private Map<String, ConfigItem>[] getModifiedInstanceConfigs() {
|
|
|
- Map<String, ConfigItem>[] res = new Map[2];
|
|
|
+ private void loadModifiedInstanceConfigs() {
|
|
|
Map<String, ConfigItem> remoteConfigStatus = new HashMap<>();
|
|
|
String sql = "select id, name, modified_time from canal_instance_config";
|
|
|
- try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
|
|
|
+ try (Connection conn = dataSource.getConnection();
|
|
|
+ Statement stmt = conn.createStatement();
|
|
|
+ ResultSet rs = stmt.executeQuery(sql)) {
|
|
|
while (rs.next()) {
|
|
|
ConfigItem configItem = new ConfigItem();
|
|
|
configItem.setId(rs.getLong("id"));
|
|
@@ -157,7 +164,6 @@ public class ManagerRemoteConfigMonitor {
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
logger.error(e.getMessage(), e);
|
|
|
- return null;
|
|
|
}
|
|
|
|
|
|
if (!remoteConfigStatus.isEmpty()) {
|
|
@@ -176,10 +182,11 @@ public class ManagerRemoteConfigMonitor {
|
|
|
}
|
|
|
}
|
|
|
if (!changedIds.isEmpty()) {
|
|
|
- Map<String, ConfigItem> changedInstanceConfig = new HashMap<>();
|
|
|
String contentsSql = "select id, name, content, modified_time from canal_instance_config where id in ("
|
|
|
+ Joiner.on(",").join(changedIds) + ")";
|
|
|
- try (Statement stmt = getConn().createStatement(); ResultSet rs = stmt.executeQuery(contentsSql)) {
|
|
|
+ try (Connection conn = dataSource.getConnection();
|
|
|
+ Statement stmt = conn.createStatement();
|
|
|
+ ResultSet rs = stmt.executeQuery(contentsSql)) {
|
|
|
while (rs.next()) {
|
|
|
ConfigItem configItemNew = new ConfigItem();
|
|
|
configItemNew.setId(rs.getLong("id"));
|
|
@@ -188,64 +195,20 @@ public class ManagerRemoteConfigMonitor {
|
|
|
configItemNew.setModifiedTime(rs.getTimestamp("modified_time").getTime());
|
|
|
|
|
|
remoteInstanceConfigs.put(configItemNew.getName(), configItemNew);
|
|
|
- changedInstanceConfig.put(configItemNew.getName(), configItemNew);
|
|
|
+ remoteInstanceMonitor.onModify(configItemNew);
|
|
|
}
|
|
|
|
|
|
- res[0] = changedInstanceConfig.isEmpty() ? null : changedInstanceConfig;
|
|
|
} catch (Exception e) {
|
|
|
logger.error(e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- Map<String, ConfigItem> removedInstanceConfig = new HashMap<>();
|
|
|
for (String name : remoteInstanceConfigs.keySet()) {
|
|
|
if (!remoteConfigStatus.containsKey(name)) {
|
|
|
// 删除
|
|
|
remoteInstanceConfigs.remove(name);
|
|
|
- removedInstanceConfig.put(name, null);
|
|
|
- }
|
|
|
- }
|
|
|
- res[1] = removedInstanceConfig.isEmpty() ? null : removedInstanceConfig;
|
|
|
-
|
|
|
- if (res[0] == null && res[1] == null) {
|
|
|
- return null;
|
|
|
- } else {
|
|
|
- return res;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 覆盖本地instance配置
|
|
|
- *
|
|
|
- * @param modifiedInstanceConfigs 有变更的配置项
|
|
|
- */
|
|
|
- private void overrideLocalInstanceConfigs(Map<String, ConfigItem>[] modifiedInstanceConfigs) {
|
|
|
- Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
|
|
|
- if (changedInstanceConfigs != null) {
|
|
|
- for (ConfigItem configItem : changedInstanceConfigs.values()) {
|
|
|
- File instanceDir = new File(getConfPath() + configItem.getName());
|
|
|
- if (!instanceDir.exists()) {
|
|
|
- instanceDir.mkdirs();
|
|
|
- }
|
|
|
- try (FileWriter writer = new FileWriter(
|
|
|
- getConfPath() + configItem.getName() + "/instance.properties")) {
|
|
|
- writer.write(configItem.getContent());
|
|
|
- writer.flush();
|
|
|
- logger.info("## Loaded remote instance config: {}/instance.properties ", configItem.getName());
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- Map<String, ConfigItem> removedInstanceConfigs = modifiedInstanceConfigs[1];
|
|
|
- if (removedInstanceConfigs != null) {
|
|
|
- for (String name : removedInstanceConfigs.keySet()) {
|
|
|
- File file = new File(getConfPath() + name + "/");
|
|
|
- if (file.exists()) {
|
|
|
- deleteDir(file);
|
|
|
- logger.info("## Deleted and loaded remote instance config: {} ", name);
|
|
|
- }
|
|
|
+ remoteInstanceMonitor.onDelete(name);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -273,9 +236,9 @@ public class ManagerRemoteConfigMonitor {
|
|
|
/**
|
|
|
* 监听 canal 主配置和 instance 配置变化
|
|
|
*
|
|
|
- * @param listener 监听回调方法
|
|
|
+ * @param remoteCanalConfigMonitor 监听回调方法
|
|
|
*/
|
|
|
- public void start(final Listener<Properties> listener) {
|
|
|
+ public void startMonitor(final RemoteCanalConfigMonitor remoteCanalConfigMonitor) {
|
|
|
// 监听canal.properties变化
|
|
|
executor.scheduleWithFixedDelay(new Runnable() {
|
|
|
|
|
@@ -283,10 +246,10 @@ public class ManagerRemoteConfigMonitor {
|
|
|
try {
|
|
|
Properties properties = loadRemoteConfig();
|
|
|
if (properties != null) {
|
|
|
- listener.onChange(properties);
|
|
|
+ remoteCanalConfigMonitor.onChange(properties);
|
|
|
}
|
|
|
} catch (Throwable e) {
|
|
|
- logger.error("scan failed", e);
|
|
|
+ logger.error("Scan remote canal config failed", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -299,7 +262,7 @@ public class ManagerRemoteConfigMonitor {
|
|
|
try {
|
|
|
loadRemoteInstanceConfigs();
|
|
|
} catch (Throwable e) {
|
|
|
- logger.error("scan failed", e);
|
|
|
+ logger.error("Scan remote instance config failed", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -311,12 +274,10 @@ public class ManagerRemoteConfigMonitor {
|
|
|
*/
|
|
|
public void destroy() {
|
|
|
executor.shutdownNow();
|
|
|
- if (conn != null) {
|
|
|
- try {
|
|
|
- conn.close();
|
|
|
- } catch (SQLException e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
- }
|
|
|
+ try {
|
|
|
+ dataSource.close();
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -336,50 +297,37 @@ public class ManagerRemoteConfigMonitor {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 配置对应对象
|
|
|
+ * 远程xxx/instance.properties配置监听器实现
|
|
|
*/
|
|
|
- public static class ConfigItem {
|
|
|
-
|
|
|
- private Long id;
|
|
|
- private String name;
|
|
|
- private String content;
|
|
|
- private long modifiedTime;
|
|
|
-
|
|
|
- public Long getId() {
|
|
|
- return id;
|
|
|
- }
|
|
|
-
|
|
|
- public void setId(Long id) {
|
|
|
- this.id = id;
|
|
|
- }
|
|
|
-
|
|
|
- public String getName() {
|
|
|
- return name;
|
|
|
- }
|
|
|
-
|
|
|
- public void setName(String name) {
|
|
|
- this.name = name;
|
|
|
- }
|
|
|
-
|
|
|
- public String getContent() {
|
|
|
- return content;
|
|
|
- }
|
|
|
+ private class RemoteInstanceMonitorImpl implements RemoteInstanceMonitor {
|
|
|
|
|
|
- public void setContent(String content) {
|
|
|
- this.content = content;
|
|
|
+ @Override
|
|
|
+ public void onAdd(ConfigItem configItem) {
|
|
|
+ this.onModify(configItem);
|
|
|
}
|
|
|
|
|
|
- public long getModifiedTime() {
|
|
|
- return modifiedTime;
|
|
|
+ @Override
|
|
|
+ public void onModify(ConfigItem configItem) {
|
|
|
+ File instanceDir = new File(getConfPath() + configItem.getName());
|
|
|
+ if (!instanceDir.exists()) {
|
|
|
+ instanceDir.mkdirs();
|
|
|
+ }
|
|
|
+ try (FileWriter writer = new FileWriter(getConfPath() + configItem.getName() + "/instance.properties")) {
|
|
|
+ writer.write(configItem.getContent());
|
|
|
+ writer.flush();
|
|
|
+ logger.info("## Loaded remote instance config: {}/instance.properties ", configItem.getName());
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public void setModifiedTime(long modifiedTime) {
|
|
|
- this.modifiedTime = modifiedTime;
|
|
|
+ @Override
|
|
|
+ public void onDelete(String instanceName) {
|
|
|
+ File file = new File(getConfPath() + instanceName + "/");
|
|
|
+ if (file.exists()) {
|
|
|
+ deleteDir(file);
|
|
|
+ logger.info("## Deleted and loaded remote instance config: {} ", instanceName);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- public interface Listener<T> {
|
|
|
-
|
|
|
- void onChange(T properties);
|
|
|
- }
|
|
|
}
|