mcy 6 years ago
parent
commit
5284aa9793

+ 13 - 5
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/config/YmlConfigBinder.java

@@ -67,11 +67,11 @@ public class YmlConfigBinder {
     }
     }
 
 
     /**
     /**
-     * 将当前内容指定前缀部分绑定到指定对象并用环境变量中的属性替换占位符, 例:
-     * 当前内容有属性 zkServers: ${zookeeper.servers}
-     * 在envProperties中有属性 zookeeper.servers: 192.168.0.1:2181,192.168.0.1:2181,192.168.0.1:2181
-     * 则当前内容 zkServers 会被替换为 zkServers: 192.168.0.1:2181,192.168.0.1:2181,192.168.0.1:2181
-     * 注: 假设绑定的类中 zkServers 属性是 List<String> 对象, 则会自动映射成List
+     * 将当前内容指定前缀部分绑定到指定对象并用环境变量中的属性替换占位符, 例: 当前内容有属性 zkServers: ${zookeeper.servers}
+     * 在envProperties中有属性 zookeeper.servers:
+     * 192.168.0.1:2181,192.168.0.1:2181,192.168.0.1:2181 则当前内容 zkServers 会被替换为
+     * zkServers: 192.168.0.1:2181,192.168.0.1:2181,192.168.0.1:2181 注: 假设绑定的类中
+     * zkServers 属性是 List<String> 对象, 则会自动映射成List
      *
      *
      * @param prefix 指定前缀
      * @param prefix 指定前缀
      * @param content yml内容
      * @param content yml内容
@@ -92,6 +92,10 @@ public class YmlConfigBinder {
             Resource configResource = new ByteArrayResource(contentBytes);
             Resource configResource = new ByteArrayResource(contentBytes);
             PropertySource propertySource = propertySourceLoader.load("manualBindConfig", configResource, null);
             PropertySource propertySource = propertySourceLoader.load("manualBindConfig", configResource, null);
 
 
+            if (propertySource == null) {
+                return null;
+            }
+
             Properties properties = new Properties();
             Properties properties = new Properties();
             Map<String, Object> propertiesRes = new LinkedHashMap<>();
             Map<String, Object> propertiesRes = new LinkedHashMap<>();
             if (!StringUtils.isEmpty(prefix) && !prefix.endsWith(".")) {
             if (!StringUtils.isEmpty(prefix) && !prefix.endsWith(".")) {
@@ -126,6 +130,10 @@ public class YmlConfigBinder {
                 propertiesRes.put(key, value);
                 propertiesRes.put(key, value);
             }
             }
 
 
+            if (propertiesRes.isEmpty()) {
+                return null;
+            }
+
             propertySource = new MapPropertySource(propertySource.getName(), propertiesRes);
             propertySource = new MapPropertySource(propertySource.getName(), propertiesRes);
 
 
             T target = clazz.newInstance();
             T target = clazz.newInstance();

+ 1 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -43,6 +43,7 @@ public class MessageUtil {
             dml.setTable(entry.getHeader().getTableName());
             dml.setTable(entry.getHeader().getTableName());
             dml.setType(eventType.toString());
             dml.setType(eventType.toString());
             dml.setEs(entry.getHeader().getExecuteTime());
             dml.setEs(entry.getHeader().getExecuteTime());
+            dml.setIsDdl(rowChange.getIsDdl());
             dml.setTs(System.currentTimeMillis());
             dml.setTs(System.currentTimeMillis());
             dml.setSql(rowChange.getSql());
             dml.setSql(rowChange.getSql());
             dmls.add(dml);
             dmls.add(dml);

+ 32 - 8
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -1,7 +1,10 @@
 package com.alibaba.otter.canal.client.adapter.es;
 package com.alibaba.otter.canal.client.adapter.es;
 
 
 import java.net.InetAddress;
 import java.net.InetAddress;
-import java.util.*;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
@@ -46,6 +49,8 @@ public class ESAdapter implements OuterAdapter {
 
 
     private ESConfigMonitor                        esConfigMonitor;
     private ESConfigMonitor                        esConfigMonitor;
 
 
+    private Properties                             envProperties;
+
     public TransportClient getTransportClient() {
     public TransportClient getTransportClient() {
         return transportClient;
         return transportClient;
     }
     }
@@ -65,6 +70,7 @@ public class ESAdapter implements OuterAdapter {
     @Override
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         try {
         try {
+            this.envProperties = envProperties;
             Map<String, ESSyncConfig> esSyncConfigTmp = ESSyncConfigLoader.load(envProperties);
             Map<String, ESSyncConfig> esSyncConfigTmp = ESSyncConfigLoader.load(envProperties);
             // 过滤不匹配的key的配置
             // 过滤不匹配的key的配置
             esSyncConfigTmp.forEach((key, config) -> {
             esSyncConfigTmp.forEach((key, config) -> {
@@ -93,10 +99,21 @@ public class ESAdapter implements OuterAdapter {
                 String schema = matcher.group(2);
                 String schema = matcher.group(2);
 
 
                 schemaItem.getAliasTableItems().values().forEach(tableItem -> {
                 schemaItem.getAliasTableItems().values().forEach(tableItem -> {
-                    Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(
-                        StringUtils.trimToEmpty(config.getDestination()) + "-" + StringUtils
-                            .trimToEmpty(config.getGroupId()) + "_" + schema + "-" + tableItem.getTableName(),
-                        k -> new ConcurrentHashMap<>());
+                    Map<String, ESSyncConfig> esSyncConfigMap;
+                    if (envProperties != null
+                        && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                        esSyncConfigMap = dbTableEsSyncConfig
+                            .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "-"
+                                             + StringUtils.trimToEmpty(config.getGroupId()) + "_" + schema + "-"
+                                             + tableItem.getTableName(),
+                                k -> new ConcurrentHashMap<>());
+                    } else {
+                        esSyncConfigMap = dbTableEsSyncConfig
+                            .computeIfAbsent(StringUtils.trimToEmpty(config.getDestination()) + "_" + schema + "-"
+                                             + tableItem.getTableName(),
+                                k -> new ConcurrentHashMap<>());
+                    }
+
                     esSyncConfigMap.put(configName, config);
                     esSyncConfigMap.put(configName, config);
                 });
                 });
             }
             }
@@ -134,9 +151,16 @@ public class ESAdapter implements OuterAdapter {
     public void sync(Dml dml) {
     public void sync(Dml dml) {
         String database = dml.getDatabase();
         String database = dml.getDatabase();
         String table = dml.getTable();
         String table = dml.getTable();
-        Map<String, ESSyncConfig> configMap = dbTableEsSyncConfig
-            .get(StringUtils.trimToEmpty(dml.getDestination()) + "-" + StringUtils.trimToEmpty(dml.getGroupId()) + "_"
-                 + database + "-" + table);
+        Map<String, ESSyncConfig> configMap;
+        if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+            configMap = dbTableEsSyncConfig
+                .get(StringUtils.trimToEmpty(dml.getDestination()) + "-" + StringUtils.trimToEmpty(dml.getGroupId())
+                     + "_" + database + "-" + table);
+        } else {
+            configMap = dbTableEsSyncConfig
+                .get(StringUtils.trimToEmpty(dml.getDestination()) + "_" + database + "-" + table);
+        }
+
         if (configMap != null && !configMap.values().isEmpty()) {
         if (configMap != null && !configMap.values().isEmpty()) {
             esSyncService.sync(configMap.values(), dml);
             esSyncService.sync(configMap.values(), dml);
         }
         }

+ 3 - 1
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/config/ESSyncConfigLoader.java

@@ -28,7 +28,9 @@ public class ESSyncConfigLoader {
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("es");
         Map<String, String> configContentMap = MappingConfigsLoader.loadConfigs("es");
         configContentMap.forEach((fileName, content) -> {
         configContentMap.forEach((fileName, content) -> {
             ESSyncConfig config = YmlConfigBinder.bindYmlToObj(null, content, ESSyncConfig.class, null, envProperties);
             ESSyncConfig config = YmlConfigBinder.bindYmlToObj(null, content, ESSyncConfig.class, null, envProperties);
-
+            if (config == null) {
+                return;
+            }
             try {
             try {
                 config.validate();
                 config.validate();
             } catch (Exception e) {
             } catch (Exception e) {

+ 22 - 7
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -46,6 +46,8 @@ public class HbaseAdapter implements OuterAdapter {
 
 
     private HbaseConfigMonitor                      configMonitor;
     private HbaseConfigMonitor                      configMonitor;
 
 
+    private Properties                              envProperties;
+
     public Map<String, MappingConfig> getHbaseMapping() {
     public Map<String, MappingConfig> getHbaseMapping() {
         return hbaseMapping;
         return hbaseMapping;
     }
     }
@@ -57,6 +59,7 @@ public class HbaseAdapter implements OuterAdapter {
     @Override
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
         try {
         try {
+            this.envProperties = envProperties;
             Map<String, MappingConfig> hbaseMappingTmp = MappingConfigLoader.load(envProperties);
             Map<String, MappingConfig> hbaseMappingTmp = MappingConfigLoader.load(envProperties);
             // 过滤不匹配的key的配置
             // 过滤不匹配的key的配置
             hbaseMappingTmp.forEach((key, mappingConfig) -> {
             hbaseMappingTmp.forEach((key, mappingConfig) -> {
@@ -69,11 +72,19 @@ public class HbaseAdapter implements OuterAdapter {
             for (Map.Entry<String, MappingConfig> entry : hbaseMapping.entrySet()) {
             for (Map.Entry<String, MappingConfig> entry : hbaseMapping.entrySet()) {
                 String configName = entry.getKey();
                 String configName = entry.getKey();
                 MappingConfig mappingConfig = entry.getValue();
                 MappingConfig mappingConfig = entry.getValue();
-                String k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
-                           + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
-                           + mappingConfig.getHbaseMapping().getDatabase() + "-"
-                           + mappingConfig.getHbaseMapping().getTable();
-                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(k, k1 -> new ConcurrentHashMap<>());
+                String k;
+                if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                    k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                        + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                        + mappingConfig.getHbaseMapping().getDatabase() + "-"
+                        + mappingConfig.getHbaseMapping().getTable();
+                } else {
+                    k = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                        + mappingConfig.getHbaseMapping().getDatabase() + "-"
+                        + mappingConfig.getHbaseMapping().getTable();
+                }
+                Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(k,
+                    k1 -> new ConcurrentHashMap<>());
                 configMap.put(configName, mappingConfig);
                 configMap.put(configName, mappingConfig);
             }
             }
 
 
@@ -108,8 +119,12 @@ public class HbaseAdapter implements OuterAdapter {
         String groupId = StringUtils.trimToEmpty(dml.getGroupId());
         String groupId = StringUtils.trimToEmpty(dml.getGroupId());
         String database = dml.getDatabase();
         String database = dml.getDatabase();
         String table = dml.getTable();
         String table = dml.getTable();
-        Map<String, MappingConfig> configMap = mappingConfigCache
-            .get(destination + "-" + groupId + "_" + database + "-" + table);
+        Map<String, MappingConfig> configMap;
+        if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+            configMap = mappingConfigCache.get(destination + "-" + groupId + "_" + database + "-" + table);
+        } else {
+            configMap = mappingConfigCache.get(destination + "_" + database + "-" + table);
+        }
         if (configMap != null) {
         if (configMap != null) {
             List<MappingConfig> configs = new ArrayList<>();
             List<MappingConfig> configs = new ArrayList<>();
             configMap.values().forEach(config -> {
             configMap.values().forEach(config -> {

+ 3 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -34,6 +34,9 @@ public class MappingConfigLoader {
         configContentMap.forEach((fileName, content) -> {
         configContentMap.forEach((fileName, content) -> {
             MappingConfig config = YmlConfigBinder
             MappingConfig config = YmlConfigBinder
                 .bindYmlToObj(null, content, MappingConfig.class, null, envProperties);
                 .bindYmlToObj(null, content, MappingConfig.class, null, envProperties);
+            if (config == null) {
+                return;
+            }
             try {
             try {
                 config.validate();
                 config.validate();
             } catch (Exception e) {
             } catch (Exception e) {

+ 13 - 5
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -49,6 +49,8 @@ public class RdbAdapter implements OuterAdapter {
 
 
     private RdbConfigMonitor                        rdbConfigMonitor;
     private RdbConfigMonitor                        rdbConfigMonitor;
 
 
+    private Properties                              envProperties;
+
     public Map<String, MappingConfig> getRdbMapping() {
     public Map<String, MappingConfig> getRdbMapping() {
         return rdbMapping;
         return rdbMapping;
     }
     }
@@ -68,6 +70,7 @@ public class RdbAdapter implements OuterAdapter {
      */
      */
     @Override
     @Override
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
     public void init(OuterAdapterConfig configuration, Properties envProperties) {
+        this.envProperties = envProperties;
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load(envProperties);
         Map<String, MappingConfig> rdbMappingTmp = ConfigLoader.load(envProperties);
         // 过滤不匹配的key的配置
         // 过滤不匹配的key的配置
         rdbMappingTmp.forEach((key, mappingConfig) -> {
         rdbMappingTmp.forEach((key, mappingConfig) -> {
@@ -86,10 +89,15 @@ public class RdbAdapter implements OuterAdapter {
             String configName = entry.getKey();
             String configName = entry.getKey();
             MappingConfig mappingConfig = entry.getValue();
             MappingConfig mappingConfig = entry.getValue();
             if (!mappingConfig.getDbMapping().getMirrorDb()) {
             if (!mappingConfig.getDbMapping().getMirrorDb()) {
-                String key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
-                             + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
-                             + mappingConfig.getDbMapping().getDatabase() + "-"
-                             + mappingConfig.getDbMapping().getTable();
+                String key;
+                if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                    key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "-"
+                          + StringUtils.trimToEmpty(mappingConfig.getGroupId()) + "_"
+                          + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+                } else {
+                    key = StringUtils.trimToEmpty(mappingConfig.getDestination()) + "_"
+                          + mappingConfig.getDbMapping().getDatabase() + "-" + mappingConfig.getDbMapping().getTable();
+                }
                 Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
                 Map<String, MappingConfig> configMap = mappingConfigCache.computeIfAbsent(key,
                     k1 -> new ConcurrentHashMap<>());
                     k1 -> new ConcurrentHashMap<>());
                 configMap.put(configName, mappingConfig);
                 configMap.put(configName, mappingConfig);
@@ -152,7 +160,7 @@ public class RdbAdapter implements OuterAdapter {
             return;
             return;
         }
         }
         try {
         try {
-            rdbSyncService.sync(mappingConfigCache, dmls);
+            rdbSyncService.sync(mappingConfigCache, dmls, envProperties);
             rdbMirrorDbSyncService.sync(dmls);
             rdbMirrorDbSyncService.sync(dmls);
         } catch (Exception e) {
         } catch (Exception e) {
             throw new RuntimeException(e);
             throw new RuntimeException(e);

+ 3 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/ConfigLoader.java

@@ -36,6 +36,9 @@ public class ConfigLoader {
         configContentMap.forEach((fileName, content) -> {
         configContentMap.forEach((fileName, content) -> {
             MappingConfig config = YmlConfigBinder
             MappingConfig config = YmlConfigBinder
                 .bindYmlToObj(null, content, MappingConfig.class, null, envProperties);
                 .bindYmlToObj(null, content, MappingConfig.class, null, envProperties);
+            if (config == null) {
+                return;
+            }
             try {
             try {
                 config.validate();
                 config.validate();
             } catch (Exception e) {
             } catch (Exception e) {

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbMirrorDbSyncService.java

@@ -59,7 +59,7 @@ public class RdbMirrorDbSyncService {
             if (mirrorDbConfig.getMappingConfig() == null) {
             if (mirrorDbConfig.getMappingConfig() == null) {
                 continue;
                 continue;
             }
             }
-            if (StringUtils.isNotEmpty(mirrorDbConfig.getMappingConfig().getGroupId())) {
+            if (dml.getGroupId() != null && StringUtils.isNotEmpty(mirrorDbConfig.getMappingConfig().getGroupId())) {
                 if (!mirrorDbConfig.getMappingConfig().getGroupId().equals(dml.getGroupId())) {
                 if (!mirrorDbConfig.getMappingConfig().getGroupId().equals(dml.getGroupId())) {
                     continue; // 如果groupId不匹配则过滤
                     continue; // 如果groupId不匹配则过滤
                 }
                 }

+ 10 - 19
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -3,10 +3,7 @@ package com.alibaba.otter.canal.client.adapter.rdb.service;
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.ResultSetMetaData;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.*;
 import java.util.function.Function;
 import java.util.function.Function;
 
 
@@ -138,7 +135,7 @@ public class RdbSyncService {
      * @param mappingConfig 配置集合
      * @param mappingConfig 配置集合
      * @param dmls 批量 DML
      * @param dmls 批量 DML
      */
      */
-    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls) {
+    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Properties envProperties) {
         sync(dmls, dml -> {
         sync(dmls, dml -> {
             if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
             if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
                 // DDL
                 // DDL
@@ -150,28 +147,22 @@ public class RdbSyncService {
                 String groupId = StringUtils.trimToEmpty(dml.getGroupId());
                 String groupId = StringUtils.trimToEmpty(dml.getGroupId());
                 String database = dml.getDatabase();
                 String database = dml.getDatabase();
                 String table = dml.getTable();
                 String table = dml.getTable();
-                Map<String, MappingConfig> configMap = mappingConfig
-                    .get(destination + "-" + groupId + "_" + database + "-" + table);
+                Map<String, MappingConfig> configMap;
+                if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
+                    configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
+                } else {
+                    configMap = mappingConfig.get(destination + "_" + database + "-" + table);
+                }
 
 
                 if (configMap == null) {
                 if (configMap == null) {
                     return false;
                     return false;
                 }
                 }
 
 
-                List<MappingConfig> configs = new ArrayList<>();
-                configMap.values().forEach(config -> {
-                    if (StringUtils.isNotEmpty(config.getGroupId())) {
-                        if (config.getGroupId().equals(dml.getGroupId())) {
-                            configs.add(config);
-                        }
-                    } else {
-                        configs.add(config);
-                    }
-                });
-                if (configs.isEmpty()) {
+                if (configMap.values().isEmpty()) {
                     return false;
                     return false;
                 }
                 }
 
 
-                for (MappingConfig config : configs) {
+                for (MappingConfig config : configMap.values()) {
                     if (config.getConcurrent()) {
                     if (config.getConcurrent()) {
                         List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
                         List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
                         singleDmls.forEach(singleDml -> {
                         singleDmls.forEach(singleDml -> {