Browse Source

mirror schema 完成测试

mcy 6 years ago
parent
commit
3acdffb297

+ 5 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.es.config;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -19,6 +20,7 @@ public class ESSyncConfigLoader {
 
     private static Logger logger = LoggerFactory.getLogger(ESSyncConfigLoader.class);
 
+    @SuppressWarnings("unchecked")
     public static synchronized Map<String, ESSyncConfig> load() {
         logger.info("## Start loading es mapping config ... ");
 
@@ -26,7 +28,9 @@ public class ESSyncConfigLoader {
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("es");
         configContentMap.forEach((fileName, content) -> {
-            ESSyncConfig config = new Yaml().loadAs(content, ESSyncConfig.class);
+            Map configMap = new Yaml().loadAs(content, Map.class); // yml自带的对象反射不是很稳定
+            JSONObject configJson = new JSONObject(configMap);
+            ESSyncConfig config = configJson.toJavaObject(ESSyncConfig.class);
             try {
                 config.validate();
             } catch (Exception e) {

+ 6 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.hbase.config;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -21,9 +22,10 @@ public class MappingConfigLoader {
 
     /**
      * 加载HBase表映射配置
-     * 
+     *
      * @return 配置名/配置文件名--对象
      */
+    @SuppressWarnings("unchecked")
     public static Map<String, MappingConfig> load() {
         logger.info("## Start loading hbase mapping config ... ");
 
@@ -31,7 +33,9 @@ public class MappingConfigLoader {
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("hbase");
         configContentMap.forEach((fileName, content) -> {
-            MappingConfig config = new Yaml().loadAs(content, MappingConfig.class);
+            Map configMap = new Yaml().loadAs(content, Map.class); // yml自带的对象反射不是很稳定
+            JSONObject configJson = new JSONObject(configMap);
+            MappingConfig config = configJson.toJavaObject(MappingConfig.class);
             try {
                 config.validate();
             } catch (Exception e) {

+ 17 - 8
client-adapter/launcher/src/main/resources/application.yml

@@ -34,7 +34,14 @@ canal.conf:
     groups:
     - groupId: g1
       outerAdapters:
-#      - name: logger
+      - name: logger
+#      - name: rdb
+#        key: mysql1
+#        properties:
+#          jdbc.driverClassName: com.mysql.jdbc.Driver
+#          jdbc.url: jdbc:mysql://192.168.0.36/mytest?useUnicode=true
+#          jdbc.username: root
+#          jdbc.password: 121212
 #      - name: rdb
 #        key: oracle1
 #        properties:
@@ -42,13 +49,15 @@ canal.conf:
 #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
 #          jdbc.username: mytest
 #          jdbc.password: m121212
-      - name: rdb
-        key: mysql1
-        properties:
-          jdbc.driverClassName: com.mysql.jdbc.Driver
-          jdbc.url: jdbc:mysql://192.168.100.36/mytest?useUnicode=true
-          jdbc.username: root
-          jdbc.password: Ambari-123
+#      - name: rdb
+#        key: postgres1
+#        properties:
+#          jdbc.driverClassName: org.postgresql.Driver
+#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
+#          jdbc.username: postgres
+#          jdbc.password: 121212
+#          threads: 1
+#          commitSize: 3000
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: 127.0.0.1

+ 9 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -17,6 +17,7 @@ import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService;
@@ -37,7 +38,7 @@ public class RdbAdapter implements OuterAdapter {
 
     private Map<String, MappingConfig>              rdbMapping          = new ConcurrentHashMap<>();                // 文件名对应配置
     private Map<String, Map<String, MappingConfig>> mappingConfigCache  = new ConcurrentHashMap<>();                // 库名-表名对应配置
-    private Map<String, MappingConfig>              mirrorDbConfigCache = new ConcurrentHashMap<>();                // 镜像库配置
+    private Map<String, MirrorDbConfig>             mirrorDbConfigCache = new ConcurrentHashMap<>();                // 镜像库配置
 
     private DruidDataSource                         dataSource;
 
@@ -54,6 +55,10 @@ public class RdbAdapter implements OuterAdapter {
         return mappingConfigCache;
     }
 
+    public Map<String, MirrorDbConfig> getMirrorDbConfigCache() {
+        return mirrorDbConfigCache;
+    }
+
     /**
      * 初始化方法
      *
@@ -73,7 +78,7 @@ public class RdbAdapter implements OuterAdapter {
         for (Map.Entry<String, MappingConfig> entry : rdbMapping.entrySet()) {
             String configName = entry.getKey();
             MappingConfig mappingConfig = entry.getValue();
-            if (!mappingConfig.getDbMapping().isMirrorDb()) {
+            if (!mappingConfig.getDbMapping().getMirrorDb()) {
                 Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(
                     StringUtils.trimToEmpty(mappingConfig.getDestination()) + "." + mappingConfig.getDbMapping()
                         .getDatabase() + "." + mappingConfig.getDbMapping().getTable(),
@@ -81,9 +86,10 @@ public class RdbAdapter implements OuterAdapter {
                 configMap.put(configName, mappingConfig);
             } else {
                 // mirrorDB
+
                 mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
                                         + mappingConfig.getDbMapping().getDatabase(),
-                    mappingConfig);
+                        MirrorDbConfig.create(configName, mappingConfig));
             }
         }
 

+ 6 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/ConfigLoader.java

@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.rdb.config;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import com.alibaba.fastjson.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -21,9 +22,10 @@ public class ConfigLoader {
 
     /**
      * 加载HBase表映射配置
-     * 
+     *
      * @return 配置名/配置文件名--对象
      */
+    @SuppressWarnings("unchecked")
     public static Map<String, MappingConfig> load() {
         logger.info("## Start loading rdb mapping config ... ");
 
@@ -31,7 +33,9 @@ public class ConfigLoader {
 
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("rdb");
         configContentMap.forEach((fileName, content) -> {
-            MappingConfig config = new Yaml().loadAs(content, MappingConfig.class);
+            Map configMap = new Yaml().loadAs(content, Map.class); // yml自带的对象反射不是很稳定
+            JSONObject configJson = new JSONObject(configMap);
+            MappingConfig config = configJson.toJavaObject(MappingConfig.class);
             try {
                 config.validate();
             } catch (Exception e) {

+ 6 - 6
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -65,10 +65,10 @@ public class MappingConfig {
         if (dbMapping.database == null || dbMapping.database.isEmpty()) {
             throw new NullPointerException("dbMapping.database");
         }
-        if (!dbMapping.isMirrorDb() && (dbMapping.table == null || dbMapping.table.isEmpty())) {
+        if (!dbMapping.getMirrorDb() && (dbMapping.table == null || dbMapping.table.isEmpty())) {
             throw new NullPointerException("dbMapping.table");
         }
-        if (!dbMapping.isMirrorDb() && (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty())) {
+        if (!dbMapping.getMirrorDb() && (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty())) {
             throw new NullPointerException("dbMapping.targetTable");
         }
     }
@@ -91,8 +91,8 @@ public class MappingConfig {
 
         private Map<String, String> allMapColumns;
 
-        public boolean isMirrorDb() {
-            return mirrorDb == null ? false : mirrorDb;
+        public Boolean getMirrorDb() {
+            return mirrorDb;
         }
 
         public void setMirrorDb(Boolean mirrorDb) {
@@ -123,8 +123,8 @@ public class MappingConfig {
             this.targetPk = targetPk;
         }
 
-        public boolean isMapAll() {
-            return mapAll == null ? false : mapAll;
+        public Boolean getMapAll() {
+            return mapAll;
         }
 
         public void setMapAll(Boolean mapAll) {

+ 44 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MirrorDbConfig.java

@@ -0,0 +1,44 @@
+package com.alibaba.otter.canal.client.adapter.rdb.config;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MirrorDbConfig {
+
+    private String             fileName;
+    private MappingConfig      mappingConfig;
+    Map<String, MappingConfig> tableConfig = new ConcurrentHashMap<>();
+
+    public static MirrorDbConfig create(String fileName, MappingConfig mappingConfig) {
+        return new MirrorDbConfig(fileName, mappingConfig);
+    }
+
+    public MirrorDbConfig(String fileName, MappingConfig mappingConfig){
+        this.fileName = fileName;
+        this.mappingConfig = mappingConfig;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public void setFileName(String fileName) {
+        this.fileName = fileName;
+    }
+
+    public MappingConfig getMappingConfig() {
+        return mappingConfig;
+    }
+
+    public void setMappingConfig(MappingConfig mappingConfig) {
+        this.mappingConfig = mappingConfig;
+    }
+
+    public Map<String, MappingConfig> getTableConfig() {
+        return tableConfig;
+    }
+
+    public void setTableConfig(Map<String, MappingConfig> tableConfig) {
+        this.tableConfig = tableConfig;
+    }
+}

+ 21 - 7
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/monitor/RdbConfigMonitor.java

@@ -4,6 +4,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import org.apache.commons.io.filefilter.FileFilterUtils;
 import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
 import org.apache.commons.io.monitor.FileAlterationMonitor;
@@ -20,7 +21,7 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
 
 public class RdbConfigMonitor {
 
-    private static final Logger   logger = LoggerFactory.getLogger(RdbConfigMonitor.class);
+    private static final Logger   logger      = LoggerFactory.getLogger(RdbConfigMonitor.class);
 
     private static final String   adapterName = "rdb";
 
@@ -83,7 +84,8 @@ public class RdbConfigMonitor {
             try {
                 if (rdbAdapter.getRdbMapping().containsKey(file.getName())) {
                     // 加载配置文件
-                    String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
+                    String configContent = MappingConfigsLoader
+                        .loadConfig(adapterName + File.separator + file.getName());
                     MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
                     config.validate();
                     if ((key == null && config.getOuterAdapterKey() == null)
@@ -118,13 +120,19 @@ public class RdbConfigMonitor {
             }
         }
 
-        private void addConfigToCache(File file, MappingConfig config) {
-            rdbAdapter.getRdbMapping().put(file.getName(), config);
+        private void addConfigToCache(File file, MappingConfig mappingConfig) {
+            rdbAdapter.getRdbMapping().put(file.getName(), mappingConfig);
             Map<String, MappingConfig> configMap = rdbAdapter.getMappingConfigCache()
-                .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "."
-                                 + config.getDbMapping().getDatabase() + "." + config.getDbMapping().getTable(),
+                .computeIfAbsent(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                 + mappingConfig.getDbMapping().getDatabase() + "."
+                                 + mappingConfig.getDbMapping().getTable(),
                     k1 -> new HashMap<>());
-            configMap.put(file.getName(), config);
+            configMap.put(file.getName(), mappingConfig);
+
+            Map<String, MirrorDbConfig> mirrorDbConfigCache = rdbAdapter.getMirrorDbConfigCache();
+            mirrorDbConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                                    + mappingConfig.getDbMapping().getDatabase(),
+                MirrorDbConfig.create(file.getName(), mappingConfig));
         }
 
         private void deleteConfigFromCache(File file) {
@@ -136,6 +144,12 @@ public class RdbConfigMonitor {
                 }
             }
 
+            rdbAdapter.getMirrorDbConfigCache().forEach((key, mirrorDbConfig) -> {
+                if (mirrorDbConfig.getFileName().equals(file.getName())) {
+                    rdbAdapter.getMirrorDbConfigCache().remove(key);
+                }
+            });
+
         }
     }
 }

+ 26 - 21
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -6,7 +6,6 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import javax.sql.DataSource;
 
@@ -14,7 +13,10 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
@@ -26,15 +28,13 @@ import com.alibaba.otter.canal.client.adapter.support.Dml;
  */
 public class RdbMirrorDbSyncService {
 
-    private static final Logger              logger             = LoggerFactory.getLogger(RdbMirrorDbSyncService.class);
+    private static final Logger         logger = LoggerFactory.getLogger(RdbMirrorDbSyncService.class);
 
-    private Map<String, MappingConfig>       mirrorDbConfigCache;                                                       // 镜像库配置
-    private DataSource                       dataSource;
-    private RdbSyncService                   rdbSyncService;                                                            // rdbSyncService代理
+    private Map<String, MirrorDbConfig> mirrorDbConfigCache;                                           // 镜像库配置
+    private DataSource                  dataSource;
+    private RdbSyncService              rdbSyncService;                                                // rdbSyncService代理
 
-    private final Map<String, MappingConfig> tableDbConfigCache = new ConcurrentHashMap<>();                            // 自动生成的库表配置缓存
-
-    public RdbMirrorDbSyncService(Map<String, MappingConfig> mirrorDbConfigCache, DataSource dataSource,
+    public RdbMirrorDbSyncService(Map<String, MirrorDbConfig> mirrorDbConfigCache, DataSource dataSource,
                                   Integer threads, Map<String, Map<String, Integer>> columnsTypeCache){
         this.mirrorDbConfigCache = mirrorDbConfigCache;
         this.dataSource = dataSource;
@@ -52,27 +52,32 @@ public class RdbMirrorDbSyncService {
             for (Dml dml : dmls) {
                 String destination = StringUtils.trimToEmpty(dml.getDestination());
                 String database = dml.getDatabase();
-                MappingConfig configMap = mirrorDbConfigCache.get(destination + "." + database);
-                if (configMap == null) {
+                MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache.get(destination + "." + database);
+                if (mirrorDbConfig == null) {
                     continue;
                 }
-                if (dml.getSql() != null) {
+                if (StringUtils.isNotEmpty(dml.getSql())) {
                     // DDL
-                    executeDdl(dml);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("DDL: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                    }
+                    executeDdl(mirrorDbConfig, dml);
                     rdbSyncService.getColumnsTypeCache().remove(destination + "." + database + "." + dml.getTable());
-                    tableDbConfigCache.remove(destination + "." + database + "." + dml.getTable()); // 删除对应库表配置
+                    mirrorDbConfig.getTableConfig().remove(dml.getTable()); // 删除对应库表配置
                 } else {
                     // DML
-                    initMappingConfig(destination + "." + database + "." + dml.getTable(), configMap, dml);
+                    initMappingConfig(dml.getTable(), mirrorDbConfig.getMappingConfig(), mirrorDbConfig, dml);
                     dmlList.add(dml);
                 }
             }
             if (!dmlList.isEmpty()) {
                 rdbSyncService.sync(dmlList, dml -> {
+                    MirrorDbConfig mirrorDbConfig = mirrorDbConfigCache
+                        .get(dml.getDestination() + "." + dml.getDatabase());
                     String destination = StringUtils.trimToEmpty(dml.getDestination());
                     String database = dml.getDatabase();
                     String table = dml.getTable();
-                    MappingConfig config = tableDbConfigCache.get(destination + "." + database + "." + table);
+                    MappingConfig config = mirrorDbConfig.getTableConfig().get(table);
 
                     if (config == null) {
                         return false;
@@ -108,10 +113,10 @@ public class RdbMirrorDbSyncService {
      * @param baseConfigMap db sync config
      * @param dml DML
      */
-    private void initMappingConfig(String key, MappingConfig baseConfigMap, Dml dml) {
-        MappingConfig mappingConfig = tableDbConfigCache.get(key);
+    private void initMappingConfig(String key, MappingConfig baseConfigMap, MirrorDbConfig mirrorDbConfig, Dml dml) {
+        MappingConfig mappingConfig = mirrorDbConfig.getTableConfig().get(key);
         if (mappingConfig == null) {
-            // 构造一个配置
+            // 构造配置
             mappingConfig = new MappingConfig();
             mappingConfig.setDataSourceKey(baseConfigMap.getDataSourceKey());
             mappingConfig.setDestination(baseConfigMap.getDestination());
@@ -129,7 +134,7 @@ public class RdbMirrorDbSyncService {
             pkNames.forEach(pkName -> pkMapping.put(pkName, pkName));
             dbMapping.setTargetPk(pkMapping);
 
-            tableDbConfigCache.put(key, mappingConfig);
+            mirrorDbConfig.getTableConfig().put(key, mappingConfig);
         }
     }
 
@@ -138,11 +143,11 @@ public class RdbMirrorDbSyncService {
      *
      * @param ddl DDL
      */
-    private void executeDdl(Dml ddl) {
+    private void executeDdl(MirrorDbConfig mirrorDbConfig, Dml ddl) {
         try (Connection conn = dataSource.getConnection(); Statement statement = conn.createStatement()) {
             statement.execute(ddl.getSql());
             // 移除对应配置
-            tableDbConfigCache.remove(ddl.getDatabase() + "." + ddl.getDatabase() + "." + ddl.getTable());
+            mirrorDbConfig.getTableConfig().remove(ddl.getTable());
             if (logger.isTraceEnabled()) {
                 logger.trace("Execute DDL sql: {} for database: {}", ddl.getSql(), ddl.getDatabase());
             }

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -135,7 +135,7 @@ public class RdbSyncService {
     public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls) {
         try {
             sync(dmls, dml -> {
-                if (dml.getSql() != null) {
+                if (StringUtils.isNotEmpty(dml.getSql())) {
                     // DDL
                     columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
                     return false;

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -22,7 +22,7 @@ public class SyncUtil {
 
     public static Map<String, String> getColumnsMap(MappingConfig.DbMapping dbMapping, Collection<String> columns) {
         Map<String, String> columnsMap;
-        if (dbMapping.isMapAll()) {
+        if (dbMapping.getMapAll()) {
             if (dbMapping.getAllMapColumns() != null) {
                 return dbMapping.getAllMapColumns();
             }

+ 16 - 7
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -1,18 +1,27 @@
 dataSourceKey: defaultDS
 destination: example
-outerAdapterKey: mysql1
+outerAdapterKey: oracle1
 concurrent: true
 dbMapping:
-  mirrorDb: true
   database: mytest
-#  table: user
-#  targetTable: mytest.tb_user
-#  targetPk:
-#    id: id
-#  mapAll: true
+  table: user
+  targetTable: mytest.tb_user
+  targetPk:
+    id: id
+  mapAll: true
 #  targetColumns:
 #    id:
 #    name:
 #    role_id:
 #    c_time:
 #    test1:
+
+
+# Mirror schema synchronize config
+#dataSourceKey: defaultDS
+#destination: example
+#outerAdapterKey: mysql1
+#concurrent: true
+#dbMapping:
+#  mirrorDb: true
+#  database: mytest