Kaynağa Gözat

HBase的etl初步完成

mcy 6 yıl önce
ebeveyn
işleme
0a7251add0
17 değiştirilmiş dosya ile 869 ekleme ve 149 silme
  1. 6 7
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java
  2. 6 6
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  3. 50 4
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java
  4. 8 8
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/OuterAdapterConfig.java
  5. 4 4
      client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java
  6. 27 32
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java
  7. 347 0
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java
  8. 6 15
      client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java
  9. 12 0
      client-adapter/launcher/pom.xml
  10. 5 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java
  11. 47 45
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  12. 148 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  13. 56 10
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  14. 124 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  15. 1 7
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  16. 20 8
      client-adapter/launcher/src/main/resources/application.yml
  17. 2 2
      client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

+ 6 - 7
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -1,8 +1,8 @@
 package com.alibaba.otter.canal.client.adapter;
 
-import java.util.Map;
+import java.util.List;
 
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
 
@@ -20,7 +20,7 @@ public interface OuterAdapter {
      *
      * @param configuration 外部适配器配置信息
      */
-    void init(CanalOuterAdapterConfiguration configuration);
+    void init(OuterAdapterConfig configuration);
 
     /**
      * 往适配器中同步数据
@@ -29,8 +29,6 @@ public interface OuterAdapter {
      */
     void sync(Dml dml);
 
-    // void writeOut(FlatMessage flatMessage);
-
     /**
      * 外部适配器销毁接口
      */
@@ -39,9 +37,10 @@ public interface OuterAdapter {
     /**
      * Etl操作
      * 
-     * @param criteriaSql 条件拼接sql
+     * @param task 任务名, 对应配置名
+     * @param params etl筛选条件
      */
-    default void etl(String criteriaSql) {
+    default void etl(String task, List<String> params) {
         throw new UnsupportedOperationException();
     }
 

+ 6 - 6
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -109,13 +109,13 @@ public class CanalClientConfig {
 
     public static class AdapterGroup {
 
-        private List<CanalOuterAdapterConfiguration> outAdapters;
+        private List<OuterAdapterConfig> outAdapters;
 
-        public List<CanalOuterAdapterConfiguration> getOutAdapters() {
+        public List<OuterAdapterConfig> getOutAdapters() {
             return outAdapters;
         }
 
-        public void setOutAdapters(List<CanalOuterAdapterConfiguration> outAdapters) {
+        public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
             this.outAdapters = outAdapters;
         }
     }
@@ -159,7 +159,7 @@ public class CanalClientConfig {
 
         // private List<Adaptor> adapters = new ArrayList<>();
 
-        private List<CanalOuterAdapterConfiguration> outAdapters;
+        private List<OuterAdapterConfig> outAdapters;
 
         public String getGroupId() {
             return groupId;
@@ -169,11 +169,11 @@ public class CanalClientConfig {
             this.groupId = groupId;
         }
 
-        public List<CanalOuterAdapterConfiguration> getOutAdapters() {
+        public List<OuterAdapterConfig> getOutAdapters() {
             return outAdapters;
         }
 
-        public void setOutAdapters(List<CanalOuterAdapterConfiguration> outAdapters) {
+        public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
             this.outAdapters = outAdapters;
         }
 

+ 50 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java

@@ -2,10 +2,7 @@ package com.alibaba.otter.canal.client.adapter.support;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
+import java.sql.*;
 
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -21,6 +18,55 @@ public class JdbcTypeUtil {
 
     private static Logger logger = LoggerFactory.getLogger(JdbcTypeUtil.class);
 
+    public static Object getRSData(ResultSet rs, String columnName, int jdbcType) throws SQLException {
+        if (jdbcType == Types.BIT || jdbcType == Types.BOOLEAN) {
+            return rs.getByte(columnName);
+        } else {
+            return rs.getObject(columnName);
+        }
+    }
+
+    public static Class<?> jdbcType2javaType(int jdbcType) {
+        switch (jdbcType) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+//                return Boolean.class;
+            case Types.TINYINT:
+                return Byte.TYPE;
+            case Types.SMALLINT:
+                return Short.class;
+            case Types.INTEGER:
+                return Integer.class;
+            case Types.BIGINT:
+                return Long.class;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                return BigDecimal.class;
+            case Types.REAL:
+                return Float.class;
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                return Double.class;
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                return String.class;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                return byte[].class;
+            case Types.DATE:
+                return java.sql.Date.class;
+            case Types.TIME:
+                return Time.class;
+            case Types.TIMESTAMP:
+                return Timestamp.class;
+            default:
+                return String.class;
+        }
+    }
+
     public static Object typeConvert(String columnName, String value, int sqlType, String mysqlType) {
         if (value == null || value.equals("")) {
             return null;

+ 8 - 8
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalOuterAdapterConfiguration.java → client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/OuterAdapterConfig.java

@@ -1,6 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * 外部适配器配置信息类
@@ -8,15 +8,15 @@ import java.util.Properties;
  * @author machengyuan 2018-8-18 下午10:15:12
  * @version 1.0.0
  */
-public class CanalOuterAdapterConfiguration {
+public class OuterAdapterConfig {
 
-    private String     name;      // 适配器名称, 如: logger, hbase, es
+    private String              name;       // 适配器名称, 如: logger, hbase, es
 
-    private String     hosts;     // 适配器内部的地址, 比如对应es该参数可以填写es的server地址
+    private String              hosts;      // 适配器内部的地址, 比如对应es该参数可以填写es的server地址
 
-    private String     zkHosts;   // 适配器内部的ZK地址, 比如对应HBase该参数可以填写HBase对应的ZK地址
+    private String              zkHosts;    // 适配器内部的ZK地址, 比如对应HBase该参数可以填写HBase对应的ZK地址
 
-    private Properties properties; // 其余参数, 可填写适配器中的所需的配置信息
+    private Map<String, String> properties; // 其余参数, 可填写适配器中的所需的配置信息
 
     public String getName() {
         return name;
@@ -34,11 +34,11 @@ public class CanalOuterAdapterConfiguration {
         this.hosts = hosts;
     }
 
-    public Properties getProperties() {
+    public Map<String, String> getProperties() {
         return properties;
     }
 
-    public void setProperties(Properties properties) {
+    public void setProperties(Map<String, String> properties) {
         this.properties = properties;
     }
 

+ 4 - 4
client-adapter/example/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
 
 /**
@@ -72,7 +72,7 @@ public class CanalAdapterLoader {
 
                 for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
                     List<OuterAdapter> canalOutConnectors = new ArrayList<>();
-                    for (CanalOuterAdapterConfiguration c : connectorGroup.getOutAdapters()) {
+                    for (OuterAdapterConfig c : connectorGroup.getOutAdapters()) {
                         loadConnector(c, canalOutConnectors);
                     }
                     canalOuterAdapterGroups.add(canalOutConnectors);
@@ -97,7 +97,7 @@ public class CanalAdapterLoader {
 
                     List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
 
-                    for (CanalOuterAdapterConfiguration config : group.getOutAdapters()) {
+                    for (OuterAdapterConfig config : group.getOutAdapters()) {
                         loadConnector(config, canalOuterAdapters);
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
@@ -125,7 +125,7 @@ public class CanalAdapterLoader {
         }
     }
 
-    private void loadConnector(CanalOuterAdapterConfiguration config, List<OuterAdapter> canalOutConnectors) {
+    private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
             OuterAdapter adapter = loader.getExtension(config.getName());
             ClassLoader cl = Thread.currentThread().getContextClassLoader();

+ 27 - 32
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -2,9 +2,11 @@ package com.alibaba.otter.canal.client.adapter.hbase;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
+import javax.sql.DataSource;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Connection;
@@ -13,9 +15,12 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfigLoader;
+import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
 
 /**
@@ -27,56 +32,37 @@ import com.alibaba.otter.canal.client.adapter.support.SPI;
 @SPI("hbase")
 public class HbaseAdapter implements OuterAdapter {
 
-    private static volatile Map<String, MappingConfig> mappingConfigCache = null;
+    private static volatile Map<String, MappingConfig> hbaseMapping       = null; // 文件名对应配置
+    private static volatile Map<String, MappingConfig> mappingConfigCache = null; // 库名-表名对应配置
 
     private Connection                                 conn;
     private HbaseSyncService                           hbaseSyncService;
+    private HbaseTemplate                              hbaseTemplate;
 
     @Override
-    public void init(CanalOuterAdapterConfiguration configuration) {
+    public void init(OuterAdapterConfig configuration) {
         try {
             if (mappingConfigCache == null) {
                 synchronized (MappingConfig.class) {
                     if (mappingConfigCache == null) {
-                        Map<String, MappingConfig> hbaseMapping = MappingConfigLoader.load();
+                        hbaseMapping = MappingConfigLoader.load();
                         mappingConfigCache = new HashMap<>();
                         for (MappingConfig mappingConfig : hbaseMapping.values()) {
                             mappingConfigCache.put(mappingConfig.getHbaseOrm().getDatabase() + "-"
-                                                   + mappingConfig.getHbaseOrm().getTable(), mappingConfig);
+                                                   + mappingConfig.getHbaseOrm().getTable(),
+                                mappingConfig);
                         }
                     }
                 }
             }
 
-            String hosts = configuration.getZkHosts();
-            if (StringUtils.isEmpty(hosts)) {
-                hosts = configuration.getHosts();
-            }
-            if (StringUtils.isEmpty(hosts)) {
-                throw new RuntimeException("Empty zookeeper hosts");
-            }
-            String[] zkHosts = StringUtils.split(hosts, ",");
-            int zkPort = 0;
-            StringBuilder hostsWithoutPort = new StringBuilder();
-            for (String host : zkHosts) {
-                int i = host.indexOf(":");
-                hostsWithoutPort.append(host, 0, i);
-                hostsWithoutPort.append(",");
-                if (zkPort == 0) zkPort = Integer.parseInt(host.substring(i + 1));
-            }
-            hostsWithoutPort.deleteCharAt(hostsWithoutPort.length() - 1);
-
-            String znode = configuration.getProperties().getProperty("znodeParent");
-            if (StringUtils.isEmpty(znode)) {
-                znode = "/hbase";
-            }
+            Map<String, String> propertites = configuration.getProperties();
 
             Configuration hbaseConfig = HBaseConfiguration.create();
-            hbaseConfig.set("hbase.zookeeper.quorum", hostsWithoutPort.toString());
-            hbaseConfig.set("hbase.zookeeper.property.clientPort", Integer.toString(zkPort));
-            hbaseConfig.set("zookeeper.znode.parent", znode);
+            propertites.forEach(hbaseConfig::set);
             conn = ConnectionFactory.createConnection(hbaseConfig);
-            hbaseSyncService = new HbaseSyncService(conn);
+            hbaseTemplate = new HbaseTemplate(conn);
+            hbaseSyncService = new HbaseSyncService(hbaseTemplate);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -93,6 +79,15 @@ public class HbaseAdapter implements OuterAdapter {
         hbaseSyncService.sync(config, dml);
     }
 
+    @Override
+    public void etl(String task, List<String> params) {
+        MappingConfig config = hbaseMapping.get(task);
+        DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+        if (dataSource != null) {
+            HbaseEtlService.importData(dataSource, hbaseTemplate, config, params);
+        }
+    }
+
     @Override
     public void destroy() {
         if (conn != null) {

+ 347 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -1,4 +1,351 @@
 package com.alibaba.otter.canal.client.adapter.hbase.service;
 
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import javax.sql.DataSource;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.hbase.support.*;
+import com.alibaba.otter.canal.client.adapter.support.JdbcTypeUtil;
+import com.google.common.base.Joiner;
+
 public class HbaseEtlService {
+
+    private static Logger logger = LoggerFactory.getLogger(HbaseEtlService.class);
+
+    public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
+        Connection conn = null;
+        Statement smt = null;
+        ResultSet rs = null;
+        try {
+            conn = ds.getConnection();
+            smt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            smt.setFetchSize(Integer.MIN_VALUE);
+            rs = smt.executeQuery(sql);
+            return fun.apply(rs);
+        } finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (smt != null) {
+                try {
+                    smt.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    public static void createTable(HbaseTemplate hbaseTemplate, MappingConfig config) {
+        try {
+            // 判断hbase表是否存在,不存在则建表
+            MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
+            if (!hbaseTemplate.tableExists(hbaseOrm.getHbaseTable())) {
+                hbaseTemplate.createTable(hbaseOrm.getHbaseTable(), hbaseOrm.getFamilies().toArray(new String[0]));
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void importData(DataSource ds, HbaseTemplate hbaseTemplate, MappingConfig config,
+                                  List<String> params) {
+        // EtlResult etlResult = new EtlResult();
+        AtomicLong successCount = new AtomicLong();
+        List<String> errMsg = new ArrayList<>();
+        String hbaseTable = "";
+        try {
+            // if (config == null) {
+            // logger.error("Config is null!");
+            // etlResult.setSucceeded(false);
+            // etlResult.setErrorMessage("Config is null!");
+            // return etlResult;
+            // }
+            MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
+            hbaseTable = hbaseOrm.getHbaseTable();
+
+            long start = System.currentTimeMillis();
+
+            if (params != null && params.size() == 1 && "rebuild".equalsIgnoreCase(params.get(0))) {
+                logger.info(hbaseOrm.getHbaseTable() + " rebuild is starting!");
+                // 如果表存在则删除
+                if (hbaseTemplate.tableExists(hbaseOrm.getHbaseTable())) {
+                    hbaseTemplate.disableTable(hbaseOrm.getHbaseTable());
+                    hbaseTemplate.deleteTable(hbaseOrm.getHbaseTable());
+                }
+                params = null;
+            } else {
+                logger.info(hbaseOrm.getHbaseTable() + " etl is starting!");
+            }
+            createTable(hbaseTemplate, config);
+
+            // 拼接sql
+            String sql = "SELECT * FROM " + config.getHbaseOrm().getDatabase() + "." + hbaseOrm.getTable();
+
+            // 拼接条件
+            if (params != null && params.size() == 1 && hbaseOrm.getEtlCondition() == null) {
+                AtomicBoolean stExists = new AtomicBoolean(false);
+                // 验证是否有SYS_TIME字段
+                sqlRS(ds, sql, rs -> {
+                    try {
+                        ResultSetMetaData rsmd = rs.getMetaData();
+                        int cnt = rsmd.getColumnCount();
+                        for (int i = 1; i <= cnt; i++) {
+                            String columnName = rsmd.getColumnName(i);
+                            if ("SYS_TIME".equalsIgnoreCase(columnName)) {
+                                stExists.set(true);
+                                break;
+                            }
+                        }
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                    return null;
+                });
+                if (stExists.get()) {
+                    sql += " WHERE SYS_TIME >= '" + params.get(0) + "' ";
+                }
+            } else if (hbaseOrm.getEtlCondition() != null && params != null) {
+                String etlCondition = hbaseOrm.getEtlCondition();
+                int size = params.size();
+                for (int i = 0; i < size; i++) {
+                    etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
+                }
+
+                sql += " " + etlCondition;
+            }
+
+            // 获取总数
+            String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
+            long cnt = (Long) sqlRS(ds, countSql, rs -> {
+                Long count = null;
+                try {
+                    if (rs.next()) {
+                        count = ((Number) rs.getObject(1)).longValue();
+                    }
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+                return count == null ? 0 : count;
+            });
+
+            // 当大于1万条记录时开启多线程
+            if (cnt >= 10000) {
+                int threadCount = 3; // TODO 从配置读取默认为3
+                long perThreadCnt = cnt / threadCount;
+                ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+                List<Future<Boolean>> futures = new ArrayList<>(threadCount);
+                for (int i = 0; i < threadCount; i++) {
+                    long offset = i * perThreadCnt;
+                    Long size = null;
+                    if (i != threadCount - 1) {
+                        size = perThreadCnt;
+                    }
+                    String sqlFinal;
+                    if (size != null) {
+                        sqlFinal = sql + " LIMIT " + offset + "," + size;
+                    } else {
+                        sqlFinal = sql + " LIMIT " + offset + "," + cnt;
+                    }
+                    Future<Boolean> future = executor
+                        .submit(() -> executeSqlImport(ds, sqlFinal, hbaseOrm, hbaseTemplate, successCount, errMsg));
+                    futures.add(future);
+                }
+
+                for (Future<Boolean> future : futures) {
+                    future.get();
+                }
+
+                executor.shutdown();
+            } else {
+                executeSqlImport(ds, sql, hbaseOrm, hbaseTemplate, successCount, errMsg);
+            }
+
+            logger.info(
+                hbaseOrm.getHbaseTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+
+            // etlResult.setResultMessage("导入HBase表 " + hbaseOrm.getHbaseTable() + " 数据:" +
+            // successCount.get() + " 条");
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
+        }
+
+        // if (errMsg.isEmpty()) {
+        // etlResult.setSucceeded(true);
+        // } else {
+        // etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
+        // }
+        // return etlResult;
+    }
+
+    private static boolean executeSqlImport(DataSource ds, String sql, MappingConfig.HbaseOrm hbaseOrm,
+                                            HbaseTemplate hbaseTemplate, AtomicLong successCount, List<String> errMsg) {
+        try {
+            sqlRS(ds, sql, rs -> {
+                int i = 1;
+
+                try {
+                    boolean complete = false;
+                    List<HRow> rows = new ArrayList<>();
+                    String[] rowKeyColumns = null;
+                    if (hbaseOrm.getRowKey() != null) {
+                        rowKeyColumns = hbaseOrm.getRowKey().trim().split(",");
+                    }
+                    while (rs.next()) {
+                        int cc = rs.getMetaData().getColumnCount();
+                        int[] jdbcTypes = new int[cc];
+                        Class<?>[] classes = new Class[cc];
+                        for (int j = 1; j <= cc; j++) {
+                            int jdbcType = rs.getMetaData().getColumnType(j);
+                            jdbcTypes[j - 1] = jdbcType;
+                            classes[j - 1] = JdbcTypeUtil.jdbcType2javaType(jdbcType);
+                        }
+                        HRow row = new HRow();
+
+                        if (rowKeyColumns != null) {
+                            // 取rowKey字段拼接
+                            StringBuilder rowKeyVale = new StringBuilder();
+                            for (String rowKeyColumnName : rowKeyColumns) {
+                                Object obj = rs.getObject(rowKeyColumnName);
+                                if (obj != null) {
+                                    rowKeyVale.append(obj.toString());
+                                }
+                                rowKeyVale.append("|");
+                            }
+                            int len = rowKeyVale.length();
+                            if (len > 0) {
+                                rowKeyVale.delete(len - 1, len);
+                            }
+                            row.setRowKey(Bytes.toBytes(rowKeyVale.toString()));
+                        }
+
+                        for (int j = 1; j <= cc; j++) {
+                            String columnName = rs.getMetaData().getColumnName(j);
+
+                            Object val = JdbcTypeUtil.getRSData(rs, columnName, jdbcTypes[j - 1]);
+                            if (val == null) {
+                                continue;
+                            }
+
+                            MappingConfig.ColumnItem columnItem = hbaseOrm.getColumnItems().get(columnName);
+                            // 没有配置映射
+                            if (columnItem == null) {
+                                String family = hbaseOrm.getFamily();
+                                String qualifile = columnName;
+                                if (hbaseOrm.isUppercaseQualifier()) {
+                                    qualifile = qualifile.toUpperCase();
+                                }
+                                if (MappingConfig.Mode.STRING == hbaseOrm.getMode()) {
+                                    if (hbaseOrm.getRowKey() == null && j == 1) {
+                                        row.setRowKey(Bytes.toBytes(val.toString()));
+                                    } else {
+                                        row.addCell(family, qualifile, Bytes.toBytes(val.toString()));
+                                    }
+                                } else if (MappingConfig.Mode.NATIVE == hbaseOrm.getMode()) {
+                                    Type type = Type.getType(classes[j - 1]);
+                                    if (hbaseOrm.getRowKey() == null && j == 1) {
+                                        row.setRowKey(TypeUtil.toBytes(val, type));
+                                    } else {
+                                        row.addCell(family, qualifile, TypeUtil.toBytes(val, type));
+                                    }
+                                } else if (MappingConfig.Mode.PHOENIX == hbaseOrm.getMode()) {
+                                    PhType phType = PhType.getType(classes[j - 1]);
+                                    if (hbaseOrm.getRowKey() == null && j == 1) {
+                                        row.setRowKey(PhTypeUtil.toBytes(val, phType));
+                                    } else {
+                                        row.addCell(family, qualifile, PhTypeUtil.toBytes(val, phType));
+                                    }
+                                }
+                            } else {
+                                // 如果不需要类型转换
+                                if (columnItem.getType() == null || "".equals(columnItem.getType())) {
+                                    if (val instanceof java.sql.Date) {
+                                        SimpleDateFormat dateFmt = new SimpleDateFormat("yyyy-MM-dd");
+                                        val = dateFmt.format((Date) val);
+                                    } else if (val instanceof Timestamp) {
+                                        SimpleDateFormat datetimeFmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                                        val = datetimeFmt.format((Date) val);
+                                    }
+
+                                    byte[] valBytes = Bytes.toBytes(val.toString());
+                                    if (columnItem.isRowKey()) {
+                                        row.setRowKey(valBytes);
+                                    } else {
+                                        row.addCell(columnItem.getFamily(), columnItem.getQualifier(), valBytes);
+                                    }
+                                } else {
+                                    PhType phType = PhType.getType(columnItem.getType());
+                                    if (columnItem.isRowKey()) {
+                                        row.setRowKey(PhTypeUtil.toBytes(val, phType));
+                                    } else {
+                                        row.addCell(columnItem.getFamily(),
+                                            columnItem.getQualifier(),
+                                            PhTypeUtil.toBytes(val, phType));
+                                    }
+                                }
+                            }
+                        }
+
+                        if (row.getRowKey() == null) throw new RuntimeException("RowKey 值为空");
+
+                        rows.add(row);
+                        complete = false;
+                        if (i % hbaseOrm.getCommitBatch() == 0 && !rows.isEmpty()) {
+                            hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+                            rows.clear();
+                            complete = true;
+                        }
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("import count:" + i);
+                        }
+                        // System.out.println(i);
+                        i++;
+                        successCount.incrementAndGet();
+                    }
+
+                    if (!complete && !rows.isEmpty()) {
+                        hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+                    }
+
+                } catch (Exception e) {
+                    logger.error(hbaseOrm.getHbaseTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(hbaseOrm.getHbaseTable() + " etl failed! ==>" + e.getMessage());
+                    throw new RuntimeException(e);
+                }
+                return i;
+            });
+            return true;
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+    }
 }

+ 6 - 15
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -1,23 +1,13 @@
 package com.alibaba.otter.canal.client.adapter.hbase.service;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.hbase.support.HRow;
-import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
-import com.alibaba.otter.canal.client.adapter.hbase.support.PhType;
-import com.alibaba.otter.canal.client.adapter.hbase.support.PhTypeUtil;
-import com.alibaba.otter.canal.client.adapter.hbase.support.Type;
-import com.alibaba.otter.canal.client.adapter.hbase.support.TypeUtil;
+import com.alibaba.otter.canal.client.adapter.hbase.support.*;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 
 /**
@@ -32,8 +22,8 @@ public class HbaseSyncService {
 
     private HbaseTemplate hbaseTemplate;                                    // HBase操作模板
 
-    public HbaseSyncService(Connection conn){
-        hbaseTemplate = new HbaseTemplate(conn);
+    public HbaseSyncService(HbaseTemplate hbaseTemplate){
+        this.hbaseTemplate = hbaseTemplate;
     }
 
     public void sync(MappingConfig config, Dml dml) {
@@ -375,7 +365,8 @@ public class HbaseSyncService {
      * @param value 值
      * @return 复合字段rowKey
      */
-    private static byte[] typeConvert(MappingConfig.ColumnItem columnItem, MappingConfig.HbaseOrm hbaseOrm, Object value) {
+    private static byte[] typeConvert(MappingConfig.ColumnItem columnItem, MappingConfig.HbaseOrm hbaseOrm,
+                                      Object value) {
         if (value == null) {
             return null;
         }

+ 12 - 0
client-adapter/launcher/pom.xml

@@ -50,6 +50,18 @@
             <artifactId>spring-boot-configuration-processor</artifactId>
             <optional>true</optional>
         </dependency>
+        <!-- 单独引入rocketmq依赖 -->
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>4.3.0</version>
+        </dependency>
+        <!-- 单独引入kafka依赖 -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>1.1.1</version>
+        </dependency>
 
         <!-- outer adapter jar with dependencies-->
         <dependency>

+ 5 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterConfig.java

@@ -4,6 +4,8 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
@@ -15,6 +17,8 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 @ConfigurationProperties(prefix = "adapter.conf")
 public class AdapterConfig {
 
+    private static Logger                 logger = LoggerFactory.getLogger(AdapterConfig.class);
+
     private Map<String, DatasourceConfig> datasourceConfigs;
 
     private List<String>                  adapterConfigs;
@@ -51,7 +55,7 @@ public class AdapterConfig {
                 try {
                     ds.init();
                 } catch (SQLException e) {
-                    throw new RuntimeException(e);
+                    logger.error("#Failed to initial datasource: " + datasourceConfig.getUrl(), e);
                 }
                 DatasourceConfig.DATA_SOURCES.put(entry.getKey(), ds);
             }

+ 47 - 45
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -7,6 +7,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -24,10 +26,10 @@ public abstract class AbstractCanalAdapterWorker {
 
     protected final Logger                    logger  = LoggerFactory.getLogger(this.getClass());
 
-    protected String                          canalDestination;                                                 // canal实例
-    protected List<List<OuterAdapter>>        canalOuterAdapters;                                               // 外部适配器
-    protected ExecutorService                 groupInnerExecutorService;                                        // 组内工作线程池
-    protected volatile boolean                running = false;                                                  // 是否运行中
+    protected String                          canalDestination;                                                // canal实例
+    protected List<List<OuterAdapter>>        canalOuterAdapters;                                              // 外部适配器
+    protected ExecutorService                 groupInnerExecutorService;                                       // 组内工作线程池
+    protected volatile boolean                running = false;                                                 // 是否运行中
     protected Thread                          thread  = null;
     protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
 
@@ -69,47 +71,47 @@ public abstract class AbstractCanalAdapterWorker {
         }
     }
 
-    // protected void writeOut(final FlatMessage flatMessage) {
-    // List<Future<Boolean>> futures = new ArrayList<>();
-    // // 组间适配器并行运行
-    // for (List<CanalOuterAdapter> outerAdapters : canalOuterAdapters) {
-    // final List<CanalOuterAdapter> adapters = outerAdapters;
-    // futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
-    //
-    // @Override
-    // public Boolean call() {
-    // try {
-    // // 组内适配器穿行运行,尽量不要配置组内适配器
-    // for (CanalOuterAdapter c : adapters) {
-    // long begin = System.currentTimeMillis();
-    // Dml dml = MessageUtil.flatMessage2Dml(flatMessage);
-    // c.writeOut(dml);
-    // if (logger.isDebugEnabled()) {
-    // logger.debug("{} elapsed time: {}",
-    // c.getClass().getName(),
-    // (System.currentTimeMillis() - begin));
-    // }
-    // }
-    // return true;
-    // } catch (Exception e) {
-    // return false;
-    // }
-    // }
-    // }));
-    //
-    // // 等待所有适配器写入完成
-    // // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
-    // for (Future<Boolean> f : futures) {
-    // try {
-    // if (!f.get()) {
-    // logger.error("Outer adapter write failed");
-    // }
-    // } catch (InterruptedException | ExecutionException e) {
-    // // ignore
-    // }
-    // }
-    // }
-    // }
+    protected void writeOut(final FlatMessage flatMessage) {
+        List<Future<Boolean>> futures = new ArrayList<>();
+        // 组间适配器并行运行
+        for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+            final List<OuterAdapter> adapters = outerAdapters;
+            futures.add(groupInnerExecutorService.submit(new Callable<Boolean>() {
+
+                @Override
+                public Boolean call() {
+                    try {
+                        // 组内适配器穿行运行,尽量不要配置组内适配器
+                        for (OuterAdapter c : adapters) {
+                            long begin = System.currentTimeMillis();
+                            Dml dml = MessageUtil.flatMessage2Dml(flatMessage);
+                            c.sync(dml);
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("{} elapsed time: {}",
+                                    c.getClass().getName(),
+                                    (System.currentTimeMillis() - begin));
+                            }
+                        }
+                        return true;
+                    } catch (Exception e) {
+                        return false;
+                    }
+                }
+            }));
+
+            // 等待所有适配器写入完成
+            // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
+            for (Future<Boolean> f : futures) {
+                try {
+                    if (!f.get()) {
+                        logger.error("Outer adapter write failed");
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    // ignore
+                }
+            }
+        }
+    }
 
     protected void writeOut(Message message, String topic) {
         if (logger.isDebugEnabled()) {

+ 148 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -0,0 +1,148 @@
+package com.alibaba.otter.canal.adapter.launcher.loader;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.common.errors.WakeupException;
+
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * kafka对应的client适配器工作线程
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
+
+    private KafkaCanalConnector connector;
+
+    private String              topic;
+
+    private boolean             flatMessage;
+
+    public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
+                                   List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        this.topic = topic;
+        this.canalDestination = topic;
+        this.flatMessage = flatMessage;
+        connector = KafkaCanalConnectors.newKafkaConnector(bootstrapServers, topic, null, groupId, flatMessage);
+        // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
+
+        // super.initSwitcher(topic);
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    process();
+                }
+            });
+            thread.setUncaughtExceptionHandler(handler);
+            running = true;
+            thread.start();
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (!running) {
+                return;
+            }
+
+            connector.stopRunning();
+            running = false;
+
+            // if (switcher != null && !switcher.state()) {
+            // switcher.set(true);
+            // }
+
+            if (thread != null) {
+                try {
+                    thread.join();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            groupInnerExecutorService.shutdown();
+            logger.info("topic {} connectors' worker thread dead!", this.topic);
+            for (List<OuterAdapter> outerAdapters : canalOuterAdapters) {
+                for (OuterAdapter adapter : outerAdapters) {
+                    adapter.destroy();
+                }
+            }
+            logger.info("topic {} all connectors destroyed!", this.topic);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        // final AtomicBoolean executing = new AtomicBoolean(true);
+        while (running) {
+            try {
+                logger.info("=============> Start to connect topic: {} <=============", this.topic);
+                connector.connect();
+                logger.info("=============> Start to subscribe topic: {} <=============", this.topic);
+                connector.subscribe();
+                logger.info("=============> Subscribe topic: {} succeed <=============", this.topic);
+                while (running) {
+                    try {
+                        // switcher.get(); //等待开关开启
+
+                        List<?> messages;
+                        if (!flatMessage) {
+                            messages = connector.getWithoutAck();
+                        } else {
+                            messages = connector.getFlatMessageWithoutAck(100L, TimeUnit.MILLISECONDS);
+                        }
+                        if (messages != null) {
+                            for (final Object message : messages) {
+                                if (message instanceof FlatMessage) {
+                                    writeOut((FlatMessage) message);
+                                } else {
+                                    writeOut((Message) message);
+                                }
+                            }
+                        }
+                        connector.ack();
+                    } catch (CommitFailedException e) {
+                        logger.warn(e.getMessage());
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                        TimeUnit.SECONDS.sleep(1L);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        executor.shutdown();
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.disconnect();
+        logger.info("=============> Disconnect topic: {} <=============", this.topic);
+    }
+}

+ 56 - 10
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -9,13 +9,14 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
-import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
 
 /**
  * 外部适配器的加载器
@@ -24,13 +25,15 @@ import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
  */
 public class CanalAdapterLoader {
 
-    private static final Logger                logger       = LoggerFactory.getLogger(CanalAdapterLoader.class);
+    private static final Logger                     logger        = LoggerFactory.getLogger(CanalAdapterLoader.class);
+
+    private CanalClientConfig                       canalClientConfig;
 
-    private CanalClientConfig                  canalClientConfig;
+    private Map<String, CanalAdapterWorker>         canalWorkers  = new HashMap<>();
 
-    private Map<String, CanalAdapterWorker>    canalWorkers = new HashMap<>();
+    private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
 
-    private ExtensionLoader<OuterAdapter> loader;
+    private ExtensionLoader<OuterAdapter>           loader;
 
     public CanalAdapterLoader(CanalClientConfig canalClientConfig){
         this.canalClientConfig = canalClientConfig;
@@ -57,7 +60,7 @@ public class CanalAdapterLoader {
 
                 for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
                     List<OuterAdapter> canalOutConnectors = new ArrayList<>();
-                    for (CanalOuterAdapterConfiguration c : connectorGroup.getOutAdapters()) {
+                    for (OuterAdapterConfig c : connectorGroup.getOutAdapters()) {
                         loadConnector(c, canalOutConnectors);
                     }
                     canalOuterAdapterGroups.add(canalOutConnectors);
@@ -73,9 +76,43 @@ public class CanalAdapterLoader {
                 logger.info("Start adapter for canal instance: {} succeed", instance.getInstance());
             }
         }
+
+        // 初始化canal-client-mq的适配器
+        if (canalClientConfig.getMqTopics() != null) {
+            for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
+                for (CanalClientConfig.Group group : topic.getGroups()) {
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+
+                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
+
+                    for (OuterAdapterConfig config : group.getOutAdapters()) {
+                        loadConnector(config, canalOuterAdapters);
+                    }
+                    canalOuterAdapterGroups.add(canalOuterAdapters);
+                    if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
+                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig
+                            .getBootstrapServers(), topic.getTopic(), group.getGroupId(), canalOuterAdapterGroups);
+                        canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
+                        rocketMQWorker.start();
+                    } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
+                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(
+                            canalClientConfig.getBootstrapServers(),
+                            topic.getTopic(),
+                            group.getGroupId(),
+                            canalOuterAdapterGroups,
+                            canalClientConfig.getFlatMessage());
+                        canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
+                        canalKafkaWorker.start();
+                    }
+                    logger.info("Start adapter for canal-client rocketmq topic: {} succeed",
+                        topic.getTopic() + "-" + group.getGroupId());
+
+                }
+            }
+        }
     }
 
-    private void loadConnector(CanalOuterAdapterConfiguration config, List<OuterAdapter> canalOutConnectors) {
+    private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
             OuterAdapter adapter = loader.getExtension(config.getName());
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
@@ -98,10 +135,19 @@ public class CanalAdapterLoader {
             ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());
             for (CanalAdapterWorker v : canalWorkers.values()) {
                 final CanalAdapterWorker caw = v;
-                stopExecutorService.submit(() -> caw.stop());
+                stopExecutorService.submit(caw::stop);
             }
             stopExecutorService.shutdown();
         }
+
+        if (canalMQWorker.size() > 0) {
+            ExecutorService stopMQWokerService = Executors.newFixedThreadPool(canalMQWorker.size());
+            for (AbstractCanalAdapterWorker tmp : canalMQWorker.values()) {
+                final AbstractCanalAdapterWorker worker = tmp;
+                stopMQWokerService.submit(worker::stop);
+            }
+            stopMQWokerService.shutdown();
+        }
         logger.info("All canal adapters destroyed");
     }
 }

+ 124 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -0,0 +1,124 @@
+package com.alibaba.otter.canal.adapter.launcher.loader;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.common.errors.WakeupException;
+
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.protocol.Message;
+
+/**
+ * kafka对应的client适配器工作线程
+ *
+ * @author machengyuan 2018-8-19 下午11:30:49
+ * @version 1.0.0
+ */
+public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
+
+    private RocketMQCanalConnector connector;
+
+    private String                 topic;
+
+    public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
+                                      List<List<OuterAdapter>> canalOuterAdapters){
+        logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
+        this.canalOuterAdapters = canalOuterAdapters;
+        this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
+        this.topic = topic;
+        this.canalDestination = topic;
+        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    process();
+                }
+            });
+            thread.setUncaughtExceptionHandler(handler);
+            running = true;
+            thread.start();
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (!running) {
+                return;
+            }
+            connector.stopRunning();
+            running = false;
+            logger.info("Stop topic {} out adapters begin", this.topic);
+            stopOutAdapters();
+            logger.info("Stop topic {} out adapters end", this.topic);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void process() {
+        while (!running)
+            ;
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        while (running) {
+            try {
+                logger.info("=============> Start to connect topic: {} <=============", this.topic);
+                connector.connect();
+                logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
+                connector.subscribe();
+                logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
+                while (running) {
+                    try {
+                        // switcher.get(); //等待开关开启
+
+                        final Message message = connector.getWithoutAck(1);
+                        if (message != null) {
+                            executor.submit(new Runnable() {
+
+                                @Override
+                                public void run() {
+                                    try {
+                                        writeOut(message, topic);
+                                    } catch (Exception e) {
+                                        logger.error(e.getMessage(), e);
+                                    }
+                                    connector.ack(message.getId());
+                                }
+                            });
+                        } else {
+                            logger.debug("Message is null");
+                        }
+                    } catch (CommitFailedException e) {
+                        logger.warn(e.getMessage());
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                        TimeUnit.SECONDS.sleep(1L);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        executor.shutdown();
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.stopRunning();
+        logger.info("=============> Disconnect topic: {} <=============", this.topic);
+    }
+}

+ 1 - 7
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -59,13 +59,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
     @Override
     public void start() {
         if (!running) {
-            thread = new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    process();
-                }
-            });
+            thread = new Thread(() -> process());
             thread.setUncaughtExceptionHandler(handler);
             thread.start();
             running = true;

+ 20 - 8
client-adapter/launcher/src/main/resources/application.yml

@@ -1,20 +1,32 @@
 server:
   port: 8081
 
+logging:
+  level:
+    com.alibaba.otter.canal.client.adapter.hbase: DEBUG
+
 canal.conf:
   canalServerHost: 127.0.0.1:11111
-  #zookeeperHosts: slave1:2181
-  #bootstrapServers: localhost:9092 #or rocketmq nameservers:host1:9876;host2:9876
-  flatMessage: false
-
+#  zookeeperHosts: slave1:2181
+#  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
+  flatMessage: true
   canalInstances:
   - instance: example
     adapterGroups:
     - outAdapters:
-      - name: logger
-#      - name: hbase
-#        hosts: slave1:2181
-#        properties: {znodeParent: "/hbase-unsecure"}
+#      - name: logger
+      - name: hbase
+        properties:
+          hbase.zookeeper.quorum: slave1
+          hbase.zookeeper.property.clientPort: 2181
+          zookeeper.znode.parent: /hbase-unsecure
+#  mqTopics:
+#  - mqMode: kafka
+#    topic: example
+#    groups:
+#    - groupId: g2
+#      outAdapters:
+#      - name: logger
 
 adapter.conf:
   datasourceConfigs:

+ 2 - 2
client-adapter/logger/src/main/java/com/alibaba/otter/canal/client/adapter/logger/LoggerAdapterExample.java

@@ -4,7 +4,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
-import com.alibaba.otter.canal.client.adapter.support.CanalOuterAdapterConfiguration;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
 
@@ -21,7 +21,7 @@ public class LoggerAdapterExample implements OuterAdapter {
     private Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Override
-    public void init(CanalOuterAdapterConfiguration configuration) {
+    public void init(OuterAdapterConfig configuration) {
 
     }