Quellcode durchsuchen

HBase的etl初步完成

mcy vor 6 Jahren
Ursprung
Commit
49caff5759

+ 3 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -2,8 +2,9 @@ package com.alibaba.otter.canal.client.adapter;
 
 import java.util.List;
 
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
 import com.alibaba.otter.canal.client.adapter.support.SPI;
 
 /**
@@ -40,7 +41,7 @@ public interface OuterAdapter {
      * @param task 任务名, 对应配置名
      * @param params etl筛选条件
      */
-    default void etl(String task, List<String> params) {
+    default EtlResult etl(String task, List<String> params) {
         throw new UnsupportedOperationException();
     }
 

+ 37 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/EtlResult.java

@@ -0,0 +1,37 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import java.io.Serializable;
+
+public class EtlResult implements Serializable {
+    private static final long serialVersionUID = 4250522736289866505L;
+
+    private boolean succeeded = false;
+
+    private String resultMessage;
+
+    private String errorMessage;
+
+    public boolean getSucceeded() {
+        return succeeded;
+    }
+
+    public void setSucceeded(boolean succeeded) {
+        this.succeeded = succeeded;
+    }
+
+    public String getResultMessage() {
+        return resultMessage;
+    }
+
+    public void setResultMessage(String resultMessage) {
+        this.resultMessage = resultMessage;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+}

+ 8 - 6
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -18,10 +18,7 @@ import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfigLoader;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
 import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
 import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
+import com.alibaba.otter.canal.client.adapter.support.*;
 
 /**
  * HBase外部适配器
@@ -80,11 +77,16 @@ public class HbaseAdapter implements OuterAdapter {
     }
 
     @Override
-    public void etl(String task, List<String> params) {
+    public EtlResult etl(String task, List<String> params) {
         MappingConfig config = hbaseMapping.get(task);
         DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
         if (dataSource != null) {
-            HbaseEtlService.importData(dataSource, hbaseTemplate, config, params);
+            return HbaseEtlService.importData(dataSource, hbaseTemplate, config, params);
+        } else {
+            EtlResult etlResult = new EtlResult();
+            etlResult.setSucceeded(false);
+            etlResult.setErrorMessage("DataSource not found");
+            return etlResult;
         }
     }
 

+ 18 - 18
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.hbase.support.*;
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.JdbcTypeUtil;
 import com.google.common.base.Joiner;
 
@@ -75,19 +76,19 @@ public class HbaseEtlService {
         }
     }
 
-    public static void importData(DataSource ds, HbaseTemplate hbaseTemplate, MappingConfig config,
-                                  List<String> params) {
-        // EtlResult etlResult = new EtlResult();
+    public static EtlResult importData(DataSource ds, HbaseTemplate hbaseTemplate, MappingConfig config,
+                                       List<String> params) {
+        EtlResult etlResult = new EtlResult();
         AtomicLong successCount = new AtomicLong();
         List<String> errMsg = new ArrayList<>();
         String hbaseTable = "";
         try {
-            // if (config == null) {
-            // logger.error("Config is null!");
-            // etlResult.setSucceeded(false);
-            // etlResult.setErrorMessage("Config is null!");
-            // return etlResult;
-            // }
+            if (config == null) {
+                logger.error("Config is null!");
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("Config is null!");
+                return etlResult;
+            }
             MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
             hbaseTable = hbaseOrm.getHbaseTable();
 
@@ -158,7 +159,7 @@ public class HbaseEtlService {
 
             // 当大于1万条记录时开启多线程
             if (cnt >= 10000) {
-                int threadCount = 3; // TODO 从配置读取默认为3
+                int threadCount = 3;
                 long perThreadCnt = cnt / threadCount;
                 ExecutorService executor = Executors.newFixedThreadPool(threadCount);
                 List<Future<Boolean>> futures = new ArrayList<>(threadCount);
@@ -191,19 +192,18 @@ public class HbaseEtlService {
             logger.info(
                 hbaseOrm.getHbaseTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
 
-            // etlResult.setResultMessage("导入HBase表 " + hbaseOrm.getHbaseTable() + " 数据:" +
-            // successCount.get() + " 条");
+            etlResult.setResultMessage("导入HBase表 " + hbaseOrm.getHbaseTable() + " 数据:" + successCount.get() + " 条");
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
         }
 
-        // if (errMsg.isEmpty()) {
-        // etlResult.setSucceeded(true);
-        // } else {
-        // etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
-        // }
-        // return etlResult;
+        if (errMsg.isEmpty()) {
+            etlResult.setSucceeded(true);
+        } else {
+            etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
+        }
+        return etlResult;
     }
 
     private static boolean executeSqlImport(DataSource ds, String sql, MappingConfig.HbaseOrm hbaseOrm,

+ 42 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/EtlRest.java

@@ -0,0 +1,42 @@
+package com.alibaba.otter.canal.adapter.launcher.rest;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+
+import com.alibaba.otter.canal.client.adapter.support.EtlResult;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.ExtensionLoader;
+
+@RestController
+public class EtlRest {
+
+    private ExtensionLoader<OuterAdapter> loader;
+
+    @PostConstruct
+    public void init() {
+        loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);
+    }
+
+    /**
+     * Demo: http://127.0.0.1:8081/etl/hbase/mytest_person2.yml
+     */
+    @GetMapping("/etl/{type}/{task}")
+    public EtlResult etl(@PathVariable String type, @PathVariable String task,
+                         @RequestParam(required = false) String params) {
+        OuterAdapter adapter = loader.getExtension(type);
+        List<String> paramArr = null;
+        if (params != null) {
+            String[] parmaArray = params.trim().split(";");
+            paramArr = Arrays.asList(parmaArray);
+        }
+
+        return  adapter.etl(task, paramArr);
+    }
+}

+ 5 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -1,9 +1,13 @@
 server:
   port: 8081
-
 logging:
   level:
     com.alibaba.otter.canal.client.adapter.hbase: DEBUG
+spring:
+  jackson:
+    date-format: yyyy-MM-dd HH:mm:ss
+    time-zone: GMT+8
+    default-property-inclusion: non_null
 
 canal.conf:
   canalServerHost: 127.0.0.1:11111