|
@@ -47,63 +47,58 @@ public class RdbMirrorDbSyncService {
|
|
|
* @param dmls 批量 DML
|
|
|
*/
|
|
|
public void sync(List<Dml> dmls) {
|
|
|
- try {
|
|
|
- List<Dml> dmlList = new ArrayList<>();
|
|
|
- for (Dml dml : dmls) {
|
|
|
- String destination = StringUtils.trimToEmpty(dml.getDestination());
|
|
|
- String database = dml.getDatabase();
|
|
|
- MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
|
|
|
- if (mirrorDbConfig == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
|
|
|
- // DDL
|
|
|
- if (logger.isDebugEnabled()) {
|
|
|
- logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
|
|
|
- }
|
|
|
- executeDdl(mirrorDbConfig, dml);
|
|
|
- rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
|
|
|
- mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
|
|
|
- } else {
|
|
|
- // DML
|
|
|
- initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
|
|
|
- dmlList.add(dml);
|
|
|
+ List<Dml> dmlList = new ArrayList<>();
|
|
|
+ for (Dml dml : dmls) {
|
|
|
+ String destination = StringUtils.trimToEmpty(dml.getDestination());
|
|
|
+ String database = dml.getDatabase();
|
|
|
+ MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
|
|
|
+ if (mirrorDbConfig == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
|
|
|
+ // DDL
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
|
|
|
}
|
|
|
+ executeDdl(mirrorDbConfig, dml);
|
|
|
+ rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
|
|
|
+ mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
|
|
|
+ } else {
|
|
|
+ // DML
|
|
|
+ initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
|
|
|
+ dmlList.add(dml);
|
|
|
}
|
|
|
- if (!dmlList.isEmpty()) {
|
|
|
- rdbSyncService.sync(dmlList, dml -> {
|
|
|
- MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache
|
|
|
- .get(dml.getDestination() + "." + dml.getDatabase());
|
|
|
- if (mirrorDbConfig == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- String table = dml.getTable();
|
|
|
- MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
|
|
|
+ }
|
|
|
+ if (!dmlList.isEmpty()) {
|
|
|
+ rdbSyncService.sync(dmlList, dml -> {
|
|
|
+ MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(dml.getDestination() + "." + dml.getDatabase());
|
|
|
+ if (mirrorDbConfig == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ String table = dml.getTable();
|
|
|
+ MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
|
|
|
|
|
|
- if (config == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ if (config == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
- if (config.getConcurrent()) {
|
|
|
- List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
|
|
|
- singleDmls.forEach(singleDml -> {
|
|
|
- int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
|
|
|
- RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
|
|
|
- rdbSyncService.getDmlsPartition()[hash].add(syncItem);
|
|
|
- });
|
|
|
- } else {
|
|
|
- int hash = 0;
|
|
|
- List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
|
|
|
- singleDmls.forEach(singleDml -> {
|
|
|
- RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
|
|
|
- rdbSyncService.getDmlsPartition()[hash].add(syncItem);
|
|
|
- });
|
|
|
- }
|
|
|
- return true;
|
|
|
- });
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
+ if (config.getConcurrent()) {
|
|
|
+ List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
|
|
|
+ singleDmls.forEach(singleDml -> {
|
|
|
+ int hash = rdbSyncService.pkHash(config.getDbMapping(), singleDml.getData());
|
|
|
+ RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
|
|
|
+ rdbSyncService.getDmlsPartition()[hash].add(syncItem);
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ int hash = 0;
|
|
|
+ List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
|
|
|
+ singleDmls.forEach(singleDml -> {
|
|
|
+ RdbSyncService.SyncItem syncItem = new RdbSyncService.SyncItem(config, singleDml);
|
|
|
+ rdbSyncService.getDmlsPartition()[hash].add(syncItem);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
|