|
@@ -112,9 +112,8 @@ public class RdbSyncService {
|
|
|
|
|
|
futures.add(executorThreads[i].submit(() -> {
|
|
|
try {
|
|
|
- dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
|
|
|
- syncItem.config,
|
|
|
- syncItem.singleDml));
|
|
|
+ dmlsPartition[j]
|
|
|
+ .forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
|
|
|
dmlsPartition[j].clear();
|
|
|
batchExecutors[j].commit();
|
|
|
return true;
|
|
@@ -152,49 +151,50 @@ public class RdbSyncService {
|
|
|
sync(dmls, dml -> {
|
|
|
if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
|
|
|
// DDL
|
|
|
- columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
|
|
|
- return false;
|
|
|
- } else {
|
|
|
- // DML
|
|
|
- String destination = StringUtils.trimToEmpty(dml.getDestination());
|
|
|
- String groupId = StringUtils.trimToEmpty(dml.getGroupId());
|
|
|
- String database = dml.getDatabase();
|
|
|
- String table = dml.getTable();
|
|
|
- Map<String, MappingConfig> configMap;
|
|
|
- if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
|
|
|
- configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
|
|
|
+ columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
|
|
|
+ return false;
|
|
|
} else {
|
|
|
- configMap = mappingConfig.get(destination + "_" + database + "-" + table);
|
|
|
- }
|
|
|
+ // DML
|
|
|
+ String destination = StringUtils.trimToEmpty(dml.getDestination());
|
|
|
+ String groupId = StringUtils.trimToEmpty(dml.getGroupId());
|
|
|
+ String database = dml.getDatabase();
|
|
|
+ String table = dml.getTable();
|
|
|
+ Map<String, MappingConfig> configMap;
|
|
|
+ if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
|
|
|
+ configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
|
|
|
+ } else {
|
|
|
+ configMap = mappingConfig.get(destination + "_" + database + "-" + table);
|
|
|
+ }
|
|
|
|
|
|
- if (configMap == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ if (configMap == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
- if (configMap.values().isEmpty()) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ if (configMap.values().isEmpty()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
- for (MappingConfig config : configMap.values()) {
|
|
|
- if (config.getConcurrent()) {
|
|
|
- List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
|
|
|
- singleDmls.forEach(singleDml -> {
|
|
|
- int hash = pkHash(config.getDbMapping(), singleDml.getData());
|
|
|
- SyncItem syncItem = new SyncItem(config, singleDml);
|
|
|
- dmlsPartition[hash].add(syncItem);
|
|
|
- });
|
|
|
- } else {
|
|
|
- int hash = 0;
|
|
|
- List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
|
|
|
- singleDmls.forEach(singleDml -> {
|
|
|
- SyncItem syncItem = new SyncItem(config, singleDml);
|
|
|
- dmlsPartition[hash].add(syncItem);
|
|
|
- });
|
|
|
+ for (MappingConfig config : configMap.values()) {
|
|
|
+ boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
|
|
|
+ if (config.getConcurrent()) {
|
|
|
+ List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
|
|
|
+ singleDmls.forEach(singleDml -> {
|
|
|
+ int hash = pkHash(config.getDbMapping(), singleDml.getData());
|
|
|
+ SyncItem syncItem = new SyncItem(config, singleDml);
|
|
|
+ dmlsPartition[hash].add(syncItem);
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ int hash = 0;
|
|
|
+ List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
|
|
|
+ singleDmls.forEach(singleDml -> {
|
|
|
+ SyncItem syncItem = new SyncItem(config, singleDml);
|
|
|
+ dmlsPartition[hash].add(syncItem);
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
+ return true;
|
|
|
}
|
|
|
- return true;
|
|
|
- }
|
|
|
- } );
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|