Browse Source

修改hbase配置项名

mcy 6 years ago
parent
commit
87ac3daecc

+ 6 - 6
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -53,9 +53,9 @@ public class HbaseAdapter implements OuterAdapter {
                         hbaseMapping = MappingConfigLoader.load();
                         mappingConfigCache = new HashMap<>();
                         for (MappingConfig mappingConfig : hbaseMapping.values()) {
-                            mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getHbaseOrm().getDestination())
-                                                   + "." + mappingConfig.getHbaseOrm().getDatabase() + "."
-                                                   + mappingConfig.getHbaseOrm().getTable(),
+                            mappingConfigCache.put(StringUtils.trimToEmpty(mappingConfig.getHbaseMapping().getDestination())
+                                                   + "." + mappingConfig.getHbaseMapping().getDatabase() + "."
+                                                   + mappingConfig.getHbaseMapping().getTable(),
                                 mappingConfig);
                         }
                     }
@@ -136,7 +136,7 @@ public class HbaseAdapter implements OuterAdapter {
     @Override
     public Map<String, Object> count(String task) {
         MappingConfig config = hbaseMapping.get(task);
-        String hbaseTable = config.getHbaseOrm().getHbaseTable();
+        String hbaseTable = config.getHbaseMapping().getHbaseTable();
         long rowCount = 0L;
         try {
             HTable table = (HTable) conn.getTable(TableName.valueOf(hbaseTable));
@@ -169,8 +169,8 @@ public class HbaseAdapter implements OuterAdapter {
     @Override
     public String getDestination(String task) {
         MappingConfig config = hbaseMapping.get(task);
-        if (config != null && config.getHbaseOrm() != null) {
-            return config.getHbaseOrm().getDestination();
+        if (config != null && config.getHbaseMapping() != null) {
+            return config.getHbaseMapping().getDestination();
         }
         return null;
     }

+ 21 - 21
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfig.java

@@ -10,9 +10,9 @@ import java.util.*;
  */
 public class MappingConfig {
 
-    private String   dataSourceKey; // 数据源key
+    private String       dataSourceKey; // 数据源key
 
-    private HbaseOrm hbaseOrm;      // hbase映射配置
+    private HbaseMapping hbaseMapping;  // hbase映射配置
 
     public String getDataSourceKey() {
         return dataSourceKey;
@@ -22,28 +22,28 @@ public class MappingConfig {
         this.dataSourceKey = dataSourceKey;
     }
 
-    public HbaseOrm getHbaseOrm() {
-        return hbaseOrm;
+    public HbaseMapping getHbaseMapping() {
+        return hbaseMapping;
     }
 
-    public void setHbaseOrm(HbaseOrm hbaseOrm) {
-        this.hbaseOrm = hbaseOrm;
+    public void setHbaseMapping(HbaseMapping hbaseMapping) {
+        this.hbaseMapping = hbaseMapping;
     }
 
     public void validate() {
-        if (hbaseOrm.database == null || hbaseOrm.database.isEmpty()) {
-            throw new NullPointerException("hbaseOrm.database");
+        if (hbaseMapping.database == null || hbaseMapping.database.isEmpty()) {
+            throw new NullPointerException("hbaseMapping.database");
         }
-        if (hbaseOrm.table == null || hbaseOrm.table.isEmpty()) {
-            throw new NullPointerException("hbaseOrm.table");
+        if (hbaseMapping.table == null || hbaseMapping.table.isEmpty()) {
+            throw new NullPointerException("hbaseMapping.table");
         }
-        if (hbaseOrm.hbaseTable == null || hbaseOrm.hbaseTable.isEmpty()) {
-            throw new NullPointerException("hbaseOrm.hbaseTable");
+        if (hbaseMapping.hbaseTable == null || hbaseMapping.hbaseTable.isEmpty()) {
+            throw new NullPointerException("hbaseMapping.hbaseTable");
         }
-        if (hbaseOrm.mode == null) {
-            throw new NullPointerException("hbaseOrm.mode");
+        if (hbaseMapping.mode == null) {
+            throw new NullPointerException("hbaseMapping.mode");
         }
-        if (hbaseOrm.rowKey != null && hbaseOrm.rowKeyColumn != null) {
+        if (hbaseMapping.rowKey != null && hbaseMapping.rowKeyColumn != null) {
             throw new RuntimeException("已配置了复合主键作为RowKey,无需再指定RowKey列");
         }
     }
@@ -55,12 +55,12 @@ public class MappingConfig {
 
         MappingConfig config = (MappingConfig) o;
 
-        return hbaseOrm != null ? hbaseOrm.equals(config.hbaseOrm) : config.hbaseOrm == null;
+        return hbaseMapping != null ? hbaseMapping.equals(config.hbaseMapping) : config.hbaseMapping == null;
     }
 
     @Override
     public int hashCode() {
-        return hbaseOrm != null ? hbaseOrm.hashCode() : 0;
+        return hbaseMapping != null ? hbaseMapping.hashCode() : 0;
     }
 
     public static class ColumnItem {
@@ -140,7 +140,7 @@ public class MappingConfig {
         }
     }
 
-    public static class HbaseOrm {
+    public static class HbaseMapping {
 
         private Mode                    mode               = Mode.STRING;           // hbase默认转换格式
         private String                  destination;                                // canal实例或MQ的topic
@@ -349,10 +349,10 @@ public class MappingConfig {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
 
-            HbaseOrm hbaseOrm = (HbaseOrm) o;
+            HbaseMapping hbaseMapping = (HbaseMapping) o;
 
-            if (table != null ? !table.equals(hbaseOrm.table) : hbaseOrm.table != null) return false;
-            return hbaseTable != null ? hbaseTable.equals(hbaseOrm.hbaseTable) : hbaseOrm.hbaseTable == null;
+            if (table != null ? !table.equals(hbaseMapping.table) : hbaseMapping.table != null) return false;
+            return hbaseTable != null ? hbaseTable.equals(hbaseMapping.hbaseTable) : hbaseMapping.hbaseTable == null;
         }
 
         @Override

+ 9 - 9
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/config/MappingConfigLoader.java

@@ -80,21 +80,21 @@ public class MappingConfigLoader {
                 if (dbTable.length == 2) {
                     config = new MappingConfig();
 
-                    MappingConfig.HbaseOrm hbaseOrm = new MappingConfig.HbaseOrm();
-                    hbaseOrm.setHbaseTable(dbTable[0].toUpperCase() + "." + dbTable[1].toUpperCase());
-                    hbaseOrm.setAutoCreateTable(true);
-                    hbaseOrm.setDatabase(dbTable[0]);
-                    hbaseOrm.setTable(dbTable[1]);
-                    hbaseOrm.setMode(MappingConfig.Mode.PHOENIX);
-                    hbaseOrm.setRowKey(rowKey);
+                    MappingConfig.HbaseMapping hbaseMapping = new MappingConfig.HbaseMapping();
+                    hbaseMapping.setHbaseTable(dbTable[0].toUpperCase() + "." + dbTable[1].toUpperCase());
+                    hbaseMapping.setAutoCreateTable(true);
+                    hbaseMapping.setDatabase(dbTable[0]);
+                    hbaseMapping.setTable(dbTable[1]);
+                    hbaseMapping.setMode(MappingConfig.Mode.PHOENIX);
+                    hbaseMapping.setRowKey(rowKey);
                     // 有定义rowKey
                     if (rowKey != null) {
                         MappingConfig.ColumnItem columnItem = new MappingConfig.ColumnItem();
                         columnItem.setRowKey(true);
                         columnItem.setColumn(rowKey);
-                        hbaseOrm.setRowKeyColumn(columnItem);
+                        hbaseMapping.setRowKeyColumn(columnItem);
                     }
-                    config.setHbaseOrm(hbaseOrm);
+                    config.setHbaseMapping(hbaseMapping);
                     config.setDataSourceKey(dsKey);
 
                 } else {

+ 36 - 36
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseEtlService.java

@@ -81,9 +81,9 @@ public class HbaseEtlService {
     public static void createTable(HbaseTemplate hbaseTemplate, MappingConfig config) {
         try {
             // 判断hbase表是否存在,不存在则建表
-            MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
-            if (!hbaseTemplate.tableExists(hbaseOrm.getHbaseTable())) {
-                hbaseTemplate.createTable(hbaseOrm.getHbaseTable(), hbaseOrm.getFamily());
+            MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();
+            if (!hbaseTemplate.tableExists(hbaseMapping.getHbaseTable())) {
+                hbaseTemplate.createTable(hbaseMapping.getHbaseTable(), hbaseMapping.getFamily());
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -113,29 +113,29 @@ public class HbaseEtlService {
                 etlResult.setErrorMessage("Config is null!");
                 return etlResult;
             }
-            MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
-            hbaseTable = hbaseOrm.getHbaseTable();
+            MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();
+            hbaseTable = hbaseMapping.getHbaseTable();
 
             long start = System.currentTimeMillis();
 
             if (params != null && params.size() == 1 && "rebuild".equalsIgnoreCase(params.get(0))) {
-                logger.info(hbaseOrm.getHbaseTable() + " rebuild is starting!");
+                logger.info(hbaseMapping.getHbaseTable() + " rebuild is starting!");
                 // 如果表存在则删除
-                if (hbaseTemplate.tableExists(hbaseOrm.getHbaseTable())) {
-                    hbaseTemplate.disableTable(hbaseOrm.getHbaseTable());
-                    hbaseTemplate.deleteTable(hbaseOrm.getHbaseTable());
+                if (hbaseTemplate.tableExists(hbaseMapping.getHbaseTable())) {
+                    hbaseTemplate.disableTable(hbaseMapping.getHbaseTable());
+                    hbaseTemplate.deleteTable(hbaseMapping.getHbaseTable());
                 }
                 params = null;
             } else {
-                logger.info(hbaseOrm.getHbaseTable() + " etl is starting!");
+                logger.info(hbaseMapping.getHbaseTable() + " etl is starting!");
             }
             createTable(hbaseTemplate, config);
 
             // 拼接sql
-            String sql = "SELECT * FROM " + config.getHbaseOrm().getDatabase() + "." + hbaseOrm.getTable();
+            String sql = "SELECT * FROM " + config.getHbaseMapping().getDatabase() + "." + hbaseMapping.getTable();
 
             // 拼接条件
-            if (params != null && params.size() == 1 && hbaseOrm.getEtlCondition() == null) {
+            if (params != null && params.size() == 1 && hbaseMapping.getEtlCondition() == null) {
                 AtomicBoolean stExists = new AtomicBoolean(false);
                 // 验证是否有SYS_TIME字段
                 sqlRS(ds, sql, rs -> {
@@ -157,8 +157,8 @@ public class HbaseEtlService {
                 if (stExists.get()) {
                     sql += " WHERE SYS_TIME >= '" + params.get(0) + "' ";
                 }
-            } else if (hbaseOrm.getEtlCondition() != null && params != null) {
-                String etlCondition = hbaseOrm.getEtlCondition();
+            } else if (hbaseMapping.getEtlCondition() != null && params != null) {
+                String etlCondition = hbaseMapping.getEtlCondition();
                 int size = params.size();
                 for (int i = 0; i < size; i++) {
                     etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
@@ -200,7 +200,7 @@ public class HbaseEtlService {
                         sqlFinal = sql + " LIMIT " + offset + "," + cnt;
                     }
                     Future<Boolean> future = executor
-                        .submit(() -> executeSqlImport(ds, sqlFinal, hbaseOrm, hbaseTemplate, successCount, errMsg));
+                        .submit(() -> executeSqlImport(ds, sqlFinal, hbaseMapping, hbaseTemplate, successCount, errMsg));
                     futures.add(future);
                 }
 
@@ -210,13 +210,13 @@ public class HbaseEtlService {
 
                 executor.shutdown();
             } else {
-                executeSqlImport(ds, sql, hbaseOrm, hbaseTemplate, successCount, errMsg);
+                executeSqlImport(ds, sql, hbaseMapping, hbaseTemplate, successCount, errMsg);
             }
 
             logger.info(
-                hbaseOrm.getHbaseTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
+                hbaseMapping.getHbaseTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
 
-            etlResult.setResultMessage("导入HBase表 " + hbaseOrm.getHbaseTable() + " 数据:" + successCount.get() + " 条");
+            etlResult.setResultMessage("导入HBase表 " + hbaseMapping.getHbaseTable() + " 数据:" + successCount.get() + " 条");
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
@@ -235,13 +235,13 @@ public class HbaseEtlService {
      * 
      * @param ds
      * @param sql
-     * @param hbaseOrm
+     * @param hbaseMapping
      * @param hbaseTemplate
      * @param successCount
      * @param errMsg
      * @return
      */
-    private static boolean executeSqlImport(DataSource ds, String sql, MappingConfig.HbaseOrm hbaseOrm,
+    private static boolean executeSqlImport(DataSource ds, String sql, MappingConfig.HbaseMapping hbaseMapping,
                                             HbaseTemplate hbaseTemplate, AtomicLong successCount, List<String> errMsg) {
         try {
             sqlRS(ds, sql, rs -> {
@@ -251,8 +251,8 @@ public class HbaseEtlService {
                     boolean complete = false;
                     List<HRow> rows = new ArrayList<>();
                     String[] rowKeyColumns = null;
-                    if (hbaseOrm.getRowKey() != null) {
-                        rowKeyColumns = hbaseOrm.getRowKey().trim().split(",");
+                    if (hbaseMapping.getRowKey() != null) {
+                        rowKeyColumns = hbaseMapping.getRowKey().trim().split(",");
                     }
                     while (rs.next()) {
                         int cc = rs.getMetaData().getColumnCount();
@@ -290,30 +290,30 @@ public class HbaseEtlService {
                                 continue;
                             }
 
-                            MappingConfig.ColumnItem columnItem = hbaseOrm.getColumnItems().get(columnName);
+                            MappingConfig.ColumnItem columnItem = hbaseMapping.getColumnItems().get(columnName);
                             // 没有配置映射
                             if (columnItem == null) {
-                                String family = hbaseOrm.getFamily();
+                                String family = hbaseMapping.getFamily();
                                 String qualifile = columnName;
-                                if (hbaseOrm.isUppercaseQualifier()) {
+                                if (hbaseMapping.isUppercaseQualifier()) {
                                     qualifile = qualifile.toUpperCase();
                                 }
-                                if (MappingConfig.Mode.STRING == hbaseOrm.getMode()) {
-                                    if (hbaseOrm.getRowKey() == null && j == 1) {
+                                if (MappingConfig.Mode.STRING == hbaseMapping.getMode()) {
+                                    if (hbaseMapping.getRowKey() == null && j == 1) {
                                         row.setRowKey(Bytes.toBytes(val.toString()));
                                     } else {
                                         row.addCell(family, qualifile, Bytes.toBytes(val.toString()));
                                     }
-                                } else if (MappingConfig.Mode.NATIVE == hbaseOrm.getMode()) {
+                                } else if (MappingConfig.Mode.NATIVE == hbaseMapping.getMode()) {
                                     Type type = Type.getType(classes[j - 1]);
-                                    if (hbaseOrm.getRowKey() == null && j == 1) {
+                                    if (hbaseMapping.getRowKey() == null && j == 1) {
                                         row.setRowKey(TypeUtil.toBytes(val, type));
                                     } else {
                                         row.addCell(family, qualifile, TypeUtil.toBytes(val, type));
                                     }
-                                } else if (MappingConfig.Mode.PHOENIX == hbaseOrm.getMode()) {
+                                } else if (MappingConfig.Mode.PHOENIX == hbaseMapping.getMode()) {
                                     PhType phType = PhType.getType(classes[j - 1]);
-                                    if (hbaseOrm.getRowKey() == null && j == 1) {
+                                    if (hbaseMapping.getRowKey() == null && j == 1) {
                                         row.setRowKey(PhTypeUtil.toBytes(val, phType));
                                     } else {
                                         row.addCell(family, qualifile, PhTypeUtil.toBytes(val, phType));
@@ -353,8 +353,8 @@ public class HbaseEtlService {
 
                         rows.add(row);
                         complete = false;
-                        if (i % hbaseOrm.getCommitBatch() == 0 && !rows.isEmpty()) {
-                            hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+                        if (i % hbaseMapping.getCommitBatch() == 0 && !rows.isEmpty()) {
+                            hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
                             rows.clear();
                             complete = true;
                         }
@@ -366,12 +366,12 @@ public class HbaseEtlService {
                     }
 
                     if (!complete && !rows.isEmpty()) {
-                        hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+                        hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
                     }
 
                 } catch (Exception e) {
-                    logger.error(hbaseOrm.getHbaseTable() + " etl failed! ==>" + e.getMessage(), e);
-                    errMsg.add(hbaseOrm.getHbaseTable() + " etl failed! ==>" + e.getMessage());
+                    logger.error(hbaseMapping.getHbaseTable() + " etl failed! ==>" + e.getMessage(), e);
+                    errMsg.add(hbaseMapping.getHbaseTable() + " etl failed! ==>" + e.getMessage());
                     // throw new RuntimeException(e);
                 }
                 return i;

+ 58 - 58
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/service/HbaseSyncService.java

@@ -59,11 +59,11 @@ public class HbaseSyncService {
             return;
         }
 
-        MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
+        MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();
 
         // if (!validHTable(config)) {
         // logger.error("HBase table '{}' not exists",
-        // hbaseOrm.getHbaseTable());
+        // hbaseMapping.getHbaseTable());
         // return;
         // }
         int i = 1;
@@ -73,28 +73,28 @@ public class HbaseSyncService {
             HRow hRow = new HRow();
 
             // 拼接复合rowKey
-            if (hbaseOrm.getRowKey() != null) {
-                String[] rowKeyColumns = hbaseOrm.getRowKey().trim().split(",");
+            if (hbaseMapping.getRowKey() != null) {
+                String[] rowKeyColumns = hbaseMapping.getRowKey().trim().split(",");
                 String rowKeyVale = getRowKeys(rowKeyColumns, r);
                 // params.put("rowKey", Bytes.toBytes(rowKeyVale));
                 hRow.setRowKey(Bytes.toBytes(rowKeyVale));
             }
 
-            convertData2Row(hbaseOrm, hRow, r);
+            convertData2Row(hbaseMapping, hRow, r);
             if (hRow.getRowKey() == null) {
                 throw new RuntimeException("empty rowKey");
             }
             rows.add(hRow);
             complete = false;
-            if (i % config.getHbaseOrm().getCommitBatch() == 0 && !rows.isEmpty()) {
-                hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+            if (i % config.getHbaseMapping().getCommitBatch() == 0 && !rows.isEmpty()) {
+                hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
                 rows.clear();
                 complete = true;
             }
             i++;
         }
         if (!complete && !rows.isEmpty()) {
-            hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+            hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
         }
 
     }
@@ -102,30 +102,30 @@ public class HbaseSyncService {
     /**
      * 将Map数据转换为HRow行数据
      * 
-     * @param hbaseOrm hbase映射配置
+     * @param hbaseMapping hbase映射配置
      * @param hRow 行对象
      * @param data Map数据
      */
-    private static void convertData2Row(MappingConfig.HbaseOrm hbaseOrm, HRow hRow, Map<String, Object> data) {
-        Map<String, MappingConfig.ColumnItem> columnItems = hbaseOrm.getColumnItems();
+    private static void convertData2Row(MappingConfig.HbaseMapping hbaseMapping, HRow hRow, Map<String, Object> data) {
+        Map<String, MappingConfig.ColumnItem> columnItems = hbaseMapping.getColumnItems();
         int i = 0;
         for (Map.Entry<String, Object> entry : data.entrySet()) {
-            if (hbaseOrm.getExcludeColumns() != null && hbaseOrm.getExcludeColumns().contains(entry.getKey())) {
+            if (hbaseMapping.getExcludeColumns() != null && hbaseMapping.getExcludeColumns().contains(entry.getKey())) {
                 continue;
             }
             if (entry.getValue() != null) {
                 MappingConfig.ColumnItem columnItem = columnItems.get(entry.getKey());
 
-                byte[] bytes = typeConvert(columnItem, hbaseOrm, entry.getValue());
+                byte[] bytes = typeConvert(columnItem, hbaseMapping, entry.getValue());
 
                 if (columnItem == null) {
-                    String familyName = hbaseOrm.getFamily();
+                    String familyName = hbaseMapping.getFamily();
                     String qualifier = entry.getKey();
-                    if (hbaseOrm.isUppercaseQualifier()) {
+                    if (hbaseMapping.isUppercaseQualifier()) {
                         qualifier = qualifier.toUpperCase();
                     }
 
-                    if (hbaseOrm.getRowKey() == null && i == 0) {
+                    if (hbaseMapping.getRowKey() == null && i == 0) {
                         hRow.setRowKey(bytes);
                     } else {
                         hRow.addCell(familyName, qualifier, bytes);
@@ -156,15 +156,15 @@ public class HbaseSyncService {
             return;
         }
 
-        MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
+        MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();
 
         // if (!validHTable(config)) {
         // logger.error("HBase table '{}' not exists",
-        // hbaseOrm.getHbaseTable());
+        // hbaseMapping.getHbaseTable());
         // return;
         // }
 
-        MappingConfig.ColumnItem rowKeyColumn = hbaseOrm.getRowKeyColumn();
+        MappingConfig.ColumnItem rowKeyColumn = hbaseMapping.getRowKeyColumn();
         int index = 0;
         int i = 1;
         boolean complete = false;
@@ -172,8 +172,8 @@ public class HbaseSyncService {
         out: for (Map<String, Object> r : data) {
             byte[] rowKeyBytes;
 
-            if (hbaseOrm.getRowKey() != null) {
-                String[] rowKeyColumns = hbaseOrm.getRowKey().trim().split(",");
+            if (hbaseMapping.getRowKey() != null) {
+                String[] rowKeyColumns = hbaseMapping.getRowKey().trim().split(",");
 
                 // 判断是否有复合主键修改
                 for (String updateColumn : old.get(index).keySet()) {
@@ -190,23 +190,23 @@ public class HbaseSyncService {
                 rowKeyBytes = Bytes.toBytes(rowKeyVale);
             } else if (rowKeyColumn == null) {
                 Map<String, Object> rowKey = data.get(0);
-                rowKeyBytes = typeConvert(null, hbaseOrm, rowKey.values().iterator().next());
+                rowKeyBytes = typeConvert(null, hbaseMapping, rowKey.values().iterator().next());
             } else {
-                rowKeyBytes = typeConvert(rowKeyColumn, hbaseOrm, r.get(rowKeyColumn.getColumn()));
+                rowKeyBytes = typeConvert(rowKeyColumn, hbaseMapping, r.get(rowKeyColumn.getColumn()));
             }
             if (rowKeyBytes == null) throw new RuntimeException("rowKey值为空");
 
-            Map<String, MappingConfig.ColumnItem> columnItems = hbaseOrm.getColumnItems();
+            Map<String, MappingConfig.ColumnItem> columnItems = hbaseMapping.getColumnItems();
             HRow hRow = new HRow(rowKeyBytes);
             for (String updateColumn : old.get(index).keySet()) {
-                if (hbaseOrm.getExcludeColumns() != null && hbaseOrm.getExcludeColumns().contains(updateColumn)) {
+                if (hbaseMapping.getExcludeColumns() != null && hbaseMapping.getExcludeColumns().contains(updateColumn)) {
                     continue;
                 }
                 MappingConfig.ColumnItem columnItem = columnItems.get(updateColumn);
                 if (columnItem == null) {
-                    String family = hbaseOrm.getFamily();
+                    String family = hbaseMapping.getFamily();
                     String qualifier = updateColumn;
-                    if (hbaseOrm.isUppercaseQualifier()) {
+                    if (hbaseMapping.isUppercaseQualifier()) {
                         qualifier = qualifier.toUpperCase();
                     }
 
@@ -215,7 +215,7 @@ public class HbaseSyncService {
                     if (newVal == null) {
                         hRow.addCell(family, qualifier, null);
                     } else {
-                        hRow.addCell(family, qualifier, typeConvert(null, hbaseOrm, newVal));
+                        hRow.addCell(family, qualifier, typeConvert(null, hbaseMapping, newVal));
                     }
                 } else {
                     // 排除修改id的情况
@@ -227,14 +227,14 @@ public class HbaseSyncService {
                     } else {
                         hRow.addCell(columnItem.getFamily(),
                             columnItem.getQualifier(),
-                            typeConvert(columnItem, hbaseOrm, newVal));
+                            typeConvert(columnItem, hbaseMapping, newVal));
                     }
                 }
             }
             rows.add(hRow);
             complete = false;
-            if (i % config.getHbaseOrm().getCommitBatch() == 0 && !rows.isEmpty()) {
-                hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+            if (i % config.getHbaseMapping().getCommitBatch() == 0 && !rows.isEmpty()) {
+                hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
                 rows.clear();
                 complete = true;
             }
@@ -242,7 +242,7 @@ public class HbaseSyncService {
             index++;
         }
         if (!complete && !rows.isEmpty()) {
-            hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+            hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
         }
     }
 
@@ -252,45 +252,45 @@ public class HbaseSyncService {
             return;
         }
 
-        MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
+        MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();
 
         // if (!validHTable(config)) {
         // logger.error("HBase table '{}' not exists",
-        // hbaseOrm.getHbaseTable());
+        // hbaseMapping.getHbaseTable());
         // return;
         // }
 
-        MappingConfig.ColumnItem rowKeyColumn = hbaseOrm.getRowKeyColumn();
+        MappingConfig.ColumnItem rowKeyColumn = hbaseMapping.getRowKeyColumn();
         boolean complete = false;
         int i = 1;
         Set<byte[]> rowKeys = new HashSet<>();
         for (Map<String, Object> r : data) {
             byte[] rowKeyBytes;
 
-            if (hbaseOrm.getRowKey() != null) {
-                String[] rowKeyColumns = hbaseOrm.getRowKey().trim().split(",");
+            if (hbaseMapping.getRowKey() != null) {
+                String[] rowKeyColumns = hbaseMapping.getRowKey().trim().split(",");
                 String rowKeyVale = getRowKeys(rowKeyColumns, r);
                 rowKeyBytes = Bytes.toBytes(rowKeyVale);
             } else if (rowKeyColumn == null) {
                 // 如果不需要类型转换
                 Map<String, Object> rowKey = data.get(0);
-                rowKeyBytes = typeConvert(null, hbaseOrm, rowKey.values().iterator().next());
+                rowKeyBytes = typeConvert(null, hbaseMapping, rowKey.values().iterator().next());
             } else {
                 Object val = r.get(rowKeyColumn.getColumn());
-                rowKeyBytes = typeConvert(rowKeyColumn, hbaseOrm, val);
+                rowKeyBytes = typeConvert(rowKeyColumn, hbaseMapping, val);
             }
             if (rowKeyBytes == null) throw new RuntimeException("rowKey值为空");
             rowKeys.add(rowKeyBytes);
             complete = false;
-            if (i % config.getHbaseOrm().getCommitBatch() == 0 && !rowKeys.isEmpty()) {
-                hbaseTemplate.deletes(hbaseOrm.getHbaseTable(), rowKeys);
+            if (i % config.getHbaseMapping().getCommitBatch() == 0 && !rowKeys.isEmpty()) {
+                hbaseTemplate.deletes(hbaseMapping.getHbaseTable(), rowKeys);
                 rowKeys.clear();
                 complete = true;
             }
             i++;
         }
         if (!complete && !rowKeys.isEmpty()) {
-            hbaseTemplate.deletes(hbaseOrm.getHbaseTable(), rowKeys);
+            hbaseTemplate.deletes(hbaseMapping.getHbaseTable(), rowKeys);
         }
     }
 
@@ -300,9 +300,9 @@ public class HbaseSyncService {
         if (old == null || old.isEmpty() || data == null || data.isEmpty()) {
             return;
         }
-        MappingConfig.HbaseOrm hbaseOrm = config.getHbaseOrm();
+        MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();
 
-        String[] rowKeyColumns = hbaseOrm.getRowKey().trim().split(",");
+        String[] rowKeyColumns = hbaseMapping.getRowKey().trim().split(",");
 
         int index = 0;
         int i = 1;
@@ -343,13 +343,13 @@ public class HbaseSyncService {
 
             rowKeys.add(oldRowKeyBytes);
             HRow row = new HRow(newRowKeyBytes);
-            convertData2Row(hbaseOrm, row, r);
+            convertData2Row(hbaseMapping, row, r);
             rows.add(row);
             complete = false;
-            if (i % config.getHbaseOrm().getCommitBatch() == 0 && !rows.isEmpty()) {
-                hbaseTemplate.deletes(hbaseOrm.getHbaseTable(), rowKeys);
+            if (i % config.getHbaseMapping().getCommitBatch() == 0 && !rows.isEmpty()) {
+                hbaseTemplate.deletes(hbaseMapping.getHbaseTable(), rowKeys);
 
-                hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+                hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
                 rowKeys.clear();
                 rows.clear();
                 complete = true;
@@ -358,8 +358,8 @@ public class HbaseSyncService {
             index++;
         }
         if (!complete && !rows.isEmpty()) {
-            hbaseTemplate.deletes(hbaseOrm.getHbaseTable(), rowKeys);
-            hbaseTemplate.puts(hbaseOrm.getHbaseTable(), rows);
+            hbaseTemplate.deletes(hbaseMapping.getHbaseTable(), rowKeys);
+            hbaseTemplate.puts(hbaseMapping.getHbaseTable(), rows);
         }
     }
 
@@ -367,32 +367,32 @@ public class HbaseSyncService {
      * 根据对应的类型进行转换
      * 
      * @param columnItem 列项配置
-     * @param hbaseOrm hbase映射配置
+     * @param hbaseMapping hbase映射配置
      * @param value 值
      * @return 复合字段rowKey
      */
-    private static byte[] typeConvert(MappingConfig.ColumnItem columnItem, MappingConfig.HbaseOrm hbaseOrm,
+    private static byte[] typeConvert(MappingConfig.ColumnItem columnItem, MappingConfig.HbaseMapping hbaseMapping,
                                       Object value) {
         if (value == null) {
             return null;
         }
         byte[] bytes = null;
         if (columnItem == null || columnItem.getType() == null || "".equals(columnItem.getType())) {
-            if (MappingConfig.Mode.STRING == hbaseOrm.getMode()) {
+            if (MappingConfig.Mode.STRING == hbaseMapping.getMode()) {
                 bytes = Bytes.toBytes(value.toString());
-            } else if (MappingConfig.Mode.NATIVE == hbaseOrm.getMode()) {
+            } else if (MappingConfig.Mode.NATIVE == hbaseMapping.getMode()) {
                 bytes = TypeUtil.toBytes(value);
-            } else if (MappingConfig.Mode.PHOENIX == hbaseOrm.getMode()) {
+            } else if (MappingConfig.Mode.PHOENIX == hbaseMapping.getMode()) {
                 PhType phType = PhType.getType(value.getClass());
                 bytes = PhTypeUtil.toBytes(value, phType);
             }
         } else {
-            if (hbaseOrm.getMode() == MappingConfig.Mode.STRING) {
+            if (hbaseMapping.getMode() == MappingConfig.Mode.STRING) {
                 bytes = Bytes.toBytes(value.toString());
-            } else if (hbaseOrm.getMode() == MappingConfig.Mode.NATIVE) {
+            } else if (hbaseMapping.getMode() == MappingConfig.Mode.NATIVE) {
                 Type type = Type.getType(columnItem.getType());
                 bytes = TypeUtil.toBytes(value, type);
-            } else if (hbaseOrm.getMode() == MappingConfig.Mode.PHOENIX) {
+            } else if (hbaseMapping.getMode() == MappingConfig.Mode.PHOENIX) {
                 PhType phType = PhType.getType(columnItem.getType());
                 bytes = PhTypeUtil.toBytes(value, phType);
             }

+ 2 - 2
client-adapter/hbase/src/main/resources/hbase/mytest_person2.yml

@@ -1,5 +1,5 @@
 dataSourceKey: defaultDS
-hbaseOrm:
+hbaseMapping:
   mode: PHOENIX  #NATIVE   #STRING
   destination: example
   database: mytest  # 数据库名
@@ -18,7 +18,7 @@ hbaseOrm:
     c_time: C_TIME$UNSIGNED_TIMESTAMP
     birthday: BIRTHDAY$DATE
   excludeColumns:
-    - lat
+    - lat   # 忽略字段
 
 # -- NATIVE类型
 # $DEFAULT

+ 16 - 16
client-adapter/launcher/src/main/resources/application.yml

@@ -10,9 +10,9 @@ spring:
     default-property-inclusion: non_null
 
 
-hbase.zookeeper.quorum: slave1
+hbasezookeeper.quorum: 127.0.0.1
 hbase.zookeeper.property.clientPort: 2181
-hbase.zookeeper.znode.parent: /hbase-unsecure
+hbase.zookeeper.znode.parent: /hbase
 
 canal.conf:
   canalServerHost: 127.0.0.1:11111
@@ -23,12 +23,12 @@ canal.conf:
   - instance: example
     adapterGroups:
     - outAdapters:
-#      - name: logger
-      - name: hbase
-        properties:
-          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
-          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
-          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
+      - name: logger
+#      - name: hbase
+#        properties:
+#          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
+#          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
+#          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
 #  mqTopics:
 #  - mqMode: kafka
 #    topic: example
@@ -37,11 +37,11 @@ canal.conf:
 #      outAdapters:
 #      - name: logger
 
-adapter.conf:
-  datasourceConfigs:
-    defaultDS:
-      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
-      username: root
-      password: 121212
-  adapterConfigs:
-  - hbase/mytest_person2.yml
+#adapter.conf:
+#  datasourceConfigs:
+#    defaultDS:
+#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
+#      username: root
+#      password: 121212
+#  adapterConfigs:
+#  - hbase/mytest_person2.yml