浏览代码

Merge pull request #1590 from rewerma/master

 fix #1585
agapple 6 年之前
父节点
当前提交
5c96bf282e

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Util.java

@@ -144,7 +144,7 @@ public class Util {
         return column;
     }
 
-    private static String timeZone; // 当前时区
+    public static String timeZone; // 当前时区
     private static DateTimeZone dateTimeZone;
 
     static {

+ 8 - 8
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESSyncUtil.java

@@ -113,9 +113,9 @@ public class ESSyncUtil {
             } else if (val instanceof java.sql.Timestamp) {
                 DateTime dateTime = new DateTime(((java.sql.Timestamp) val).getTime());
                 if (dateTime.getMillisOfSecond() != 0) {
-                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
+                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
                 } else {
-                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
                 }
             } else if (val instanceof java.sql.Date || val instanceof Date) {
                 DateTime dateTime;
@@ -129,9 +129,9 @@ public class ESSyncUtil {
                     res = dateTime.toString("yyyy-MM-dd");
                 } else {
                     if (dateTime.getMillisOfSecond() != 0) {
-                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
+                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
                     } else {
-                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                        res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
                     }
                 }
             } else if (val instanceof Long) {
@@ -140,9 +140,9 @@ public class ESSyncUtil {
                     && dateTime.getMillisOfSecond() == 0) {
                     res = dateTime.toString("yyyy-MM-dd");
                 } else if (dateTime.getMillisOfSecond() != 0) {
-                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
+                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
                 } else {
-                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                    res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
                 }
             } else if (val instanceof String) {
                 String v = ((String) val).trim();
@@ -153,9 +153,9 @@ public class ESSyncUtil {
                     if (date != null) {
                         DateTime dateTime = new DateTime(date);
                         if (dateTime.getMillisOfSecond() != 0) {
-                            res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS+08:00");
+                            res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
                         } else {
-                            res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss+08:00");
+                            res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss" + Util.timeZone);
                         }
                     }
                 } else if (v.length() == 10 && v.charAt(4) == '-' && v.charAt(7) == '-') {

+ 5 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -202,7 +202,7 @@ public class RdbEtlService {
                     StringBuilder insertSql = new StringBuilder();
                     insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
                     columnsMap
-                            .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+                        .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
 
                     int len = insertSql.length();
                     insertSql.delete(len - 1, len).append(") VALUES (");
@@ -213,16 +213,18 @@ public class RdbEtlService {
                     len = insertSql.length();
                     insertSql.delete(len - 1, len).append(")");
                     try (Connection connTarget = targetDS.getConnection();
-                         PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
+                            PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
                         connTarget.setAutoCommit(false);
 
                         while (rs.next()) {
+                            completed = false;
+
                             pstmt.clearParameters();
 
                             // 删除数据
                             Map<String, Object> values = new LinkedHashMap<>();
                             StringBuilder deleteSql = new StringBuilder(
-                                    "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
+                                "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
                             appendCondition(dbMapping, deleteSql, values, rs);
                             try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
                                 int k = 1;

+ 3 - 2
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -49,6 +49,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
         properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
         properties.put("buffer.memory", kafkaProperties.getBufferMemory());
         properties.put("key.serializer", StringSerializer.class.getName());
+        properties.put("max.in.flight.requests.per.connection", 1);
 
         if (!kafkaProperties.getProperties().isEmpty()) {
             properties.putAll(kafkaProperties.getProperties());
@@ -160,7 +161,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
 
             if (record != null) {
                 if (kafkaProperties.getTransaction()) {
-                    producer.send(record);
+                    producer.send(record).get();
                 } else {
                     producer.send(record).get();
                 }
@@ -212,7 +213,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
         if (kafkaProperties.getTransaction()) {
             producer2.send(record);
         } else {
-            producer2.send(record).get();
+            producer2.send(record);
         }
     }