Browse Source

支持hbase rowkey指定数字长度

mcy 6 years ago
parent
commit
bc298bedd5

+ 19 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -76,6 +76,7 @@ public class MappingConfig {
     public static class ColumnItem {
 
         private boolean isRowKey = false;
+        private Integer rowKeyLen;
         private String  column;
         private String  family;
         private String  qualifier;
@@ -89,6 +90,14 @@ public class MappingConfig {
             isRowKey = rowKey;
         }
 
+        public Integer getRowKeyLen() {
+            return rowKeyLen;
+        }
+
+        public void setRowKeyLen(Integer rowKeyLen) {
+            this.rowKeyLen = rowKeyLen;
+        }
+
         public String getColumn() {
             return column;
         }
@@ -264,7 +273,16 @@ public class MappingConfig {
                     ColumnItem columnItem = new ColumnItem();
                     columnItem.setColumn(columnField.getKey());
                     columnItem.setType(type);
-                    if ("rowKey".equalsIgnoreCase(field)) {
+                    if (field != null && field.toUpperCase().startsWith("ROWKEY")) {
+                        int idx = field.toUpperCase().indexOf("LEN:");
+                        if (idx > -1) {
+                            String len = field.substring(idx + 4);
+                            try {
+                                columnItem.setRowKeyLen(Integer.parseInt(len));
+                            } catch (Exception e) {
+                                // ignore
+                            }
+                        }
                         columnItem.setRowKey(true);
                         rowKeyColumn = columnItem;
                     } else {

+ 55 - 12
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -199,8 +199,8 @@ public class HbaseEtlService {
                     } else {
                         sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                     }
-                    Future<Boolean> future = executor
-                        .submit(() -> executeSqlImport(ds, sqlFinal, hbaseMapping, hbaseTemplate, successCount, errMsg));
+                    Future<Boolean> future = executor.submit(
+                        () -> executeSqlImport(ds, sqlFinal, hbaseMapping, hbaseTemplate, successCount, errMsg));
                     futures.add(future);
                 }
 
@@ -213,8 +213,8 @@ public class HbaseEtlService {
                 executeSqlImport(ds, sql, hbaseMapping, hbaseTemplate, successCount, errMsg);
             }
 
-            logger.info(
-                hbaseMapping.getHbaseTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+            logger.info(hbaseMapping.getHbaseTable() + " etl completed in: "
+                        + (System.currentTimeMillis() - start) / 1000 + "s!");
 
             etlResult.setResultMessage("导入HBase表 " + hbaseMapping.getHbaseTable() + " 数据:" + successCount.get() + " 条");
         } catch (Exception e) {
@@ -332,18 +332,49 @@ public class HbaseEtlService {
 
                                     byte[] valBytes = Bytes.toBytes(val.toString());
                                     if (columnItem.isRowKey()) {
-                                        row.setRowKey(valBytes);
+                                        if (columnItem.getRowKeyLen() != null) {
+                                            valBytes = Bytes.toBytes(limitLenNum(columnItem.getRowKeyLen(), val));
+                                            row.setRowKey(valBytes);
+                                        } else {
+                                            row.setRowKey(valBytes);
+                                        }
                                     } else {
                                         row.addCell(columnItem.getFamily(), columnItem.getQualifier(), valBytes);
                                     }
                                 } else {
-                                    PhType phType = PhType.getType(columnItem.getType());
-                                    if (columnItem.isRowKey()) {
-                                        row.setRowKey(PhTypeUtil.toBytes(val, phType));
-                                    } else {
-                                        row.addCell(columnItem.getFamily(),
-                                            columnItem.getQualifier(),
-                                            PhTypeUtil.toBytes(val, phType));
+                                    if (MappingConfig.Mode.STRING == hbaseMapping.getMode()) {
+                                        byte[] valBytes = Bytes.toBytes(val.toString());
+                                        if (columnItem.isRowKey()) {
+                                            if (columnItem.getRowKeyLen() != null) {
+                                                valBytes = Bytes.toBytes(limitLenNum(columnItem.getRowKeyLen(), val));
+                                            }
+                                            row.setRowKey(valBytes);
+                                        } else {
+                                            row.addCell(columnItem.getFamily(), columnItem.getQualifier(), valBytes);
+                                        }
+                                    } else if (MappingConfig.Mode.NATIVE == hbaseMapping.getMode()) {
+                                        Type type = Type.getType(columnItem.getType());
+                                        if (columnItem.isRowKey()) {
+                                            if (columnItem.getRowKeyLen() != null) {
+                                                String v = limitLenNum(columnItem.getRowKeyLen(), val);
+                                                row.setRowKey(Bytes.toBytes(v));
+                                            } else {
+                                                row.setRowKey(TypeUtil.toBytes(val, type));
+                                            }
+                                        } else {
+                                            row.addCell(columnItem.getFamily(),
+                                                columnItem.getQualifier(),
+                                                TypeUtil.toBytes(val, type));
+                                        }
+                                    } else if (MappingConfig.Mode.PHOENIX == hbaseMapping.getMode()) {
+                                        PhType phType = PhType.getType(columnItem.getType());
+                                        if (columnItem.isRowKey()) {
+                                            row.setRowKey(PhTypeUtil.toBytes(val, phType));
+                                        } else {
+                                            row.addCell(columnItem.getFamily(),
+                                                columnItem.getQualifier(),
+                                                PhTypeUtil.toBytes(val, phType));
+                                        }
                                     }
                                 }
                             }
@@ -382,4 +413,16 @@ public class HbaseEtlService {
             return false;
         }
     }
+
+    private static String limitLenNum(int len, Object val) {
+        if (val == null) {
+            return null;
+        }
+        if (val instanceof Number) {
+            return String.format("%0" + len + "d", (Number) ((Number) val).longValue());
+        } else if (val instanceof String) {
+            return String.format("%0" + len + "d", Long.parseLong((String) val));
+        }
+        return null;
+    }
 }

+ 37 - 6
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -132,7 +132,21 @@ public class HbaseSyncService {
                     }
                 } else {
                     if (columnItem.isRowKey()) {
-                        // row.put("rowKey", bytes);
+                        if (columnItem.getRowKeyLen() != null && entry.getValue() != null) {
+                            if (entry.getValue() instanceof Number) {
+                                String v = String.format("%0" + columnItem.getRowKeyLen() + "d",
+                                    ((Number) entry.getValue()).longValue());
+                                bytes = Bytes.toBytes(v);
+                            } else {
+                                try {
+                                    String v = String.format("%0" + columnItem.getRowKeyLen() + "d",
+                                        Integer.parseInt((String) entry.getValue()));
+                                    bytes = Bytes.toBytes(v);
+                                } catch (Exception e) {
+                                    // ignore
+                                }
+                            }
+                        }
                         hRow.setRowKey(bytes);
                     } else {
                         hRow.addCell(columnItem.getFamily(), columnItem.getQualifier(), bytes);
@@ -146,8 +160,8 @@ public class HbaseSyncService {
     /**
      * 更新操作
      * 
-     * @param config
-     * @param dml
+     * @param config 配置对象
+     * @param dml dml对象
      */
     private void update(MappingConfig config, Dml dml) {
         List<Map<String, Object>> data = dml.getData();
@@ -192,7 +206,7 @@ public class HbaseSyncService {
                 Map<String, Object> rowKey = data.get(0);
                 rowKeyBytes = typeConvert(null, hbaseMapping, rowKey.values().iterator().next());
             } else {
-                rowKeyBytes = typeConvert(rowKeyColumn, hbaseMapping, r.get(rowKeyColumn.getColumn()));
+                rowKeyBytes = getRowKeyBytes(hbaseMapping, rowKeyColumn, r);
             }
             if (rowKeyBytes == null) throw new RuntimeException("rowKey值为空");
 
@@ -277,8 +291,7 @@ public class HbaseSyncService {
                 Map<String, Object> rowKey = data.get(0);
                 rowKeyBytes = typeConvert(null, hbaseMapping, rowKey.values().iterator().next());
             } else {
-                Object val = r.get(rowKeyColumn.getColumn());
-                rowKeyBytes = typeConvert(rowKeyColumn, hbaseMapping, val);
+                rowKeyBytes = getRowKeyBytes(hbaseMapping, rowKeyColumn, r);
             }
             if (rowKeyBytes == null) throw new RuntimeException("rowKey值为空");
             rowKeys.add(rowKeyBytes);
@@ -424,4 +437,22 @@ public class HbaseSyncService {
         return rowKeyValue.toString();
     }
 
+    private static byte[] getRowKeyBytes(MappingConfig.HbaseMapping hbaseMapping, MappingConfig.ColumnItem rowKeyColumn,
+                                         Map<String, Object> rowData) {
+        Object val = rowData.get(rowKeyColumn.getColumn());
+        String v = null;
+        if (rowKeyColumn.getRowKeyLen() != null) {
+            if (val instanceof Number) {
+                v = String.format("%0" + rowKeyColumn.getRowKeyLen() + "d", (Number) ((Number) val).longValue());
+            } else if (val instanceof String) {
+                v = String.format("%0" + rowKeyColumn.getRowKeyLen() + "d", Long.parseLong((String) val));
+            }
+        }
+        if (v != null) {
+            return Bytes.toBytes(v);
+        } else {
+            return typeConvert(rowKeyColumn, hbaseMapping, val);
+        }
+    }
+
 }

+ 7 - 7
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -1,7 +1,7 @@
 dataSourceKey: defaultDS
 destination: example
 hbaseMapping:
-  mode: PHOENIX  #NATIVE   #STRING
+  mode: STRING  #NATIVE   #PHOENIX
   database: mytest  # 数据库名
   table: person2     # 数据库表名
   hbaseTable: MYTEST.PERSON2   # HBase表名
@@ -11,14 +11,14 @@ hbaseMapping:
   #rowKey: id,type  # 复合字段rowKey不能和columns中的rowKey重复
   columns:
     # 数据库字段:HBase对应字段
-    id: ROWKEY$UNSIGNED_LONG
+    id: ROWKEY LEN:15
     name: NAME
     email: EMAIL
-    type: $DECIMAL
-    c_time: C_TIME$UNSIGNED_TIMESTAMP
-    birthday: BIRTHDAY$DATE
-  excludeColumns:
-    - lat   # 忽略字段
+    type:
+    c_time: C_TIME
+    birthday: BIRTHDAY
+#  excludeColumns:
+#    - lat   # 忽略字段
 
 # -- NATIVE类型
 # $DEFAULT