Bläddra i källkod

修改HBase适配器配置

mcy 6 år sedan
förälder
incheckning
0d0fcbbc0b

+ 9 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -152,6 +152,7 @@ public class MappingConfig {
         private boolean                 autoCreateTable    = false;                 // 同步时HBase中表不存在的情况下自动建表
         private String                  rowKey;                                     // 指定复合主键为rowKey
         private Map<String, String>     columns;                                    // 字段映射
+        private List<String>            excludeColumns;                             // 不映射的字段
         private ColumnItem              rowKeyColumn;                               // rowKey字段
         private String                  etlCondition;                               // etl条件sql
 
@@ -292,6 +293,14 @@ public class MappingConfig {
             }
         }
 
+        public List<String> getExcludeColumns() {
+            return excludeColumns;
+        }
+
+        public void setExcludeColumns(List<String> excludeColumns) {
+            this.excludeColumns = excludeColumns;
+        }
+
         public String getFamily() {
             return family;
         }

+ 6 - 0
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -110,6 +110,9 @@ public class HbaseSyncService {
         Map<String, MappingConfig.ColumnItem> columnItems = hbaseOrm.getColumnItems();
         int i = 0;
         for (Map.Entry<String, Object> entry : data.entrySet()) {
+            if (hbaseOrm.getExcludeColumns() != null && hbaseOrm.getExcludeColumns().contains(entry.getKey())) {
+                continue;
+            }
             if (entry.getValue() != null) {
                 MappingConfig.ColumnItem columnItem = columnItems.get(entry.getKey());
 
@@ -196,6 +199,9 @@ public class HbaseSyncService {
             Map<String, MappingConfig.ColumnItem> columnItems = hbaseOrm.getColumnItems();
             HRow hRow = new HRow(rowKeyBytes);
             for (String updateColumn : old.get(index).keySet()) {
+                if (hbaseOrm.getExcludeColumns() != null && hbaseOrm.getExcludeColumns().contains(updateColumn)) {
+                    continue;
+                }
                 MappingConfig.ColumnItem columnItem = columnItems.get(updateColumn);
                 if (columnItem == null) {
                     String family = hbaseOrm.getFamily();

+ 2 - 0
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -17,6 +17,8 @@ hbaseOrm:
     type: $DECIMAL
     c_time: C_TIME$UNSIGNED_TIMESTAMP
     birthday: BIRTHDAY$DATE
+  excludeColumns:
+    - lat
 
 # -- NATIVE类型
 # $DEFAULT

+ 14 - 14
client-adapter/launcher/src/main/resources/application.yml

@@ -23,12 +23,12 @@ canal.conf:
   - instance: example
     adapterGroups:
     - outAdapters:
-      - name: logger
-#      - name: hbase
-#        properties:
-#          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
-#          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
-#          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
+#      - name: logger
+      - name: hbase
+        properties:
+          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
+          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
+          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
 #  mqTopics:
 #  - mqMode: kafka
 #    topic: example
@@ -37,11 +37,11 @@ canal.conf:
 #      outAdapters:
 #      - name: logger
 
-#adapter.conf:
-#  datasourceConfigs:
-#    defaultDS:
-#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
-#      username: root
-#      password: 121212
-#  adapterConfigs:
-#  - hbase/mytest_person2.yml
+adapter.conf:
+  datasourceConfigs:
+    defaultDS:
+      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+      username: root
+      password: 121212
+  adapterConfigs:
+  - hbase/mytest_person2.yml