mcy 6 years ago
parent
commit
ad39559f8c

+ 5 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -202,7 +202,7 @@ public class RdbEtlService {
                     StringBuilder insertSql = new StringBuilder();
                     insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
                     columnsMap
-                            .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+                        .forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
 
                     int len = insertSql.length();
                     insertSql.delete(len - 1, len).append(") VALUES (");
@@ -213,16 +213,18 @@ public class RdbEtlService {
                     len = insertSql.length();
                     insertSql.delete(len - 1, len).append(")");
                     try (Connection connTarget = targetDS.getConnection();
-                         PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
+                            PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
                         connTarget.setAutoCommit(false);
 
                         while (rs.next()) {
+                            completed = false;
+
                             pstmt.clearParameters();
 
                             // 删除数据
                             Map<String, Object> values = new LinkedHashMap<>();
                             StringBuilder deleteSql = new StringBuilder(
-                                    "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
+                                "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
                             appendCondition(dbMapping, deleteSql, values, rs);
                             try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
                                 int k = 1;

+ 1 - 0
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -49,6 +49,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
         properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
         properties.put("buffer.memory", kafkaProperties.getBufferMemory());
         properties.put("key.serializer", StringSerializer.class.getName());
+        properties.put("max.in.flight.requests.per.connection", 1);
 
         if (!kafkaProperties.getProperties().isEmpty()) {
             properties.putAll(kafkaProperties.getProperties());