|
@@ -84,38 +84,7 @@ public class ESAdapter implements OuterAdapter {
|
|
|
for (Map.Entry<String, ESSyncConfig> entry : esSyncConfig.entrySet()) {
|
|
|
String configName = entry.getKey();
|
|
|
ESSyncConfig config = entry.getValue();
|
|
|
- SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
|
|
|
- config.getEsMapping().setSchemaItem(schemaItem);
|
|
|
-
|
|
|
- DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
|
|
|
- if (dataSource == null || dataSource.getUrl() == null) {
|
|
|
- throw new RuntimeException("No data source found: " + config.getDataSourceKey());
|
|
|
- }
|
|
|
- Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
|
|
|
- Matcher matcher = pattern.matcher(dataSource.getUrl());
|
|
|
- if (!matcher.find()) {
|
|
|
- throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
|
|
|
- }
|
|
|
- String schema = matcher.group(2);
|
|
|
-
|
|
|
- schemaItem.getAliasTableItems().values().forEach(tableItem -> {
|
|
|
- Map<String, ESSyncConfig> esSyncConfigMap;
|
|
|
- if (envProperties != null
|
|
|
- && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
|
|
|
- esSyncConfigMap = dbTableEsSyncConfig
|
|
|
- .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "-"
|
|
|
- + StringUtils.trimToEmpty(config.getGroupId()) + "_" + schema + "-"
|
|
|
- + tableItem.getTableName(),
|
|
|
- k -> new ConcurrentHashMap<>());
|
|
|
- } else {
|
|
|
- esSyncConfigMap = dbTableEsSyncConfig
|
|
|
- .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "_" + schema + "-"
|
|
|
- + tableItem.getTableName(),
|
|
|
- k -> new ConcurrentHashMap<>());
|
|
|
- }
|
|
|
-
|
|
|
- esSyncConfigMap.put(configName, config);
|
|
|
- });
|
|
|
+ addSyncConfigToCache(configName, config);
|
|
|
}
|
|
|
|
|
|
Map<String, String> properties = configuration.getProperties();
|
|
@@ -138,6 +107,43 @@ public class ESAdapter implements OuterAdapter {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void addSyncConfigToCache(String configName, ESSyncConfig config) {
|
|
|
+ Properties envProperties = this.envProperties;
|
|
|
+ SchemaItem schemaItem = SqlParser.parse(config.getEsMapping().getSql());
|
|
|
+ config.getEsMapping().setSchemaItem(schemaItem);
|
|
|
+
|
|
|
+ DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
|
|
|
+ if (dataSource == null || dataSource.getUrl() == null) {
|
|
|
+ throw new RuntimeException("No data source found: " + config.getDataSourceKey());
|
|
|
+ }
|
|
|
+ Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
|
|
|
+ Matcher matcher = pattern.matcher(dataSource.getUrl());
|
|
|
+ if (!matcher.find()) {
|
|
|
+ throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
|
|
|
+ }
|
|
|
+ String schema = matcher.group(2);
|
|
|
+
|
|
|
+ schemaItem.getAliasTableItems().values().forEach(tableItem -> {
|
|
|
+ Map<String, ESSyncConfig> esSyncConfigMap;
|
|
|
+ if (envProperties != null
|
|
|
+ && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
|
|
|
+ esSyncConfigMap = dbTableEsSyncConfig
|
|
|
+ .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "-"
|
|
|
+ + StringUtils.trimToEmpty(config.getGroupId()) + "_" + schema + "-"
|
|
|
+ + tableItem.getTableName(),
|
|
|
+ k -> new ConcurrentHashMap<>());
|
|
|
+ } else {
|
|
|
+ esSyncConfigMap = dbTableEsSyncConfig
|
|
|
+ .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "_" + schema + "-"
|
|
|
+ + tableItem.getTableName(),
|
|
|
+ k -> new ConcurrentHashMap<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ esSyncConfigMap.put(configName, config);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
@Override
|
|
|
public void sync(List<Dml> dmls) {
|
|
|
if (dmls == null || dmls.isEmpty()) {
|