Browse Source

rdb同步适配多数据源

machey 6 years ago
parent
commit
d7ab6e2dc1
17 changed files with 125 additions and 115 deletions
  1. 16 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 16 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java
  3. 10 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/OuterAdapterConfig.java
  4. 7 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  5. 15 14
      client-adapter/launcher/src/main/resources/application.yml
  6. 24 24
      client-adapter/rdb/pom.xml
  7. 18 17
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  8. 0 8
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/MysqlAdapter.java
  9. 0 8
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/OracleAdapter.java
  10. 0 8
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/PostgresqlAdapter.java
  11. 0 8
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/SqlserverAdapter.java
  12. 13 3
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java
  13. 1 4
      client-adapter/rdb/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  14. 0 15
      client-adapter/rdb/src/main/resources/postgresql/mytest_user.yml
  15. 1 0
      client-adapter/rdb/src/main/resources/rdb/mytest_user.yml
  16. 3 3
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java
  17. 1 1
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java

+ 16 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -1,7 +1,9 @@
 package com.alibaba.otter.canal.client.adapter.support;
 package com.alibaba.otter.canal.client.adapter.support;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 
 
 /**
 /**
  * 配置信息类
  * 配置信息类
@@ -129,7 +131,9 @@ public class CanalClientConfig {
 
 
     public static class Group {
     public static class Group {
 
 
-        private List<OuterAdapterConfig> outAdapters; // 适配器列表
+        private List<OuterAdapterConfig>        outAdapters;                           // 适配器列表
+
+        private Map<String, OuterAdapterConfig> outAdaptersMap = new LinkedHashMap<>();
 
 
         public List<OuterAdapterConfig> getOutAdapters() {
         public List<OuterAdapterConfig> getOutAdapters() {
             return outAdapters;
             return outAdapters;
@@ -137,6 +141,17 @@ public class CanalClientConfig {
 
 
         public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
         public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
             this.outAdapters = outAdapters;
             this.outAdapters = outAdapters;
+            if (outAdapters != null) {
+                outAdapters.forEach(outAdapter -> outAdaptersMap.put(outAdapter.getKey(), outAdapter));
+            }
+        }
+
+        public Map<String, OuterAdapterConfig> getOutAdaptersMap() {
+            return outAdaptersMap;
+        }
+
+        public void setOutAdaptersMap(Map<String, OuterAdapterConfig> outAdaptersMap) {
+            this.outAdaptersMap = outAdaptersMap;
         }
         }
     }
     }
 
 

+ 16 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -148,6 +148,22 @@ public class ExtensionLoader<T> {
         }
         }
     }
     }
 
 
+    @SuppressWarnings("unchecked")
+    public T newInstance(String name) {
+        Class<?> clazz = getExtensionClasses().get(name);
+        if (clazz == null) {
+            throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
+                                            + ")  could not be instantiated: class could not be found");
+        }
+        try {
+            return (T) clazz.newInstance();
+        } catch (Throwable t) {
+            throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
+                                            + ")  could not be instantiated: " + t.getMessage(),
+                t);
+        }
+    }
+
     private Map<String, Class<?>> getExtensionClasses() {
     private Map<String, Class<?>> getExtensionClasses() {
         Map<String, Class<?>> classes = cachedClasses.get();
         Map<String, Class<?>> classes = cachedClasses.get();
         if (classes == null) {
         if (classes == null) {

+ 10 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/OuterAdapterConfig.java

@@ -12,6 +12,8 @@ public class OuterAdapterConfig {
 
 
     private String              name;       // 适配器名称, 如: logger, hbase, es
     private String              name;       // 适配器名称, 如: logger, hbase, es
 
 
+    private String              key;        // 适配器唯一键
+
     private String              hosts;      // 适配器内部的地址, 比如对应es该参数可以填写es的server地址
     private String              hosts;      // 适配器内部的地址, 比如对应es该参数可以填写es的server地址
 
 
     private String              zkHosts;    // 适配器内部的ZK地址, 比如对应HBase该参数可以填写HBase对应的ZK地址
     private String              zkHosts;    // 适配器内部的ZK地址, 比如对应HBase该参数可以填写HBase对应的ZK地址
@@ -26,6 +28,14 @@ public class OuterAdapterConfig {
         this.name = name;
         this.name = name;
     }
     }
 
 
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
     public String getHosts() {
     public String getHosts() {
         return hosts;
         return hosts;
     }
     }

+ 7 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -123,7 +123,12 @@ public class CanalAdapterLoader {
 
 
     private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
     private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
         try {
-            OuterAdapter adapter = loader.getExtension(config.getName());
+            OuterAdapter adapter;
+            if ("rdb".equalsIgnoreCase(config.getName())) {
+                adapter = loader.newInstance(config.getName());
+            } else {
+                adapter = loader.getExtension(config.getName());
+            }
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             // 替换ClassLoader
             // 替换ClassLoader
             Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());
             Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());
@@ -133,6 +138,7 @@ public class CanalAdapterLoader {
             logger.info("Load canal adapter: {} succeed", config.getName());
             logger.info("Load canal adapter: {} succeed", config.getName());
         } catch (Exception e) {
         } catch (Exception e) {
             logger.error("Load canal adapter: {} failed", config.getName(), e);
             logger.error("Load canal adapter: {} failed", config.getName(), e);
+            System.exit(0);
         }
         }
     }
     }
 
 

+ 15 - 14
client-adapter/launcher/src/main/resources/application.yml

@@ -21,6 +21,20 @@ spring:
 #    groups:
 #    groups:
 #    - outAdapters:
 #    - outAdapters:
 #      - name: logger
 #      - name: logger
+#      - name: rdb
+#        key: oracle1
+#        properties:
+#          jdbc.driverClassName: oracle.jdbc.OracleDriver
+#          jdbc.url: jdbc:oracle:thin:@127.0.0.1:1521:orcl
+#          jdbc.username: mytest
+#          jdbc.password: m121212
+#      - name: rdb
+#        key: postgres1
+#        properties:
+#          jdbc.driverClassName: org.postgresql.Driver
+#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
+#          jdbc.username: postgres
+#          jdbc.password: 121212
 #      - name: hbase
 #      - name: hbase
 #        properties:
 #        properties:
 #          hbase.zookeeper.quorum: 127.0.0.1
 #          hbase.zookeeper.quorum: 127.0.0.1
@@ -37,18 +51,6 @@ spring:
 #    - groupId: g2
 #    - groupId: g2
 #      outAdapters:
 #      outAdapters:
 #      - name: logger
 #      - name: logger
-#      - name: oracle
-#        properties:
-#          jdbc.driverClassName: oracle.jdbc.OracleDriver
-#          jdbc.url: jdbc:oracle:thin:@127.0.0.1:49161:XE
-#          jdbc.username: mytest
-#          jdbc.password: m121212
-#      - name: postgresql
-#        properties:
-#          jdbc.driverClassName: org.postgresql.Driver
-#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
-#          jdbc.username: postgres
-#          jdbc.password: 121212
 #  mqTopics:
 #  mqTopics:
 #  - mqMode: rocketmq
 #  - mqMode: rocketmq
 #    topic: example
 #    topic: example
@@ -66,5 +68,4 @@ spring:
 #  adapterConfigs:
 #  adapterConfigs:
 #  - hbase/mytest_person2.yml
 #  - hbase/mytest_person2.yml
 #  - es/mytest_user.yml
 #  - es/mytest_user.yml
-#  - oracle/mytest_user.yml
-#  - postgresql/mytest_user.yml
+#  - rdb/mytest_user.yml

+ 24 - 24
client-adapter/rdb/pom.xml

@@ -27,30 +27,30 @@
             <scope>provided</scope>
             <scope>provided</scope>
         </dependency>
         </dependency>
 
 
-        <dependency>
-            <groupId>mysql</groupId>
-            <artifactId>mysql-connector-java</artifactId>
-            <version>5.1.40</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.postgresql</groupId>
-            <artifactId>postgresql</artifactId>
-            <version>42.1.4</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.oracle</groupId>
-            <artifactId>ojdbc6</artifactId>
-            <version>11.2.0.4.0-atlassian-hosted</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.microsoft.sqlserver</groupId>
-            <artifactId>mssql-jdbc</artifactId>
-            <version>7.0.0.jre8</version>
-            <scope>test</scope>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>mysql</groupId>-->
+            <!--<artifactId>mysql-connector-java</artifactId>-->
+            <!--<version>5.1.40</version>-->
+            <!--<scope>test</scope>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>org.postgresql</groupId>-->
+            <!--<artifactId>postgresql</artifactId>-->
+            <!--<version>42.1.4</version>-->
+            <!--<scope>test</scope>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>com.oracle</groupId>-->
+            <!--<artifactId>ojdbc6</artifactId>-->
+            <!--<version>11.2.0.4.0-atlassian-hosted</version>-->
+            <!--<scope>test</scope>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>com.microsoft.sqlserver</groupId>-->
+            <!--<artifactId>mssql-jdbc</artifactId>-->
+            <!--<version>7.0.0.jre8</version>-->
+            <!--<scope>test</scope>-->
+        <!--</dependency>-->
         <dependency>
         <dependency>
             <groupId>junit</groupId>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <artifactId>junit</artifactId>

+ 18 - 17
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -19,12 +19,13 @@ import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
 import com.alibaba.otter.canal.client.adapter.support.*;
 import com.alibaba.otter.canal.client.adapter.support.*;
 
 
-public abstract class RdbAdapter implements OuterAdapter {
+@SPI("rdb")
+public class RdbAdapter implements OuterAdapter {
 
 
-    private static Logger                       logger             = LoggerFactory.getLogger(RdbAdapter.class);
+    private static Logger                       logger     = LoggerFactory.getLogger(RdbAdapter.class);
 
 
-    private volatile Map<String, MappingConfig> rdbMapping         = null;                                     // 文件名对应配置
-    private volatile Map<String, MappingConfig> mappingConfigCache = null;                                     // 库名-表名对应配置
+    private volatile Map<String, MappingConfig> rdbMapping = new HashMap<>();                          // 文件名对应配置
+    private Map<String, MappingConfig>          mappingConfigCache;                                    // 库名-表名对应配置
 
 
     private DruidDataSource                     dataSource;
     private DruidDataSource                     dataSource;
 
 
@@ -32,20 +33,20 @@ public abstract class RdbAdapter implements OuterAdapter {
 
 
     @Override
     @Override
     public void init(OuterAdapterConfig configuration) {
     public void init(OuterAdapterConfig configuration) {
-        if (mappingConfigCache == null) {
-            synchronized (MappingConfig.class) {
-                if (mappingConfigCache == null) {
-                    SPI spi = this.getClass().getAnnotation(SPI.class);
-                    rdbMapping = MappingConfigLoader.load(spi.value());
-                    mappingConfigCache = new HashMap<>();
-                    for (MappingConfig mappingConfig : rdbMapping.values()) {
-                        mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
-                                               + mappingConfig.getDbMapping().getDatabase() + "."
-                                               + mappingConfig.getDbMapping().getTable(),
-                            mappingConfig);
-                    }
-                }
+        SPI spi = this.getClass().getAnnotation(SPI.class);
+        Map<String, MappingConfig> rdbMappingTmp = MappingConfigLoader.load(spi.value());
+        // 过滤其他key的配置
+        rdbMappingTmp.forEach((key, mappingConfig) -> {
+            if (mappingConfig.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey())) {
+                rdbMapping.put(key, mappingConfig);
             }
             }
+        });
+        mappingConfigCache = new HashMap<>();
+        for (MappingConfig mappingConfig : rdbMapping.values()) {
+            mappingConfigCache
+                .put(StringUtils.trimToEmpty(mappingConfig.getDestination()) + "."
+                     + mappingConfig.getDbMapping().getDatabase() + "." + mappingConfig.getDbMapping().getTable(),
+                    mappingConfig);
         }
         }
 
 
         Map<String, String> properties = configuration.getProperties();
         Map<String, String> properties = configuration.getProperties();

+ 0 - 8
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/MysqlAdapter.java

@@ -1,8 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.rdb.adapters;
-
-import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
-
-@SPI("mysql")
-public class MysqlAdapter extends RdbAdapter {
-}

+ 0 - 8
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/OracleAdapter.java

@@ -1,8 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.rdb.adapters;
-
-import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
-
-@SPI("oracle")
-public class OracleAdapter extends RdbAdapter {
-}

+ 0 - 8
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/PostgresqlAdapter.java

@@ -1,8 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.rdb.adapters;
-
-import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
-
-@SPI("postgresql")
-public class PostgresqlAdapter extends RdbAdapter {
-}

+ 0 - 8
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/adapters/SqlserverAdapter.java

@@ -1,8 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.rdb.adapters;
-
-import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
-
-@SPI("sqlserver")
-public class SqlserverAdapter extends RdbAdapter {
-}

+ 13 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/config/MappingConfig.java

@@ -12,11 +12,13 @@ import java.util.Set;
  */
  */
 public class MappingConfig {
 public class MappingConfig {
 
 
-    private String    dataSourceKey; // 数据源key
+    private String    dataSourceKey;   // 数据源key
 
 
-    private String    destination;   // canal实例或MQ的topic
+    private String    destination;     // canal实例或MQ的topic
 
 
-    private DbMapping dbMapping;     // db映射配置
+    private String    outerAdapterKey; // 对应适配器的key
+
+    private DbMapping dbMapping;       // db映射配置
 
 
     public String getDataSourceKey() {
     public String getDataSourceKey() {
         return dataSourceKey;
         return dataSourceKey;
@@ -26,6 +28,14 @@ public class MappingConfig {
         this.dataSourceKey = dataSourceKey;
         this.dataSourceKey = dataSourceKey;
     }
     }
 
 
+    public String getOuterAdapterKey() {
+        return outerAdapterKey;
+    }
+
+    public void setOuterAdapterKey(String outerAdapterKey) {
+        this.outerAdapterKey = outerAdapterKey;
+    }
+
     public DbMapping getDbMapping() {
     public DbMapping getDbMapping() {
         return dbMapping;
         return dbMapping;
     }
     }

+ 1 - 4
client-adapter/rdb/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

@@ -1,4 +1 @@
-mysql=com.alibaba.otter.canal.client.adapter.rdb.adapters.MysqlAdapter
-postgresql=com.alibaba.otter.canal.client.adapter.rdb.adapters.PostgresqlAdapter
-oracle=com.alibaba.otter.canal.client.adapter.rdb.adapters.OracleAdapter
-sqlserver=com.alibaba.otter.canal.client.adapter.rdb.adapters.SqlserverAdapter
+rdb=com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter

+ 0 - 15
client-adapter/rdb/src/main/resources/postgresql/mytest_user.yml

@@ -1,15 +0,0 @@
-dataSourceKey: defaultDS
-destination: example
-dbMapping:
-  database: mytest
-  table: user
-  targetTable: public.tb_user
-  targetPk:
-    id: id
-  mapAll: true
-#  targetColumns:
-#    id:
-#    name:
-#    role_id:
-#    c_time:
-#    test1:

+ 1 - 0
client-adapter/rdb/src/main/resources/oracle/mytest_user.yml → client-adapter/rdb/src/main/resources/rdb/mytest_user.yml

@@ -1,5 +1,6 @@
 dataSourceKey: defaultDS
 dataSourceKey: defaultDS
 destination: example
 destination: example
+outerAdapterKey: oracle1
 dbMapping:
 dbMapping:
   database: mytest
   database: mytest
   table: user
   table: user

+ 3 - 3
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/Common.java

@@ -1,7 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
 package com.alibaba.otter.canal.client.adapter.rdb.test.sync;
 
 
 import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
-import com.alibaba.otter.canal.client.adapter.rdb.adapters.OracleAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.test.TestConstant;
 import com.alibaba.otter.canal.client.adapter.rdb.test.TestConstant;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
@@ -14,7 +13,8 @@ public class Common {
         DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
         DatasourceConfig.DATA_SOURCES.put("defaultDS", TestConstant.dataSource);
 
 
         OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
         OuterAdapterConfig outerAdapterConfig = new OuterAdapterConfig();
-        outerAdapterConfig.setName("oracle");
+        outerAdapterConfig.setName("rdb");
+        outerAdapterConfig.setKey("oralce1");
         Map<String, String> properties = new HashMap<>();
         Map<String, String> properties = new HashMap<>();
         properties.put("jdbc.driveClassName", "oracle.jdbc.OracleDriver");
         properties.put("jdbc.driveClassName", "oracle.jdbc.OracleDriver");
         properties.put("jdbc.url", "jdbc:oracle:thin:@127.0.0.1:49161:XE");
         properties.put("jdbc.url", "jdbc:oracle:thin:@127.0.0.1:49161:XE");
@@ -22,7 +22,7 @@ public class Common {
         properties.put("jdbc.password", "m121212");
         properties.put("jdbc.password", "m121212");
         outerAdapterConfig.setProperties(properties);
         outerAdapterConfig.setProperties(properties);
 
 
-        RdbAdapter adapter = new OracleAdapter();
+        RdbAdapter adapter = new RdbAdapter();
         adapter.init(outerAdapterConfig);
         adapter.init(outerAdapterConfig);
         return adapter;
         return adapter;
     }
     }

+ 1 - 1
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java

@@ -15,7 +15,7 @@ public class OracleSyncTest {
 
 
     @Before
     @Before
     public void init() {
     public void init() {
-        AdapterConfigs.put("oracle", "mytest_user.yml");
+        AdapterConfigs.put("rdb", "mytest_user.yml");
         rdbAdapter = Common.init();
         rdbAdapter = Common.init();
     }
     }