Quellcode durchsuchen

Merge branch 'master' into miss_producergroup_properties

dragonlong1986 vor 6 Jahren
Ursprung
Commit
a34f9ae97b

+ 25 - 13
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/JdbcTypeUtil.java

@@ -9,7 +9,6 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
 
-import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,22 +125,35 @@ public class JdbcTypeUtil {
                     break;
                 case Types.DATE:
                     if (!value.startsWith("0000-00-00")) {
-                        value = value.trim().replace(" ", "T");
-                        DateTime dt = new DateTime(value);
-                        res = new Date(dt.toDate().getTime());
-                        break;
+                        java.util.Date date = Util.parseDate(value);
+                        if (date != null) {
+                            res = new Date(date.getTime());
+                        } else {
+                            res = null;
+                        }
+                    } else {
+                        res = null;
                     }
-                case Types.TIME:
-                    value = "T" + value;
-                    DateTime dt = new DateTime(value);
-                    res = new Time(dt.toDate().getTime());
                     break;
+                case Types.TIME: {
+                    java.util.Date date = Util.parseDate(value);
+                    if (date != null) {
+                        res = new Time(date.getTime());
+                    } else {
+                        res = null;
+                    }
+                    break;
+                }
                 case Types.TIMESTAMP:
                     if (!value.startsWith("0000-00-00")) {
-                        value = value.trim().replace(" ", "T");
-                        dt = new DateTime(value);
-                        res = new Timestamp(dt.toDate().getTime());
-                        break;
+                        java.util.Date date = Util.parseDate(value);
+                        if (date != null) {
+                            res = new Timestamp(date.getTime());
+                        } else {
+                            res = null;
+                        }
+                    } else {
+                        res = null;
                     }
                 case Types.CLOB:
                 default:

+ 139 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -2,19 +2,26 @@ package com.alibaba.otter.canal.client.adapter.support;
 
 import java.io.File;
 import java.net.URL;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
+import java.time.*;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+import java.util.TimeZone;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 import javax.sql.DataSource;
 
 import org.apache.commons.lang.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 public class Util {
 
     private static final Logger logger = LoggerFactory.getLogger(Util.class);
@@ -136,4 +143,132 @@ public class Util {
 
         return column;
     }
+
+    private static String timeZone; // 当前时区
+    private static DateTimeZone dateTimeZone;
+
+    static {
+        TimeZone localTimeZone = TimeZone.getDefault();
+        int rawOffset = localTimeZone.getRawOffset();
+        String symbol = "+";
+        if (rawOffset < 0) {
+            symbol = "-";
+        }
+        rawOffset = Math.abs(rawOffset);
+        int offsetHour = rawOffset / 3600000;
+        int offsetMinute = rawOffset % 3600000 / 60000;
+        String hour = String.format("%1$02d", offsetHour);
+        String minute = String.format("%1$02d", offsetMinute);
+        timeZone = symbol + hour + ":" + minute;
+        dateTimeZone = DateTimeZone.forID(timeZone);
+        TimeZone.setDefault(TimeZone.getTimeZone("GMT" + timeZone));
+    }
+
+    /**
+     * 通用日期时间字符解析
+     *
+     * @param datetimeStr 日期时间字符串
+     * @return Date
+     */
+    public static Date parseDate(String datetimeStr) {
+        if (StringUtils.isEmpty(datetimeStr)) {
+            return null;
+        }
+        datetimeStr = datetimeStr.trim();
+        if (datetimeStr.contains("-")) {
+            if (datetimeStr.contains(":")) {
+                datetimeStr = datetimeStr.replace(" ", "T");
+            }
+        } else if (datetimeStr.contains(":")) {
+            datetimeStr = "T" + datetimeStr;
+        }
+
+        DateTime dateTime = new DateTime(datetimeStr, dateTimeZone);
+
+        return dateTime.toDate();
+    }
+
+    private static LoadingCache<String, DateTimeFormatter> dateFormatterCache = CacheBuilder.newBuilder()
+        .build(new CacheLoader<String, DateTimeFormatter>() {
+
+            @Override
+            public DateTimeFormatter load(String key) {
+                return DateTimeFormatter.ofPattern(key);
+            }
+        });
+
+    public static Date parseDate2(String datetimeStr) {
+        if (StringUtils.isEmpty(datetimeStr)) {
+            return null;
+        }
+        try {
+            datetimeStr = datetimeStr.trim();
+            int len = datetimeStr.length();
+            if (datetimeStr.contains("-") && datetimeStr.contains(":") && datetimeStr.contains(".")) {
+                // 包含日期+时间+毫秒
+                // 取毫秒位数
+                int msLen = len - datetimeStr.indexOf(".") - 1;
+                StringBuilder ms = new StringBuilder();
+                for (int i = 0; i < msLen; i++) {
+                    ms.append("S");
+                }
+                String formatter = "yyyy-MM-dd HH:mm:ss." + ms;
+
+                DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
+                LocalDateTime dateTime = LocalDateTime.parse(datetimeStr, dateTimeFormatter);
+                return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
+            } else if (datetimeStr.contains("-") && datetimeStr.contains(":")) {
+                // 包含日期+时间
+                // 判断包含时间位数
+                int i = datetimeStr.indexOf(":");
+                i = datetimeStr.indexOf(":", i + 1);
+                String formatter;
+                if (i > -1) {
+                    formatter = "yyyy-MM-dd HH:mm:ss";
+                } else {
+                    formatter = "yyyy-MM-dd HH:mm";
+                }
+
+                DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
+                LocalDateTime dateTime = LocalDateTime.parse(datetimeStr, dateTimeFormatter);
+                return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
+            } else if (datetimeStr.contains("-")) {
+                // 只包含日期
+                String formatter = "yyyy-MM-dd";
+                DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
+                LocalDate localDate = LocalDate.parse(datetimeStr, dateTimeFormatter);
+                return Date.from(localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+            } else if (datetimeStr.contains(":")) {
+                // 只包含时间
+                String formatter;
+                if (datetimeStr.contains(".")) {
+                    // 包含毫秒
+                    int msLen = len - datetimeStr.indexOf(".") - 1;
+                    StringBuilder ms = new StringBuilder();
+                    for (int i = 0; i < msLen; i++) {
+                        ms.append("S");
+                    }
+                    formatter = "HH:mm:ss." + ms;
+                } else {
+                    // 判断包含时间位数
+                    int i = datetimeStr.indexOf(":");
+                    i = datetimeStr.indexOf(":", i + 1);
+                    if (i > -1) {
+                        formatter = "HH:mm:ss";
+                    } else {
+                        formatter = "HH:mm";
+                    }
+                }
+                DateTimeFormatter dateTimeFormatter = dateFormatterCache.get(formatter);
+                LocalTime localTime = LocalTime.parse(datetimeStr, dateTimeFormatter);
+                LocalDate localDate = LocalDate.of(1970, Month.JANUARY, 1);
+                LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
+                return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+
+        return null;
+    }
 }

+ 15 - 8
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -20,6 +20,7 @@ import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
 import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.TableItem;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
  * ES 同步工具同类
@@ -148,15 +149,21 @@ public class ESSyncUtil {
                 if (v.length() > 18 && v.charAt(4) == '-' && v.charAt(7) == '-' && v.charAt(10) == ' '
                     && v.charAt(13) == ':' && v.charAt(16) == ':') {
                     String dt = v.substring(0, 10) + "T" + v.substring(11);
-                    DateTime dateTime = new DateTime(dt);
-                    if (dateTime.getMillisOfSecond() != 0) {
-                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
-                    } else {
-                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                    Date date = Util.parseDate(dt);
+                    if (date != null) {
+                        DateTime dateTime = new DateTime(date);
+                        if (dateTime.getMillisOfSecond() != 0) {
+                            res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
+                        } else {
+                            res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                        }
                     }
                 } else if (v.length() == 10 && v.charAt(4) == '-' && v.charAt(7) == '-') {
-                    DateTime dateTime = new DateTime(v);
-                    res = dateTime.toString("yyyy-MM-dd");
+                    Date date = Util.parseDate(v);
+                    if (date != null) {
+                        DateTime dateTime = new DateTime(date);
+                        res = dateTime.toString("yyyy-MM-dd");
+                    }
                 }
             }
         } else if ("binary".equals(esType)) {
@@ -237,7 +244,7 @@ public class ESSyncUtil {
 
     /**
      * 拼接主键条件
-     * 
+     *
      * @param mapping
      * @param data
      * @return

+ 3 - 3
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/sync/db_schema.sql

@@ -7,7 +7,7 @@ CREATE TABLE `label` (
   `user_id` bigint(20) NOT NULL,
   `label` varchar(30) NOT NULL,
   PRIMARY KEY (`id`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
+) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;
 
 -- ----------------------------
 -- Table structure for role
@@ -17,7 +17,7 @@ CREATE TABLE `role` (
   `id` bigint(20) NOT NULL AUTO_INCREMENT,
   `role_name` varchar(30) NOT NULL,
   PRIMARY KEY (`id`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
+) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;
 
 -- ----------------------------
 -- Table structure for user
@@ -29,7 +29,7 @@ CREATE TABLE `user` (
   `c_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
   `role_id` bigint(20) DEFAULT NULL,
   PRIMARY KEY (`id`)
-) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
+) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;
 
 insert into user (id,name,role_id) values (1,'Eric',1);
 insert into role (id,role_name) values (1,'admin');

+ 6 - 17
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/support/PhTypeUtil.java

@@ -7,10 +7,12 @@ import java.math.RoundingMode;
 import java.sql.Timestamp;
 import java.util.Date;
 
+import com.alibaba.otter.canal.client.adapter.support.Util;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.joda.time.DateTime;
 
 import com.google.common.math.LongMath;
+import org.joda.time.DateTimeZone;
 
 /**
  * Phoenix类型转换工具类
@@ -389,7 +391,7 @@ public class PhTypeUtil {
             String dateStr = (String) v;
             Date date;
             try {
-                date = parseDatetime(dateStr);
+                date = Util.parseDate(dateStr);
                 if (date != null) {
                     encodeLong(date.getTime(), b, 0);
                 }
@@ -418,7 +420,7 @@ public class PhTypeUtil {
             String dateStr = (String) v;
             Date date;
             try {
-                date = parseDatetime(dateStr);
+                date = Util.parseDate(dateStr);
                 if (date != null) {
                     encodeUnsignedLong(date.getTime(), b, 0);
                 }
@@ -599,22 +601,9 @@ public class PhTypeUtil {
 
     private static void checkForSufficientLength(byte[] b, int offset, int requiredLength) {
         if (b.length < offset + requiredLength) {
-            throw new RuntimeException("Expected length of at least " + requiredLength + " bytes, but had "
-                                       + (b.length - offset));
+            throw new RuntimeException(
+                "Expected length of at least " + requiredLength + " bytes, but had " + (b.length - offset));
         }
     }
 
-    private static Date parseDatetime(String dateStr) {
-        Date date = null;
-        int len = dateStr.length();
-        if (len == 10 && dateStr.charAt(4) == '-' && dateStr.charAt(7) == '-') {
-            date = new DateTime(dateStr).toDate();
-        } else if (len == 8 && dateStr.charAt(2) == ':' && dateStr.charAt(5) == ':') {
-            date = new DateTime("T" + dateStr).toDate();
-        } else if (len >= 19 && dateStr.charAt(4) == '-' && dateStr.charAt(7) == '-' && dateStr.charAt(13) == ':'
-                   && dateStr.charAt(16) == ':') {
-            date = new DateTime(dateStr.replace(" ", "T")).toDate();
-        }
-        return date;
-    }
 }

+ 20 - 11
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -10,9 +10,9 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
-import org.joda.time.DateTime;
 
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
+import com.alibaba.otter.canal.client.adapter.support.Util;
 
 public class SyncUtil {
 
@@ -86,7 +86,7 @@ public class SyncUtil {
                 }
                 break;
             case Types.TINYINT:
-                 if (value instanceof Number) {
+                if (value instanceof Number) {
                     pstmt.setByte(i, ((Number) value).byteValue());
                 } else if (value instanceof String) {
                     pstmt.setByte(i, Byte.parseByte((String) value));
@@ -196,9 +196,12 @@ public class SyncUtil {
                 } else if (value instanceof String) {
                     String v = (String) value;
                     if (!v.startsWith("0000-00-00")) {
-                        v = v.trim().replace(" ", "T");
-                        DateTime dt = new DateTime(v);
-                        pstmt.setDate(i, new Date(dt.toDate().getTime()));
+                        java.util.Date date = Util.parseDate(v);
+                        if (date != null) {
+                            pstmt.setDate(i, new Date(date.getTime()));
+                        } else {
+                            pstmt.setNull(i, type);
+                        }
                     } else {
                         pstmt.setObject(i, value);
                     }
@@ -213,9 +216,12 @@ public class SyncUtil {
                     pstmt.setTime(i, new java.sql.Time(((java.util.Date) value).getTime()));
                 } else if (value instanceof String) {
                     String v = (String) value;
-                    v = "T" + v;
-                    DateTime dt = new DateTime(v);
-                    pstmt.setTime(i, new Time(dt.toDate().getTime()));
+                    java.util.Date date = Util.parseDate(v);
+                    if (date != null) {
+                        pstmt.setTime(i, new Time(date.getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
                 } else {
                     pstmt.setNull(i, type);
                 }
@@ -228,9 +234,12 @@ public class SyncUtil {
                 } else if (value instanceof String) {
                     String v = (String) value;
                     if (!v.startsWith("0000-00-00")) {
-                        v = v.trim().replace(" ", "T");
-                        DateTime dt = new DateTime(v);
-                        pstmt.setTimestamp(i, new Timestamp(dt.toDate().getTime()));
+                        java.util.Date date = Util.parseDate(v);
+                        if (date != null) {
+                            pstmt.setTimestamp(i, new Timestamp(date.getTime()));
+                        } else {
+                            pstmt.setNull(i, type);
+                        }
                     } else {
                         pstmt.setObject(i, value);
                     }

+ 3 - 3
deployer/src/main/resources/canal.properties

@@ -78,8 +78,8 @@ canal.instance.tsdb.snapshot.interval = 24
 canal.instance.tsdb.snapshot.expire = 360
 
 # aliyun ak/sk , support rds/mq
-canal.aliyun.accesskey =
-canal.aliyun.secretkey =
+canal.aliyun.accessKey =
+canal.aliyun.secretKey =
 
 #################################################
 ######### 		destinations		############# 
@@ -117,4 +117,4 @@ canal.mq.compressionType = none
 canal.mq.acks = all
 # use transaction for kafka flatMessage batch produce
 canal.mq.transaction = false
-#canal.mq.properties. =
+#canal.mq.properties. =

+ 2 - 2
deployer/src/main/resources/spring/tsdb/sql/create_table.sql

@@ -14,7 +14,7 @@ CREATE TABLE IF NOT EXISTS `meta_snapshot` (
   KEY `destination` (`destination`),
   KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
   KEY `gmt_modified` (`gmt_modified`)
-) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表结构记录表快照表';
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='表结构记录表快照表';
 
 CREATE TABLE IF NOT EXISTS `meta_history` (
   `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
@@ -36,4 +36,4 @@ CREATE TABLE IF NOT EXISTS `meta_history` (
   KEY `destination` (`destination`),
   KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
   KEY `gmt_modified` (`gmt_modified`)
-) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表结构变化明细表';
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='表结构变化明细表';