Browse Source

mq模式下adapter配置增加groupId属性对应

mcy 6 years ago
parent
commit
10276e254a

+ 9 - 17
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -93,8 +93,10 @@ public class ESAdapter implements OuterAdapter {
                 String schema = matcher.group(2);
 
                 schemaItem.getAliasTableItems().values().forEach(tableItem -> {
-                    Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig
-                        .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new HashMap<>());
+                    Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(
+                        StringUtils.trimToEmpty(config.getDestination()) + "-" + StringUtils
+                            .trimToEmpty(config.getGroupId()) + "_" + schema + "-" + tableItem.getTableName(),
+                        k -> new ConcurrentHashMap<>());
                     esSyncConfigMap.put(configName, config);
                 });
             }
@@ -132,21 +134,11 @@ public class ESAdapter implements OuterAdapter {
     public void sync(Dml dml) {
         String database = dml.getDatabase();
         String table = dml.getTable();
-        Map<String, ESSyncConfig> configMap = dbTableEsSyncConfig.get(database + "-" + table);
-        if (configMap != null) {
-            List<ESSyncConfig> configs = new ArrayList<>();
-            configMap.values().forEach(esConfig -> {
-                if (StringUtils.isNotEmpty(esConfig.getGroupId())) {
-                    if (esConfig.getGroupId().equals(dml.getGroupId())) {
-                        configs.add(esConfig);
-                    }
-                } else {
-                    configs.add(esConfig);
-                }
-            });
-            if (!configs.isEmpty()) {
-                esSyncService.sync(configs, dml);
-            }
+        Map<String, ESSyncConfig> configMap = dbTableEsSyncConfig
+            .get(StringUtils.trimToEmpty(dml.getDestination()) + "-" + StringUtils.trimToEmpty(dml.getGroupId()) + "_"
+                 + database + "-" + table);
+        if (configMap != null && !configMap.values().isEmpty()) {
+            esSyncService.sync(configMap.values(), dml);
         }
     }
 

+ 3 - 4
client-adapter/elasticsearch/src/main/resources/es/mytest_user.yml

@@ -1,15 +1,14 @@
 dataSourceKey: defaultDS
 destination: example
+groupId: g1
 esMapping:
   _index: mytest_user
   _type: _doc
   _id: _id
 #  pk: id
   sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
-        a.c_time as _c_time, c.labels as _labels from user a
-        left join role b on b.id=a.role_id
-        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
-        group by user_id) c on c.user_id=a.id"
+        a.c_time as _c_time from user a
+        left join role b on b.id=a.role_id"
 #  objFields:
 #    _labels: array:;
   etlCondition: "where a.c_time>='{0}'"

+ 7 - 4
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -69,10 +69,11 @@ public class HbaseAdapter implements OuterAdapter {
             for (Map.Entry<String, MappingConfig> entry : hbaseMapping.entrySet()) {
                 String configName = entry.getKey();
                 MappingConfig mappingConfig = entry.getValue();
-                String k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                           + mappingConfig.getHbaseMapping().getDatabase() + "."
+                String k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                           + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                           + mappingConfig.getHbaseMapping().getDatabase() + "-"
                            + mappingConfig.getHbaseMapping().getTable();
-                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(k, k1 -> new HashMap<>());
+                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(k, k1 -> new ConcurrentHashMap<>());
                 configMap.put(configName, mappingConfig);
             }
 
@@ -104,9 +105,11 @@ public class HbaseAdapter implements OuterAdapter {
             return;
         }
         String destination = StringUtils.trimToEmpty(dml.getDestination());
+        String groupId = StringUtils.trimToEmpty(dml.getGroupId());
         String database = dml.getDatabase();
         String table = dml.getTable();
-        Map<String, MappingConfig> configMap = mappingConfigCache.get(destination + "." + database + "." + table);
+        Map<String, MappingConfig> configMap = mappingConfigCache
+            .get(destination + "-" + groupId + "_" + database + "-" + table);
         if (configMap != null) {
             List<MappingConfig> configs = new ArrayList<>();
             configMap.values().forEach(config -> {

+ 2 - 1
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -1,5 +1,6 @@
 dataSourceKey: defaultDS
 destination: example
+groupId: g1
 hbaseMapping:
   mode: STRING  #NATIVE   #PHOENIX
   database: mytest  # 数据库名
@@ -57,4 +58,4 @@ hbaseMapping:
 # $UNSIGNED_TIMESTAMP       对应PHOENIX里的UNSIGNED_TIMESTAMP     12字节
 # $VARCHAR                  对应PHOENIX里的VARCHAR                动态长度
 # $VARBINARY                对应PHOENIX里的VARBINARY              动态长度
-# $DECIMAL                  对应PHOENIX里的DECIMAL                动态长度
+# $DECIMAL                  对应PHOENIX里的DECIMAL                动态长度

+ 3 - 2
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -86,8 +86,9 @@ public class RdbAdapter implements OuterAdapter {
             String configName = entry.getKey();
             MappingConfig mappingConfig = entry.getValue();
             if (!mappingConfig.getDbMapping().getMirrorDb()) {
-                String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                             + mappingConfig.getDbMapping().getDatabase() + "."
+                String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                             + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                             + mappingConfig.getDbMapping().getDatabase() + "-"
                              + mappingConfig.getDbMapping().getTable();
                 Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
                     k1 -> new ConcurrentHashMap<>());

+ 4 - 4
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -147,9 +147,11 @@ public class RdbSyncService {
             } else {
                 // DML
                 String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String groupId = StringUtils.trimToEmpty(dml.getGroupId());
                 String database = dml.getDatabase();
                 String table = dml.getTable();
-                Map<String, MappingConfig> configMap = mappingConfig.get(destination + "." + database + "." + table);
+                Map<String, MappingConfig> configMap = mappingConfig
+                    .get(destination + "-" + groupId + "_" + database + "-" + table);
 
                 if (configMap == null) {
                     return false;
@@ -169,7 +171,6 @@ public class RdbSyncService {
                     return false;
                 }
 
-                boolean executed = false;
                 for (MappingConfig config : configs) {
                     if (config.getConcurrent()) {
                         List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
@@ -186,9 +187,8 @@ public class RdbSyncService {
                             dmlsPartition[hash].add(syncItem);
                         });
                     }
-                    executed = true;
                 }
-                return executed;
+                return true;
             }
         });
     }

+ 5 - 3
client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -1,11 +1,12 @@
 dataSourceKey: defaultDS
 destination: example
-outerAdapterKey: oracle1
+groupId: g1
+outerAdapterKey: mysql1
 concurrent: true
 dbMapping:
   database: mytest
   table: user
-  targetTable: mytest.tb_user
+  targetTable: mytest2.user
   targetPk:
     id: id
 #  mapAll: true
@@ -17,9 +18,10 @@ dbMapping:
     test1:
 
 
-# Mirror schema synchronize config
+## Mirror schema synchronize config
 #dataSourceKey: defaultDS
 #destination: example
+#groupId: g1
 #outerAdapterKey: mysql1
 #concurrent: true
 #dbMapping: