Forráskód Böngészése

HBase etl锁定同步等待

mcy 6 éve
szülő
commit
42a4abe4c7

+ 10 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -55,4 +55,14 @@ public interface OuterAdapter {
     default Map<String, Object> count(String task) {
         throw new UnsupportedOperationException("unsupported operation");
     }
+
+    /**
+     * 通过task获取对应的destination
+     * 
+     * @param task 任务名, 对应配置名
+     * @return destination
+     */
+    default String getDestination(String task) {
+        return null;
+    }
 }

+ 9 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -162,4 +162,13 @@ public class HbaseAdapter implements OuterAdapter {
             }
         }
     }
+
+    @Override
+    public String getDestination(String task) {
+        MappingConfig config = hbaseMapping.get(task);
+        if (config != null && config.getHbaseOrm() != null) {
+            return config.getHbaseOrm().getDestination();
+        }
+        return null;
+    }
 }

+ 9 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -146,6 +146,7 @@ public class MappingConfig {
     public static class HbaseOrm {
 
         private Mode                    mode               = Mode.STRING;
+        private String                  destination;
         private String                  database;
         private String                  table;
         private String                  hbaseTable;
@@ -170,6 +171,14 @@ public class MappingConfig {
             this.mode = mode;
         }
 
+        public String getDestination() {
+            return destination;
+        }
+
+        public void setDestination(String destination) {
+            this.destination = destination;
+        }
+
         public String getDatabase() {
             return database;
         }

+ 1 - 0
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -1,6 +1,7 @@
 dataSourceKey: defaultDS
 hbaseOrm:
   mode: PHOENIX  #NATIVE   #STRING
+  destination: example
   database: mytest  # 数据库名
   table: person2     # 数据库表名
   hbaseTable: MYTEST.PERSON2   # HBase表名

+ 44 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/rest/CommonRest.java

@@ -32,24 +32,43 @@ public class CommonRest {
     }
 
     /**
-     * Demo: curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST -d
-     * "params=0,1,2"
+     * ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST
+     * 
+     * @param type 类型 hbase, es
+     * @param task 任务名对应配置文件名 mytest_person2.yml
+     * @param params etl where条件参数, 为空全部导入
+     * @return
      */
     @PostMapping("/etl/{type}/{task}")
     public EtlResult etl(@PathVariable String type, @PathVariable String task,
                          @RequestParam(name = "params", required = false) String params) {
         OuterAdapter adapter = loader.getExtension(type);
-        List<String> paramArr = null;
-        if (params != null) {
-            String[] parmaArray = params.trim().split(";");
-            paramArr = Arrays.asList(parmaArray);
+        String destination = adapter.getDestination(task);
+        Boolean oriSwithcStatus = null;
+        if (destination != null) {
+            oriSwithcStatus = syncSwitch.status(destination);
+            syncSwitch.off(destination);
+        }
+        try {
+            List<String> paramArr = null;
+            if (params != null) {
+                String[] parmaArray = params.trim().split(";");
+                paramArr = Arrays.asList(parmaArray);
+            }
+            return adapter.etl(task, paramArr);
+        } finally {
+            if (destination != null && oriSwithcStatus != null && oriSwithcStatus) {
+                syncSwitch.on(destination);
+            }
         }
-
-        return adapter.etl(task, paramArr);
     }
 
     /**
-     * Demo: curl http://127.0.0.1:8081/count/hbase/mytest_person2.yml
+     * 统计总数 curl http://127.0.0.1:8081/count/hbase/mytest_person2.yml
+     * 
+     * @param type 类型 hbase, es
+     * @param task 任务名对应配置文件名 mytest_person2.yml
+     * @return
      */
     @GetMapping("/count/{type}/{task}")
     public Map<String, Object> count(@PathVariable String type, @PathVariable String task) {
@@ -57,6 +76,9 @@ public class CommonRest {
         return adapter.count(task);
     }
 
+    /**
+     * 返回所有实例 curl http://127.0.0.1:8081/destinations
+     */
     @GetMapping("/destinations")
     public List<Map<String, String>> destinations() {
         List<Map<String, String>> result = new ArrayList<>();
@@ -77,6 +99,13 @@ public class CommonRest {
         return result;
     }
 
+    /**
+     * 实例同步开关 curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
+     * 
+     * @param destination 实例名称
+     * @param status 开关状态: off on
+     * @return
+     */
     @PutMapping("/syncSwitch/{destination}/{status}")
     public Result etl(@PathVariable String destination, @PathVariable String status) {
         if (status.equals("on")) {
@@ -95,6 +124,12 @@ public class CommonRest {
         }
     }
 
+    /**
+     * 获取实例开关状态 curl http://127.0.0.1:8081/syncSwitch/example
+     * 
+     * @param destination 实例名称
+     * @return
+     */
     @GetMapping("/syncSwitch/{destination}")
     public Map<String, String> etl(@PathVariable String destination) {
         Boolean status = syncSwitch.status(destination);