Browse Source

adapter 分批同步

mcy 6 years ago
parent
commit
56ec66f90e
17 changed files with 474 additions and 637 deletions
  1. 1 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java
  2. 19 9
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  3. 8 12
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  4. 43 22
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  5. 0 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  6. 0 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  7. 0 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  8. 6 84
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  9. 1 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  10. 139 368
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  11. 12 19
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java
  12. 0 97
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SimpleDml.java
  13. 213 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java
  14. 2 2
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java
  15. 2 2
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java
  16. 1 1
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
  17. 27 15
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -27,7 +27,7 @@ public interface OuterAdapter {
     /**
      * 往适配器中同步数据
      *
-     * @param dml 数据包
+     * @param dmls 数据包
      */
     void sync(List<Dml> dmls);
 

+ 19 - 9
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -13,23 +13,25 @@ import java.util.Map;
  */
 public class CanalClientConfig {
 
-    private String              canalServerHost;    // 单机模式下canal server的 ip:port
+    private String              canalServerHost;      // 单机模式下canal server的 ip:port
 
-    private String              zookeeperHosts;     // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
+    private String              zookeeperHosts;       // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
 
-    private String              mqServers;          // kafka or rocket mq 地址
+    private String              mqServers;            // kafka or rocket mq 地址
 
-    private Boolean             flatMessage = true; // 是否已flatMessage模式传输, 只适用于mq模式
+    private Boolean             flatMessage   = true; // 是否已flatMessage模式传输, 只适用于mq模式
 
-    private Integer             batchSize;          // 批大小
+    private Integer             batchSize;            // 批大小
 
-    private Integer             retries;            // 重试次数
+    private Integer             syncBatchSize = 1000; // 同步分批提交大小
 
-    private Long                timeout;            // 消费超时时间
+    private Integer             retries;              // 重试次数
 
-    private List<MQTopic>       mqTopics;           // mq topic 列表
+    private Long                timeout;              // 消费超时时间
 
-    private List<CanalInstance> canalInstances;     // tcp 模式下 canal 实例列表, 与mq模式不能共存!!
+    private List<MQTopic>       mqTopics;             // mq topic 列表
+
+    private List<CanalInstance> canalInstances;       // tcp 模式下 canal 实例列表, 与mq模式不能共存!!
 
     public String getCanalServerHost() {
         return canalServerHost;
@@ -83,6 +85,14 @@ public class CanalClientConfig {
         return retries;
     }
 
+    public Integer getSyncBatchSize() {
+        return syncBatchSize;
+    }
+
+    public void setSyncBatchSize(Integer syncBatchSize) {
+        this.syncBatchSize = syncBatchSize;
+    }
+
     public void setRetries(Integer retries) {
         this.retries = retries;
     }

+ 8 - 12
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -19,9 +19,9 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class MessageUtil {
 
-    public static void parse4Dml(String destination, Message message, Consumer<List<Dml>> consumer) {
+    public static List<Dml> parse4Dml(String destination, Message message) {
         if (message == null) {
-            return;
+            return null;
         }
         List<CanalEntry.Entry> entries = message.getEntries();
         List<Dml> dmls = new ArrayList<Dml>(entries.size());
@@ -89,10 +89,11 @@ public class MessageUtil {
                         Map<String, Object> rowOld = new LinkedHashMap<>();
                         for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             if (updateSet.contains(column.getName())) {
-                                rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(column.getName(),
-                                    column.getValue(),
-                                    column.getSqlType(),
-                                    column.getMysqlType()));
+                                rowOld.put(column.getName(),
+                                    JdbcTypeUtil.typeConvert(column.getName(),
+                                        column.getValue(),
+                                        column.getSqlType(),
+                                        column.getMysqlType()));
                             }
                         }
                         // update操作将记录修改前的值
@@ -110,7 +111,7 @@ public class MessageUtil {
             }
         }
 
-        consumer.accept(dmls);
+        return dmls;
     }
 
     public static List<Dml> flatMessage2Dml(String destination, List<FlatMessage> flatMessages) {
@@ -177,9 +178,4 @@ public class MessageUtil {
         }
         return result;
     }
-
-    public interface Consumer<T> {
-
-        void accept(T t);
-    }
 }

+ 43 - 22
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,12 +2,9 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +29,7 @@ public abstract class AbstractCanalAdapterWorker {
 
     protected String                          canalDestination;                                                // canal实例
     protected List<List<OuterAdapter>>        canalOuterAdapters;                                              // 外部适配器
+    protected CanalClientConfig               canalClientConfig;                                               // 配置
     protected ExecutorService                 groupInnerExecutorService;                                       // 组内工作线程池
     protected volatile boolean                running = false;                                                 // 是否运行中
     protected Thread                          thread  = null;
@@ -55,11 +53,15 @@ public abstract class AbstractCanalAdapterWorker {
                     // 组内适配器穿行运行,尽量不要配置组内适配器
                     adapters.forEach(adapter -> {
                         long begin = System.currentTimeMillis();
-                        MessageUtil.parse4Dml(canalDestination, message, adapter::sync);
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("{} elapsed time: {}",
-                                adapter.getClass().getName(),
-                                (System.currentTimeMillis() - begin));
+                        List<Dml> dmls = MessageUtil.parse4Dml(canalDestination, message);
+                        if (dmls != null) {
+                            batchSync(dmls, adapter);
+
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("{} elapsed time: {}",
+                                    adapter.getClass().getName(),
+                                    (System.currentTimeMillis() - begin));
+                            }
                         }
                     });
                     return true;
@@ -82,7 +84,7 @@ public abstract class AbstractCanalAdapterWorker {
         });
     }
 
-    protected void writeOut(final List<FlatMessage> flatMessages) {
+    private void writeOut(final List<FlatMessage> flatMessages) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
         canalOuterAdapters.forEach(outerAdapters -> {
@@ -92,7 +94,8 @@ public abstract class AbstractCanalAdapterWorker {
                     outerAdapters.forEach(adapter -> {
                         long begin = System.currentTimeMillis();
                         List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, flatMessages);
-                        adapter.sync(dmls);
+                        batchSync(dmls, adapter);
+
                         if (logger.isDebugEnabled()) {
                             logger.debug("{} elapsed time: {}",
                                 adapter.getClass().getName(),
@@ -119,6 +122,7 @@ public abstract class AbstractCanalAdapterWorker {
         });
     }
 
+    @SuppressWarnings("unchecked")
     protected void mqWriteOutData(int retry, long timeout, final boolean flatMessage, CanalMQConnector connector,
                                   ExecutorService workerExecutor) {
         for (int i = 0; i < retry; i++) {
@@ -131,18 +135,13 @@ public abstract class AbstractCanalAdapterWorker {
                 }
                 if (messages != null) {
                     Future<Boolean> future = workerExecutor.submit(() -> {
-                        List<FlatMessage> flatMessages = new ArrayList<FlatMessage>(messages.size());
-                        for (final Object message : messages) {
-                            if (message instanceof FlatMessage) {
-                                flatMessages.add((FlatMessage) message);
-                            } else {
+                        if (flatMessage) {
+                            // batch write
+                            writeOut((List<FlatMessage>) messages);
+                        } else {
+                            for (final Object message : messages) {
                                 writeOut((Message) message);
                             }
-
-                            if (flatMessage) {
-                                // batch write
-                                writeOut(flatMessages);
-                            }
                         }
                         return true;
                     });
@@ -173,6 +172,28 @@ public abstract class AbstractCanalAdapterWorker {
         }
     }
 
+    /**
+     * 分批同步
+     * 
+     * @param dmls
+     * @param adapter
+     */
+    private void batchSync(List<Dml> dmls, OuterAdapter adapter) {
+        // 分批同步
+        int len = 0;
+        List<Dml> dmlsBatch = new ArrayList<>();
+        for (Dml dml : dmls) {
+            dmlsBatch.add(dml);
+            len += dml.getData().size();
+            if (len >= canalClientConfig.getSyncBatchSize()) {
+                adapter.sync(dmlsBatch);
+                dmlsBatch.clear();
+                len = 0;
+            }
+        }
+        adapter.sync(dmlsBatch);
+    }
+
     public void start() {
         if (!running) {
             thread = new Thread(this::process);

+ 0 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -19,7 +19,6 @@ import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
  */
 public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
-    private CanalClientConfig   canalClientConfig;
     private KafkaCanalConnector connector;
     private String              topic;
     private boolean             flatMessage;

+ 0 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -21,7 +21,6 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
 
-    private CanalClientConfig      canalClientConfig;
     private RocketMQCanalConnector connector;
     private String                 topic;
     private boolean                flatMessage;

+ 0 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -25,8 +25,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
 
     private CanalConnector    connector;
 
-    private CanalClientConfig canalClientConfig;
-
     /**
      * 单台client适配器worker的构造方法
      *

+ 6 - 84
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -2,16 +2,12 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.sql.DataSource;
 
@@ -20,21 +16,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
-import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
-import com.alibaba.otter.canal.client.adapter.support.Util;
+import com.alibaba.otter.canal.client.adapter.support.*;
 
 @SPI("rdb")
 public class RdbAdapter implements OuterAdapter {
@@ -48,13 +36,6 @@ public class RdbAdapter implements OuterAdapter {
 
     private RdbSyncService                          rdbSyncService;
 
-    private int                                     commitSize         = 3000;
-
-    private volatile boolean                        running            = false;
-
-    private List<SimpleDml>                         dmlList            = Collections
-        .synchronizedList(new ArrayList<>());
-    private Lock                                    syncLock           = new ReentrantLock();
     private ExecutorService                         executor           = Executors.newFixedThreadPool(1);
 
     private RdbConfigMonitor                        rdbConfigMonitor;
@@ -108,72 +89,18 @@ public class RdbAdapter implements OuterAdapter {
             logger.error("ERROR ## failed to initial datasource: " + properties.get("jdbc.url"), e);
         }
 
-        String threads = properties.get("threads");
-        String commitSize = properties.get("commitSize");
-        if (commitSize != null) {
-            this.commitSize = Integer.valueOf(commitSize);
-        }
-        rdbSyncService = new RdbSyncService(threads != null ? Integer.valueOf(threads) : null, dataSource);
-
-        running = true;
-
-        executor.submit(() -> {
-            while (running) {
-                int beginSize = dmlList.size();
-                try {
-                    Thread.sleep(3000);
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-                int endSize = dmlList.size();
+        // String threads = properties.get("threads");
+        // String commitSize = properties.get("commitSize");
 
-                if (endSize - beginSize < 300) {
-                    sync();
-                }
-            }
-        });
+        rdbSyncService = new RdbSyncService(mappingConfigCache, dataSource);
 
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor.init(configuration.getKey(), this);
     }
 
+    @Override
     public void sync(List<Dml> dmls) {
-        for (Dml dml : dmls) {
-            sync(dml);
-        }
-    }
-
-    public void sync(Dml dml) {
-        String destination = StringUtils.trimToEmpty(dml.getDestination());
-        String database = dml.getDatabase();
-        String table = dml.getTable();
-        Map<String, MappingConfig> configMap = mappingConfigCache.get(destination + "." + database + "." + table);
-
-        if (configMap != null) {
-            configMap.values().forEach(config -> {
-                List<SimpleDml> simpleDmlList = SimpleDml.dml2SimpleDml(dml, config);
-                dmlList.addAll(simpleDmlList);
-
-                if (dmlList.size() >= commitSize) {
-                    sync();
-                }
-            });
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
-        }
-    }
-
-    private void sync() {
-        try {
-            syncLock.lock();
-            if (!dmlList.isEmpty()) {
-                rdbSyncService.sync(dmlList);
-                dmlList.clear();
-            }
-        } finally {
-            syncLock.unlock();
-        }
+        rdbSyncService.sync(dmls);
     }
 
     @Override
@@ -270,17 +197,12 @@ public class RdbAdapter implements OuterAdapter {
 
     @Override
     public void destroy() {
-        running = false;
         if (rdbConfigMonitor != null) {
             rdbConfigMonitor.destroy();
         }
 
         executor.shutdown();
 
-        if (rdbSyncService != null) {
-            rdbSyncService.close();
-        }
-
         if (dataSource != null) {
             dataSource.close();
         }

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -231,7 +231,7 @@ public class RdbEtlService {
 
                                 Object value = rs.getObject(srcColumnName);
                                 if (value != null) {
-                                    RdbSyncService.setPStmt(type, pstmt, value, i);
+                                    SyncUtil.setPStmt(type, pstmt, value, i);
                                 } else {
                                     pstmt.setNull(i, type);
                                 }

+ 139 - 368
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -1,27 +1,27 @@
 package com.alibaba.otter.canal.client.adapter.rdb.service;
 
-import java.io.Reader;
-import java.io.StringReader;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.sql.DataSource;
 
-import org.joda.time.DateTime;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
-import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
+import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
@@ -36,87 +36,48 @@ public class RdbSyncService {
 
     private final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
 
-    private BatchExecutor[]                         batchExecutors;
+    private Map<String, Map<String, MappingConfig>> mappingConfigCache;                                                // 库名-表名对应配置
 
-    private int                                     threads            = 1;
+    private DataSource                              dataSource;
 
-    private ExecutorService[]                       threadExecutors;
+    public RdbSyncService(Map<String, Map<String, MappingConfig>> mappingConfigCache, DataSource dataSource){
+        this.mappingConfigCache = mappingConfigCache;
+        this.dataSource = dataSource;
+    }
 
-    public RdbSyncService(Integer threads, DataSource dataSource){
-        try {
-            if (threads != null && threads > 1 && threads <= 10) {
-                this.threads = threads;
-            }
-            batchExecutors = new BatchExecutor[this.threads];
-            for (int i = 0; i < this.threads; i++) {
-                Connection conn = dataSource.getConnection();
-                conn.setAutoCommit(false);
-                this.batchExecutors[i] = new BatchExecutor(i, conn);
-            }
-            threadExecutors = new ExecutorService[this.threads];
-            for (int i = 0; i < this.threads; i++) {
-                threadExecutors[i] = Executors.newFixedThreadPool(1);
+    public void sync(List<Dml> dmls) {
+        try (BatchExecutor batchExecutor = new BatchExecutor(dataSource.getConnection())) {
+            for (Dml dml : dmls) {
+                String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String database = dml.getDatabase();
+                String table = dml.getTable();
+                Map<String, MappingConfig> configMap = mappingConfigCache
+                    .get(destination + "." + database + "." + table);
+
+                for (MappingConfig config : configMap.values()) {
+                    sync(batchExecutor, config, dml);
+                }
             }
+            batchExecutor.commit();
         } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e);
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private List<SimpleDml>[] simpleDmls2Partition(List<SimpleDml> simpleDmlList) {
-        List<SimpleDml>[] simpleDmlPartition = new ArrayList[threads];
-        for (int i = 0; i < threads; i++) {
-            simpleDmlPartition[i] = new ArrayList<>();
-        }
-        simpleDmlList.forEach(simpleDml -> {
-            int hash;
-            if (simpleDml.getConfig().getConcurrent()) {
-                hash = pkHash(simpleDml.getConfig().getDbMapping(), simpleDml.getData(), threads);
-            } else {
-                hash = Math.abs(Math.abs(simpleDml.getConfig().getDbMapping().getTargetTable().hashCode()) % threads);
-            }
-            simpleDmlPartition[hash].add(simpleDml);
-        });
-        return simpleDmlPartition;
-    }
-
-    public void sync(List<SimpleDml> simpleDmlList) {
+    private void sync(BatchExecutor batchExecutor, MappingConfig config, Dml dml) {
         try {
-            List<SimpleDml>[] simpleDmlsPartition = simpleDmls2Partition(simpleDmlList);
-
-            List<Future<Boolean>> futures = new ArrayList<>();
-            for (int i = 0; i < threads; i++) {
-                if (!simpleDmlsPartition[i].isEmpty()) {
-                    int j = i;
-                    futures.add(threadExecutors[i].submit(() -> {
-                        simpleDmlsPartition[j].forEach(simpleDml -> sync(simpleDml, batchExecutors[j]));
-                        batchExecutors[j].commit();
-                        return true;
-                    }));
+            if (config != null) {
+                String type = dml.getType();
+                if (type != null && type.equalsIgnoreCase("INSERT")) {
+                    insert(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                    update(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                    delete(batchExecutor, config, dml);
                 }
-            }
-
-            futures.forEach(future -> {
-                try {
-                    future.get();
-                } catch (Exception e) {
-                    // ignore
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                 }
-            });
-        } catch (Exception e) {
-            logger.error("Error rdb sync for batch", e);
-        }
-    }
-
-    public void sync(SimpleDml dml, BatchExecutor batchExecutor) {
-        try {
-            String type = dml.getType();
-            if (type != null && type.equalsIgnoreCase("INSERT")) {
-                insert(dml, batchExecutor);
-            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                update(dml, batchExecutor);
-            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                delete(dml, batchExecutor);
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -126,53 +87,57 @@ public class RdbSyncService {
     /**
      * 插入操作
      *
-     * @param simpleDml DML数据
+     * @param config 配置项
+     * @param dml DML数据
      */
-    private void insert(SimpleDml simpleDml, BatchExecutor batchExecutor) {
-        Map<String, Object> data = simpleDml.getData();
+    private void insert(BatchExecutor batchExecutor, MappingConfig config, Dml dml) {
+        List<Map<String, Object>> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
-
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+        DbMapping dbMapping = config.getDbMapping();
 
-        StringBuilder insertSql = new StringBuilder();
-        insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+        try {
+            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
 
-        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
-        int len = insertSql.length();
-        insertSql.delete(len - 1, len).append(") VALUES (");
-        int mapLen = columnsMap.size();
-        for (int i = 0; i < mapLen; i++) {
-            insertSql.append("?,");
-        }
-        len = insertSql.length();
-        insertSql.delete(len - 1, len).append(")");
+            StringBuilder insertSql = new StringBuilder();
+            insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
+            columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+            int len = insertSql.length();
+            insertSql.delete(len - 1, len).append(") VALUES (");
+            int mapLen = columnsMap.size();
+            for (int i = 0; i < mapLen; i++) {
+                insertSql.append("?,");
+            }
+            len = insertSql.length();
+            insertSql.delete(len - 1, len).append(")");
+
+            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+            for (Map<String, Object> d : data) {
+                List<Map<String, ?>> values = new ArrayList<>();
+                for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
+                    String targetClolumnName = entry.getKey();
+                    String srcColumnName = entry.getValue();
+                    if (srcColumnName == null) {
+                        srcColumnName = targetClolumnName;
+                    }
 
-        String sql = insertSql.toString();
+                    Integer type = ctype.get(targetClolumnName.toLowerCase());
 
-        try {
-            List<Map<String, ?>> values = new ArrayList<>();
+                    Object value = d.get(srcColumnName);
 
-            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                String targetColumnName = entry.getKey();
-                String srcColumnName = entry.getValue();
-                if (srcColumnName == null) {
-                    srcColumnName = targetColumnName;
+                    BatchExecutor.setValue(values, type, value);
                 }
 
-                Integer type = ctype.get(targetColumnName.toLowerCase());
-                if (type == null) {
-                    throw new RuntimeException("No column: " + targetColumnName + " found in target db");
+                batchExecutor.execute(insertSql.toString(), values);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Insert into target table, sql: {}", insertSql);
                 }
-                Object value = data.get(srcColumnName);
-                BatchExecutor.setValue(values, type, value);
+
             }
-            batchExecutor.execute(sql, values);
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
@@ -180,56 +145,63 @@ public class RdbSyncService {
 
     /**
      * 更新操作
-     * 
-     * @param simpleDml DML数据
+     *
+     * @param config 配置项
+     * @param dml DML数据
      */
-    private void update(SimpleDml simpleDml, BatchExecutor batchExecutor) {
-        Map<String, Object> data = simpleDml.getData();
+    private void update(BatchExecutor batchExecutor, MappingConfig config, Dml dml) {
+        List<Map<String, Object>> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        Map<String, Object> old = simpleDml.getOld();
+        List<Map<String, Object>> old = dml.getOld();
         if (old == null || old.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
-
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+        DbMapping dbMapping = config.getDbMapping();
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
+        int idx = 1;
 
         try {
-            StringBuilder updateSql = new StringBuilder();
-            updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
-
-            List<Map<String, ?>> values = new ArrayList<>();
-            for (String srcColumnName : old.keySet()) {
-                List<String> targetColumnNames = new ArrayList<>();
-                columnsMap.forEach((targetColumn, srcColumn) -> {
-                    if (srcColumnName.toLowerCase().equals(srcColumn)) {
-                        targetColumnNames.add(targetColumn);
-                    }
-                });
-                if (!targetColumnNames.isEmpty()) {
-                    for (String targetColumnName : targetColumnNames) {
-                        updateSql.append(targetColumnName).append("=?, ");
-                        Integer type = ctype.get(targetColumnName.toLowerCase());
-                        BatchExecutor.setValue(values, type, data.get(srcColumnName));
+            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0));
+
+            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+            for (Map<String, Object> o : old) {
+                Map<String, Object> d = data.get(idx - 1);
+                StringBuilder updateSql = new StringBuilder();
+                updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
+                List<Map<String, ?>> values = new ArrayList<>();
+                for (String srcColumnName : o.keySet()) {
+                    List<String> targetColumnNames = new ArrayList<>();
+                    columnsMap.forEach((targetColumn, srcColumn) -> {
+                        if (srcColumnName.toLowerCase().equals(srcColumn)) {
+                            targetColumnNames.add(targetColumn);
+                        }
+                    });
+                    if (!targetColumnNames.isEmpty()) {
+
+                        for (String targetColumnName : targetColumnNames) {
+                            updateSql.append(targetColumnName).append("=?, ");
+                            Integer type = ctype.get(targetColumnName.toLowerCase());
+                            BatchExecutor.setValue(values, type, d.get(srcColumnName));
+                        }
                     }
                 }
-            }
-            int len = updateSql.length();
-            updateSql.delete(len - 2, len).append(" WHERE ");
+                int len = updateSql.length();
+                updateSql.delete(len - 2, len).append(" WHERE ");
 
-            // 拼接主键
-            appendCondition(dbMapping, updateSql, ctype, values, data, old);
+                // 拼接主键
+                appendCondition(dbMapping, updateSql, ctype, values, d, o);
 
-            batchExecutor.execute(updateSql.toString(), values);
+                batchExecutor.execute(updateSql.toString(), values);
 
-            if (logger.isTraceEnabled()) {
-                logger.trace("Execute sql: {}", updateSql);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Update target table, sql: {}", updateSql);
+                }
+                idx++;
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -238,31 +210,34 @@ public class RdbSyncService {
 
     /**
      * 删除操作
-     * 
-     * @param simpleDml
+     *
+     * @param config
+     * @param dml
      */
-    private void delete(SimpleDml simpleDml, BatchExecutor batchExecutor) {
-        Map<String, Object> data = simpleDml.getData();
+    private void delete(BatchExecutor batchExecutor, MappingConfig config, Dml dml) {
+        List<Map<String, Object>> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
-
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
+        DbMapping dbMapping = config.getDbMapping();
 
         try {
-            StringBuilder sql = new StringBuilder();
-            sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
+            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
+            for (Map<String, Object> d : data) {
+                StringBuilder sql = new StringBuilder();
+                sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
 
-            List<Map<String, ?>> values = new ArrayList<>();
+                List<Map<String, ?>> values = new ArrayList<>();
+                // 拼接主键
+                appendCondition(dbMapping, sql, ctype, values, d);
 
-            // 拼接主键
-            appendCondition(dbMapping, sql, ctype, values, data);
+                batchExecutor.execute(sql.toString(), values);
 
-            batchExecutor.execute(sql.toString(), values);
-            if (logger.isTraceEnabled()) {
-                logger.trace("Execute sql: {}", sql);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Delete from target table, sql: {}", sql);
+                }
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -271,7 +246,7 @@ public class RdbSyncService {
 
     /**
      * 获取目标字段类型
-     * 
+     *
      * @param conn sql connection
      * @param config 映射配置
      * @return 字段sqlType
@@ -305,181 +280,16 @@ public class RdbSyncService {
         return columnType;
     }
 
-    /**
-     * 设置 preparedStatement
-     * 
-     * @param type sqlType
-     * @param pstmt 需要设置的preparedStatement
-     * @param value 值
-     * @param i 索引号
-     */
-    public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
-        if (value == null) {
-            pstmt.setNull(i, type);
-            return;
-        }
-        switch (type) {
-            case Types.BIT:
-            case Types.BOOLEAN:
-                if (value instanceof Boolean) {
-                    pstmt.setBoolean(i, (Boolean) value);
-                } else if (value instanceof String) {
-                    boolean v = !value.equals("0");
-                    pstmt.setBoolean(i, v);
-                } else if (value instanceof Number) {
-                    boolean v = ((Number) value).intValue() != 0;
-                    pstmt.setBoolean(i, v);
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.CHAR:
-            case Types.NCHAR:
-            case Types.VARCHAR:
-            case Types.LONGVARCHAR:
-                pstmt.setString(i, value.toString());
-                break;
-            case Types.TINYINT:
-                if (value instanceof Number) {
-                    pstmt.setByte(i, ((Number) value).byteValue());
-                } else if (value instanceof String) {
-                    pstmt.setByte(i, Byte.parseByte((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.SMALLINT:
-                if (value instanceof Number) {
-                    pstmt.setShort(i, ((Number) value).shortValue());
-                } else if (value instanceof String) {
-                    pstmt.setShort(i, Short.parseShort((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.INTEGER:
-                if (value instanceof Number) {
-                    pstmt.setInt(i, ((Number) value).intValue());
-                } else if (value instanceof String) {
-                    pstmt.setInt(i, Integer.parseInt((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.BIGINT:
-                if (value instanceof Number) {
-                    pstmt.setLong(i, ((Number) value).longValue());
-                } else if (value instanceof String) {
-                    pstmt.setLong(i, Long.parseLong((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.DECIMAL:
-            case Types.NUMERIC:
-                pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
-                break;
-            case Types.REAL:
-                if (value instanceof Number) {
-                    pstmt.setFloat(i, ((Number) value).floatValue());
-                } else if (value instanceof String) {
-                    pstmt.setFloat(i, Float.parseFloat((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.FLOAT:
-            case Types.DOUBLE:
-                if (value instanceof Number) {
-                    pstmt.setDouble(i, ((Number) value).doubleValue());
-                } else if (value instanceof String) {
-                    pstmt.setDouble(i, Double.parseDouble((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.BINARY:
-            case Types.VARBINARY:
-            case Types.LONGVARBINARY:
-            case Types.BLOB:
-
-                if (value instanceof byte[]) {
-                    pstmt.setBytes(i, (byte[]) value);
-                } else if (value instanceof String) {
-                    pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.CLOB:
-                if (value instanceof byte[]) {
-                    pstmt.setBytes(i, (byte[]) value);
-                } else if (value instanceof String) {
-                    Reader clobReader = new StringReader((String) value);
-                    pstmt.setCharacterStream(i, clobReader);
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.DATE:
-                if (value instanceof java.util.Date) {
-                    pstmt.setDate(i, new Date(((java.util.Date) value).getTime()));
-                } else if (value instanceof String) {
-                    String v = (String) value;
-                    if (!v.startsWith("0000-00-00")) {
-                        v = v.trim().replace(" ", "T");
-                        DateTime dt = new DateTime(v);
-                        pstmt.setDate(i, new Date(dt.toDate().getTime()));
-                    } else {
-                        pstmt.setNull(i, type);
-                    }
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.TIME:
-                if (value instanceof java.util.Date) {
-                    pstmt.setTime(i, new Time(((java.util.Date) value).getTime()));
-                } else if (value instanceof String) {
-                    String v = (String) value;
-                    v = "T" + v;
-                    DateTime dt = new DateTime(v);
-                    pstmt.setTime(i, new Time(dt.toDate().getTime()));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.TIMESTAMP:
-                if (value instanceof java.util.Date) {
-                    pstmt.setTimestamp(i, new Timestamp(((java.util.Date) value).getTime()));
-                } else if (value instanceof String) {
-                    String v = (String) value;
-                    if (!v.startsWith("0000-00-00")) {
-                        v = v.trim().replace(" ", "T");
-                        DateTime dt = new DateTime(v);
-                        pstmt.setTimestamp(i, new Timestamp(dt.toDate().getTime()));
-                    } else {
-                        pstmt.setNull(i, type);
-                    }
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            default:
-                pstmt.setObject(i, value, type);
-        }
-    }
-
     /**
      * 拼接主键 where条件
      */
-    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
-                                        List<Map<String, ?>> values, Map<String, Object> d) {
+    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                 List<Map<String, ?>> values, Map<String, Object> d) {
         appendCondition(dbMapping, sql, ctype, values, d, null);
     }
 
-    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
-                                        List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
+    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                 List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
         // 拼接主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
             String targetColumnName = entry.getKey();
@@ -499,43 +309,4 @@ public class RdbSyncService {
         int len = sql.length();
         sql.delete(len - 4, len);
     }
-
-    /**
-     * 取主键hash
-     */
-    private static int pkHash(DbMapping dbMapping, Map<String, Object> d, int threads) {
-        return pkHash(dbMapping, d, null, threads);
-    }
-
-    private static int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o, int threads) {
-        int hash = 0;
-        // 取主键
-        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
-            String targetColumnName = entry.getKey();
-            String srcColumnName = entry.getValue();
-            if (srcColumnName == null) {
-                srcColumnName = targetColumnName;
-            }
-            Object value;
-            if (o != null && o.containsKey(srcColumnName)) {
-                value = o.get(srcColumnName);
-            } else {
-                value = d.get(srcColumnName);
-            }
-            if (value != null) {
-                hash += value.hashCode();
-            }
-        }
-        hash = Math.abs(hash) % threads;
-        return Math.abs(hash);
-    }
-
-    public void close() {
-        for (BatchExecutor batchExecutor : batchExecutors) {
-            batchExecutor.close();
-        }
-        for (ExecutorService executorService : threadExecutors) {
-            executorService.shutdown();
-        }
-    }
 }

+ 12 - 19
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java → client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java

@@ -1,32 +1,28 @@
-package com.alibaba.otter.canal.client.adapter.rdb.service;
+package com.alibaba.otter.canal.client.adapter.rdb.support;
 
+import java.io.Closeable;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BatchExecutor {
+public class BatchExecutor implements Closeable {
 
-    private static final Logger logger     = LoggerFactory.getLogger(BatchExecutor.class);
+    private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);
 
     private Integer             key;
     private Connection          conn;
-    private AtomicInteger       idx        = new AtomicInteger(0);
-    private ExecutorService     executor   = Executors.newFixedThreadPool(1);
-    private Lock                commitLock = new ReentrantLock();
-    private Condition           condition  = commitLock.newCondition();
+    private AtomicInteger       idx    = new AtomicInteger(0);
+
+    public BatchExecutor(Connection conn){
+        this(1, conn);
+    }
 
     public BatchExecutor(Integer key, Connection conn){
         this.key = key;
@@ -60,10 +56,11 @@ public class BatchExecutor {
             for (int i = 0; i < len; i++) {
                 int type = (Integer) values.get(i).get("type");
                 Object value = values.get(i).get("value");
-                RdbSyncService.setPStmt(type, pstmt, value, i + 1);
+                SyncUtil.setPStmt(type, pstmt, value, i + 1);
             }
 
             pstmt.execute();
+            idx.incrementAndGet();
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
         }
@@ -71,20 +68,17 @@ public class BatchExecutor {
 
     public void commit() {
         try {
-            commitLock.lock();
             conn.commit();
             if (logger.isTraceEnabled()) {
                 logger.trace("Batch executor: " + key + " commit " + idx.get() + " rows");
             }
-            condition.signal();
             idx.set(0);
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
-        } finally {
-            commitLock.unlock();
         }
     }
 
+    @Override
     public void close() {
         if (conn != null) {
             try {
@@ -93,6 +87,5 @@ public class BatchExecutor {
                 logger.error(e.getMessage(), e);
             }
         }
-        executor.shutdown();
     }
 }

+ 0 - 97
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SimpleDml.java

@@ -1,97 +0,0 @@
-package com.alibaba.otter.canal.client.adapter.rdb.support;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-
-public class SimpleDml {
-
-    private String              destination;
-    private String              database;
-    private String              table;
-    private String              type;
-    private Map<String, Object> data;
-    private Map<String, Object> old;
-
-    private MappingConfig       config;
-
-    public String getDestination() {
-        return destination;
-    }
-
-    public void setDestination(String destination) {
-        this.destination = destination;
-    }
-
-    public String getDatabase() {
-        return database;
-    }
-
-    public void setDatabase(String database) {
-        this.database = database;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public void setTable(String table) {
-        this.table = table;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public Map<String, Object> getData() {
-        return data;
-    }
-
-    public void setData(Map<String, Object> data) {
-        this.data = data;
-    }
-
-    public Map<String, Object> getOld() {
-        return old;
-    }
-
-    public void setOld(Map<String, Object> old) {
-        this.old = old;
-    }
-
-    public MappingConfig getConfig() {
-        return config;
-    }
-
-    public void setConfig(MappingConfig config) {
-        this.config = config;
-    }
-
-    public static List<SimpleDml> dml2SimpleDml(Dml dml, MappingConfig config) {
-        List<SimpleDml> simpleDmlList = new ArrayList<>();
-        int len = dml.getData().size();
-
-        for (int i = 0; i < len; i++) {
-            SimpleDml simpleDml = new SimpleDml();
-            simpleDml.setDestination(dml.getDestination());
-            simpleDml.setDatabase(dml.getDatabase());
-            simpleDml.setTable(dml.getTable());
-            simpleDml.setType(dml.getType());
-            simpleDml.setData(dml.getData().get(i));
-            if (dml.getOld() != null) {
-                simpleDml.setOld(dml.getOld().get(i));
-            }
-            simpleDml.setConfig(config);
-            simpleDmlList.add(simpleDml);
-        }
-
-        return simpleDmlList;
-    }
-}

+ 213 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -1,9 +1,16 @@
 package com.alibaba.otter.canal.client.adapter.rdb.support;
 
+import java.io.Reader;
+import java.io.StringReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.joda.time.DateTime;
+
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 
 public class SyncUtil {
@@ -36,4 +43,210 @@ public class SyncUtil {
         }
         return columnsMap;
     }
+
+    /**
+     * 设置 preparedStatement
+     *
+     * @param type sqlType
+     * @param pstmt 需要设置的preparedStatement
+     * @param value 值
+     * @param i 索引号
+     */
+    public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
+        switch (type) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+                if (value instanceof Boolean) {
+                    pstmt.setBoolean(i, (Boolean) value);
+                } else if (value instanceof String) {
+                    boolean v = !value.equals("0");
+                    pstmt.setBoolean(i, v);
+                } else if (value instanceof Number) {
+                    boolean v = ((Number) value).intValue() != 0;
+                    pstmt.setBoolean(i, v);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CHAR:
+            case Types.NCHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                if (value instanceof String) {
+                    pstmt.setString(i, (String) value);
+                } else {
+                    pstmt.setString(i, value.toString());
+                }
+                break;
+            case Types.TINYINT:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer) {
+                    pstmt.setByte(i, (byte) value);
+                } else if (value instanceof Number) {
+                    pstmt.setByte(i, ((Number) value).byteValue());
+                } else if (value instanceof String) {
+                    pstmt.setByte(i, Byte.parseByte((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.SMALLINT:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer) {
+                    pstmt.setShort(i, (short) value);
+                } else if (value instanceof Number) {
+                    pstmt.setShort(i, ((Number) value).shortValue());
+                } else if (value instanceof String) {
+                    pstmt.setShort(i, Short.parseShort((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.INTEGER:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer
+                    || value instanceof Long) {
+                    pstmt.setInt(i, (int) value);
+                } else if (value instanceof Number) {
+                    pstmt.setInt(i, ((Number) value).intValue());
+                } else if (value instanceof String) {
+                    pstmt.setInt(i, Integer.parseInt((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BIGINT:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer
+                    || value instanceof Long) {
+                    pstmt.setLong(i, (long) value);
+                } else if (value instanceof Number) {
+                    pstmt.setLong(i, ((Number) value).longValue());
+                } else if (value instanceof String) {
+                    pstmt.setLong(i, Long.parseLong((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                if (value instanceof BigDecimal) {
+                    pstmt.setBigDecimal(i, (BigDecimal) value);
+                } else if (value instanceof Byte) {
+                    pstmt.setInt(i, (int) value);
+                } else if (value instanceof Short) {
+                    pstmt.setInt(i, (int) value);
+                } else if (value instanceof Integer) {
+                    pstmt.setInt(i, (int) value);
+                } else if (value instanceof Long) {
+                    pstmt.setLong(i, (long) value);
+                } else if (value instanceof Float) {
+                    pstmt.setBigDecimal(i, new BigDecimal((float) value));
+                } else if (value instanceof Double) {
+                    pstmt.setBigDecimal(i, new BigDecimal((double) value));
+                } else {
+                    pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
+                }
+                break;
+            case Types.REAL:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long
+                    || value instanceof Float || value instanceof Double) {
+                    pstmt.setFloat(i, (float) value);
+                } else if (value instanceof Number) {
+                    pstmt.setFloat(i, ((Number) value).floatValue());
+                } else if (value instanceof String) {
+                    pstmt.setFloat(i, Float.parseFloat((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long
+                    || value instanceof Float || value instanceof Double) {
+                    pstmt.setDouble(i, (double) value);
+                } else if (value instanceof Number) {
+                    pstmt.setDouble(i, ((Number) value).doubleValue());
+                } else if (value instanceof String) {
+                    pstmt.setDouble(i, Double.parseDouble((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                if (value instanceof Blob) {
+                    pstmt.setBlob(i, (Blob) value);
+                } else if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CLOB:
+                if (value instanceof Clob) {
+                    pstmt.setClob(i, (Clob) value);
+                } else if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    Reader clobReader = new StringReader((String) value);
+                    pstmt.setCharacterStream(i, clobReader);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DATE:
+                if (value instanceof java.sql.Date) {
+                    pstmt.setDate(i, (java.sql.Date) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setDate(i, new java.sql.Date(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        v = v.trim().replace(" ", "T");
+                        DateTime dt = new DateTime(v);
+                        pstmt.setDate(i, new Date(dt.toDate().getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIME:
+                if (value instanceof java.sql.Time) {
+                    pstmt.setTime(i, (java.sql.Time) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setTime(i, new java.sql.Time(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    v = "T" + v;
+                    DateTime dt = new DateTime(v);
+                    pstmt.setTime(i, new Time(dt.toDate().getTime()));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIMESTAMP:
+                if (value instanceof java.sql.Timestamp) {
+                    pstmt.setTimestamp(i, (java.sql.Timestamp) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setTimestamp(i, new java.sql.Timestamp(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        v = v.trim().replace(" ", "T");
+                        DateTime dt = new DateTime(v);
+                        pstmt.setTimestamp(i, new Timestamp(dt.toDate().getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            default:
+                pstmt.setObject(i, value, type);
+        }
+    }
 }

+ 2 - 2
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java

@@ -42,13 +42,13 @@ public class DBTest {
             .prepareStatement("insert into user (id,name,role_id,c_time,test1,test2) values (?,?,?,?,?,?)");
 
         java.util.Date now = new java.util.Date();
-        for (int i = 1; i <= 100000; i++) {
+        for (int i = 1; i <= 10000; i++) {
             pstmt.clearParameters();
             pstmt.setLong(1, (long) i);
             pstmt.setString(2, "test_" + i);
             pstmt.setLong(3, (long) i % 4 + 1);
             pstmt.setDate(4, new java.sql.Date(now.getTime()));
-            pstmt.setString(5, "tttt");
+            pstmt.setString(5, null);
             pstmt.setBytes(6, null);
 
             pstmt.execute();

+ 2 - 2
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java

@@ -35,7 +35,7 @@ public class OracleSyncTest {
         data.put("test1", "sdfasdfawe中国asfwef");
         dml.setData(dataList);
 
-        rdbAdapter.sync(dml);
+        rdbAdapter.sync(Collections.singletonList(dml));
     }
 
     @Test
@@ -58,7 +58,7 @@ public class OracleSyncTest {
         old.put("name", "Eric");
         dml.setOld(oldList);
 
-        rdbAdapter.sync(dml);
+        rdbAdapter.sync(Collections.singletonList(dml));
     }
 
 }

+ 1 - 1
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -174,7 +174,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
             return Lists.newArrayList();
         }
 
-        ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); // 基于配置,最多只能poll到一条数据
+        ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
 
         if (!records.isEmpty()) {
             List<Message> messages = new ArrayList<>();

+ 27 - 15
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,11 +1,8 @@
 package com.alibaba.otter.canal.kafka;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -13,8 +10,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Properties;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 /**
  * kafka producer 主操作类
@@ -38,7 +39,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("acks", kafkaProperties.getAcks());
-        properties.put("compression.type",kafkaProperties.getCompressionType());
+        properties.put("compression.type", kafkaProperties.getCompressionType());
         properties.put("retries", kafkaProperties.getRetries());
         properties.put("batch.size", kafkaProperties.getBatchSize());
         properties.put("linger.ms", kafkaProperties.getLingerMs());
@@ -89,6 +90,12 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 }
 
                 producer.send(record).get();
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Send  message to kafka topic: [{}], packet: {}",
+                        canalDestination.getTopic(),
+                        message.toString());
+                }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
                 // producer.abortTransaction();
@@ -102,7 +109,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (canalDestination.getPartition() != null) {
                         try {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                canalDestination.getTopic(),
                                 canalDestination.getPartition(),
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
@@ -124,7 +132,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                            canalDestination.getTopic(),
                                             i,
                                             null,
                                             JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
@@ -139,7 +148,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         } else {
                             try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                    canalDestination.getTopic(),
                                     0,
                                     null,
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
@@ -152,15 +162,17 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         }
                     }
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Send flat message to kafka topic: [{}], packet: {}",
+                            canalDestination.getTopic(),
+                            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
+                    }
                 }
             }
         }
 
         // producer.commitTransaction();
         callback.commit();
-        if (logger.isDebugEnabled()) {
-            logger.debug("send message to kafka topic: {}", canalDestination.getTopic());
-        }
 
     }