|
@@ -2,10 +2,10 @@ package com.alibaba.otter.canal.client.adapter.rdb;
|
|
|
|
|
|
import java.sql.Connection;
|
|
|
import java.sql.SQLException;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
@@ -21,22 +21,26 @@ import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
|
|
|
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
|
|
|
import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
|
|
|
import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
|
|
|
+import com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService;
|
|
|
import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
|
|
|
+import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
|
|
|
import com.alibaba.otter.canal.client.adapter.support.*;
|
|
|
|
|
|
@SPI("rdb")
|
|
|
public class RdbAdapter implements OuterAdapter {
|
|
|
|
|
|
- private static Logger logger = LoggerFactory.getLogger(RdbAdapter.class);
|
|
|
+ private static Logger logger = LoggerFactory.getLogger(RdbAdapter.class);
|
|
|
|
|
|
- private Map<String, MappingConfig> rdbMapping = new HashMap<>(); // 文件名对应配置
|
|
|
- private Map<String, Map<String, MappingConfig>> mappingConfigCache = new HashMap<>(); // 库名-表名对应配置
|
|
|
+ private Map<String, MappingConfig> rdbMapping = new ConcurrentHashMap<>(); // 文件名对应配置
|
|
|
+ private Map<String, Map<String, MappingConfig>> mappingConfigCache = new ConcurrentHashMap<>(); // 库名-表名对应配置
|
|
|
+ private Map<String, MappingConfig> mirrorDbConfigCache = new ConcurrentHashMap<>(); // 镜像库配置
|
|
|
|
|
|
private DruidDataSource dataSource;
|
|
|
|
|
|
private RdbSyncService rdbSyncService;
|
|
|
+ private RdbMirrorDbSyncService rdbMirrorDbSyncService;
|
|
|
|
|
|
- private ExecutorService executor = Executors.newFixedThreadPool(1);
|
|
|
+ private ExecutorService executor = Executors.newFixedThreadPool(1);
|
|
|
|
|
|
private RdbConfigMonitor rdbConfigMonitor;
|
|
|
|
|
@@ -62,12 +66,18 @@ public class RdbAdapter implements OuterAdapter {
|
|
|
for (Map.Entry<String, MappingConfig> entry : rdbMapping.entrySet()) {
|
|
|
String configName = entry.getKey();
|
|
|
MappingConfig mappingConfig = entry.getValue();
|
|
|
- Map<String, MappingConfig> configMap = mappingConfigCache
|
|
|
- .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
|
|
|
- + mappingConfig.getDbMapping().getDatabase() + "."
|
|
|
- + mappingConfig.getDbMapping().getTable(),
|
|
|
- k1 -> new HashMap<>());
|
|
|
- configMap.put(configName, mappingConfig);
|
|
|
+ if (!mappingConfig.getDbMapping().isMirrorDb()) {
|
|
|
+ Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(
|
|
|
+ StringUtils.trimToEmpty(mappingConfig.getDestination()) + "." + mappingConfig.getDbMapping()
|
|
|
+ .getDatabase() + "." + mappingConfig.getDbMapping().getTable(),
|
|
|
+ k1 -> new ConcurrentHashMap<>());
|
|
|
+ configMap.put(configName, mappingConfig);
|
|
|
+ } else {
|
|
|
+ // mirrorDB
|
|
|
+ mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
|
|
|
+ + mappingConfig.getDbMapping().getDatabase(),
|
|
|
+ mappingConfig);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
Map<String, String> properties = configuration.getProperties();
|
|
@@ -96,6 +106,10 @@ public class RdbAdapter implements OuterAdapter {
|
|
|
dataSource,
|
|
|
threads != null ? Integer.valueOf(threads) : null);
|
|
|
|
|
|
+ rdbMirrorDbSyncService = new RdbMirrorDbSyncService(mirrorDbConfigCache,
|
|
|
+ dataSource,
|
|
|
+ threads != null ? Integer.valueOf(threads) : null);
|
|
|
+
|
|
|
rdbConfigMonitor = new RdbConfigMonitor();
|
|
|
rdbConfigMonitor.init(configuration.getKey(), this);
|
|
|
}
|
|
@@ -103,6 +117,7 @@ public class RdbAdapter implements OuterAdapter {
|
|
|
@Override
|
|
|
public void sync(List<Dml> dmls) {
|
|
|
rdbSyncService.sync(dmls);
|
|
|
+ rdbMirrorDbSyncService.sync(dmls);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -157,7 +172,7 @@ public class RdbAdapter implements OuterAdapter {
|
|
|
public Map<String, Object> count(String task) {
|
|
|
MappingConfig config = rdbMapping.get(task);
|
|
|
MappingConfig.DbMapping dbMapping = config.getDbMapping();
|
|
|
- String sql = "SELECT COUNT(1) AS cnt FROM " + dbMapping.getTargetTable();
|
|
|
+ String sql = "SELECT COUNT(1) AS cnt FROM " + SyncUtil.dbTable(dbMapping);
|
|
|
Connection conn = null;
|
|
|
Map<String, Object> res = new LinkedHashMap<>();
|
|
|
try {
|
|
@@ -183,7 +198,7 @@ public class RdbAdapter implements OuterAdapter {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- res.put("targetTable", dbMapping.getTargetTable());
|
|
|
+ res.put("targetTable", SyncUtil.dbTable(dbMapping));
|
|
|
|
|
|
return res;
|
|
|
}
|