Explorar o código

Merge pull request #1055 from rewerma/master

代码整理
rewerma %!s(int64=6) %!d(string=hai) anos
pai
achega
423392a18a

+ 1 - 1
client-adapter/README.md

@@ -164,7 +164,7 @@ adapter.conf:
 修改 config/hbase/mytest_person.yml文件:
 ```
 dataSourceKey: defaultDS            # 对应application.yml中的datasourceConfigs下的配置
-hbaseOrm:                           # mysql--HBase的单表映射配置
+hbaseMapping:                       # mysql--HBase的单表映射配置
   mode: STRING                      # HBase中的存储类型, 默认统一存为String, 可选: #PHOENIX  #NATIVE   #STRING 
                                     # NATIVE: 以java类型为主, PHOENIX: 将类型转换为Phoenix对应的类型
   destination: example              # 对应 canal destination/MQ topic 名称

+ 27 - 88
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/PhType.java

@@ -1,5 +1,8 @@
 package com.alibaba.otter.canal.client.adapter.hbase.support;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.Time;
@@ -13,65 +16,44 @@ import java.util.Date;
  * @version 1.0.0
  */
 public enum PhType {
-    DEFAULT(-1, "VARCHAR"), UNSIGNED_INT(4, "UNSIGNED_INT"), UNSIGNED_LONG(8, "UNSIGNED_LONG"),
-    UNSIGNED_TINYINT(1, "UNSIGNED_TINYINT"), UNSIGNED_SMALLINT(2, "UNSIGNED_SMALLINT"),
-    UNSIGNED_FLOAT(4, "UNSIGNED_FLOAT"), UNSIGNED_DOUBLE(8, "UNSIGNED_DOUBLE"), INTEGER(4, "INTEGER"),
-    BIGINT(8, "BIGINT"), TINYINT(1, "TINYINT"), SMALLINT(2, "SMALLINT"), FLOAT(4, "FLOAT"), DOUBLE(8, "DOUBLE"),
-    DECIMAL(-1, "DECIMAL"), BOOLEAN(1, "BOOLEAN"), UNSIGNED_TIME(8, "UNSIGNED_TIME"),
-    UNSIGNED_DATE(8, "UNSIGNED_DATE"), UNSIGNED_TIMESTAMP(12, "UNSIGNED_TIMESTAMP"), TIME(8, "TIME"), DATE(8, "DATE"),
-    TIMESTAMP(12, "TIMESTAMP"), VARCHAR(-1, "VARCHAR"), VARBINARY(-1, "VARBINARY");
-
-    /**
-     * -1:长度可变
-     */
-    private int    len;
-    private String type;
-
-    PhType(int len, String type){
-        this.len = len;
-        this.type = type;
-    }
-
-    public int getLen() {
-        return len;
-    }
+                    DEFAULT, UNSIGNED_INT, UNSIGNED_LONG, UNSIGNED_TINYINT, UNSIGNED_SMALLINT, UNSIGNED_FLOAT,
+                    UNSIGNED_DOUBLE, INTEGER, BIGINT, TINYINT, SMALLINT, FLOAT, DOUBLE, DECIMAL, BOOLEAN, UNSIGNED_TIME,
+                    UNSIGNED_DATE, UNSIGNED_TIMESTAMP, TIME, DATE, TIMESTAMP, VARCHAR, VARBINARY;
 
-    public String getType() {
-        return this.type;
-    }
+    private static Logger logger = LoggerFactory.getLogger(PhType.class);
 
     public static PhType getType(Class<?> javaType) {
         if (javaType == null) return DEFAULT;
         PhType phType;
-        if (Integer.class.isAssignableFrom(javaType) || int.class.isAssignableFrom(javaType)) {
+        if (Integer.class == javaType || int.class == javaType) {
             phType = INTEGER;
-        } else if (Long.class.isAssignableFrom(javaType) || long.class.isAssignableFrom(javaType)) {
+        } else if (Long.class == javaType || long.class == javaType) {
             phType = BIGINT;
-        } else if (Byte.class.isAssignableFrom(javaType) || byte.class.isAssignableFrom(javaType)) {
+        } else if (Byte.class == javaType || byte.class == javaType) {
             phType = TINYINT;
-        } else if (Short.class.isAssignableFrom(javaType) || short.class.isAssignableFrom(javaType)) {
+        } else if (Short.class == javaType || short.class == javaType) {
             phType = SMALLINT;
-        } else if (Float.class.isAssignableFrom(javaType) || float.class.isAssignableFrom(javaType)) {
+        } else if (Float.class == javaType || float.class == javaType) {
             phType = FLOAT;
-        } else if (Double.class.isAssignableFrom(javaType) || double.class.isAssignableFrom(javaType)) {
+        } else if (Double.class == javaType || double.class == javaType) {
             phType = DOUBLE;
-        } else if (Boolean.class.isAssignableFrom(javaType) || boolean.class.isAssignableFrom(javaType)) {
+        } else if (Boolean.class == javaType || boolean.class == javaType) {
             phType = BOOLEAN;
-        } else if (java.sql.Date.class.isAssignableFrom(javaType)) {
+        } else if (java.sql.Date.class == javaType) {
             phType = DATE;
-        } else if (Time.class.isAssignableFrom(javaType)) {
+        } else if (Time.class == javaType) {
             phType = DATE;
-        } else if (Timestamp.class.isAssignableFrom(javaType)) {
+        } else if (Timestamp.class == javaType) {
             phType = TIMESTAMP;
-        } else if (Date.class.isAssignableFrom(javaType)) {
+        } else if (Date.class == javaType) {
             phType = DATE;
-        } else if (byte[].class.isAssignableFrom(javaType)) {
+        } else if (byte[].class == javaType) {
             phType = VARBINARY;
-        } else if (String.class.isAssignableFrom(javaType)) {
+        } else if (String.class == javaType) {
             phType = VARCHAR;
-        } else if (BigDecimal.class.isAssignableFrom(javaType)) {
+        } else if (BigDecimal.class == javaType) {
             phType = DECIMAL;
-        } else if (BigInteger.class.isAssignableFrom(javaType)) {
+        } else if (BigInteger.class == javaType) {
             phType = UNSIGNED_LONG;
         } else {
             phType = DEFAULT;
@@ -81,54 +63,11 @@ public enum PhType {
 
     public static PhType getType(String type) {
         if (type == null) return DEFAULT;
-        PhType phType;
-        if (type.equalsIgnoreCase(UNSIGNED_INT.type)) {
-            phType = UNSIGNED_INT;
-        } else if (type.equalsIgnoreCase(UNSIGNED_LONG.type)) {
-            phType = UNSIGNED_LONG;
-        } else if (type.equalsIgnoreCase(UNSIGNED_TINYINT.type)) {
-            phType = UNSIGNED_TINYINT;
-        } else if (type.equalsIgnoreCase(UNSIGNED_SMALLINT.type)) {
-            phType = UNSIGNED_SMALLINT;
-        } else if (type.equalsIgnoreCase(UNSIGNED_FLOAT.type)) {
-            phType = UNSIGNED_FLOAT;
-        } else if (type.equalsIgnoreCase(UNSIGNED_DOUBLE.type)) {
-            phType = UNSIGNED_DOUBLE;
-        } else if (type.equalsIgnoreCase(INTEGER.type)) {
-            phType = INTEGER;
-        } else if (type.equalsIgnoreCase(BIGINT.type)) {
-            phType = BIGINT;
-        } else if (type.equalsIgnoreCase(TINYINT.type)) {
-            phType = TINYINT;
-        } else if (type.equalsIgnoreCase(SMALLINT.type)) {
-            phType = SMALLINT;
-        } else if (type.equalsIgnoreCase(FLOAT.type)) {
-            phType = FLOAT;
-        } else if (type.equalsIgnoreCase(DOUBLE.type)) {
-            phType = DOUBLE;
-        } else if (type.equalsIgnoreCase(BOOLEAN.type)) {
-            phType = BOOLEAN;
-        } else if (type.equalsIgnoreCase(UNSIGNED_TIME.type)) {
-            phType = UNSIGNED_TIME;
-        } else if (type.equalsIgnoreCase(UNSIGNED_DATE.type)) {
-            phType = UNSIGNED_DATE;
-        } else if (type.equalsIgnoreCase(UNSIGNED_TIMESTAMP.type)) {
-            phType = UNSIGNED_TIMESTAMP;
-        } else if (type.equalsIgnoreCase(TIME.type)) {
-            phType = TIME;
-        } else if (type.equalsIgnoreCase(DATE.type)) {
-            phType = DATE;
-        } else if (type.equalsIgnoreCase(TIMESTAMP.type)) {
-            phType = TIMESTAMP;
-        } else if (type.equalsIgnoreCase(VARCHAR.type)) {
-            phType = VARCHAR;
-        } else if (type.equalsIgnoreCase(VARBINARY.type)) {
-            phType = VARBINARY;
-        } else if (type.equalsIgnoreCase(DECIMAL.type)) {
-            phType = DECIMAL;
-        } else {
-            phType = DEFAULT;
+        try {
+            return PhType.valueOf(type.toUpperCase());
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return DEFAULT;
         }
-        return phType;
     }
 }

+ 22 - 49
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/Type.java

@@ -1,5 +1,8 @@
 package com.alibaba.otter.canal.client.adapter.hbase.support;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Date;
@@ -11,50 +14,20 @@ import java.util.Date;
  * @version 1.0.0
  */
 public enum Type {
-    DEFAULT("STRING"), STRING("STRING"), INTEGER("INTEGER"), LONG("LONG"), SHORT("SHORT"), BOOLEAN("BOOLEAN"),
-    FLOAT("FLOAT"), DOUBLE("DOUBLE"), BIGDECIMAL("BIGDECIMAL"), DATE("DATE"), BYTE("BYTE"), BYTES("BYTES");
-
-    private String type;
-
-    Type(String type){
-        this.type = type;
-    }
+                  DEFAULT, STRING, INTEGER, LONG, SHORT, BOOLEAN, FLOAT, DOUBLE, BIGDECIMAL, DATE, BYTE, BYTES;
 
-    public String getType() {
-        return type;
-    }
+    private static Logger logger = LoggerFactory.getLogger(Type.class);
 
     public static Type getType(String type) {
         if (type == null) {
             return DEFAULT;
         }
-        Type res;
-        if (type.equalsIgnoreCase("STRING")) {
-            res = STRING;
-        } else if (type.equalsIgnoreCase("INTEGER")) {
-            res = INTEGER;
-        } else if (type.equalsIgnoreCase("LONG")) {
-            res = LONG;
-        } else if (type.equalsIgnoreCase("SHORT")) {
-            res = SHORT;
-        } else if (type.equalsIgnoreCase("BOOLEAN")) {
-            res = BOOLEAN;
-        } else if (type.equalsIgnoreCase("FLOAT")) {
-            res = FLOAT;
-        } else if (type.equalsIgnoreCase("DOUBLE")) {
-            res = DOUBLE;
-        } else if (type.equalsIgnoreCase("BIGDECIMAL")) {
-            res = BIGDECIMAL;
-        } else if (type.equalsIgnoreCase("DATE")) {
-            res = DATE;
-        } else if (type.equalsIgnoreCase("BYTE")) {
-            res = BYTE;
-        } else if (type.equalsIgnoreCase("BYTES")) {
-            res = BYTES;
-        } else {
-            res = DEFAULT;
+        try {
+            return Type.valueOf(type);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return DEFAULT;
         }
-        return res;
     }
 
     public static Type getType(Class<?> javaType) {
@@ -62,29 +35,29 @@ public enum Type {
             return DEFAULT;
         }
         Type type;
-        if (Integer.class.isAssignableFrom(javaType) || int.class.isAssignableFrom(javaType)) {
+        if (Integer.class == javaType || int.class == javaType) {
             type = INTEGER;
-        } else if (Long.class.isAssignableFrom(javaType) || long.class.isAssignableFrom(javaType)) {
+        } else if (Long.class == javaType || long.class == javaType) {
             type = LONG;
-        } else if (Byte.class.isAssignableFrom(javaType) || byte.class.isAssignableFrom(javaType)) {
+        } else if (Byte.class == javaType || byte.class == javaType) {
             type = BYTE;
-        } else if (Short.class.isAssignableFrom(javaType) || short.class.isAssignableFrom(javaType)) {
+        } else if (Short.class == javaType || short.class == javaType) {
             type = SHORT;
-        } else if (Float.class.isAssignableFrom(javaType) || float.class.isAssignableFrom(javaType)) {
+        } else if (Float.class == javaType || float.class == javaType) {
             type = FLOAT;
-        } else if (Double.class.isAssignableFrom(javaType) || double.class.isAssignableFrom(javaType)) {
+        } else if (Double.class == javaType || double.class == javaType) {
             type = DOUBLE;
-        } else if (Boolean.class.isAssignableFrom(javaType) || boolean.class.isAssignableFrom(javaType)) {
+        } else if (Boolean.class == javaType || boolean.class == javaType) {
             type = BOOLEAN;
-        } else if (Date.class.isAssignableFrom(javaType)) {
+        } else if (Date.class == javaType) {
             type = DATE;
-        } else if (byte[].class.isAssignableFrom(javaType)) {
+        } else if (byte[].class == javaType) {
             type = BYTES;
-        } else if (String.class.isAssignableFrom(javaType)) {
+        } else if (String.class == javaType) {
             type = STRING;
-        } else if (BigDecimal.class.isAssignableFrom(javaType)) {
+        } else if (BigDecimal.class == javaType) {
             type = BIGDECIMAL;
-        } else if (BigInteger.class.isAssignableFrom(javaType)) {
+        } else if (BigInteger.class == javaType) {
             type = LONG;
         } else {
             type = DEFAULT;

+ 29 - 28
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/TypeUtil.java

@@ -21,29 +21,30 @@ public class TypeUtil {
             return null;
         }
         byte[] bytes;
-        if (obj instanceof String) {
+        Class<?> clazz = obj.getClass();
+        if (clazz == String.class) {
             bytes = Bytes.toBytes((String) obj);
-        } else if (obj instanceof Integer) {
+        } else if (clazz == Integer.class || clazz == int.class) {
             bytes = Bytes.toBytes((Integer) obj);
-        } else if (obj instanceof Long) {
+        } else if (clazz == Long.class || clazz == long.class) {
             bytes = Bytes.toBytes((Long) obj);
-        } else if (obj instanceof Short) {
+        } else if (clazz == Short.class || clazz == short.class) {
             bytes = Bytes.toBytes((Short) obj);
-        } else if (obj instanceof Boolean) {
+        } else if (clazz == Boolean.class || clazz == boolean.class) {
             bytes = Bytes.toBytes((Boolean) obj);
-        } else if (obj instanceof Float) {
+        } else if (clazz == Float.class || clazz == float.class) {
             bytes = Bytes.toBytes((Float) obj);
-        } else if (obj instanceof Double) {
+        } else if (clazz == Double.class || clazz == double.class) {
             bytes = Bytes.toBytes((Double) obj);
-        } else if (obj instanceof BigDecimal) {
+        } else if (clazz == Byte.class || clazz == byte.class) {
+            bytes = new byte[] { (byte) obj };
+        } else if (clazz == BigDecimal.class) {
             bytes = Bytes.toBytes((BigDecimal) obj);
-        } else if (obj instanceof BigInteger) {
+        } else if (clazz == BigInteger.class) {
             bytes = Bytes.toBytes(((BigInteger) obj).longValue());
-        } else if (obj instanceof Date) {
+        } else if (clazz == Date.class) {
             bytes = Bytes.toBytes(((Date) obj).getTime());
-        } else if (obj instanceof Byte) {
-            bytes = new byte[] { (byte) obj };
-        } else if (obj instanceof byte[]) {
+        } else if (clazz == byte[].class) {
             bytes = (byte[]) obj;
         } else {
             // 其余类型统一转换为string
@@ -98,38 +99,38 @@ public class TypeUtil {
             return null;
         }
         Object res;
-        if (String.class.isAssignableFrom(clazz)) {
+        if (String.class == clazz) {
             res = Bytes.toString(bytes);
-        } else if (Integer.class.isAssignableFrom(clazz)) {
+        } else if (Integer.class == clazz || int.class == clazz) {
             res = Bytes.toInt(bytes);
-        } else if (Long.class.isAssignableFrom(clazz)) {
+        } else if (Long.class == clazz || long.class == clazz) {
             res = Bytes.toLong(bytes);
-        } else if (Short.class.isAssignableFrom(clazz)) {
+        } else if (Short.class == clazz || short.class == clazz) {
             res = Bytes.toShort(bytes);
-        } else if (Boolean.class.isAssignableFrom(clazz)) {
+        } else if (Boolean.class == clazz || boolean.class == clazz) {
             res = Bytes.toBoolean(bytes);
-        } else if (Float.class.isAssignableFrom(clazz)) {
+        } else if (Float.class == clazz || float.class == clazz) {
             res = Bytes.toFloat(bytes);
-        } else if (Double.class.isAssignableFrom(clazz)) {
+        } else if (Double.class == clazz || double.class == clazz) {
             res = Bytes.toDouble(bytes);
-        } else if (BigDecimal.class.isAssignableFrom(clazz)) {
+        } else if (Byte.class == clazz || byte.class == clazz) {
+            res = bytes[0];
+        } else if (BigDecimal.class == clazz) {
             res = Bytes.toBigDecimal(bytes);
-        } else if (BigInteger.class.isAssignableFrom(clazz)) {
+        } else if (BigInteger.class == clazz) {
             res = Bytes.toLong(bytes);
-        } else if (java.sql.Date.class.isAssignableFrom(clazz)) {
+        } else if (java.sql.Date.class == clazz) {
             long ts = Bytes.toLong(bytes);
             res = new java.sql.Date(ts);
-        } else if (Time.class.isAssignableFrom(clazz)) {
+        } else if (Time.class == clazz) {
             long ts = Bytes.toLong(bytes);
             res = new Time(ts);
-        } else if (Timestamp.class.isAssignableFrom(clazz)) {
+        } else if (Timestamp.class == clazz) {
             long ts = Bytes.toLong(bytes);
             res = new Timestamp(ts);
-        } else if (Date.class.isAssignableFrom(clazz)) {
+        } else if (Date.class == clazz) {
             long ts = Bytes.toLong(bytes);
             res = new Date(ts);
-        } else if (Byte.class.isAssignableFrom(clazz)) {
-            res = bytes[0];
         } else {
             throw new IllegalArgumentException("mismatch class type");
         }

+ 1 - 1
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/KafkaClientRunningTest.java

@@ -40,7 +40,7 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
                 connector.subscribe();
                 while (running) {
                     try {
-                        List<Message> messages = connector.getWithoutAck(3L, TimeUnit.SECONDS);
+                        List<Message> messages = connector.getList(3L, TimeUnit.SECONDS);
                         if (messages != null) {
                             System.out.println(messages);
                         }