Ver código fonte

调整Hbase适配

mcy 6 anos atrás
pai
commit
15e79c1400
27 arquivos alterados com 367 adições e 187 exclusões
  1. 10 0
      client-adapter/common/pom.xml
  2. 15 3
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java
  3. 24 4
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/AdapterConfigs.java
  4. 75 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/DatasourceConfig.java
  5. 15 15
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java
  6. 4 5
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java
  7. 9 9
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java
  8. 2 2
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java
  9. 5 5
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterWorker.java
  10. 3 3
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  11. 9 0
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java
  12. 1 1
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java
  13. 4 0
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java
  14. 0 0
      client-adapter/hbase/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  15. 2 1
      client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml
  16. 1 0
      client-adapter/launcher/pom.xml
  17. 0 44
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java
  18. 1 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java
  19. 76 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java
  20. 0 35
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfigPath.java
  21. 21 34
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  22. 11 16
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  23. 65 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java
  24. 5 5
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  25. 6 1
      client-adapter/launcher/src/main/resources/application.yml
  26. 3 3
      client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java
  27. 0 0
      client-adapter/logger/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

+ 10 - 0
client-adapter/common/pom.xml

@@ -28,6 +28,16 @@
             <artifactId>joda-time</artifactId>
             <version>2.9.4</version>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.1.9</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.40</version>
+        </dependency>
     </dependencies>
 
 </project>

+ 15 - 3
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapter.java → client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.client.adapter;
 
+import java.util.Map;
+
 import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
@@ -11,7 +13,7 @@ import com.alibaba.otter.canal.client.adapter.support.SPI;
  * @version 1.0.0
  */
 @SPI("logger")
-public interface CanalOuterAdapter {
+public interface OuterAdapter {
 
     /**
      * 外部适配器初始化接口
@@ -21,11 +23,11 @@ public interface CanalOuterAdapter {
     void init(CanalOuterAdapterConfiguration configuration);
 
     /**
-     * 往适配器中写入数据
+     * 往适配器中同步数据
      *
      * @param dml 数据包
      */
-    void writeOut(Dml dml);
+    void sync(Dml dml);
 
     // void writeOut(FlatMessage flatMessage);
 
@@ -33,4 +35,14 @@ public interface CanalOuterAdapter {
      * 外部适配器销毁接口
      */
     void destroy();
+
+    /**
+     * Etl操作
+     * 
+     * @param criteriaSql 条件拼接sql
+     */
+    default void etl(String criteriaSql) {
+        throw new UnsupportedOperationException();
+    }
+
 }

+ 24 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/AdapterConfigs.java

@@ -1,8 +1,28 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
-public interface AdapterConfigs {
-    Multimap<String, String> configs = ArrayListMultimap.create();
+public class AdapterConfigs {
+
+    private static Map<String, Set<String>> configs = new ConcurrentHashMap<>();
+
+    public static void put(String key, String value) {
+        Set<String> values = configs.get(key);
+        if (values == null) {
+            values = new LinkedHashSet<>();
+        }
+        values.add(value);
+        configs.put(key, values);
+    }
+
+    public static Set<String> get(String key) {
+        return configs.get(key);
+    }
+
+    public static void clear() {
+        configs.clear();
+    }
 }

+ 75 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/DatasourceConfig.java

@@ -0,0 +1,75 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DatasourceConfig {
+
+    public final static Map<String, DruidDataSource> DATA_SOURCES = new ConcurrentHashMap<>();
+
+    private String                                   driver       = "com.mysql.jdbc.Driver";
+    private String                                   url;
+    private String                                   database;
+    private String                                   type         = "mysql";
+    private String                                   username;
+    private String                                   password;
+    private Integer                                  maxActive    = 3;
+
+    public String getDriver() {
+        return driver;
+    }
+
+    public void setDriver(String driver) {
+        this.driver = driver;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public Integer getMaxActive() {
+        return maxActive;
+    }
+
+    public void setMaxActive(Integer maxActive) {
+        this.maxActive = maxActive;
+    }
+}

+ 15 - 15
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java

@@ -10,7 +10,7 @@ import java.util.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -26,10 +26,10 @@ public abstract class AbstractCanalAdapterWorker {
 
     protected final Logger                    logger  = LoggerFactory.getLogger(this.getClass());
 
-    protected String                          canalDestination;                                  // canal实例
-    protected List<List<CanalOuterAdapter>>   canalOuterAdapters;                                // 外部适配器
-    protected ExecutorService                 groupInnerExecutorService;                         // 组内工作线程池
-    protected volatile boolean                running = false;                                   // 是否运行中
+    protected String                          canalDestination;                                                 // canal实例
+    protected List<List<OuterAdapter>>        canalOuterAdapters;                                               // 外部适配器
+    protected ExecutorService                 groupInnerExecutorService;                                        // 组内工作线程池
+    protected volatile boolean                running = false;                                                  // 是否运行中
     protected Thread                          thread  = null;
     protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
 
@@ -42,21 +42,21 @@ public abstract class AbstractCanalAdapterWorker {
     protected void writeOut(final Message message) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            final List<CanalOuterAdapter> adapters = outerAdapters;
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            final List<OuterAdapter> adapters = outerAdapters;
             futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
 
                 @Override
                 public Boolean call() {
                     try {
                         // 组内适配器穿行运行,尽量不要配置组内适配器
-                        for (final CanalOuterAdapter c : adapters) {
+                        for (final OuterAdapter c : adapters) {
                             long begin = System.currentTimeMillis();
                             MessageUtil.parse4Dml(message, new MessageUtil.Consumer<Dml>() {
 
                                 @Override
                                 public void accept(Dml dml) {
-                                    c.writeOut(dml);
+                                    c.sync(dml);
                                 }
                             });
 
@@ -90,18 +90,18 @@ public abstract class AbstractCanalAdapterWorker {
     protected void writeOut(final FlatMessage flatMessage) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            final List<CanalOuterAdapter> adapters = outerAdapters;
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            final List<OuterAdapter> adapters = outerAdapters;
             futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
 
                 @Override
                 public Boolean call() {
                     try {
                         // 组内适配器穿行运行,尽量不要配置组内适配器
-                        for (CanalOuterAdapter c : adapters) {
+                        for (OuterAdapter c : adapters) {
                             long begin = System.currentTimeMillis();
                             Dml dml = MessageUtil.flatMessage2Dml(flatMessage);
-                            c.writeOut(dml);
+                            c.sync(dml);
                             if (logger.isDebugEnabled()) {
                                 logger.debug("{} elapsed time: {}",
                                     c.getClass().getName(),
@@ -154,8 +154,8 @@ public abstract class AbstractCanalAdapterWorker {
         }
         groupInnerExecutorService.shutdown();
         logger.info("topic connectors' worker thread dead!");
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            for (CanalOuterAdapter adapter : outerAdapters) {
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            for (OuterAdapter adapter : outerAdapters) {
                 adapter.destroy();
             }
         }

+ 4 - 5
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -4,12 +4,11 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.errors.WakeupException;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -30,7 +29,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     private boolean             flatMessage;
 
     public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
-                                   List<List<CanalOuterAdapter>> canalOuterAdapters, boolean flatMessage){
+                                   List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
@@ -81,8 +80,8 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             }
             groupInnerExecutorService.shutdown();
             logger.info("topic {} connectors' worker thread dead!", this.topic);
-            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (CanalOuterAdapter adapter : outerAdapters) {
+            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (OuterAdapter adapter : outerAdapters) {
                     adapter.destroy();
                 }
             }

+ 9 - 9
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -13,7 +13,7 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
@@ -33,7 +33,7 @@ public class CanalAdapterLoader {
 
     private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
 
-    private ExtensionLoader<CanalOuterAdapter>      loader;
+    private ExtensionLoader<OuterAdapter>      loader;
 
     public CanalAdapterLoader(CanalClientConfig canalClientConfig){
         this.canalClientConfig = canalClientConfig;
@@ -48,7 +48,7 @@ public class CanalAdapterLoader {
             throw new RuntimeException("Blank config property: canalInstances or canalMQTopics");
         }
 
-        loader = ExtensionLoader.getExtensionLoader(CanalOuterAdapter.class, "" /*
+        loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class, "" /*
                                                                                  * TODO
                                                                                  * canalClientConfig
                                                                                  * .
@@ -68,10 +68,10 @@ public class CanalAdapterLoader {
         // 初始化canal-client的适配器
         if (canalClientConfig.getCanalInstances() != null) {
             for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
-                List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
                 for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
-                    List<CanalOuterAdapter> canalOutConnectors = new ArrayList<>();
+                    List<OuterAdapter> canalOutConnectors = new ArrayList<>();
                     for (CanalOuterAdapterConfiguration c : connectorGroup.getOutAdapters()) {
                         loadConnector(c, canalOutConnectors);
                     }
@@ -93,9 +93,9 @@ public class CanalAdapterLoader {
         if (canalClientConfig.getMqTopics() != null) {
             for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
                 for (CanalClientConfig.Group group : topic.getGroups()) {
-                    List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
-                    List<CanalOuterAdapter> canalOuterAdapters = new ArrayList<>();
+                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
 
                     for (CanalOuterAdapterConfiguration config : group.getOutAdapters()) {
                         loadConnector(config, canalOuterAdapters);
@@ -125,9 +125,9 @@ public class CanalAdapterLoader {
         }
     }
 
-    private void loadConnector(CanalOuterAdapterConfiguration config, List<CanalOuterAdapter> canalOutConnectors) {
+    private void loadConnector(CanalOuterAdapterConfiguration config, List<OuterAdapter> canalOutConnectors) {
         try {
-            CanalOuterAdapter adapter = loader.getExtension(config.getName());
+            OuterAdapter adapter = loader.getExtension(config.getName());
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             // 替换ClassLoader
             Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());

+ 2 - 2
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java

@@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.errors.WakeupException;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
 import com.alibaba.otter.canal.protocol.Message;
@@ -26,7 +26,7 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
     private String                 topic;
 
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
-                                      List<List<CanalOuterAdapter>> canalOuterAdapters){
+                                      List<List<OuterAdapter>> canalOuterAdapters){
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());

+ 5 - 5
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterWorker.java

@@ -6,7 +6,7 @@ import java.util.concurrent.Executors;
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -31,7 +31,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param canalOuterAdapters 外部适配器组
      */
     public CanalAdapterWorker(String canalDestination, SocketAddress address,
-                              List<List<CanalOuterAdapter>> canalOuterAdapters){
+                              List<List<OuterAdapter>> canalOuterAdapters){
         this.canalOuterAdapters = canalOuterAdapters;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
@@ -46,7 +46,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param canalOuterAdapters 外部适配器组
      */
     public CanalAdapterWorker(String canalDestination, String zookeeperHosts,
-                              List<List<CanalOuterAdapter>> canalOuterAdapters){
+                              List<List<OuterAdapter>> canalOuterAdapters){
         this.canalOuterAdapters = canalOuterAdapters;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
@@ -96,8 +96,8 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
             }
             groupInnerExecutorService.shutdown();
             logger.info("destination {} adapters' worker thread dead!", canalDestination);
-            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (CanalOuterAdapter adapter : outerAdapters) {
+            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (OuterAdapter adapter : outerAdapters) {
                     adapter.destroy();
                 }
             }

+ 3 - 3
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -10,7 +10,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfigLoader;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
@@ -25,7 +25,7 @@ import com.alibaba.otter.canal.client.adapter.support.SPI;
  * @version 1.0.0
  */
 @SPI("hbase")
-public class HbaseAdapter implements CanalOuterAdapter {
+public class HbaseAdapter implements OuterAdapter {
 
     private static volatile Map<String, MappingConfig> mappingConfigCache = null;
 
@@ -83,7 +83,7 @@ public class HbaseAdapter implements CanalOuterAdapter {
     }
 
     @Override
-    public void writeOut(Dml dml) {
+    public void sync(Dml dml) {
         if (dml == null) {
             return;
         }

+ 9 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -13,9 +13,18 @@ import java.util.Set;
  * @version 1.0.0
  */
 public class MappingConfig {
+    private String dataSourceKey;
 
     private HbaseOrm hbaseOrm;
 
+    public String getDataSourceKey() {
+        return dataSourceKey;
+    }
+
+    public void setDataSourceKey(String dataSourceKey) {
+        this.dataSourceKey = dataSourceKey;
+    }
+
     public HbaseOrm getHbaseOrm() {
         return hbaseOrm;
     }

+ 1 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -37,7 +37,7 @@ public class MappingConfigLoader {
 
         Map<String, MappingConfig> result = new LinkedHashMap<>();
 
-        Collection<String> configs = AdapterConfigs.configs.get("hbase");
+        Collection<String> configs = AdapterConfigs.get("hbase");
         for (String c : configs) {
             if (c == null) {
                 continue;

+ 4 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -0,0 +1,4 @@
+package com.alibaba.otter.canal.client.adapter.hbase.service;
+
+public class HbaseEtlService {
+}

+ 0 - 0
client-adapter/hbase/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.CanalOuterAdapter → client-adapter/hbase/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter


+ 2 - 1
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -1,8 +1,9 @@
+dataSourceKey: defaultDS
 hbaseOrm:
   mode: PHOENIX  #NATIVE   #STRING
   database: mytest  # 数据库名
   table: person2     # 数据库表名
-  hbaseTable: MYTEST.PERSON2   # HBase表名
+  hbaseTable: MYTEST_PERSON2   # HBase表名
   family: CF  # 默认统一Family名称
   uppercaseQualifier: true  # 字段名转大写, 默认为true
   commitBatch: 3000 # 批量提交的大小

+ 1 - 0
client-adapter/launcher/pom.xml

@@ -81,6 +81,7 @@
     </dependencies>
 
     <build>
+        <finalName>canal-client-launcher</finalName>
         <plugins>
             <plugin>
                 <groupId>org.springframework.boot</groupId>

+ 0 - 44
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java

@@ -1,56 +1,12 @@
 package com.alibaba.otter.canal.adapter.launcher;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-
-import com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 
-
 @SpringBootApplication
 public class CanalAdapterApplication {
-    private static final Logger logger = LoggerFactory.getLogger(CanalAdapterApplication.class);
-
-    private static CanalAdapterLoader adapterLoader;
 
     public static void main(String[] args) {
         new SpringApplicationBuilder(CanalAdapterApplication.class).run(args);
     }
-
-    @Resource
-    private CanalClientConfig canalClientConf;
-
-    @PostConstruct
-    public void init() {
-        if (adapterLoader == null) {
-            try {
-                logger.info("## start the canal client adapters.");
-                adapterLoader = new CanalAdapterLoader(canalClientConf);
-                adapterLoader.init();
-                logger.info("## the canal client adapters are running now ......");
-            } catch (Throwable e) {
-                logger.error("## something goes wrong when starting up the canal client adapters:", e);
-                System.exit(0);
-            }
-        }
-    }
-
-    @PreDestroy
-    public void destroy() {
-        try {
-            logger.info("## stop the canal client adapters");
-            if (adapterLoader != null) {
-                adapterLoader.destroy();
-            }
-        } catch (Throwable e) {
-            logger.warn("## something goes wrong when stopping canal client adapters:", e);
-        } finally {
-            logger.info("## canal client adapters are down.");
-        }
-    }
 }

+ 1 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -4,7 +4,7 @@ import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
-@ConfigurationProperties(prefix = "canal.conf")
 @Component
+@ConfigurationProperties(prefix = "canal.conf")
 public class AdapterCanalConfig extends CanalClientConfig {
 }

+ 76 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java

@@ -0,0 +1,76 @@
+package com.alibaba.otter.canal.adapter.launcher.config;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+
+@Component
+@ConfigurationProperties(prefix = "adapter.conf")
+public class AdapterConfig {
+
+    private Map<String, DatasourceConfig> datasourceConfigs;
+
+    private List<String>                  adapterConfigs;
+
+    public List<String> getAdapterConfigs() {
+        return adapterConfigs;
+    }
+
+    public Map<String, DatasourceConfig> getDatasourceConfigs() {
+        return datasourceConfigs;
+    }
+
+    public void setDatasourceConfigs(Map<String, DatasourceConfig> datasourceConfigs) {
+        this.datasourceConfigs = datasourceConfigs;
+
+        if (datasourceConfigs != null) {
+            for (Map.Entry<String, DatasourceConfig> entry : datasourceConfigs.entrySet()) {
+                DatasourceConfig datasourceConfig = entry.getValue();
+                // 加载数据源连接池
+                DruidDataSource ds = new DruidDataSource();
+                ds.setDriverClassName(datasourceConfig.getDriver());
+                ds.setUrl(datasourceConfig.getUrl());
+                ds.setUsername(datasourceConfig.getUsername());
+                ds.setPassword(datasourceConfig.getPassword());
+                ds.setInitialSize(1);
+                ds.setMinIdle(1);
+                ds.setMaxActive(datasourceConfig.getMaxActive());
+                ds.setMaxWait(60000);
+                ds.setTimeBetweenEvictionRunsMillis(60000);
+                ds.setMinEvictableIdleTimeMillis(300000);
+                ds.setPoolPreparedStatements(false);
+                ds.setMaxPoolPreparedStatementPerConnectionSize(20);
+                ds.setValidationQuery("select 1");
+                try {
+                    ds.init();
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                }
+                DatasourceConfig.DATA_SOURCES.put(entry.getKey(), ds);
+            }
+        }
+    }
+
+    public void setAdapterConfigs(List<String> adapterConfigs) {
+        this.adapterConfigs = adapterConfigs;
+
+        if (adapterConfigs != null) {
+            AdapterConfigs.clear();
+            for (String adapterConfig : adapterConfigs) {
+                int idx = adapterConfig.indexOf("/");
+                if (idx > -1) {
+                    String type = adapterConfig.substring(0, idx);
+                    String ymlFile = adapterConfig.substring(idx + 1);
+                    AdapterConfigs.put(type, ymlFile);
+                }
+            }
+        }
+    }
+}

+ 0 - 35
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfigPath.java

@@ -1,35 +0,0 @@
-package com.alibaba.otter.canal.adapter.launcher.config;
-
-import java.util.List;
-
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-
-import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
-
-@Component
-@ConfigurationProperties(prefix = "adapter.conf.path")
-public class AdapterConfigPath {
-
-    private List<String> adapterConfigs;
-
-    public List<String> getAdapterConfigs() {
-        return adapterConfigs;
-    }
-
-    public void setAdapterConfigs(List<String> adapterConfigs) {
-        this.adapterConfigs = adapterConfigs;
-
-        if (adapterConfigs != null) {
-            AdapterConfigs.configs.clear();
-            for (String adapterConfig : adapterConfigs) {
-                int idx = adapterConfig.indexOf("/");
-                if (idx > -1) {
-                    String type = adapterConfig.substring(0, idx);
-                    String ymlFile = adapterConfig.substring(idx + 1);
-                    AdapterConfigs.configs.put(type, ymlFile);
-                }
-            }
-        }
-    }
-}

+ 21 - 34
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -10,7 +10,7 @@ import java.util.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -25,43 +25,33 @@ public abstract class AbstractCanalAdapterWorker {
     protected final Logger                    logger  = LoggerFactory.getLogger(this.getClass());
 
     protected String                          canalDestination;                                                 // canal实例
-    protected List<List<CanalOuterAdapter>>   canalOuterAdapters;                                               // 外部适配器
+    protected List<List<OuterAdapter>>        canalOuterAdapters;                                               // 外部适配器
     protected ExecutorService                 groupInnerExecutorService;                                        // 组内工作线程池
     protected volatile boolean                running = false;                                                  // 是否运行中
     protected Thread                          thread  = null;
-    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
-
-                                                          @Override
-                                                          public void uncaughtException(Thread t, Throwable e) {
-                                                              logger.error("parse events has an error", e);
-                                                          }
-                                                      };
+    protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
 
     protected void writeOut(final Message message) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            final List<CanalOuterAdapter> adapters = outerAdapters;
-            futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
-
-                @Override
-                public Boolean call() {
-                    try {
-                        // 组内适配器穿行运行,尽量不要配置组内适配器
-                        for (final CanalOuterAdapter c : adapters) {
-                            long begin = System.currentTimeMillis();
-                            MessageUtil.parse4Dml(message, c::writeOut);
-
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("{} elapsed time: {}",
-                                    c.getClass().getName(),
-                                    (System.currentTimeMillis() - begin));
-                            }
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            final List<OuterAdapter> adapters = outerAdapters;
+            futures.add(groupInnerExecutorService.submit(() -> {
+                try {
+                    // 组内适配器穿行运行,尽量不要配置组内适配器
+                    for (final OuterAdapter c : adapters) {
+                        long begin = System.currentTimeMillis();
+                        MessageUtil.parse4Dml(message, c::sync);
+
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("{} elapsed time: {}",
+                                c.getClass().getName(),
+                                (System.currentTimeMillis() - begin));
                         }
-                        return true;
-                    } catch (Exception e) {
-                        return false;
                     }
+                    return true;
+                } catch (Exception e) {
+                    return false;
                 }
             }));
 
@@ -131,9 +121,6 @@ public abstract class AbstractCanalAdapterWorker {
         if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
             logger.error("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
         }
-        if (logger.isDebugEnabled()) {
-            logger.debug("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
-        }
     }
 
     protected void stopOutAdapters() {
@@ -146,8 +133,8 @@ public abstract class AbstractCanalAdapterWorker {
         }
         groupInnerExecutorService.shutdown();
         logger.info("topic connectors' worker thread dead!");
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            for (CanalOuterAdapter adapter : outerAdapters) {
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            for (OuterAdapter adapter : outerAdapters) {
                 adapter.destroy();
             }
         }

+ 11 - 16
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -9,7 +9,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
 import org.slf4j.Logger;
@@ -24,15 +24,15 @@ import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
  */
 public class CanalAdapterLoader {
 
-    private static final Logger logger = LoggerFactory.getLogger(CanalAdapterLoader.class);
+    private static final Logger                logger       = LoggerFactory.getLogger(CanalAdapterLoader.class);
 
-    private CanalClientConfig canalClientConfig;
+    private CanalClientConfig                  canalClientConfig;
 
-    private Map<String, CanalAdapterWorker> canalWorkers = new HashMap<>();
+    private Map<String, CanalAdapterWorker>    canalWorkers = new HashMap<>();
 
-    private ExtensionLoader<CanalOuterAdapter> loader;
+    private ExtensionLoader<OuterAdapter> loader;
 
-    public CanalAdapterLoader(CanalClientConfig canalClientConfig) {
+    public CanalAdapterLoader(CanalClientConfig canalClientConfig){
         this.canalClientConfig = canalClientConfig;
     }
 
@@ -40,12 +40,7 @@ public class CanalAdapterLoader {
      * 初始化canal-client
      */
     public void init() {
-        // canal instances 和 mq topics 配置不能同时为空
-        // if (canalClientConfig.getCanalInstances() == null && canalClientConfig.getMqTopics() == null) {
-        //    throw new RuntimeException("Blank config property: canalInstances or canalMQTopics");
-        // }
-
-        loader = ExtensionLoader.getExtensionLoader(CanalOuterAdapter.class);
+        loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);
 
         String canalServerHost = this.canalClientConfig.getCanalServerHost();
         SocketAddress sa = null;
@@ -58,10 +53,10 @@ public class CanalAdapterLoader {
         // 初始化canal-client的适配器
         if (canalClientConfig.getCanalInstances() != null) {
             for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
-                List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
                 for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
-                    List<CanalOuterAdapter> canalOutConnectors = new ArrayList<>();
+                    List<OuterAdapter> canalOutConnectors = new ArrayList<>();
                     for (CanalOuterAdapterConfiguration c : connectorGroup.getOutAdapters()) {
                         loadConnector(c, canalOutConnectors);
                     }
@@ -80,9 +75,9 @@ public class CanalAdapterLoader {
         }
     }
 
-    private void loadConnector(CanalOuterAdapterConfiguration config, List<CanalOuterAdapter> canalOutConnectors) {
+    private void loadConnector(CanalOuterAdapterConfiguration config, List<OuterAdapter> canalOutConnectors) {
         try {
-            CanalOuterAdapter adapter = loader.getExtension(config.getName());
+            OuterAdapter adapter = loader.getExtension(config.getName());
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             // 替换ClassLoader
             Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());

+ 65 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -0,0 +1,65 @@
+package com.alibaba.otter.canal.adapter.launcher.loader;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+
+@Component
+public class CanalAdapterService {
+
+    private static final Logger       logger = LoggerFactory.getLogger(CanalAdapterService.class);
+
+    private static CanalAdapterLoader adapterLoader;
+
+    @Resource
+    private AdapterCanalConfig        adapterCanalConfig;
+
+    // 注入配置保证配置优先先注册
+    @Resource
+    private AdapterConfig             adapterConfig;
+
+    @PostConstruct
+    public void init() {
+        if (adapterLoader == null) {
+            try {
+                logger.info("## start the canal client adapters.");
+                adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
+                adapterLoader.init();
+                logger.info("## the canal client adapters are running now ......");
+            } catch (Throwable e) {
+                logger.error("## something goes wrong when starting up the canal client adapters:", e);
+                System.exit(0);
+            }
+        }
+    }
+
+    @PreDestroy
+    public void destroy() {
+        try {
+            logger.info("## stop the canal client adapters");
+            if (adapterLoader != null) {
+                adapterLoader.destroy();
+            }
+            for (DruidDataSource druidDataSource : DatasourceConfig.DATA_SOURCES.values()) {
+                try {
+                    druidDataSource.close();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        } catch (Throwable e) {
+            logger.warn("## something goes wrong when stopping canal client adapters:", e);
+        } finally {
+            logger.info("## canal client adapters are down.");
+        }
+    }
+}

+ 5 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -6,7 +6,7 @@ import java.util.concurrent.Executors;
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -31,7 +31,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param canalOuterAdapters 外部适配器组
      */
     public CanalAdapterWorker(String canalDestination, SocketAddress address,
-                              List<List<CanalOuterAdapter>> canalOuterAdapters){
+                              List<List<OuterAdapter>> canalOuterAdapters){
         this.canalOuterAdapters = canalOuterAdapters;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
@@ -46,7 +46,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param canalOuterAdapters 外部适配器组
      */
     public CanalAdapterWorker(String canalDestination, String zookeeperHosts,
-                              List<List<CanalOuterAdapter>> canalOuterAdapters){
+                              List<List<OuterAdapter>> canalOuterAdapters){
         this.canalOuterAdapters = canalOuterAdapters;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
@@ -96,8 +96,8 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
             }
             groupInnerExecutorService.shutdown();
             logger.info("destination {} adapters' worker thread dead!", canalDestination);
-            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (CanalOuterAdapter adapter : outerAdapters) {
+            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (OuterAdapter adapter : outerAdapters) {
                     adapter.destroy();
                 }
             }

+ 6 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -16,6 +16,11 @@ canal.conf:
 #        hosts: slave1:2181
 #        properties: {znodeParent: "/hbase-unsecure"}
 
-adapter.conf.path:
+adapter.conf:
+  datasourceConfigs:
+    defaultDS:
+      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+      username: root
+      password: 121212
   adapterConfigs:
   - hbase/mytest_person2.yml

+ 3 - 3
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -3,7 +3,7 @@ package com.alibaba.otter.canal.client.adapter.logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
@@ -16,7 +16,7 @@ import com.alibaba.otter.canal.client.adapter.support.SPI;
  */
 @SPI("logger")
 // logger参数对应CanalOuterAdapterConfiguration配置中的name
-public class LoggerAdapterExample implements CanalOuterAdapter {
+public class LoggerAdapterExample implements OuterAdapter {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -26,7 +26,7 @@ public class LoggerAdapterExample implements CanalOuterAdapter {
     }
 
     @Override
-    public void writeOut(Dml dml) {
+    public void sync(Dml dml) {
         logger.info(dml.toString());
     }
 

+ 0 - 0
client-adapter/logger/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.CanalOuterAdapter → client-adapter/logger/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter