Explorar o código

Merge pull request #1037 from rewerma/master

完善adapter-launcher及hbase-adaper相关功能
agapple %!s(int64=7) %!d(string=hai) anos
pai
achega
b5f5d3f0f0
Modificáronse 48 ficheiros con 2167 adicións e 421 borrados
  1. 10 5
      client-adapter/common/pom.xml
  2. 0 36
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapter.java
  3. 68 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java
  4. 24 4
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/AdapterConfigs.java
  5. 11 11
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  6. 75 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/DatasourceConfig.java
  7. 12 2
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java
  8. 37 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/EtlResult.java
  9. 50 4
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java
  10. 10 12
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  11. 8 8
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/OuterAdapterConfig.java
  12. 50 0
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java
  13. 1 0
      client-adapter/example/pom.xml
  14. 17 17
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java
  15. 4 5
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java
  16. 12 12
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java
  17. 2 2
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java
  18. 5 5
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterWorker.java
  19. 0 19
      client-adapter/hbase/pom.xml
  20. 111 40
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  21. 18 0
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java
  22. 4 2
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java
  23. 353 0
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java
  24. 6 15
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java
  25. 2 2
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/TypeUtil.java
  26. 0 0
      client-adapter/hbase/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter
  27. 2 0
      client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml
  28. 34 0
      client-adapter/launcher/assembly.xml
  29. 36 0
      client-adapter/launcher/pom.xml
  30. 0 44
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java
  31. 114 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/EtlLock.java
  32. 6 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/Mode.java
  33. 208 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java
  34. 37 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java
  35. 80 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java
  36. 0 35
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfigPath.java
  37. 36 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/CuratorClient.java
  38. 28 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/SpringContext.java
  39. 60 75
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  40. 142 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  41. 65 22
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  42. 132 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  43. 71 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java
  44. 20 28
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  45. 166 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java
  46. 35 9
      client-adapter/launcher/src/main/resources/application.yml
  47. 5 5
      client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java
  48. 0 0
      client-adapter/logger/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter

+ 10 - 5
client-adapter/common/pom.xml

@@ -18,16 +18,21 @@
             <artifactId>canal.protocol</artifactId>
             <version>${canal_version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <version>1.7.12</version>
-        </dependency>
         <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
             <version>2.9.4</version>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.1.9</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.40</version>
+        </dependency>
     </dependencies>
 
 </project>

+ 0 - 36
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/CanalOuterAdapter.java

@@ -1,36 +0,0 @@
-package com.alibaba.otter.canal.client.adapter;
-
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
-
-/**
- * 外部适配器接口
- *
- * @author machengyuan 2018-8-18 下午10:14:02
- * @version 1.0.0
- */
-@SPI("logger")
-public interface CanalOuterAdapter {
-
-    /**
-     * 外部适配器初始化接口
-     *
-     * @param configuration 外部适配器配置信息
-     */
-    void init(CanalOuterAdapterConfiguration configuration);
-
-    /**
-     * 往适配器中写入数据
-     *
-     * @param dml 数据包
-     */
-    void writeOut(Dml dml);
-
-    // void writeOut(FlatMessage flatMessage);
-
-    /**
-     * 外部适配器销毁接口
-     */
-    void destroy();
-}

+ 68 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -0,0 +1,68 @@
+package com.alibaba.otter.canal.client.adapter;
+
+import java.util.List;
+import java.util.Map;
+
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.SPI;
+
+/**
+ * 外部适配器接口
+ *
+ * @author machengyuan 2018-8-18 下午10:14:02
+ * @version 1.0.0
+ */
+@SPI("logger")
+public interface OuterAdapter {
+
+    /**
+     * 外部适配器初始化接口
+     *
+     * @param configuration 外部适配器配置信息
+     */
+    void init(OuterAdapterConfig configuration);
+
+    /**
+     * 往适配器中同步数据
+     *
+     * @param dml 数据包
+     */
+    void sync(Dml dml);
+
+    /**
+     * 外部适配器销毁接口
+     */
+    void destroy();
+
+    /**
+     * Etl操作
+     * 
+     * @param task 任务名, 对应配置名
+     * @param params etl筛选条件
+     */
+    default EtlResult etl(String task, List<String> params) {
+        throw new UnsupportedOperationException("unsupported operation");
+    }
+
+    /**
+     * 计算总数
+     * 
+     * @param task 任务名, 对应配置名
+     * @return 总数
+     */
+    default Map<String, Object> count(String task) {
+        throw new UnsupportedOperationException("unsupported operation");
+    }
+
+    /**
+     * 通过task获取对应的destination
+     * 
+     * @param task 任务名, 对应配置名
+     * @return destination
+     */
+    default String getDestination(String task) {
+        return null;
+    }
+}

+ 24 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/AdapterConfigs.java

@@ -1,8 +1,28 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
-public interface AdapterConfigs {
-    Multimap<String, String> configs = ArrayListMultimap.create();
+public class AdapterConfigs {
+
+    private static Map<String, Set<String>> configs = new ConcurrentHashMap<>();
+
+    public static void put(String key, String value) {
+        Set<String> values = configs.get(key);
+        if (values == null) {
+            values = new LinkedHashSet<>();
+        }
+        values.add(value);
+        configs.put(key, values);
+    }
+
+    public static Set<String> get(String key) {
+        return configs.get(key);
+    }
+
+    public static void clear() {
+        configs.clear();
+    }
 }

+ 11 - 11
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -62,6 +62,10 @@ public class CanalClientConfig {
         return mqTopics;
     }
 
+    public void setMqTopics(List<MQTopic> mqTopics) {
+        this.mqTopics = mqTopics;
+    }
+
     public Boolean getFlatMessage() {
         return flatMessage;
     }
@@ -70,10 +74,6 @@ public class CanalClientConfig {
         this.flatMessage = flatMessage;
     }
 
-    public void setMqTopics(List<MQTopic> mqTopics) {
-        this.mqTopics = mqTopics;
-    }
-
     public List<CanalInstance> getCanalInstances() {
         return canalInstances;
     }
@@ -109,13 +109,13 @@ public class CanalClientConfig {
 
     public static class AdapterGroup {
 
-        private List<CanalOuterAdapterConfiguration> outAdapters;
+        private List<OuterAdapterConfig> outAdapters;
 
-        public List<CanalOuterAdapterConfiguration> getOutAdapters() {
+        public List<OuterAdapterConfig> getOutAdapters() {
             return outAdapters;
         }
 
-        public void setOutAdapters(List<CanalOuterAdapterConfiguration> outAdapters) {
+        public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
             this.outAdapters = outAdapters;
         }
     }
@@ -155,11 +155,11 @@ public class CanalClientConfig {
 
     public static class Group {
 
-        private String                               groupId;
+        private String                   groupId;
 
         // private List<Adaptor> adapters = new ArrayList<>();
 
-        private List<CanalOuterAdapterConfiguration> outAdapters;
+        private List<OuterAdapterConfig> outAdapters;
 
         public String getGroupId() {
             return groupId;
@@ -169,11 +169,11 @@ public class CanalClientConfig {
             this.groupId = groupId;
         }
 
-        public List<CanalOuterAdapterConfiguration> getOutAdapters() {
+        public List<OuterAdapterConfig> getOutAdapters() {
             return outAdapters;
         }
 
-        public void setOutAdapters(List<CanalOuterAdapterConfiguration> outAdapters) {
+        public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
             this.outAdapters = outAdapters;
         }
 

+ 75 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/DatasourceConfig.java

@@ -0,0 +1,75 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DatasourceConfig {
+
+    public final static Map<String, DruidDataSource> DATA_SOURCES = new ConcurrentHashMap<>();
+
+    private String                                   driver       = "com.mysql.jdbc.Driver";
+    private String                                   url;
+    private String                                   database;
+    private String                                   type         = "mysql";
+    private String                                   username;
+    private String                                   password;
+    private Integer                                  maxActive    = 3;
+
+    public String getDriver() {
+        return driver;
+    }
+
+    public void setDriver(String driver) {
+        this.driver = driver;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public Integer getMaxActive() {
+        return maxActive;
+    }
+
+    public void setMaxActive(Integer maxActive) {
+        this.maxActive = maxActive;
+    }
+}

+ 12 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -14,6 +14,7 @@ public class Dml implements Serializable {
 
     private static final long         serialVersionUID = 2611556444074013268L;
 
+    private String                    destination;
     private String                    database;
     private String                    table;
     private String                    type;
@@ -25,6 +26,14 @@ public class Dml implements Serializable {
     private List<Map<String, Object>> data;
     private List<Map<String, Object>> old;
 
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
     public String getDatabase() {
         return database;
     }
@@ -102,7 +111,8 @@ public class Dml implements Serializable {
 
     @Override
     public String toString() {
-        return "Dml [database=" + database + ", table=" + table + ", type=" + type + ", es=" + es + ", ts=" + ts
-               + ", sql=" + sql + ", data=" + data + ", old=" + old + "]";
+        return "Dml{" + "destination='" + destination + '\'' + ", database='" + database + '\'' + ", table='" + table
+               + '\'' + ", type='" + type + '\'' + ", es=" + es + ", ts=" + ts + ", sql='" + sql + '\'' + ", data="
+               + data + ", old=" + old + '}';
     }
 }

+ 37 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/EtlResult.java

@@ -0,0 +1,37 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import java.io.Serializable;
+
+public class EtlResult implements Serializable {
+    private static final long serialVersionUID = 4250522736289866505L;
+
+    private boolean succeeded = false;
+
+    private String resultMessage;
+
+    private String errorMessage;
+
+    public boolean getSucceeded() {
+        return succeeded;
+    }
+
+    public void setSucceeded(boolean succeeded) {
+        this.succeeded = succeeded;
+    }
+
+    public String getResultMessage() {
+        return resultMessage;
+    }
+
+    public void setResultMessage(String resultMessage) {
+        this.resultMessage = resultMessage;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+}

+ 50 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java

@@ -2,10 +2,7 @@ package com.alibaba.otter.canal.client.adapter.support;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
+import java.sql.*;
 
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -21,6 +18,55 @@ public class JdbcTypeUtil {
 
     private static Logger logger = LoggerFactory.getLogger(JdbcTypeUtil.class);
 
+    public static Object getRSData(ResultSet rs, String columnName, int jdbcType) throws SQLException {
+        if (jdbcType == Types.BIT || jdbcType == Types.BOOLEAN) {
+            return rs.getByte(columnName);
+        } else {
+            return rs.getObject(columnName);
+        }
+    }
+
+    public static Class<?> jdbcType2javaType(int jdbcType) {
+        switch (jdbcType) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+//                return Boolean.class;
+            case Types.TINYINT:
+                return Byte.TYPE;
+            case Types.SMALLINT:
+                return Short.class;
+            case Types.INTEGER:
+                return Integer.class;
+            case Types.BIGINT:
+                return Long.class;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                return BigDecimal.class;
+            case Types.REAL:
+                return Float.class;
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                return Double.class;
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                return String.class;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                return byte[].class;
+            case Types.DATE:
+                return java.sql.Date.class;
+            case Types.TIME:
+                return Time.class;
+            case Types.TIMESTAMP:
+                return Timestamp.class;
+            default:
+                return String.class;
+        }
+    }
+
     public static Object typeConvert(String columnName, String value, int sqlType, String mysqlType) {
         if (value == null || value.equals("")) {
             return null;

+ 10 - 12
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -1,11 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -19,7 +14,7 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class MessageUtil {
 
-    public static void parse4Dml(Message message, Consumer<Dml> consumer) {
+    public static void parse4Dml(String destination, Message message, Consumer<Dml> consumer) {
         if (message == null) {
             return;
         }
@@ -42,6 +37,7 @@ public class MessageUtil {
             CanalEntry.EventType eventType = rowChange.getEventType();
 
             final Dml dml = new Dml();
+            dml.setDestination(destination);
             dml.setDatabase(entry.getHeader().getSchemaName());
             dml.setTable(entry.getHeader().getTableName());
             dml.setType(eventType.toString());
@@ -87,10 +83,11 @@ public class MessageUtil {
                         Map<String, Object> rowOld = new LinkedHashMap<>();
                         for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             if (updateSet.contains(column.getName())) {
-                                rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(column.getName(),
-                                    column.getValue(),
-                                    column.getSqlType(),
-                                    column.getMysqlType()));
+                                rowOld.put(column.getName(),
+                                    JdbcTypeUtil.typeConvert(column.getName(),
+                                        column.getValue(),
+                                        column.getSqlType(),
+                                        column.getMysqlType()));
                             }
                         }
                         // update操作将记录修改前的值
@@ -110,11 +107,12 @@ public class MessageUtil {
         }
     }
 
-    public static Dml flatMessage2Dml(FlatMessage flatMessage) {
+    public static Dml flatMessage2Dml(String destination, FlatMessage flatMessage) {
         if (flatMessage == null) {
             return null;
         }
         Dml dml = new Dml();
+        dml.setDestination(destination);
         dml.setDatabase(flatMessage.getDatabase());
         dml.setTable(flatMessage.getTable());
         dml.setType(flatMessage.getType());

+ 8 - 8
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalOuterAdapterConfiguration.java → client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/OuterAdapterConfig.java

@@ -1,6 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * 外部适配器配置信息类
@@ -8,15 +8,15 @@ import java.util.Properties;
  * @author machengyuan 2018-8-18 下午10:15:12
  * @version 1.0.0
  */
-public class CanalOuterAdapterConfiguration {
+public class OuterAdapterConfig {
 
-    private String     name;      // 适配器名称, 如: logger, hbase, es
+    private String              name;       // 适配器名称, 如: logger, hbase, es
 
-    private String     hosts;     // 适配器内部的地址, 比如对应es该参数可以填写es的server地址
+    private String              hosts;      // 适配器内部的地址, 比如对应es该参数可以填写es的server地址
 
-    private String     zkHosts;   // 适配器内部的ZK地址, 比如对应HBase该参数可以填写HBase对应的ZK地址
+    private String              zkHosts;    // 适配器内部的ZK地址, 比如对应HBase该参数可以填写HBase对应的ZK地址
 
-    private Properties properties; // 其余参数, 可填写适配器中的所需的配置信息
+    private Map<String, String> properties; // 其余参数, 可填写适配器中的所需的配置信息
 
     public String getName() {
         return name;
@@ -34,11 +34,11 @@ public class CanalOuterAdapterConfiguration {
         this.hosts = hosts;
     }
 
-    public Properties getProperties() {
+    public Map<String, String> getProperties() {
         return properties;
     }
 
-    public void setProperties(Properties properties) {
+    public void setProperties(Map<String, String> properties) {
         this.properties = properties;
     }
 

+ 50 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java

@@ -0,0 +1,50 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class Result implements Serializable {
+
+    public Integer code    = 20000;
+    public Object  data;
+    public String  message;
+    public Date    sysTime;
+
+    public static Result createSuccess(String message) {
+        Result result = new Result();
+        result.setMessage(message);
+        return result;
+    }
+
+    public Integer getCode() {
+        return code;
+    }
+
+    public void setCode(Integer code) {
+        this.code = code;
+    }
+
+    public Object getData() {
+        return data;
+    }
+
+    public void setData(Object data) {
+        this.data = data;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public Date getSysTime() {
+        return sysTime;
+    }
+
+    public void setSysTime(Date sysTime) {
+        this.sysTime = sysTime;
+    }
+}

+ 1 - 0
client-adapter/example/pom.xml

@@ -55,6 +55,7 @@
             <!-- deploy模块的packaging通常是jar,如果项目中没有java 源代码或资源文件,加上这一段配置使项目能通过构建 -->
             <plugin>
                 <artifactId>maven-jar-plugin</artifactId>
+                <version>3.0.2</version>
                 <configuration>
                     <archive>
                         <addMavenDescriptor>true</addMavenDescriptor>

+ 17 - 17
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/AbstractCanalAdapterWorker.java

@@ -10,7 +10,7 @@ import java.util.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -26,10 +26,10 @@ public abstract class AbstractCanalAdapterWorker {
 
     protected final Logger                    logger  = LoggerFactory.getLogger(this.getClass());
 
-    protected String                          canalDestination;                                  // canal实例
-    protected List<List<CanalOuterAdapter>>   canalOuterAdapters;                                // 外部适配器
-    protected ExecutorService                 groupInnerExecutorService;                         // 组内工作线程池
-    protected volatile boolean                running = false;                                   // 是否运行中
+    protected String                          canalDestination;                                                 // canal实例
+    protected List<List<OuterAdapter>>        canalOuterAdapters;                                               // 外部适配器
+    protected ExecutorService                 groupInnerExecutorService;                                        // 组内工作线程池
+    protected volatile boolean                running = false;                                                  // 是否运行中
     protected Thread                          thread  = null;
     protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
 
@@ -42,21 +42,21 @@ public abstract class AbstractCanalAdapterWorker {
     protected void writeOut(final Message message) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            final List<CanalOuterAdapter> adapters = outerAdapters;
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            final List<OuterAdapter> adapters = outerAdapters;
             futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
 
                 @Override
                 public Boolean call() {
                     try {
                         // 组内适配器穿行运行,尽量不要配置组内适配器
-                        for (final CanalOuterAdapter c : adapters) {
+                        for (final OuterAdapter c : adapters) {
                             long begin = System.currentTimeMillis();
-                            MessageUtil.parse4Dml(message, new MessageUtil.Consumer<Dml>() {
+                            MessageUtil.parse4Dml(canalDestination, message, new MessageUtil.Consumer<Dml>() {
 
                                 @Override
                                 public void accept(Dml dml) {
-                                    c.writeOut(dml);
+                                    c.sync(dml);
                                 }
                             });
 
@@ -90,18 +90,18 @@ public abstract class AbstractCanalAdapterWorker {
     protected void writeOut(final FlatMessage flatMessage) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            final List<CanalOuterAdapter> adapters = outerAdapters;
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            final List<OuterAdapter> adapters = outerAdapters;
             futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
 
                 @Override
                 public Boolean call() {
                     try {
                         // 组内适配器穿行运行,尽量不要配置组内适配器
-                        for (CanalOuterAdapter c : adapters) {
+                        for (OuterAdapter c : adapters) {
                             long begin = System.currentTimeMillis();
-                            Dml dml = MessageUtil.flatMessage2Dml(flatMessage);
-                            c.writeOut(dml);
+                            Dml dml = MessageUtil.flatMessage2Dml(canalDestination, flatMessage);
+                            c.sync(dml);
                             if (logger.isDebugEnabled()) {
                                 logger.debug("{} elapsed time: {}",
                                     c.getClass().getName(),
@@ -154,8 +154,8 @@ public abstract class AbstractCanalAdapterWorker {
         }
         groupInnerExecutorService.shutdown();
         logger.info("topic connectors' worker thread dead!");
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            for (CanalOuterAdapter adapter : outerAdapters) {
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            for (OuterAdapter adapter : outerAdapters) {
                 adapter.destroy();
             }
         }

+ 4 - 5
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -4,12 +4,11 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.errors.WakeupException;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -30,7 +29,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     private boolean             flatMessage;
 
     public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
-                                   List<List<CanalOuterAdapter>> canalOuterAdapters, boolean flatMessage){
+                                   List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
@@ -81,8 +80,8 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             }
             groupInnerExecutorService.shutdown();
             logger.info("topic {} connectors' worker thread dead!", this.topic);
-            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (CanalOuterAdapter adapter : outerAdapters) {
+            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (OuterAdapter adapter : outerAdapters) {
                     adapter.destroy();
                 }
             }

+ 12 - 12
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -13,9 +13,9 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
 
 /**
@@ -33,7 +33,7 @@ public class CanalAdapterLoader {
 
     private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
 
-    private ExtensionLoader<CanalOuterAdapter>      loader;
+    private ExtensionLoader<OuterAdapter>      loader;
 
     public CanalAdapterLoader(CanalClientConfig canalClientConfig){
         this.canalClientConfig = canalClientConfig;
@@ -48,7 +48,7 @@ public class CanalAdapterLoader {
             throw new RuntimeException("Blank config property: canalInstances or canalMQTopics");
         }
 
-        loader = ExtensionLoader.getExtensionLoader(CanalOuterAdapter.class, "" /*
+        loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class, "" /*
                                                                                  * TODO
                                                                                  * canalClientConfig
                                                                                  * .
@@ -68,11 +68,11 @@ public class CanalAdapterLoader {
         // 初始化canal-client的适配器
         if (canalClientConfig.getCanalInstances() != null) {
             for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
-                List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
                 for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
-                    List<CanalOuterAdapter> canalOutConnectors = new ArrayList<>();
-                    for (CanalOuterAdapterConfiguration c : connectorGroup.getOutAdapters()) {
+                    List<OuterAdapter> canalOutConnectors = new ArrayList<>();
+                    for (OuterAdapterConfig c : connectorGroup.getOutAdapters()) {
                         loadConnector(c, canalOutConnectors);
                     }
                     canalOuterAdapterGroups.add(canalOutConnectors);
@@ -93,11 +93,11 @@ public class CanalAdapterLoader {
         if (canalClientConfig.getMqTopics() != null) {
             for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
                 for (CanalClientConfig.Group group : topic.getGroups()) {
-                    List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
-                    List<CanalOuterAdapter> canalOuterAdapters = new ArrayList<>();
+                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
 
-                    for (CanalOuterAdapterConfiguration config : group.getOutAdapters()) {
+                    for (OuterAdapterConfig config : group.getOutAdapters()) {
                         loadConnector(config, canalOuterAdapters);
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
@@ -125,9 +125,9 @@ public class CanalAdapterLoader {
         }
     }
 
-    private void loadConnector(CanalOuterAdapterConfiguration config, List<CanalOuterAdapter> canalOutConnectors) {
+    private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
-            CanalOuterAdapter adapter = loader.getExtension(config.getName());
+            OuterAdapter adapter = loader.getExtension(config.getName());
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             // 替换ClassLoader
             Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());

+ 2 - 2
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java

@@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.errors.WakeupException;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
 import com.alibaba.otter.canal.protocol.Message;
@@ -26,7 +26,7 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
     private String                 topic;
 
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
-                                      List<List<CanalOuterAdapter>> canalOuterAdapters){
+                                      List<List<OuterAdapter>> canalOuterAdapters){
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());

+ 5 - 5
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterWorker.java

@@ -6,7 +6,7 @@ import java.util.concurrent.Executors;
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -31,7 +31,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param canalOuterAdapters 外部适配器组
      */
     public CanalAdapterWorker(String canalDestination, SocketAddress address,
-                              List<List<CanalOuterAdapter>> canalOuterAdapters){
+                              List<List<OuterAdapter>> canalOuterAdapters){
         this.canalOuterAdapters = canalOuterAdapters;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
@@ -46,7 +46,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param canalOuterAdapters 外部适配器组
      */
     public CanalAdapterWorker(String canalDestination, String zookeeperHosts,
-                              List<List<CanalOuterAdapter>> canalOuterAdapters){
+                              List<List<OuterAdapter>> canalOuterAdapters){
         this.canalOuterAdapters = canalOuterAdapters;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
@@ -96,8 +96,8 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
             }
             groupInnerExecutorService.shutdown();
             logger.info("destination {} adapters' worker thread dead!", canalDestination);
-            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (CanalOuterAdapter adapter : outerAdapters) {
+            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (OuterAdapter adapter : outerAdapters) {
                     adapter.destroy();
                 }
             }

+ 0 - 19
client-adapter/hbase/pom.xml

@@ -84,25 +84,6 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>2.4</version>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
 

+ 111 - 40
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -2,21 +2,29 @@ package com.alibaba.otter.canal.client.adapter.hbase;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
+import javax.sql.DataSource;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
+import com.alibaba.otter.canal.client.adapter.support.*;
 
 /**
  * HBase外部适配器
@@ -25,74 +33,128 @@ import com.alibaba.otter.canal.client.adapter.support.SPI;
  * @version 1.0.0
  */
 @SPI("hbase")
-public class HbaseAdapter implements CanalOuterAdapter {
+public class HbaseAdapter implements OuterAdapter {
+
+    private static Logger                              logger             = LoggerFactory.getLogger(HbaseAdapter.class);
 
-    private static volatile Map<String, MappingConfig> mappingConfigCache = null;
+    private static volatile Map<String, MappingConfig> hbaseMapping       = null;                                       // 文件名对应配置
+    private static volatile Map<String, MappingConfig> mappingConfigCache = null;                                       // 库名-表名对应配置
 
     private Connection                                 conn;
     private HbaseSyncService                           hbaseSyncService;
+    private HbaseTemplate                              hbaseTemplate;
 
     @Override
-    public void init(CanalOuterAdapterConfiguration configuration) {
+    public void init(OuterAdapterConfig configuration) {
         try {
             if (mappingConfigCache == null) {
                 synchronized (MappingConfig.class) {
                     if (mappingConfigCache == null) {
-                        Map<String, MappingConfig> hbaseMapping = MappingConfigLoader.load();
+                        hbaseMapping = MappingConfigLoader.load();
                         mappingConfigCache = new HashMap<>();
                         for (MappingConfig mappingConfig : hbaseMapping.values()) {
-                            mappingConfigCache.put(mappingConfig.getHbaseOrm().getDatabase() + "-"
-                                                   + mappingConfig.getHbaseOrm().getTable(), mappingConfig);
+                            mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getHbaseOrm().getDestination())
+                                                   + "." + mappingConfig.getHbaseOrm().getDatabase() + "."
+                                                   + mappingConfig.getHbaseOrm().getTable(),
+                                mappingConfig);
                         }
                     }
                 }
             }
 
-            String hosts = configuration.getZkHosts();
-            if (StringUtils.isEmpty(hosts)) {
-                hosts = configuration.getHosts();
-            }
-            if (StringUtils.isEmpty(hosts)) {
-                throw new RuntimeException("Empty zookeeper hosts");
-            }
-            String[] zkHosts = StringUtils.split(hosts, ",");
-            int zkPort = 0;
-            StringBuilder hostsWithoutPort = new StringBuilder();
-            for (String host : zkHosts) {
-                int i = host.indexOf(":");
-                hostsWithoutPort.append(host, 0, i);
-                hostsWithoutPort.append(",");
-                if (zkPort == 0) zkPort = Integer.parseInt(host.substring(i + 1));
-            }
-            hostsWithoutPort.deleteCharAt(hostsWithoutPort.length() - 1);
-
-            String znode = configuration.getProperties().getProperty("znodeParent");
-            if (StringUtils.isEmpty(znode)) {
-                znode = "/hbase";
-            }
+            Map<String, String> propertites = configuration.getProperties();
 
             Configuration hbaseConfig = HBaseConfiguration.create();
-            hbaseConfig.set("hbase.zookeeper.quorum", hostsWithoutPort.toString());
-            hbaseConfig.set("hbase.zookeeper.property.clientPort", Integer.toString(zkPort));
-            hbaseConfig.set("zookeeper.znode.parent", znode);
+            propertites.forEach(hbaseConfig::set);
             conn = ConnectionFactory.createConnection(hbaseConfig);
-            hbaseSyncService = new HbaseSyncService(conn);
+            hbaseTemplate = new HbaseTemplate(conn);
+            hbaseSyncService = new HbaseSyncService(hbaseTemplate);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
     @Override
-    public void writeOut(Dml dml) {
+    public void sync(Dml dml) {
         if (dml == null) {
             return;
         }
+        String destination = StringUtils.trimToEmpty(dml.getDestination());
         String database = dml.getDatabase();
         String table = dml.getTable();
-        MappingConfig config = mappingConfigCache.get(database + "-" + table);
+        MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table);
         hbaseSyncService.sync(config, dml);
     }
 
+    @Override
+    public EtlResult etl(String task, List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        MappingConfig config = hbaseMapping.get(task);
+        if (config != null) {
+            DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+            if (dataSource != null) {
+                return HbaseEtlService.importData(dataSource, hbaseTemplate, config, params);
+            } else {
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("DataSource not found");
+                return etlResult;
+            }
+        } else {
+            DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(task);
+            if (dataSource != null) {
+                StringBuilder resultMsg = new StringBuilder();
+                boolean resSucc = true;
+                // ds不为空说明传入的是datasourceKey
+                for (MappingConfig configTmp : hbaseMapping.values()) {
+                    // 取所有的datasourceKey为task的配置
+                    if (configTmp.getDataSourceKey().equals(task)) {
+                        EtlResult etlRes = HbaseEtlService.importData(dataSource, hbaseTemplate, configTmp, params);
+                        if (!etlRes.getSucceeded()) {
+                            resSucc = false;
+                            resultMsg.append(etlRes.getErrorMessage()).append("\n");
+                        } else {
+                            resultMsg.append(etlRes.getResultMessage()).append("\n");
+                        }
+                    }
+                }
+                if (resultMsg.length() > 0) {
+                    etlResult.setSucceeded(resSucc);
+                    if (resSucc) {
+                        etlResult.setResultMessage(resultMsg.toString());
+                    } else {
+                        etlResult.setErrorMessage(resultMsg.toString());
+                    }
+                    return etlResult;
+                }
+            }
+        }
+        etlResult.setSucceeded(false);
+        etlResult.setErrorMessage("Task not found");
+        return etlResult;
+    }
+
+    @Override
+    public Map<String, Object> count(String task) {
+        MappingConfig config = hbaseMapping.get(task);
+        String hbaseTable = config.getHbaseOrm().getHbaseTable();
+        long rowCount = 0L;
+        try {
+            HTable table = (HTable) conn.getTable(TableName.valueOf(hbaseTable));
+            Scan scan = new Scan();
+            scan.setFilter(new FirstKeyOnlyFilter());
+            ResultScanner resultScanner = table.getScanner(scan);
+            for (Result result : resultScanner) {
+                rowCount += result.size();
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+        Map<String, Object> res = new LinkedHashMap<>();
+        res.put("hbaseTable", hbaseTable);
+        res.put("count", rowCount);
+        return res;
+    }
+
     @Override
     public void destroy() {
         if (conn != null) {
@@ -103,4 +165,13 @@ public class HbaseAdapter implements CanalOuterAdapter {
             }
         }
     }
+
+    @Override
+    public String getDestination(String task) {
+        MappingConfig config = hbaseMapping.get(task);
+        if (config != null && config.getHbaseOrm() != null) {
+            return config.getHbaseOrm().getDestination();
+        }
+        return null;
+    }
 }

+ 18 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -13,9 +13,18 @@ import java.util.Set;
  * @version 1.0.0
  */
 public class MappingConfig {
+    private String dataSourceKey;
 
     private HbaseOrm hbaseOrm;
 
+    public String getDataSourceKey() {
+        return dataSourceKey;
+    }
+
+    public void setDataSourceKey(String dataSourceKey) {
+        this.dataSourceKey = dataSourceKey;
+    }
+
     public HbaseOrm getHbaseOrm() {
         return hbaseOrm;
     }
@@ -137,6 +146,7 @@ public class MappingConfig {
     public static class HbaseOrm {
 
         private Mode                    mode               = Mode.STRING;
+        private String                  destination;
         private String                  database;
         private String                  table;
         private String                  hbaseTable;
@@ -161,6 +171,14 @@ public class MappingConfig {
             this.mode = mode;
         }
 
+        public String getDestination() {
+            return destination;
+        }
+
+        public void setDestination(String destination) {
+            this.destination = destination;
+        }
+
         public String getDatabase() {
             return database;
         }

+ 4 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -37,7 +37,7 @@ public class MappingConfigLoader {
 
         Map<String, MappingConfig> result = new LinkedHashMap<>();
 
-        Collection<String> configs = AdapterConfigs.configs.get("hbase");
+        Collection<String> configs = AdapterConfigs.get("hbase");
         for (String c : configs) {
             if (c == null) {
                 continue;
@@ -72,6 +72,7 @@ public class MappingConfigLoader {
                 String[] dbTable;
                 if (dsKey == null) {
                     dbTable = srcMeta.split("\\.");
+
                 } else {
                     dbTable = srcMeta.split("@")[0].split("\\.");
                 }
@@ -84,7 +85,7 @@ public class MappingConfigLoader {
                     hbaseOrm.setAutoCreateTable(true);
                     hbaseOrm.setDatabase(dbTable[0]);
                     hbaseOrm.setTable(dbTable[1]);
-                    hbaseOrm.setMode(MappingConfig.Mode.STRING);
+                    hbaseOrm.setMode(MappingConfig.Mode.PHOENIX);
                     hbaseOrm.setRowKey(rowKey);
                     // 有定义rowKey
                     if (rowKey != null) {
@@ -94,6 +95,7 @@ public class MappingConfigLoader {
                         hbaseOrm.setRowKeyColumn(columnItem);
                     }
                     config.setHbaseOrm(hbaseOrm);
+                    config.setDataSourceKey(dsKey);
 
                 } else {
                     throw new RuntimeException(String.format("配置项[%s]内容为空, 或格式不符合database.table", c));

+ 353 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -0,0 +1,353 @@
+package com.alibaba.otter.canal.client.adapter.hbase.service;
+
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import javax.sql.DataSource;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.hbase.support.*;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.JdbcTypeUtil;
+import com.google.common.base.Joiner;
+
+public class HbaseEtlService {
+
+    private static Logger logger = LoggerFactory.getLogger(HbaseEtlService.class);
+
+    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
+        Connection conn = null;
+        Statement stmt = null;
+        ResultSet rs = null;
+        try {
+            conn = ds.getConnection();
+            stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            stmt.setFetchSize(Integer.MIN_VALUE);
+            rs = stmt.executeQuery(sql);
+            return fun.apply(rs);
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            rs = null;
+            stmt = null;
+            conn = null;
+        }
+    }
+
+    public static void createTable(HbaseTemplate hbaseTemplate, MappingConfig config) {
+        try {
+            // 判断hbase表是否存在,不存在则建表
+            MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
+            if (!hbaseTemplate.tableExists(hbaseOrm.getHbaseTable())) {
+                hbaseTemplate.createTable(hbaseOrm.getHbaseTable(), hbaseOrm.getFamily());
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static EtlResult importData(DataSource ds, HbaseTemplate hbaseTemplate, MappingConfig config,
+                                       List<String> params) {
+        EtlResult etlResult = new EtlResult();
+        AtomicLong successCount = new AtomicLong();
+        List<String> errMsg = new ArrayList<>();
+        String hbaseTable = "";
+        try {
+            if (config == null) {
+                logger.error("Config is null!");
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("Config is null!");
+                return etlResult;
+            }
+            MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
+            hbaseTable = hbaseOrm.getHbaseTable();
+
+            long start = System.currentTimeMillis();
+
+            if (params != null && params.size() == 1 && "rebuild".equalsIgnoreCase(params.get(0))) {
+                logger.info(hbaseOrm.getHbaseTable() + " rebuild is starting!");
+                // 如果表存在则删除
+                if (hbaseTemplate.tableExists(hbaseOrm.getHbaseTable())) {
+                    hbaseTemplate.disableTable(hbaseOrm.getHbaseTable());
+                    hbaseTemplate.deleteTable(hbaseOrm.getHbaseTable());
+                }
+                params = null;
+            } else {
+                logger.info(hbaseOrm.getHbaseTable() + " etl is starting!");
+            }
+            createTable(hbaseTemplate, config);
+
+            // 拼接sql
+            String sql = "SELECT * FROM " + config.getHbaseOrm().getDatabase() + "." + hbaseOrm.getTable();
+
+            // 拼接条件
+            if (params != null && params.size() == 1 && hbaseOrm.getEtlCondition() == null) {
+                AtomicBoolean stExists = new AtomicBoolean(false);
+                // 验证是否有SYS_TIME字段
+                sqlRS(ds, sql, rs -> {
+                    try {
+                        ResultSetMetaData rsmd = rs.getMetaData();
+                        int cnt = rsmd.getColumnCount();
+                        for (int i = 1; i <= cnt; i++) {
+                            String columnName = rsmd.getColumnName(i);
+                            if ("SYS_TIME".equalsIgnoreCase(columnName)) {
+                                stExists.set(true);
+                                break;
+                            }
+                        }
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                    return null;
+                });
+                if (stExists.get()) {
+                    sql += " WHERE SYS_TIME >= '" + params.get(0) + "' ";
+                }
+            } else if (hbaseOrm.getEtlCondition() != null && params != null) {
+                String etlCondition = hbaseOrm.getEtlCondition();
+                int size = params.size();
+                for (int i = 0; i < size; i++) {
+                    etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
+                }
+
+                sql += " " + etlCondition;
+            }
+
+            // 获取总数
+            String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
+            long cnt = (Long) sqlRS(ds, countSql, rs -> {
+                Long count = null;
+                try {
+                    if (rs.next()) {
+                        count = ((Number) rs.getObject(1)).longValue();
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+                return count == null ? 0 : count;
+            });
+
+            // 当大于1万条记录时开启多线程
+            if (cnt >= 10000) {
+                int threadCount = 3;
+                long perThreadCnt = cnt / threadCount;
+                ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                List<Future<Boolean>> futures = new ArrayList<>(threadCount);
+                for (int i = 0; i < threadCount; i++) {
+                    long offset = i * perThreadCnt;
+                    Long size = null;
+                    if (i != threadCount - 1) {
+                        size = perThreadCnt;
+                    }
+                    String sqlFinal;
+                    if (size != null) {
+                        sqlFinal = sql + " LIMIT " + offset + "," + size;
+                    } else {
+                        sqlFinal = sql + " LIMIT " + offset + "," + cnt;
+                    }
+                    Future<Boolean> future = executor
+                        .submit(() -> executeSqlImport(ds, sqlFinal, hbaseOrm, hbaseTemplate, successCount, errMsg));
+                    futures.add(future);
+                }
+
+                for (Future<Boolean> future : futures) {
+                    future.get();
+                }
+
+                executor.shutdown();
+            } else {
+                executeSqlImport(ds, sql, hbaseOrm, hbaseTemplate, successCount, errMsg);
+            }
+
+            logger.info(
+                hbaseOrm.getHbaseTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+
+            etlResult.setResultMessage("导入HBase表 " + hbaseOrm.getHbaseTable() + " 数据:" + successCount.get() + " 条");
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
+        }
+
+        if (errMsg.isEmpty()) {
+            etlResult.setSucceeded(true);
+        } else {
+            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
+        }
+        return etlResult;
+    }
+
+    private static boolean executeSqlImport(DataSource ds, String sql, MappingConfig.HbaseOrm hbaseOrm,
+                                            HbaseTemplate hbaseTemplate, AtomicLong successCount, List<String> errMsg) {
+        try {
+            sqlRS(ds, sql, rs -> {
+                int i = 1;
+
+                try {
+                    boolean complete = false;
+                    List<HRow> rows = new ArrayList<>();
+                    String[] rowKeyColumns = null;
+                    if (hbaseOrm.getRowKey() != null) {
+                        rowKeyColumns = hbaseOrm.getRowKey().trim().split(",");
+                    }
+                    while (rs.next()) {
+                        int cc = rs.getMetaData().getColumnCount();
+                        int[] jdbcTypes = new int[cc];
+                        Class<?>[] classes = new Class[cc];
+                        for (int j = 1; j <= cc; j++) {
+                            int jdbcType = rs.getMetaData().getColumnType(j);
+                            jdbcTypes[j - 1] = jdbcType;
+                            classes[j - 1] = JdbcTypeUtil.jdbcType2javaType(jdbcType);
+                        }
+                        HRow row = new HRow();
+
+                        if (rowKeyColumns != null) {
+                            // 取rowKey字段拼接
+                            StringBuilder rowKeyVale = new StringBuilder();
+                            for (String rowKeyColumnName : rowKeyColumns) {
+                                Object obj = rs.getObject(rowKeyColumnName);
+                                if (obj != null) {
+                                    rowKeyVale.append(obj.toString());
+                                }
+                                rowKeyVale.append("|");
+                            }
+                            int len = rowKeyVale.length();
+                            if (len > 0) {
+                                rowKeyVale.delete(len - 1, len);
+                            }
+                            row.setRowKey(Bytes.toBytes(rowKeyVale.toString()));
+                        }
+
+                        for (int j = 1; j <= cc; j++) {
+                            String columnName = rs.getMetaData().getColumnName(j);
+
+                            Object val = JdbcTypeUtil.getRSData(rs, columnName, jdbcTypes[j - 1]);
+                            if (val == null) {
+                                continue;
+                            }
+
+                            MappingConfig.ColumnItem columnItem = hbaseOrm.getColumnItems().get(columnName);
+                            // 没有配置映射
+                            if (columnItem == null) {
+                                String family = hbaseOrm.getFamily();
+                                String qualifile = columnName;
+                                if (hbaseOrm.isUppercaseQualifier()) {
+                                    qualifile = qualifile.toUpperCase();
+                                }
+                                if (MappingConfig.Mode.STRING == hbaseOrm.getMode()) {
+                                    if (hbaseOrm.getRowKey() == null && j == 1) {
+                                        row.setRowKey(Bytes.toBytes(val.toString()));
+                                    } else {
+                                        row.addCell(family, qualifile, Bytes.toBytes(val.toString()));
+                                    }
+                                } else if (MappingConfig.Mode.NATIVE == hbaseOrm.getMode()) {
+                                    Type type = Type.getType(classes[j - 1]);
+                                    if (hbaseOrm.getRowKey() == null && j == 1) {
+                                        row.setRowKey(TypeUtil.toBytes(val, type));
+                                    } else {
+                                        row.addCell(family, qualifile, TypeUtil.toBytes(val, type));
+                                    }
+                                } else if (MappingConfig.Mode.PHOENIX == hbaseOrm.getMode()) {
+                                    PhType phType = PhType.getType(classes[j - 1]);
+                                    if (hbaseOrm.getRowKey() == null && j == 1) {
+                                        row.setRowKey(PhTypeUtil.toBytes(val, phType));
+                                    } else {
+                                        row.addCell(family, qualifile, PhTypeUtil.toBytes(val, phType));
+                                    }
+                                }
+                            } else {
+                                // 如果不需要类型转换
+                                if (columnItem.getType() == null || "".equals(columnItem.getType())) {
+                                    if (val instanceof java.sql.Date) {
+                                        SimpleDateFormat dateFmt = new SimpleDateFormat("yyyy-MM-dd");
+                                        val = dateFmt.format((Date) val);
+                                    } else if (val instanceof Timestamp) {
+                                        SimpleDateFormat datetimeFmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                                        val = datetimeFmt.format((Date) val);
+                                    }
+
+                                    byte[] valBytes = Bytes.toBytes(val.toString());
+                                    if (columnItem.isRowKey()) {
+                                        row.setRowKey(valBytes);
+                                    } else {
+                                        row.addCell(columnItem.getFamily(), columnItem.getQualifier(), valBytes);
+                                    }
+                                } else {
+                                    PhType phType = PhType.getType(columnItem.getType());
+                                    if (columnItem.isRowKey()) {
+                                        row.setRowKey(PhTypeUtil.toBytes(val, phType));
+                                    } else {
+                                        row.addCell(columnItem.getFamily(),
+                                            columnItem.getQualifier(),
+                                            PhTypeUtil.toBytes(val, phType));
+                                    }
+                                }
+                            }
+                        }
+
+                        if (row.getRowKey() == null) throw new RuntimeException("RowKey 值为空");
+
+                        rows.add(row);
+                        complete = false;
+                        if (i % hbaseOrm.getCommitBatch() == 0 && !rows.isEmpty()) {
+                            hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+                            rows.clear();
+                            complete = true;
+                        }
+                        i++;
+                        successCount.incrementAndGet();
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("successful import count:" + successCount.get());
+                        }
+                    }
+
+                    if (!complete && !rows.isEmpty()) {
+                        hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+                    }
+
+                } catch (Exception e) {
+                    logger.error(hbaseOrm.getHbaseTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(hbaseOrm.getHbaseTable() + " etl failed! ==>" + e.getMessage());
+                    // throw new RuntimeException(e);
+                }
+                return i;
+            });
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+    }
+}

+ 6 - 15
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -1,23 +1,13 @@
 package com.alibaba.otter.canal.client.adapter.hbase.service;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.hbase.support.HRow;
-import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
-import com.alibaba.otter.canal.client.adapter.hbase.support.PhType;
-import com.alibaba.otter.canal.client.adapter.hbase.support.PhTypeUtil;
-import com.alibaba.otter.canal.client.adapter.hbase.support.Type;
-import com.alibaba.otter.canal.client.adapter.hbase.support.TypeUtil;
+import com.alibaba.otter.canal.client.adapter.hbase.support.*;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
 /**
@@ -32,8 +22,8 @@ public class HbaseSyncService {
 
     private HbaseTemplate hbaseTemplate;                                    // HBase操作模板
 
-    public HbaseSyncService(Connection conn){
-        hbaseTemplate = new HbaseTemplate(conn);
+    public HbaseSyncService(HbaseTemplate hbaseTemplate){
+        this.hbaseTemplate = hbaseTemplate;
     }
 
     public void sync(MappingConfig config, Dml dml) {
@@ -375,7 +365,8 @@ public class HbaseSyncService {
      * @param value 值
      * @return 复合字段rowKey
      */
-    private static byte[] typeConvert(MappingConfig.ColumnItem columnItem, MappingConfig.HbaseOrm hbaseOrm, Object value) {
+    private static byte[] typeConvert(MappingConfig.ColumnItem columnItem, MappingConfig.HbaseOrm hbaseOrm,
+                                      Object value) {
         if (value == null) {
             return null;
         }

+ 2 - 2
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/TypeUtil.java

@@ -92,6 +92,7 @@ public class TypeUtil {
         return b;
     }
 
+    @SuppressWarnings("unchecked")
     public static <T> T toObject(byte[] bytes, Class<T> clazz) {
         if (bytes == null) {
             return null;
@@ -132,10 +133,10 @@ public class TypeUtil {
         } else {
             throw new IllegalArgumentException("mismatch class type");
         }
-        // noinspection unchecked
         return (T) res;
     }
 
+    @SuppressWarnings("unchecked")
     public static <T> T toObject(byte[] bytes, Type type) {
         if (bytes == null) {
             return null;
@@ -182,7 +183,6 @@ public class TypeUtil {
         } else {
             throw new IllegalArgumentException("mismatch class type");
         }
-        // noinspection unchecked
         return (T) res;
     }
 }

+ 0 - 0
client-adapter/hbase/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.CanalOuterAdapter → client-adapter/hbase/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter


+ 2 - 0
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -1,5 +1,7 @@
+dataSourceKey: defaultDS
 hbaseOrm:
   mode: PHOENIX  #NATIVE   #STRING
+  destination: example
   database: mytest  # 数据库名
   table: person2     # 数据库表名
   hbaseTable: MYTEST.PERSON2   # HBase表名

+ 34 - 0
client-adapter/launcher/assembly.xml

@@ -0,0 +1,34 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+    <id>canal-adapter</id>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.directory}</directory>
+            <outputDirectory>/</outputDirectory>
+            <includes>
+                <include>*.jar</include>
+            </includes>
+        </fileSet>
+
+        <fileSet>
+            <directory>${project.build.directory}/lib</directory>
+            <outputDirectory>/lib/</outputDirectory>
+            <includes>
+                <include>*.jar</include>
+            </includes>
+        </fileSet>
+
+        <fileSet>
+            <directory>${project.build.directory}/config</directory>
+            <outputDirectory>/config/</outputDirectory>
+            <includes>
+                <include>**</include>
+            </includes>
+        </fileSet>
+    </fileSets>
+    <baseDirectory>/</baseDirectory>
+</assembly>

+ 36 - 0
client-adapter/launcher/pom.xml

@@ -50,6 +50,23 @@
             <artifactId>spring-boot-configuration-processor</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>2.10.0</version>
+        </dependency>
+        <!-- 单独引入rocketmq依赖 -->
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>4.3.0</version>
+        </dependency>
+        <!-- 单独引入kafka依赖 -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>1.1.1</version>
+        </dependency>
 
         <!-- outer adapter jar with dependencies-->
         <dependency>
@@ -81,6 +98,7 @@
     </dependencies>
 
     <build>
+        <finalName>canal-adapter-launcher</finalName>
         <plugins>
             <plugin>
                 <groupId>org.springframework.boot</groupId>
@@ -132,6 +150,24 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <descriptors>
+                        <descriptor>assembly.xml</descriptor>
+                    </descriptors>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>

+ 0 - 44
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/CanalAdapterApplication.java

@@ -1,56 +1,12 @@
 package com.alibaba.otter.canal.adapter.launcher;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-
-import com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader;
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 
-
 @SpringBootApplication
 public class CanalAdapterApplication {
-    private static final Logger logger = LoggerFactory.getLogger(CanalAdapterApplication.class);
-
-    private static CanalAdapterLoader adapterLoader;
 
     public static void main(String[] args) {
         new SpringApplicationBuilder(CanalAdapterApplication.class).run(args);
     }
-
-    @Resource
-    private CanalClientConfig canalClientConf;
-
-    @PostConstruct
-    public void init() {
-        if (adapterLoader == null) {
-            try {
-                logger.info("## start the canal client adapters.");
-                adapterLoader = new CanalAdapterLoader(canalClientConf);
-                adapterLoader.init();
-                logger.info("## the canal client adapters are running now ......");
-            } catch (Throwable e) {
-                logger.error("## something goes wrong when starting up the canal client adapters:", e);
-                System.exit(0);
-            }
-        }
-    }
-
-    @PreDestroy
-    public void destroy() {
-        try {
-            logger.info("## stop the canal client adapters");
-            if (adapterLoader != null) {
-                adapterLoader.destroy();
-            }
-        } catch (Throwable e) {
-            logger.warn("## something goes wrong when stopping canal client adapters:", e);
-        } finally {
-            logger.info("## canal client adapters are down.");
-        }
-    }
 }

+ 114 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/EtlLock.java

@@ -0,0 +1,114 @@
+package com.alibaba.otter.canal.adapter.launcher.common;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.otter.canal.adapter.launcher.config.CuratorClient;
+
+@Component
+public class EtlLock {
+
+    private static final Map<String, ReentrantLock>     LOCAL_LOCK       = new ConcurrentHashMap<>();
+
+    private static final Map<String, InterProcessMutex> DISTRIBUTED_LOCK = new ConcurrentHashMap<>();
+
+    private static Mode                                 mode             = Mode.LOCAL;
+
+    @Resource
+    private CuratorClient                               curatorClient;
+
+    @PostConstruct
+    public void init() {
+        CuratorFramework curator = curatorClient.getCurator();
+        if (curator != null) {
+            mode = Mode.DISTRIBUTED;
+        } else {
+            mode = Mode.LOCAL;
+        }
+    }
+
+    private ReentrantLock getLock(String key) {
+        ReentrantLock lock = LOCAL_LOCK.get(key);
+        if (lock == null) {
+            synchronized (EtlLock.class) {
+                lock = LOCAL_LOCK.get(key);
+                if (lock == null) {
+                    lock = new ReentrantLock();
+                    LOCAL_LOCK.put(key, lock);
+                }
+            }
+        }
+        return lock;
+    }
+
+    private InterProcessMutex getRemoteLock(String key) {
+        InterProcessMutex lock = DISTRIBUTED_LOCK.get(key);
+        if (lock == null) {
+            synchronized (EtlLock.class) {
+                lock = DISTRIBUTED_LOCK.get(key);
+                if (lock == null) {
+                    lock = new InterProcessMutex(curatorClient.getCurator(), key);
+                    DISTRIBUTED_LOCK.put(key, lock);
+                }
+            }
+        }
+        return lock;
+    }
+
+    public void lock(String key) throws Exception {
+        if (mode == Mode.LOCAL) {
+            getLock(key).lock();
+        } else {
+            InterProcessMutex lock = getRemoteLock(key);
+            lock.acquire();
+        }
+    }
+
+    public boolean tryLock(String key, long timeout, TimeUnit unit) {
+        try {
+            if (mode == Mode.LOCAL) {
+                return getLock(key).tryLock(timeout, unit);
+            } else {
+                InterProcessMutex lock = getRemoteLock(key);
+                return lock.acquire(timeout, unit);
+            }
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    public boolean tryLock(String key) {
+        try {
+            if (mode == Mode.LOCAL) {
+                return getLock(key).tryLock();
+            } else {
+                InterProcessMutex lock = getRemoteLock(key);
+                return lock.acquire(500, TimeUnit.MILLISECONDS);
+            }
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    public void unlock(String key) {
+        if (mode == Mode.LOCAL) {
+            getLock(key).unlock();
+        } else {
+            InterProcessMutex lock = getRemoteLock(key);
+            try {
+                lock.release();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+    }
+}

+ 6 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/Mode.java

@@ -0,0 +1,6 @@
+package com.alibaba.otter.canal.adapter.launcher.common;
+
+public enum Mode {
+                  LOCAL, // 本地模式
+                  DISTRIBUTED // 分布式
+}

+ 208 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@@ -0,0 +1,208 @@
+package com.alibaba.otter.canal.adapter.launcher.common;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
+import com.alibaba.otter.canal.adapter.launcher.config.CuratorClient;
+import com.alibaba.otter.canal.common.utils.BooleanMutex;
+
+@Component
+public class SyncSwitch {
+
+    private static final String                    SYN_SWITCH_ZK_NODE = "/sync-switch/";
+
+    private static final Map<String, BooleanMutex> LOCAL_LOCK        = new ConcurrentHashMap<>();
+
+    private static final Map<String, BooleanMutex> DISTRIBUTED_LOCK  = new ConcurrentHashMap<>();
+
+    private static Mode                            mode              = Mode.LOCAL;
+
+    @Resource
+    private AdapterCanalConfig                     adapterCanalConfig;
+    @Resource
+    private CuratorClient                          curatorClient;
+
+    @PostConstruct
+    public void init() {
+        CuratorFramework curator = curatorClient.getCurator();
+        if (curator != null) {
+            mode = Mode.DISTRIBUTED;
+            DISTRIBUTED_LOCK.clear();
+            for (String destination : adapterCanalConfig.DESTINATIONS) {
+                // 对应每个destination注册锁
+                BooleanMutex mutex = new BooleanMutex(true);
+                initMutex(curator, destination, mutex);
+                DISTRIBUTED_LOCK.put(destination, mutex);
+                startListen(destination, mutex);
+            }
+        } else {
+            mode = Mode.LOCAL;
+            LOCAL_LOCK.clear();
+            for (String destination : adapterCanalConfig.DESTINATIONS) {
+                // 对应每个destination注册锁
+                LOCAL_LOCK.put(destination, new BooleanMutex(true));
+            }
+        }
+    }
+
+    private synchronized void startListen(String destination, BooleanMutex mutex) {
+        try {
+            String path = SYN_SWITCH_ZK_NODE + destination;
+            CuratorFramework curator = curatorClient.getCurator();
+            final NodeCache nodeCache = new NodeCache(curator, path);
+            nodeCache.start();
+            nodeCache.getListenable().addListener(() -> initMutex(curator, destination, mutex));
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    private synchronized void initMutex(CuratorFramework curator, String destination, BooleanMutex mutex) {
+        try {
+            String path = SYN_SWITCH_ZK_NODE + destination;
+            Stat stat = curator.checkExists().forPath(path);
+            if (stat == null) {
+                if (!mutex.state()) {
+                    mutex.set(true);
+                }
+            } else {
+                String data = new String(curator.getData().forPath(path), StandardCharsets.UTF_8);
+                if ("on".equals(data)) {
+                    if (!mutex.state()) {
+                        mutex.set(true);
+                    }
+                } else {
+                    if (mutex.state()) {
+                        mutex.set(false);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    public synchronized void off(String destination) {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null && mutex.state()) {
+                mutex.set(false);
+            }
+        } else {
+            try {
+                String path = SYN_SWITCH_ZK_NODE + destination;
+                try {
+                    curatorClient.getCurator()
+                        .create()
+                        .creatingParentContainersIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, "off".getBytes(StandardCharsets.UTF_8));
+                } catch (Exception e) {
+                    curatorClient.getCurator().setData().forPath(path, "off".getBytes(StandardCharsets.UTF_8));
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public synchronized void on(String destination) {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null && !mutex.state()) {
+                mutex.set(true);
+            }
+        } else {
+            try {
+                String path = SYN_SWITCH_ZK_NODE + destination;
+                try {
+                    curatorClient.getCurator()
+                        .create()
+                        .creatingParentContainersIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, "on".getBytes(StandardCharsets.UTF_8));
+                } catch (Exception e) {
+                    curatorClient.getCurator().setData().forPath(path, "on".getBytes(StandardCharsets.UTF_8));
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public synchronized void release(String destination) {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null && !mutex.state()) {
+                mutex.set(true);
+            }
+        }
+        if (mode == Mode.DISTRIBUTED) {
+            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
+            if (mutex != null && !mutex.state()) {
+                mutex.set(true);
+            }
+        }
+    }
+
+    public Boolean status(String destination) {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null) {
+                return mutex.state();
+            } else {
+                return null;
+            }
+        } else {
+            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
+            if (mutex != null) {
+                return mutex.state();
+            } else {
+                return null;
+            }
+        }
+    }
+
+    public void get(String destination) throws InterruptedException {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null) {
+                mutex.get();
+            }
+        } else {
+            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
+            if (mutex != null) {
+                mutex.get();
+            }
+        }
+    }
+
+    public void get(String destination, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
+        if (mode == Mode.LOCAL) {
+            BooleanMutex mutex = LOCAL_LOCK.get(destination);
+            if (mutex != null) {
+                mutex.get(timeout, unit);
+            }
+        } else {
+            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
+            if (mutex != null) {
+                mutex.get(timeout, unit);
+            }
+        }
+    }
+
+
+}

+ 37 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -1,10 +1,45 @@
 package com.alibaba.otter.canal.adapter.launcher.config;
 
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
-@ConfigurationProperties(prefix = "canal.conf")
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+
 @Component
+@ConfigurationProperties(prefix = "canal.conf")
 public class AdapterCanalConfig extends CanalClientConfig {
+
+    public final Set<String> DESTINATIONS = new LinkedHashSet<>();
+
+    @Override
+    public void setCanalInstances(List<CanalInstance> canalInstances) {
+        super.setCanalInstances(canalInstances);
+
+        if (canalInstances != null) {
+            synchronized (DESTINATIONS) {
+                DESTINATIONS.clear();
+                for (CanalInstance canalInstance : canalInstances) {
+                    DESTINATIONS.add(canalInstance.getInstance());
+                }
+            }
+        }
+    }
+
+    @Override
+    public void setMqTopics(List<MQTopic> mqTopics) {
+        super.setMqTopics(mqTopics);
+
+        if (mqTopics != null) {
+            synchronized (DESTINATIONS) {
+                DESTINATIONS.clear();
+                for (MQTopic mqTopic : mqTopics) {
+                    DESTINATIONS.add(mqTopic.getTopic());
+                }
+            }
+        }
+    }
 }

+ 80 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java

@@ -0,0 +1,80 @@
+package com.alibaba.otter.canal.adapter.launcher.config;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+
+@Component
+@ConfigurationProperties(prefix = "adapter.conf")
+public class AdapterConfig {
+
+    private static Logger                 logger = LoggerFactory.getLogger(AdapterConfig.class);
+
+    private Map<String, DatasourceConfig> datasourceConfigs;
+
+    private List<String>                  adapterConfigs;
+
+    public List<String> getAdapterConfigs() {
+        return adapterConfigs;
+    }
+
+    public Map<String, DatasourceConfig> getDatasourceConfigs() {
+        return datasourceConfigs;
+    }
+
+    public void setDatasourceConfigs(Map<String, DatasourceConfig> datasourceConfigs) {
+        this.datasourceConfigs = datasourceConfigs;
+
+        if (datasourceConfigs != null) {
+            for (Map.Entry<String, DatasourceConfig> entry : datasourceConfigs.entrySet()) {
+                DatasourceConfig datasourceConfig = entry.getValue();
+                // 加载数据源连接池
+                DruidDataSource ds = new DruidDataSource();
+                ds.setDriverClassName(datasourceConfig.getDriver());
+                ds.setUrl(datasourceConfig.getUrl());
+                ds.setUsername(datasourceConfig.getUsername());
+                ds.setPassword(datasourceConfig.getPassword());
+                ds.setInitialSize(1);
+                ds.setMinIdle(1);
+                ds.setMaxActive(datasourceConfig.getMaxActive());
+                ds.setMaxWait(60000);
+                ds.setTimeBetweenEvictionRunsMillis(60000);
+                ds.setMinEvictableIdleTimeMillis(300000);
+                ds.setPoolPreparedStatements(false);
+                ds.setMaxPoolPreparedStatementPerConnectionSize(20);
+                ds.setValidationQuery("select 1");
+                try {
+                    ds.init();
+                } catch (SQLException e) {
+                    logger.error("#Failed to initial datasource: " + datasourceConfig.getUrl(), e);
+                }
+                DatasourceConfig.DATA_SOURCES.put(entry.getKey(), ds);
+            }
+        }
+    }
+
+    public void setAdapterConfigs(List<String> adapterConfigs) {
+        this.adapterConfigs = adapterConfigs;
+
+        if (adapterConfigs != null) {
+            AdapterConfigs.clear();
+            for (String adapterConfig : adapterConfigs) {
+                int idx = adapterConfig.indexOf("/");
+                if (idx > -1) {
+                    String type = adapterConfig.substring(0, idx);
+                    String ymlFile = adapterConfig.substring(idx + 1);
+                    AdapterConfigs.put(type, ymlFile);
+                }
+            }
+        }
+    }
+}

+ 0 - 35
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfigPath.java

@@ -1,35 +0,0 @@
-package com.alibaba.otter.canal.adapter.launcher.config;
-
-import java.util.List;
-
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-
-import com.alibaba.otter.canal.client.adapter.support.AdapterConfigs;
-
-@Component
-@ConfigurationProperties(prefix = "adapter.conf.path")
-public class AdapterConfigPath {
-
-    private List<String> adapterConfigs;
-
-    public List<String> getAdapterConfigs() {
-        return adapterConfigs;
-    }
-
-    public void setAdapterConfigs(List<String> adapterConfigs) {
-        this.adapterConfigs = adapterConfigs;
-
-        if (adapterConfigs != null) {
-            AdapterConfigs.configs.clear();
-            for (String adapterConfig : adapterConfigs) {
-                int idx = adapterConfig.indexOf("/");
-                if (idx > -1) {
-                    String type = adapterConfig.substring(0, idx);
-                    String ymlFile = adapterConfig.substring(idx + 1);
-                    AdapterConfigs.configs.put(type, ymlFile);
-                }
-            }
-        }
-    }
-}

+ 36 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/CuratorClient.java

@@ -0,0 +1,36 @@
+package com.alibaba.otter.canal.adapter.launcher.config;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.springframework.stereotype.Component;
+
+@Component
+public class CuratorClient {
+
+    @Resource
+    private AdapterCanalConfig adapterCanalConfig;
+
+    private CuratorFramework   curator = null;
+
+    @PostConstruct
+    public void init() {
+        if (adapterCanalConfig.getZookeeperHosts() != null) {
+            curator = CuratorFrameworkFactory.builder()
+                .connectString(adapterCanalConfig.getZookeeperHosts())
+                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+                .sessionTimeoutMs(6000)
+                .connectionTimeoutMs(3000)
+                .namespace("canal-adapter")
+                .build();
+            curator.start();
+        }
+    }
+
+    public CuratorFramework getCurator() {
+        return curator;
+    }
+}

+ 28 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/SpringContext.java

@@ -0,0 +1,28 @@
+package com.alibaba.otter.canal.adapter.launcher.config;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SpringContext implements ApplicationContextAware {
+
+    private static ApplicationContext context;
+
+    /*
+     * 注入ApplicationContext
+     */
+    public void setApplicationContext(final ApplicationContext context) throws BeansException {
+        // 在加载Spring时自动获得context
+        SpringContext.context = context;
+    }
+
+    public static Object getBean(final String beanName) {
+        return SpringContext.context.getBean(beanName);
+    }
+
+    public static Object getBean(final Class<?> clz) {
+        return context.getBean(clz);
+    }
+}

+ 60 - 75
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -7,10 +7,14 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
+import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -24,34 +28,72 @@ public abstract class AbstractCanalAdapterWorker {
 
     protected final Logger                    logger  = LoggerFactory.getLogger(this.getClass());
 
-    protected String                          canalDestination;                                                 // canal实例
-    protected List<List<CanalOuterAdapter>>   canalOuterAdapters;                                               // 外部适配器
-    protected ExecutorService                 groupInnerExecutorService;                                        // 组内工作线程池
-    protected volatile boolean                running = false;                                                  // 是否运行中
+    protected String                          canalDestination;                                                // canal实例
+    protected List<List<OuterAdapter>>        canalOuterAdapters;                                              // 外部适配器
+    protected ExecutorService                 groupInnerExecutorService;                                       // 组内工作线程池
+    protected volatile boolean                running = false;                                                 // 是否运行中
     protected Thread                          thread  = null;
-    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+    protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
 
-                                                          @Override
-                                                          public void uncaughtException(Thread t, Throwable e) {
-                                                              logger.error("parse events has an error", e);
-                                                          }
-                                                      };
+    protected SyncSwitch                      syncSwitch;
+
+    public AbstractCanalAdapterWorker(){
+        syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class);
+    }
 
     protected void writeOut(final Message message) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            final List<CanalOuterAdapter> adapters = outerAdapters;
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            final List<OuterAdapter> adapters = outerAdapters;
+            futures.add(groupInnerExecutorService.submit(() -> {
+                try {
+                    // 组内适配器穿行运行,尽量不要配置组内适配器
+                    for (final OuterAdapter c : adapters) {
+                        long begin = System.currentTimeMillis();
+                        MessageUtil.parse4Dml(canalDestination, message, c::sync);
+
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("{} elapsed time: {}",
+                                c.getClass().getName(),
+                                (System.currentTimeMillis() - begin));
+                        }
+                    }
+                    return true;
+                } catch (Exception e) {
+                    return false;
+                }
+            }));
+
+            // 等待所有适配器写入完成
+            // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
+            for (Future<Boolean> f : futures) {
+                try {
+                    if (!f.get()) {
+                        logger.error("Outer adapter write failed");
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    protected void writeOut(final FlatMessage flatMessage) {
+        List<Future<Boolean>> futures = new ArrayList<>();
+        // 组间适配器并行运行
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            final List<OuterAdapter> adapters = outerAdapters;
             futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
 
                 @Override
                 public Boolean call() {
                     try {
                         // 组内适配器穿行运行,尽量不要配置组内适配器
-                        for (final CanalOuterAdapter c : adapters) {
+                        for (OuterAdapter c : adapters) {
                             long begin = System.currentTimeMillis();
-                            MessageUtil.parse4Dml(message, c::writeOut);
-
+                            Dml dml = MessageUtil.flatMessage2Dml(canalDestination, flatMessage);
+                            c.sync(dml);
                             if (logger.isDebugEnabled()) {
                                 logger.debug("{} elapsed time: {}",
                                     c.getClass().getName(),
@@ -79,63 +121,6 @@ public abstract class AbstractCanalAdapterWorker {
         }
     }
 
-    // protected void writeOut(final FlatMessage flatMessage) {
-    // List<Future<Boolean>> futures = new ArrayList<>();
-    // // 组间适配器并行运行
-    // for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-    // final List<CanalOuterAdapter> adapters = outerAdapters;
-    // futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
-    //
-    // @Override
-    // public Boolean call() {
-    // try {
-    // // 组内适配器穿行运行,尽量不要配置组内适配器
-    // for (CanalOuterAdapter c : adapters) {
-    // long begin = System.currentTimeMillis();
-    // Dml dml = MessageUtil.flatMessage2Dml(flatMessage);
-    // c.writeOut(dml);
-    // if (logger.isDebugEnabled()) {
-    // logger.debug("{} elapsed time: {}",
-    // c.getClass().getName(),
-    // (System.currentTimeMillis() - begin));
-    // }
-    // }
-    // return true;
-    // } catch (Exception e) {
-    // return false;
-    // }
-    // }
-    // }));
-    //
-    // // 等待所有适配器写入完成
-    // // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
-    // for (Future<Boolean> f : futures) {
-    // try {
-    // if (!f.get()) {
-    // logger.error("Outer adapter write failed");
-    // }
-    // } catch (InterruptedException | ExecutionException e) {
-    // // ignore
-    // }
-    // }
-    // }
-    // }
-
-    protected void writeOut(Message message, String topic) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("topic: {} batchId: {} batchSize: {} ", topic, message.getId(), message.getEntries().size());
-        }
-        long begin = System.currentTimeMillis();
-        writeOut(message);
-        long now = System.currentTimeMillis();
-        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
-            logger.error("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
-        }
-    }
-
     protected void stopOutAdapters() {
         if (thread != null) {
             try {
@@ -146,8 +131,8 @@ public abstract class AbstractCanalAdapterWorker {
         }
         groupInnerExecutorService.shutdown();
         logger.info("topic connectors' worker thread dead!");
-        for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-            for (CanalOuterAdapter adapter : outerAdapters) {
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            for (OuterAdapter adapter : outerAdapters) {
                 adapter.destroy();
             }
         }

+ 142 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -0,0 +1,142 @@
+package com.alibaba.otter.canal.adapter.launcher.loader;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.common.errors.WakeupException;
+
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * kafka对应的client适配器工作线程
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
+
+    private KafkaCanalConnector connector;
+
+    private String              topic;
+
+    private boolean             flatMessage;
+
+    public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
+                                   List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        this.topic = topic;
+        this.canalDestination = topic;
+        this.flatMessage = flatMessage;
+        connector = KafkaCanalConnectors.newKafkaConnector(bootstrapServers, topic, null, groupId, flatMessage);
+        // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
+
+        // super.initSwitcher(topic);
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            thread = new Thread(() -> process());
+            thread.setUncaughtExceptionHandler(handler);
+            running = true;
+            thread.start();
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (!running) {
+                return;
+            }
+
+            connector.stopRunning();
+            running = false;
+
+            // if (switcher != null && !switcher.state()) {
+            // switcher.set(true);
+            // }
+
+            if (thread != null) {
+                try {
+                    thread.join();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            groupInnerExecutorService.shutdown();
+            logger.info("topic {} connectors' worker thread dead!", this.topic);
+            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (OuterAdapter adapter : outerAdapters) {
+                    adapter.destroy();
+                }
+            }
+            logger.info("topic {} all connectors destroyed!", this.topic);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        // final AtomicBoolean executing = new AtomicBoolean(true);
+        while (running) {
+            try {
+                logger.info("=============> Start to connect topic: {} <=============", this.topic);
+                connector.connect();
+                logger.info("=============> Start to subscribe topic: {} <=============", this.topic);
+                connector.subscribe();
+                logger.info("=============> Subscribe topic: {} succeed <=============", this.topic);
+                while (running) {
+                    try {
+                        // switcher.get(); //等待开关开启
+
+                        List<?> messages;
+                        if (!flatMessage) {
+                            messages = connector.getWithoutAck();
+                        } else {
+                            messages = connector.getFlatMessageWithoutAck(100L, TimeUnit.MILLISECONDS);
+                        }
+                        if (messages != null) {
+                            for (final Object message : messages) {
+                                if (message instanceof FlatMessage) {
+                                    writeOut((FlatMessage) message);
+                                } else {
+                                    writeOut((Message) message);
+                                }
+                            }
+                        }
+                        connector.ack();
+                    } catch (CommitFailedException e) {
+                        logger.warn(e.getMessage());
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                        TimeUnit.SECONDS.sleep(1L);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        executor.shutdown();
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.disconnect();
+        logger.info("=============> Disconnect topic: {} <=============", this.topic);
+    }
+}

+ 65 - 22
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -9,13 +9,14 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 
 /**
  * 外部适配器的加载器
@@ -24,15 +25,17 @@ import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
  */
 public class CanalAdapterLoader {
 
-    private static final Logger logger = LoggerFactory.getLogger(CanalAdapterLoader.class);
+    private static final Logger                     logger        = LoggerFactory.getLogger(CanalAdapterLoader.class);
+
+    private CanalClientConfig                       canalClientConfig;
 
-    private CanalClientConfig canalClientConfig;
+    private Map<String, CanalAdapterWorker>         canalWorkers  = new HashMap<>();
 
-    private Map<String, CanalAdapterWorker> canalWorkers = new HashMap<>();
+    private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
 
-    private ExtensionLoader<CanalOuterAdapter> loader;
+    private ExtensionLoader<OuterAdapter>           loader;
 
-    public CanalAdapterLoader(CanalClientConfig canalClientConfig) {
+    public CanalAdapterLoader(CanalClientConfig canalClientConfig){
         this.canalClientConfig = canalClientConfig;
     }
 
@@ -40,12 +43,7 @@ public class CanalAdapterLoader {
      * 初始化canal-client
      */
     public void init() {
-        // canal instances 和 mq topics 配置不能同时为空
-        // if (canalClientConfig.getCanalInstances() == null && canalClientConfig.getMqTopics() == null) {
-        //    throw new RuntimeException("Blank config property: canalInstances or canalMQTopics");
-        // }
-
-        loader = ExtensionLoader.getExtensionLoader(CanalOuterAdapter.class);
+        loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);
 
         String canalServerHost = this.canalClientConfig.getCanalServerHost();
         SocketAddress sa = null;
@@ -58,31 +56,67 @@ public class CanalAdapterLoader {
         // 初始化canal-client的适配器
         if (canalClientConfig.getCanalInstances() != null) {
             for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
-                List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
                 for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
-                    List<CanalOuterAdapter> canalOutConnectors = new ArrayList<>();
-                    for (CanalOuterAdapterConfiguration c : connectorGroup.getOutAdapters()) {
+                    List<OuterAdapter> canalOutConnectors = new ArrayList<>();
+                    for (OuterAdapterConfig c : connectorGroup.getOutAdapters()) {
                         loadConnector(c, canalOutConnectors);
                     }
                     canalOuterAdapterGroups.add(canalOutConnectors);
                 }
                 CanalAdapterWorker worker;
-                if (zkHosts != null) {
+                if (sa != null) {
+                    worker = new CanalAdapterWorker(instance.getInstance(), sa, canalOuterAdapterGroups);
+                } else if (zkHosts != null) {
                     worker = new CanalAdapterWorker(instance.getInstance(), zkHosts, canalOuterAdapterGroups);
                 } else {
-                    worker = new CanalAdapterWorker(instance.getInstance(), sa, canalOuterAdapterGroups);
+                    throw new RuntimeException("No canal server connector found");
                 }
                 canalWorkers.put(instance.getInstance(), worker);
                 worker.start();
                 logger.info("Start adapter for canal instance: {} succeed", instance.getInstance());
             }
         }
+
+        // 初始化canal-client-mq的适配器
+        if (canalClientConfig.getMqTopics() != null) {
+            for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
+                for (CanalClientConfig.Group group : topic.getGroups()) {
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+
+                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
+
+                    for (OuterAdapterConfig config : group.getOutAdapters()) {
+                        loadConnector(config, canalOuterAdapters);
+                    }
+                    canalOuterAdapterGroups.add(canalOuterAdapters);
+                    if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
+                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig
+                            .getBootstrapServers(), topic.getTopic(), group.getGroupId(), canalOuterAdapterGroups);
+                        canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
+                        rocketMQWorker.start();
+                    } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
+                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(
+                            canalClientConfig.getBootstrapServers(),
+                            topic.getTopic(),
+                            group.getGroupId(),
+                            canalOuterAdapterGroups,
+                            canalClientConfig.getFlatMessage());
+                        canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
+                        canalKafkaWorker.start();
+                    }
+                    logger.info("Start adapter for canal-client rocketmq topic: {} succeed",
+                        topic.getTopic() + "-" + group.getGroupId());
+
+                }
+            }
+        }
     }
 
-    private void loadConnector(CanalOuterAdapterConfiguration config, List<CanalOuterAdapter> canalOutConnectors) {
+    private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
-            CanalOuterAdapter adapter = loader.getExtension(config.getName());
+            OuterAdapter adapter = loader.getExtension(config.getName());
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             // 替换ClassLoader
             Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());
@@ -103,10 +137,19 @@ public class CanalAdapterLoader {
             ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());
             for (CanalAdapterWorker v : canalWorkers.values()) {
                 final CanalAdapterWorker caw = v;
-                stopExecutorService.submit(() -> caw.stop());
+                stopExecutorService.submit(caw::stop);
             }
             stopExecutorService.shutdown();
         }
+
+        if (canalMQWorker.size() > 0) {
+            ExecutorService stopMQWokerService = Executors.newFixedThreadPool(canalMQWorker.size());
+            for (AbstractCanalAdapterWorker tmp : canalMQWorker.values()) {
+                final AbstractCanalAdapterWorker worker = tmp;
+                stopMQWokerService.submit(worker::stop);
+            }
+            stopMQWokerService.shutdown();
+        }
         logger.info("All canal adapters destroyed");
     }
 }

+ 132 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -0,0 +1,132 @@
+package com.alibaba.otter.canal.adapter.launcher.loader;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.common.errors.WakeupException;
+
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * kafka对应的client适配器工作线程
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
+
+    private RocketMQCanalConnector connector;
+
+    private String                 topic;
+
+    public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
+                                      List<List<OuterAdapter>> canalOuterAdapters){
+        logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        this.topic = topic;
+        this.canalDestination = topic;
+        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    process();
+                }
+            });
+            thread.setUncaughtExceptionHandler(handler);
+            running = true;
+            thread.start();
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (!running) {
+                return;
+            }
+            connector.stopRunning();
+            running = false;
+            logger.info("Stop topic {} out adapters begin", this.topic);
+            stopOutAdapters();
+            logger.info("Stop topic {} out adapters end", this.topic);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        while (running) {
+            try {
+                logger.info("=============> Start to connect topic: {} <=============", this.topic);
+                connector.connect();
+                logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
+                connector.subscribe();
+                logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
+                while (running) {
+                    try {
+                        // switcher.get(); //等待开关开启
+
+                        final Message message = connector.getWithoutAck(1);
+                        if (message != null) {
+                            executor.submit(new Runnable() {
+
+                                @Override
+                                public void run() {
+                                    try {
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug("topic: {} batchId: {} batchSize: {} ", topic, message.getId(), message.getEntries().size());
+                                        }
+                                        long begin = System.currentTimeMillis();
+                                        writeOut(message);
+                                        long now = System.currentTimeMillis();
+                                        if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
+                                            logger.error("topic: {} batchId {} elapsed time: {} ms", topic, message.getId(), now - begin);
+                                        }
+                                    } catch (Exception e) {
+                                        logger.error(e.getMessage(), e);
+                                    }
+                                    connector.ack(message.getId());
+                                }
+                            });
+                        } else {
+                            logger.debug("Message is null");
+                        }
+                    } catch (CommitFailedException e) {
+                        logger.warn(e.getMessage());
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                        TimeUnit.SECONDS.sleep(1L);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        executor.shutdown();
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.stopRunning();
+        logger.info("=============> Disconnect topic: {} <=============", this.topic);
+    }
+}

+ 71 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@@ -0,0 +1,71 @@
+package com.alibaba.otter.canal.adapter.launcher.loader;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+
+import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
+import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
+
+@Component
+public class CanalAdapterService {
+
+    private static final Logger       logger = LoggerFactory.getLogger(CanalAdapterService.class);
+
+    private static CanalAdapterLoader adapterLoader;
+
+    @Resource
+    private AdapterCanalConfig        adapterCanalConfig;
+
+    // 注入bean保证优先注册
+    @Resource
+    private AdapterConfig             adapterConfig;
+    @Resource
+    private SpringContext             springContext;
+    @Resource
+    private SyncSwitch                syncSwitch;
+
+    @PostConstruct
+    public void init() {
+        if (adapterLoader == null) {
+            try {
+                logger.info("## start the canal client adapters.");
+                adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
+                adapterLoader.init();
+                logger.info("## the canal client adapters are running now ......");
+            } catch (Throwable e) {
+                logger.error("## something goes wrong when starting up the canal client adapters:", e);
+                System.exit(0);
+            }
+        }
+    }
+
+    @PreDestroy
+    public void destroy() {
+        try {
+            logger.info("## stop the canal client adapters");
+            if (adapterLoader != null) {
+                adapterLoader.destroy();
+            }
+            for (DruidDataSource druidDataSource : DatasourceConfig.DATA_SOURCES.values()) {
+                try {
+                    druidDataSource.close();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        } catch (Throwable e) {
+            logger.warn("## something goes wrong when stopping canal client adapters:", e);
+        } finally {
+            logger.info("## canal client adapters are down.");
+        }
+    }
+}

+ 20 - 28
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -3,10 +3,12 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.net.SocketAddress;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -31,7 +33,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param canalOuterAdapters 外部适配器组
      */
     public CanalAdapterWorker(String canalDestination, SocketAddress address,
-                              List<List<CanalOuterAdapter>> canalOuterAdapters){
+                              List<List<OuterAdapter>> canalOuterAdapters){
         this.canalOuterAdapters = canalOuterAdapters;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
@@ -46,26 +48,18 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param canalOuterAdapters 外部适配器组
      */
     public CanalAdapterWorker(String canalDestination, String zookeeperHosts,
-                              List<List<CanalOuterAdapter>> canalOuterAdapters){
+                              List<List<OuterAdapter>> canalOuterAdapters){
         this.canalOuterAdapters = canalOuterAdapters;
         this.canalDestination = canalDestination;
         groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");
         ((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);
-
-        // super.initSwitcher(canalDestination);
     }
 
     @Override
     public void start() {
         if (!running) {
-            thread = new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    process();
-                }
-            });
+            thread = new Thread(this::process);
             thread.setUncaughtExceptionHandler(handler);
             thread.start();
             running = true;
@@ -79,13 +73,11 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                 return;
             }
 
-            // if (switcher != null && !switcher.state()) {
-            // switcher.set(true);
-            // }
-
             connector.stopRunning();
             running = false;
 
+            syncSwitch.release(canalDestination);
+
             logger.info("destination {} is waiting for adapters' worker thread die!", canalDestination);
             if (thread != null) {
                 try {
@@ -96,8 +88,8 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
             }
             groupInnerExecutorService.shutdown();
             logger.info("destination {} adapters' worker thread dead!", canalDestination);
-            for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-                for (CanalOuterAdapter adapter : outerAdapters) {
+            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (OuterAdapter adapter : outerAdapters) {
                     adapter.destroy();
                 }
             }
@@ -112,22 +104,22 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
             ; // waiting until running == true
         while (running) {
             try {
-                // if (switcher != null) {
-                // switcher.get();
-                // }
+                syncSwitch.get(canalDestination);
+
                 logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);
                 connector.connect();
                 logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination);
                 connector.subscribe();
                 logger.info("=============> Subscribe destination: {} succeed <=============", this.canalDestination);
                 while (running) {
-                    // try {
-                    // if (switcher != null) {
-                    // switcher.get();
-                    // }
-                    // } catch (TimeoutException e) {
-                    // break;
-                    // }
+                    try {
+                        syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES);
+                    } catch (TimeoutException e) {
+                        break;
+                    }
+                    if (!running) {
+                        break;
+                    }
 
                     // server配置canal.instance.network.soTimeout(默认: 30s)
                     // 范围内未与server交互,server将关闭本次socket连接

+ 166 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -0,0 +1,166 @@
+package com.alibaba.otter.canal.adapter.launcher.rest;
+
+import java.util.*;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.bind.annotation.*;
+
+import com.alibaba.otter.canal.adapter.launcher.common.EtlLock;
+import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
+import com.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import com.alibaba.otter.canal.client.adapter.support.Result;
+
+@RestController
+public class CommonRest {
+
+    private static Logger                 logger           = LoggerFactory.getLogger(CommonRest.class);
+
+    private static final String           ETL_LOCK_ZK_NODE = "/sync-etl/";
+
+    private ExtensionLoader<OuterAdapter> loader;
+
+    @Resource
+    private SyncSwitch                    syncSwitch;
+    @Resource
+    private EtlLock                       etlLock;
+
+    @Resource
+    private AdapterCanalConfig            adapterCanalConfig;
+
+    @PostConstruct
+    public void init() {
+        loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);
+    }
+
+    /**
+     * ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST
+     * 
+     * @param type 类型 hbase, es
+     * @param task 任务名对应配置文件名 mytest_person2.yml
+     * @param params etl where条件参数, 为空全部导入
+     * @return
+     */
+    @PostMapping("/etl/{type}/{task}")
+    public EtlResult etl(@PathVariable String type, @PathVariable String task,
+                         @RequestParam(name = "params", required = false) String params) {
+
+        boolean locked = etlLock.tryLock(ETL_LOCK_ZK_NODE + type + "-" + task);
+        if (!locked) {
+            EtlResult result = new EtlResult();
+            result.setSucceeded(false);
+            result.setErrorMessage(task + " 有其他进程正在导入中, 请稍后再试");
+            return result;
+        }
+        try {
+            OuterAdapter adapter = loader.getExtension(type);
+            String destination = adapter.getDestination(task);
+            Boolean oriSwithcStatus = null;
+            if (destination != null) {
+                oriSwithcStatus = syncSwitch.status(destination);
+                syncSwitch.off(destination);
+            }
+            try {
+                List<String> paramArr = null;
+                if (params != null) {
+                    String[] parmaArray = params.trim().split(";");
+                    paramArr = Arrays.asList(parmaArray);
+                }
+                return adapter.etl(task, paramArr);
+            } finally {
+                if (destination != null && oriSwithcStatus != null && oriSwithcStatus) {
+                    syncSwitch.on(destination);
+                }
+            }
+        } finally {
+            etlLock.unlock(ETL_LOCK_ZK_NODE + type + "-" + task);
+        }
+    }
+
+    /**
+     * 统计总数 curl http://127.0.0.1:8081/count/hbase/mytest_person2.yml
+     * 
+     * @param type 类型 hbase, es
+     * @param task 任务名对应配置文件名 mytest_person2.yml
+     * @return
+     */
+    @GetMapping("/count/{type}/{task}")
+    public Map<String, Object> count(@PathVariable String type, @PathVariable String task) {
+        OuterAdapter adapter = loader.getExtension(type);
+        return adapter.count(task);
+    }
+
+    /**
+     * 返回所有实例 curl http://127.0.0.1:8081/destinations
+     */
+    @GetMapping("/destinations")
+    public List<Map<String, String>> destinations() {
+        List<Map<String, String>> result = new ArrayList<>();
+        Set<String> destinations = adapterCanalConfig.DESTINATIONS;
+        for (String destination : destinations) {
+            Map<String, String> resMap = new LinkedHashMap<>();
+            Boolean status = syncSwitch.status(destination);
+            String resStatus = "none";
+            if (status != null && status) {
+                resStatus = "on";
+            } else if (status != null && !status) {
+                resStatus = "off";
+            }
+            resMap.put("destination", destination);
+            resMap.put("status", resStatus);
+            result.add(resMap);
+        }
+        return result;
+    }
+
+    /**
+     * 实例同步开关 curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
+     * 
+     * @param destination 实例名称
+     * @param status 开关状态: off on
+     * @return
+     */
+    @PutMapping("/syncSwitch/{destination}/{status}")
+    public Result etl(@PathVariable String destination, @PathVariable String status) {
+        if (status.equals("on")) {
+            syncSwitch.on(destination);
+            logger.info("#Destination: {} sync on", destination);
+            return Result.createSuccess("实例: " + destination + " 开启同步成功");
+        } else if (status.equals("off")) {
+            syncSwitch.off(destination);
+            logger.info("#Destination: {} sync off", destination);
+            return Result.createSuccess("实例: " + destination + " 关闭同步成功");
+        } else {
+            Result result = new Result();
+            result.setCode(50000);
+            result.setMessage("实例: " + destination + " 操作失败");
+            return result;
+        }
+    }
+
+    /**
+     * 获取实例开关状态 curl http://127.0.0.1:8081/syncSwitch/example
+     * 
+     * @param destination 实例名称
+     * @return
+     */
+    @GetMapping("/syncSwitch/{destination}")
+    public Map<String, String> etl(@PathVariable String destination) {
+        Boolean status = syncSwitch.status(destination);
+        String resStatus = "none";
+        if (status != null && status) {
+            resStatus = "on";
+        } else if (status != null && !status) {
+            resStatus = "off";
+        }
+        Map<String, String> res = new LinkedHashMap<>();
+        res.put("stauts", resStatus);
+        return res;
+    }
+}

+ 35 - 9
client-adapter/launcher/src/main/resources/application.yml

@@ -1,21 +1,47 @@
 server:
   port: 8081
+logging:
+  level:
+    com.alibaba.otter.canal.client.adapter.hbase: DEBUG
+spring:
+  jackson:
+    date-format: yyyy-MM-dd HH:mm:ss
+    time-zone: GMT+8
+    default-property-inclusion: non_null
+
+
+hbase.zookeeper.quorum: slave1
+hbase.zookeeper.property.clientPort: 2181
+hbase.zookeeper.znode.parent: /hbase-unsecure
 
 canal.conf:
   canalServerHost: 127.0.0.1:11111
-  #zookeeperHosts: slave1:2181
-  #bootstrapServers: localhost:9092 #or rocketmq nameservers:host1:9876;host2:9876
-  flatMessage: false
-
+#  zookeeperHosts: slave1:2181
+#  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
+  flatMessage: true
   canalInstances:
   - instance: example
     adapterGroups:
     - outAdapters:
       - name: logger
 #      - name: hbase
-#        hosts: slave1:2181
-#        properties: {znodeParent: "/hbase-unsecure"}
+#        properties:
+#          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
+#          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
+#          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
+#  mqTopics:
+#  - mqMode: kafka
+#    topic: example
+#    groups:
+#    - groupId: g2
+#      outAdapters:
+#      - name: logger
 
-adapter.conf.path:
-  adapterConfigs:
-  - hbase/mytest_person2.yml
+#adapter.conf:
+#  datasourceConfigs:
+#    defaultDS:
+#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+#      username: root
+#      password: 121212
+#  adapterConfigs:
+#  - hbase/mytest_person2.yml

+ 5 - 5
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -3,8 +3,8 @@ package com.alibaba.otter.canal.client.adapter.logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.otter.canal.client.adapter.CanalOuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
 
@@ -16,17 +16,17 @@ import com.alibaba.otter.canal.client.adapter.support.SPI;
  */
 @SPI("logger")
 // logger参数对应CanalOuterAdapterConfiguration配置中的name
-public class LoggerAdapterExample implements CanalOuterAdapter {
+public class LoggerAdapterExample implements OuterAdapter {
 
     private Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Override
-    public void init(CanalOuterAdapterConfiguration configuration) {
+    public void init(OuterAdapterConfig configuration) {
 
     }
 
     @Override
-    public void writeOut(Dml dml) {
+    public void sync(Dml dml) {
         logger.info(dml.toString());
     }
 

+ 0 - 0
client-adapter/logger/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.CanalOuterAdapter → client-adapter/logger/src/main/resources/META-INF/canal/com.alibaba.otter.canal.client.adapter.OuterAdapter