Просмотр исходного кода

HBase根据datasourceKey导入

mcy 6 лет назад
Родитель
Сommit
27dba9ec1b

+ 40 - 7
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -85,16 +85,49 @@ public class HbaseAdapter implements OuterAdapter {
 
     @Override
     public EtlResult etl(String task, List<String> params) {
+        EtlResult etlResult = new EtlResult();
         MappingConfig config = hbaseMapping.get(task);
-        DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
-        if (dataSource != null) {
-            return HbaseEtlService.importData(dataSource, hbaseTemplate, config, params);
+        if (config != null) {
+            DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
+            if (dataSource != null) {
+                return HbaseEtlService.importData(dataSource, hbaseTemplate, config, params);
+            } else {
+                etlResult.setSucceeded(false);
+                etlResult.setErrorMessage("DataSource not found");
+                return etlResult;
+            }
         } else {
-            EtlResult etlResult = new EtlResult();
-            etlResult.setSucceeded(false);
-            etlResult.setErrorMessage("DataSource not found");
-            return etlResult;
+            DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(task);
+            if (dataSource != null) {
+                StringBuilder resultMsg = new StringBuilder();
+                boolean resSucc = true;
+                // ds不为空说明传入的是datasourceKey
+                for (MappingConfig configTmp : hbaseMapping.values()) {
+                    // 取所有的datasourceKey为task的配置
+                    if (configTmp.getDataSourceKey().equals(task)) {
+                        EtlResult etlRes = HbaseEtlService.importData(dataSource, hbaseTemplate, configTmp, params);
+                        if (!etlRes.getSucceeded()) {
+                            resSucc = false;
+                            resultMsg.append(etlRes.getErrorMessage()).append("\n");
+                        } else {
+                            resultMsg.append(etlRes.getResultMessage()).append("\n");
+                        }
+                    }
+                }
+                if (resultMsg.length() > 0) {
+                    etlResult.setSucceeded(resSucc);
+                    if (resSucc) {
+                        etlResult.setResultMessage(resultMsg.toString());
+                    } else {
+                        etlResult.setErrorMessage(resultMsg.toString());
+                    }
+                    return etlResult;
+                }
+            }
         }
+        etlResult.setSucceeded(false);
+        etlResult.setErrorMessage("Task not found");
+        return etlResult;
     }
 
     @Override

+ 4 - 5
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -323,12 +323,11 @@ public class HbaseEtlService {
                             rows.clear();
                             complete = true;
                         }
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("import count:" + i);
-                        }
-                        // System.out.println(i);
                         i++;
                         successCount.incrementAndGet();
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("successful import count:" + successCount.get());
+                        }
                     }
 
                     if (!complete && !rows.isEmpty()) {
@@ -338,7 +337,7 @@ public class HbaseEtlService {
                 } catch (Exception e) {
                     logger.error(hbaseOrm.getHbaseTable() + " etl failed! ==>" + e.getMessage(), e);
                     errMsg.add(hbaseOrm.getHbaseTable() + " etl failed! ==>" + e.getMessage());
-                    throw new RuntimeException(e);
+                    // throw new RuntimeException(e);
                 }
                 return i;
             });

+ 3 - 3
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -23,11 +23,11 @@ public class CommonRest {
     }
 
     /**
-     * Demo: POST: http://127.0.0.1:8081/etl/hbase/mytest_person2.yml
+     * Demo: curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST -d "params=0,1,2"
      */
     @PostMapping("/etl/{type}/{task}")
     public EtlResult etl(@PathVariable String type, @PathVariable String task,
-                         @RequestParam(required = false) String params) {
+                         @RequestParam(name = "params", required = false) String params) {
         OuterAdapter adapter = loader.getExtension(type);
         List<String> paramArr = null;
         if (params != null) {
@@ -39,7 +39,7 @@ public class CommonRest {
     }
 
     /**
-     * Demo: GET: http://127.0.0.1:8081/count/hbase/mytest_person2.yml
+     * Demo: curl http://127.0.0.1:8081/count/hbase/mytest_person2.yml
      */
     @GetMapping("/count/{type}/{task}")
     public Map<String, Object> count(@PathVariable String type, @PathVariable String task) {