Quellcode durchsuchen

HBase的etl初步完成

mcy vor 6 Jahren
Ursprung
Commit
30a11c75cb

+ 11 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.client.adapter;
 
 import java.util.List;
+import java.util.Map;
 
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
@@ -42,7 +43,16 @@ public interface OuterAdapter {
      * @param params etl筛选条件
      */
     default EtlResult etl(String task, List<String> params) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("unsupported operation");
     }
 
+    /**
+     * 计算总数
+     * 
+     * @param task 任务名, 对应配置名
+     * @return 总数
+     */
+    default Map<String, Object> count(String task) {
+        throw new UnsupportedOperationException("unsupported operation");
+    }
 }

+ 50 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Result.java

@@ -0,0 +1,50 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class Result implements Serializable {
+
+    public Integer code    = 20000;
+    public Object  data;
+    public String  message;
+    public Date    sysTime = new Date();
+
+    public static Result createSuccess(Object data) {
+        Result result = new Result();
+        result.setData(data);
+        return result;
+    }
+
+    public Integer getCode() {
+        return code;
+    }
+
+    public void setCode(Integer code) {
+        this.code = code;
+    }
+
+    public Object getData() {
+        return data;
+    }
+
+    public void setData(Object data) {
+        this.data = data;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public Date getSysTime() {
+        return sysTime;
+    }
+
+    public void setSysTime(Date sysTime) {
+        this.sysTime = sysTime;
+    }
+}

+ 1 - 1
client-adapter/example/pom.xml

@@ -9,7 +9,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.alibaba.otter</groupId>
-    <artifactId>example</artifactId>
+    <artifactId>client-adapter.example</artifactId>
     <packaging>jar</packaging>
     <name>canal client adapter example module for otter ${project.version}</name>
 

+ 34 - 5
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.client.adapter.hbase;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -9,8 +10,11 @@ import javax.sql.DataSource;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
@@ -29,12 +33,15 @@ import com.alibaba.otter.canal.client.adapter.support.*;
 @SPI("hbase")
 public class HbaseAdapter implements OuterAdapter {
 
-    private static volatile Map<String, MappingConfig> hbaseMapping       = null; // 文件名对应配置
-    private static volatile Map<String, MappingConfig> mappingConfigCache = null; // 库名-表名对应配置
+    private static Logger                              logger             = LoggerFactory.getLogger(HbaseAdapter.class);
+
+    private static volatile Map<String, MappingConfig> hbaseMapping       = null;                                       // 文件名对应配置
+    private static volatile Map<String, MappingConfig> mappingConfigCache = null;                                       // 库名-表名对应配置
 
     private Connection                                 conn;
     private HbaseSyncService                           hbaseSyncService;
     private HbaseTemplate                              hbaseTemplate;
+    private Configuration                              hbaseConfig;
 
     @Override
     public void init(OuterAdapterConfig configuration) {
@@ -55,7 +62,7 @@ public class HbaseAdapter implements OuterAdapter {
 
             Map<String, String> propertites = configuration.getProperties();
 
-            Configuration hbaseConfig = HBaseConfiguration.create();
+            hbaseConfig = HBaseConfiguration.create();
             propertites.forEach(hbaseConfig::set);
             conn = ConnectionFactory.createConnection(hbaseConfig);
             hbaseTemplate = new HbaseTemplate(conn);
@@ -90,6 +97,28 @@ public class HbaseAdapter implements OuterAdapter {
         }
     }
 
+    @Override
+    public Map<String, Object> count(String task) {
+        MappingConfig config = hbaseMapping.get(task);
+        String hbaseTable = config.getHbaseOrm().getHbaseTable();
+        long rowCount = 0L;
+        try {
+            HTable table = new HTable(hbaseConfig, hbaseTable);
+            Scan scan = new Scan();
+            scan.setFilter(new FirstKeyOnlyFilter());
+            ResultScanner resultScanner = table.getScanner(scan);
+            for (Result result : resultScanner) {
+                rowCount += result.size();
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+        Map<String, Object> res = new LinkedHashMap<>();
+        res.put("hbaseTable", hbaseTable);
+        res.put("count", rowCount);
+        return res;
+    }
+
     @Override
     public void destroy() {
         if (conn != null) {

+ 3 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -72,6 +72,7 @@ public class MappingConfigLoader {
                 String[] dbTable;
                 if (dsKey == null) {
                     dbTable = srcMeta.split("\\.");
+
                 } else {
                     dbTable = srcMeta.split("@")[0].split("\\.");
                 }
@@ -84,7 +85,7 @@ public class MappingConfigLoader {
                     hbaseOrm.setAutoCreateTable(true);
                     hbaseOrm.setDatabase(dbTable[0]);
                     hbaseOrm.setTable(dbTable[1]);
-                    hbaseOrm.setMode(MappingConfig.Mode.STRING);
+                    hbaseOrm.setMode(MappingConfig.Mode.PHOENIX);
                     hbaseOrm.setRowKey(rowKey);
                     // 有定义rowKey
                     if (rowKey != null) {
@@ -94,6 +95,7 @@ public class MappingConfigLoader {
                         hbaseOrm.setRowKeyColumn(columnItem);
                     }
                     config.setHbaseOrm(hbaseOrm);
+                    config.setDataSourceKey(dsKey);
 
                 } else {
                     throw new RuntimeException(String.format("配置项[%s]内容为空, 或格式不符合database.table", c));

+ 1 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -68,7 +68,7 @@ public class HbaseEtlService {
             // 判断hbase表是否存在,不存在则建表
             MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
             if (!hbaseTemplate.tableExists(hbaseOrm.getHbaseTable())) {
-                hbaseTemplate.createTable(hbaseOrm.getHbaseTable(), hbaseOrm.getFamilies().toArray(new String[0]));
+                hbaseTemplate.createTable(hbaseOrm.getHbaseTable(), hbaseOrm.getFamily());
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);

+ 1 - 1
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -3,7 +3,7 @@ hbaseOrm:
   mode: PHOENIX  #NATIVE   #STRING
   database: mytest  # 数据库名
   table: person2     # 数据库表名
-  hbaseTable: MYTEST_PERSON2   # HBase表名
+  hbaseTable: MYTEST.PERSON2   # HBase表名
   family: CF  # 默认统一Family名称
   uppercaseQualifier: true  # 字段名转大写, 默认为true
   commitBatch: 3000 # 批量提交的大小

+ 1 - 1
client-adapter/launcher/pom.xml

@@ -9,7 +9,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.alibaba.otter</groupId>
-    <artifactId>launcher</artifactId>
+    <artifactId>client-adapter.launcher</artifactId>
     <packaging>jar</packaging>
     <name>canal client adapter launcher module for otter ${project.version}</name>
 

+ 16 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/EtlRest.java → client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -2,20 +2,18 @@ package com.alibaba.otter.canal.adapter.launcher.rest;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import javax.annotation.PostConstruct;
 
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
 
 @RestController
-public class EtlRest {
+public class CommonRest {
 
     private ExtensionLoader<OuterAdapter> loader;
 
@@ -25,9 +23,9 @@ public class EtlRest {
     }
 
     /**
-     * Demo: http://127.0.0.1:8081/etl/hbase/mytest_person2.yml
+     * Demo: POST: http://127.0.0.1:8081/etl/hbase/mytest_person2.yml
      */
-    @GetMapping("/etl/{type}/{task}")
+    @PostMapping("/etl/{type}/{task}")
     public EtlResult etl(@PathVariable String type, @PathVariable String task,
                          @RequestParam(required = false) String params) {
         OuterAdapter adapter = loader.getExtension(type);
@@ -37,6 +35,15 @@ public class EtlRest {
             paramArr = Arrays.asList(parmaArray);
         }
 
-        return  adapter.etl(task, paramArr);
+        return adapter.etl(task, paramArr);
+    }
+
+    /**
+     * Demo: GET: http://127.0.0.1:8081/count/hbase/mytest_person2.yml
+     */
+    @GetMapping("/count/{type}/{task}")
+    public Map<String, Object> count(@PathVariable String type, @PathVariable String task) {
+        OuterAdapter adapter = loader.getExtension(type);
+        return adapter.count(task);
     }
 }

+ 3 - 2
client-adapter/launcher/src/main/resources/application.yml

@@ -18,7 +18,7 @@ canal.conf:
   - instance: example
     adapterGroups:
     - outAdapters:
-#      - name: logger
+      - name: logger
       - name: hbase
         properties:
           hbase.zookeeper.quorum: slave1
@@ -39,4 +39,5 @@ adapter.conf:
       username: root
       password: 121212
   adapterConfigs:
-  - hbase/mytest_person2.yml
+  - hbase/mytest_person2.yml
+#  - hbase/mytest.person2@defaultDS