|
@@ -20,10 +20,10 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import com.alibaba.druid.pool.DruidDataSource;
|
|
|
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterConfigHolder;
|
|
|
import com.alibaba.otter.canal.common.utils.CommonUtils;
|
|
|
import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
|
|
|
import com.google.common.base.Joiner;
|
|
|
-import com.google.common.collect.MapMaker;
|
|
|
|
|
|
/**
|
|
|
* 基于数据库的远程配置装载器
|
|
@@ -33,17 +33,16 @@ import com.google.common.collect.MapMaker;
|
|
|
*/
|
|
|
public class DbRemoteConfigLoader implements RemoteConfigLoader {
|
|
|
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(DbRemoteConfigLoader.class);
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(DbRemoteConfigLoader.class);
|
|
|
|
|
|
private DruidDataSource dataSource;
|
|
|
|
|
|
- private volatile long currentConfigTimestamp = 0;
|
|
|
- private Map<String, ConfigItem> remoteAdapterConfigs = new MapMaker().makeMap();
|
|
|
+ private AdapterConfigHolder remoteAdapterConfigHolder = AdapterConfigHolder.getInstance();
|
|
|
|
|
|
- private ScheduledExecutorService executor = Executors.newScheduledThreadPool(2,
|
|
|
+ private ScheduledExecutorService executor = Executors.newScheduledThreadPool(2,
|
|
|
new NamedThreadFactory("remote-adapter-config-scan"));
|
|
|
|
|
|
- private RemoteAdapterMonitor remoteAdapterMonitor = new RemoteAdapterMonitorImpl();
|
|
|
+ private RemoteAdapterMonitor remoteAdapterMonitor = new RemoteAdapterMonitorImpl();
|
|
|
|
|
|
public DbRemoteConfigLoader(String driverName, String jdbcUrl, String jdbcUsername, String jdbcPassword){
|
|
|
dataSource = new DruidDataSource();
|
|
@@ -76,8 +75,8 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
|
|
|
// 加载远程adapter配置
|
|
|
ConfigItem configItem = getRemoteAdapterConfig();
|
|
|
if (configItem != null) {
|
|
|
- if (configItem.getModifiedTime() != currentConfigTimestamp) {
|
|
|
- currentConfigTimestamp = configItem.getModifiedTime();
|
|
|
+ if (configItem.getModifiedTime() != remoteAdapterConfigHolder.getAdapterConfigTimestamp()) {
|
|
|
+ remoteAdapterConfigHolder.setAdapterConfigTimestamp(configItem.getModifiedTime());
|
|
|
overrideLocalCanalConfig(configItem.getContent());
|
|
|
logger.info("## Loaded remote adapter config: application.yml");
|
|
|
}
|
|
@@ -166,7 +165,7 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
|
|
|
List<Long> changedIds = new ArrayList<>();
|
|
|
|
|
|
for (ConfigItem remoteConfigStat : remoteConfigStatus.values()) {
|
|
|
- ConfigItem currentConfig = remoteAdapterConfigs
|
|
|
+ ConfigItem currentConfig = remoteAdapterConfigHolder.getAdapterConfigs()
|
|
|
.get(remoteConfigStat.getCategory() + "/" + remoteConfigStat.getName());
|
|
|
if (currentConfig == null) {
|
|
|
// 新增
|
|
@@ -192,8 +191,8 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
|
|
|
configItemNew.setContent(rs.getString("content"));
|
|
|
configItemNew.setModifiedTime(rs.getTimestamp("modified_time").getTime());
|
|
|
|
|
|
- remoteAdapterConfigs.put(configItemNew.getCategory() + "/" + configItemNew.getName(),
|
|
|
- configItemNew);
|
|
|
+ remoteAdapterConfigHolder.getAdapterConfigs()
|
|
|
+ .put(configItemNew.getCategory() + "/" + configItemNew.getName(), configItemNew);
|
|
|
remoteAdapterMonitor.onModify(configItemNew);
|
|
|
}
|
|
|
|
|
@@ -203,10 +202,11 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- for (ConfigItem configItem : remoteAdapterConfigs.values()) {
|
|
|
+ for (ConfigItem configItem : remoteAdapterConfigHolder.getAdapterConfigs().values()) {
|
|
|
if (!remoteConfigStatus.containsKey(configItem.getCategory() + "/" + configItem.getName())) {
|
|
|
// 删除
|
|
|
- remoteAdapterConfigs.remove(configItem.getCategory() + "/" + configItem.getName());
|
|
|
+ remoteAdapterConfigHolder.getAdapterConfigs()
|
|
|
+ .remove(configItem.getCategory() + "/" + configItem.getName());
|
|
|
remoteAdapterMonitor.onDelete(configItem.getCategory() + "/" + configItem.getName());
|
|
|
}
|
|
|
}
|