|
@@ -14,7 +14,6 @@ import org.apache.commons.lang.BooleanUtils;
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
-import org.slf4j.MDC;
|
|
|
|
|
|
|
|
import com.alibaba.druid.pool.DruidDataSource;
|
|
import com.alibaba.druid.pool.DruidDataSource;
|
|
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
|
|
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
|
|
@@ -26,12 +25,7 @@ import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
|
|
import com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService;
|
|
import com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService;
|
|
import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
|
|
import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
|
|
import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
|
|
import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
|
|
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
|
|
|
|
-import com.alibaba.otter.canal.client.adapter.support.Dml;
|
|
|
|
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
|
|
|
|
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
|
|
|
|
-import com.alibaba.otter.canal.client.adapter.support.SPI;
|
|
|
|
-import com.alibaba.otter.canal.client.adapter.support.Util;
|
|
|
|
|
|
+import com.alibaba.otter.canal.client.adapter.support.*;
|
|
|
|
|
|
/**
|
|
/**
|
|
* RDB适配器实现类
|
|
* RDB适配器实现类
|
|
@@ -76,14 +70,13 @@ public class RdbAdapter implements OuterAdapter {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public void init(OuterAdapterConfig configuration, Properties envProperties) {
|
|
public void init(OuterAdapterConfig configuration, Properties envProperties) {
|
|
- MDC.put("adapter", "rdb");
|
|
|
|
this.envProperties = envProperties;
|
|
this.envProperties = envProperties;
|
|
Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load(envProperties);
|
|
Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load(envProperties);
|
|
// 过滤不匹配的key的配置
|
|
// 过滤不匹配的key的配置
|
|
rdbMappingTmp.forEach((key, mappingConfig) -> {
|
|
rdbMappingTmp.forEach((key, mappingConfig) -> {
|
|
if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
|
|
if ((mappingConfig.getOuterAdapterKey() == null && configuration.getKey() == null)
|
|
- || (mappingConfig.getOuterAdapterKey() != null && mappingConfig.getOuterAdapterKey()
|
|
|
|
- .equalsIgnoreCase(configuration.getKey()))) {
|
|
|
|
|
|
+ || (mappingConfig.getOuterAdapterKey() != null
|
|
|
|
+ && mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
|
|
rdbMapping.put(key, mappingConfig);
|
|
rdbMapping.put(key, mappingConfig);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
@@ -143,8 +136,8 @@ public class RdbAdapter implements OuterAdapter {
|
|
String threads = properties.get("threads");
|
|
String threads = properties.get("threads");
|
|
// String commitSize = properties.get("commitSize");
|
|
// String commitSize = properties.get("commitSize");
|
|
|
|
|
|
- boolean skipDupException = BooleanUtils.toBoolean(configuration.getProperties()
|
|
|
|
- .getOrDefault("skipDupException", "true"));
|
|
|
|
|
|
+ boolean skipDupException = BooleanUtils
|
|
|
|
+ .toBoolean(configuration.getProperties().getOrDefault("skipDupException", "true"));
|
|
rdbSyncService = new RdbSyncService(dataSource,
|
|
rdbSyncService = new RdbSyncService(dataSource,
|
|
threads != null ? Integer.valueOf(threads) : null,
|
|
threads != null ? Integer.valueOf(threads) : null,
|
|
skipDupException);
|
|
skipDupException);
|
|
@@ -304,6 +297,5 @@ public class RdbAdapter implements OuterAdapter {
|
|
if (dataSource != null) {
|
|
if (dataSource != null) {
|
|
dataSource.close();
|
|
dataSource.close();
|
|
}
|
|
}
|
|
- MDC.remove("adapter");
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|