Преглед изворни кода

Merge pull request #1171 from rewerma/master

adapter 分批同步
agapple пре 6 година
родитељ
комит
cc97f3b331
20 измењених фајлова са 605 додато и 630 уклоњено
  1. 1 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java
  2. 50 83
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  3. 8 12
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java
  4. 11 21
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java
  5. 43 22
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  6. 0 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  7. 46 37
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  8. 0 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  9. 0 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  10. 8 12
      client-adapter/launcher/src/main/resources/application.yml
  11. 9 81
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java
  12. 1 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java
  13. 156 288
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  14. 12 19
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java
  15. 16 30
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SingleDml.java
  16. 213 0
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java
  17. 1 1
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java
  18. 2 2
      client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java
  19. 1 1
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
  20. 27 15
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

+ 1 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/OuterAdapter.java

@@ -27,7 +27,7 @@ public interface OuterAdapter {
     /**
      * 往适配器中同步数据
      *
-     * @param dml 数据包
+     * @param dmls 数据包
      */
     void sync(List<Dml> dmls);
 

+ 50 - 83
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -1,6 +1,5 @@
 package com.alibaba.otter.canal.client.adapter.support;
 
-import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -13,23 +12,25 @@ import java.util.Map;
  */
 public class CanalClientConfig {
 
-    private String              canalServerHost;    // 单机模式下canal server的 ip:port
+    private String             canalServerHost;       // 单机模式下canal server的 ip:port
 
-    private String              zookeeperHosts;     // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
+    private String             zookeeperHosts;        // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
 
-    private String              mqServers;          // kafka or rocket mq 地址
+    private String             mqServers;             // kafka or rocket mq 地址
 
-    private Boolean             flatMessage = true; // 是否已flatMessage模式传输, 只适用于mq模式
+    private Boolean            flatMessage   = true;  // 是否已flatMessage模式传输, 只适用于mq模式
 
-    private Integer             batchSize;          // 批大小
+    private Integer            batchSize;             // 批大小
 
-    private Integer             retries;            // 重试次数
+    private Integer            syncBatchSize = 1000;  // 同步分批提交大小
 
-    private Long                timeout;            // 消费超时时间
+    private Integer            retries;               // 重试次数
 
-    private List<MQTopic>       mqTopics;           // mq topic 列表
+    private Long               timeout;               // 消费超时时间
 
-    private List<CanalInstance> canalInstances;     // tcp 模式下 canal 实例列表, 与mq模式不能共存!!
+    private String             mode          = "tcp"; // 模式 tcp kafka rocketMQ
+
+    private List<CanalAdapter> canalAdapters;         // canal adapters 配置
 
     public String getCanalServerHost() {
         return canalServerHost;
@@ -55,14 +56,6 @@ public class CanalClientConfig {
         this.mqServers = mqServers;
     }
 
-    public List<MQTopic> getMqTopics() {
-        return mqTopics;
-    }
-
-    public void setMqTopics(List<MQTopic> mqTopics) {
-        this.mqTopics = mqTopics;
-    }
-
     public Boolean getFlatMessage() {
         return flatMessage;
     }
@@ -83,6 +76,14 @@ public class CanalClientConfig {
         return retries;
     }
 
+    public Integer getSyncBatchSize() {
+        return syncBatchSize;
+    }
+
+    public void setSyncBatchSize(Integer syncBatchSize) {
+        this.syncBatchSize = syncBatchSize;
+    }
+
     public void setRetries(Integer retries) {
         this.retries = retries;
     }
@@ -95,18 +96,28 @@ public class CanalClientConfig {
         this.timeout = timeout;
     }
 
-    public List<CanalInstance> getCanalInstances() {
-        return canalInstances;
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+
+    public List<CanalAdapter> getCanalAdapters() {
+        return canalAdapters;
     }
 
-    public void setCanalInstances(List<CanalInstance> canalInstances) {
-        this.canalInstances = canalInstances;
+    public void setCanalAdapters(List<CanalAdapter> canalAdapters) {
+        this.canalAdapters = canalAdapters;
     }
 
-    public static class CanalInstance {
+    public static class CanalAdapter {
 
         private String      instance; // 实例名
 
+        private String      topic;    // mq topic
+
         private List<Group> groups;   // 适配器分组列表
 
         public String getInstance() {
@@ -127,50 +138,6 @@ public class CanalClientConfig {
             this.groups = groups;
         }
 
-    }
-
-    public static class Group {
-
-        private List<OuterAdapterConfig>        outAdapters;                           // 适配器列表
-
-        private Map<String, OuterAdapterConfig> outAdaptersMap = new LinkedHashMap<>();
-
-        public List<OuterAdapterConfig> getOutAdapters() {
-            return outAdapters;
-        }
-
-        public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
-            this.outAdapters = outAdapters;
-            if (outAdapters != null) {
-                outAdapters.forEach(outAdapter -> outAdaptersMap.put(outAdapter.getKey(), outAdapter));
-            }
-        }
-
-        public Map<String, OuterAdapterConfig> getOutAdaptersMap() {
-            return outAdaptersMap;
-        }
-
-        public void setOutAdaptersMap(Map<String, OuterAdapterConfig> outAdaptersMap) {
-            this.outAdaptersMap = outAdaptersMap;
-        }
-    }
-
-    public static class MQTopic {
-
-        private String        mqMode;                     // mq模式 kafka or rocketMQ
-
-        private String        topic;                      // topic名
-
-        private List<MQGroup> groups = new ArrayList<>(); // 分组列表
-
-        public String getMqMode() {
-            return mqMode;
-        }
-
-        public void setMqMode(String mqMode) {
-            this.mqMode = mqMode;
-        }
-
         public String getTopic() {
             return topic;
         }
@@ -178,21 +145,15 @@ public class CanalClientConfig {
         public void setTopic(String topic) {
             this.topic = topic;
         }
-
-        public List<MQGroup> getGroups() {
-            return groups;
-        }
-
-        public void setGroups(List<MQGroup> groups) {
-            this.groups = groups;
-        }
     }
 
-    public static class MQGroup {
+    public static class Group {
+
+        private String                          groupId;                               // group id
 
-        private String                   groupId;     // group id
+        private List<OuterAdapterConfig>        outerAdapters;                           // 适配器列表
 
-        private List<OuterAdapterConfig> outAdapters; // 适配器配置列表
+        private Map<String, OuterAdapterConfig> outerAdaptersMap = new LinkedHashMap<>();
 
         public String getGroupId() {
             return groupId;
@@ -202,14 +163,20 @@ public class CanalClientConfig {
             this.groupId = groupId;
         }
 
-        public List<OuterAdapterConfig> getOutAdapters() {
-            return outAdapters;
+        public List<OuterAdapterConfig> getOuterAdapters() {
+            return outerAdapters;
         }
 
-        public void setOutAdapters(List<OuterAdapterConfig> outAdapters) {
-            this.outAdapters = outAdapters;
+        public void setOuterAdapters(List<OuterAdapterConfig> outerAdapters) {
+            this.outerAdapters = outerAdapters;
         }
 
-    }
+        public Map<String, OuterAdapterConfig> getOuterAdaptersMap() {
+            return outerAdaptersMap;
+        }
 
+        public void setOuterAdaptersMap(Map<String, OuterAdapterConfig> outerAdaptersMap) {
+            this.outerAdaptersMap = outerAdaptersMap;
+        }
+    }
 }

+ 8 - 12
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -19,9 +19,9 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class MessageUtil {
 
-    public static void parse4Dml(String destination, Message message, Consumer<List<Dml>> consumer) {
+    public static List<Dml> parse4Dml(String destination, Message message) {
         if (message == null) {
-            return;
+            return null;
         }
         List<CanalEntry.Entry> entries = message.getEntries();
         List<Dml> dmls = new ArrayList<Dml>(entries.size());
@@ -89,10 +89,11 @@ public class MessageUtil {
                         Map<String, Object> rowOld = new LinkedHashMap<>();
                         for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                             if (updateSet.contains(column.getName())) {
-                                rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(column.getName(),
-                                    column.getValue(),
-                                    column.getSqlType(),
-                                    column.getMysqlType()));
+                                rowOld.put(column.getName(),
+                                    JdbcTypeUtil.typeConvert(column.getName(),
+                                        column.getValue(),
+                                        column.getSqlType(),
+                                        column.getMysqlType()));
                             }
                         }
                         // update操作将记录修改前的值
@@ -110,7 +111,7 @@ public class MessageUtil {
             }
         }
 
-        consumer.accept(dmls);
+        return dmls;
     }
 
     public static List<Dml> flatMessage2Dml(String destination, List<FlatMessage> flatMessages) {
@@ -177,9 +178,4 @@ public class MessageUtil {
         }
         return result;
     }
-
-    public interface Consumer<T> {
-
-        void accept(T t);
-    }
 }

+ 11 - 21
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -6,12 +6,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
+import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
 
 /**
  * canal 的相关配置类
@@ -28,28 +28,18 @@ public class AdapterCanalConfig extends CanalClientConfig {
     private Map<String, DatasourceConfig> srcDataSources;
 
     @Override
-    public void setCanalInstances(List<CanalInstance> canalInstances) {
-        super.setCanalInstances(canalInstances);
-
-        if (canalInstances != null) {
-            synchronized (DESTINATIONS) {
-                DESTINATIONS.clear();
-                for (CanalInstance canalInstance : canalInstances) {
-                    DESTINATIONS.add(canalInstance.getInstance());
-                }
-            }
-        }
-    }
-
-    @Override
-    public void setMqTopics(List<MQTopic> mqTopics) {
-        super.setMqTopics(mqTopics);
+    public void setCanalAdapters(List<CanalAdapter> canalAdapters) {
+        super.setCanalAdapters(canalAdapters);
 
-        if (mqTopics != null) {
+        if (canalAdapters != null) {
             synchronized (DESTINATIONS) {
                 DESTINATIONS.clear();
-                for (MQTopic mqTopic : mqTopics) {
-                    DESTINATIONS.add(mqTopic.getTopic());
+                for (CanalAdapter canalAdapter : canalAdapters) {
+                    if (canalAdapter.getInstance() != null) {
+                        DESTINATIONS.add(canalAdapter.getInstance());
+                    } else if (canalAdapter.getTopic() != null) {
+                        DESTINATIONS.add(canalAdapter.getInstance());
+                    }
                 }
             }
         }

+ 43 - 22
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,12 +2,9 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +29,7 @@ public abstract class AbstractCanalAdapterWorker {
 
     protected String                          canalDestination;                                                // canal实例
     protected List<List<OuterAdapter>>        canalOuterAdapters;                                              // 外部适配器
+    protected CanalClientConfig               canalClientConfig;                                               // 配置
     protected ExecutorService                 groupInnerExecutorService;                                       // 组内工作线程池
     protected volatile boolean                running = false;                                                 // 是否运行中
     protected Thread                          thread  = null;
@@ -55,11 +53,15 @@ public abstract class AbstractCanalAdapterWorker {
                     // 组内适配器穿行运行,尽量不要配置组内适配器
                     adapters.forEach(adapter -> {
                         long begin = System.currentTimeMillis();
-                        MessageUtil.parse4Dml(canalDestination, message, adapter::sync);
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("{} elapsed time: {}",
-                                adapter.getClass().getName(),
-                                (System.currentTimeMillis() - begin));
+                        List<Dml> dmls = MessageUtil.parse4Dml(canalDestination, message);
+                        if (dmls != null) {
+                            batchSync(dmls, adapter);
+
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("{} elapsed time: {}",
+                                    adapter.getClass().getName(),
+                                    (System.currentTimeMillis() - begin));
+                            }
                         }
                     });
                     return true;
@@ -82,7 +84,7 @@ public abstract class AbstractCanalAdapterWorker {
         });
     }
 
-    protected void writeOut(final List<FlatMessage> flatMessages) {
+    private void writeOut(final List<FlatMessage> flatMessages) {
         List<Future<Boolean>> futures = new ArrayList<>();
         // 组间适配器并行运行
         canalOuterAdapters.forEach(outerAdapters -> {
@@ -92,7 +94,8 @@ public abstract class AbstractCanalAdapterWorker {
                     outerAdapters.forEach(adapter -> {
                         long begin = System.currentTimeMillis();
                         List<Dml> dmls = MessageUtil.flatMessage2Dml(canalDestination, flatMessages);
-                        adapter.sync(dmls);
+                        batchSync(dmls, adapter);
+
                         if (logger.isDebugEnabled()) {
                             logger.debug("{} elapsed time: {}",
                                 adapter.getClass().getName(),
@@ -119,6 +122,7 @@ public abstract class AbstractCanalAdapterWorker {
         });
     }
 
+    @SuppressWarnings("unchecked")
     protected void mqWriteOutData(int retry, long timeout, final boolean flatMessage, CanalMQConnector connector,
                                   ExecutorService workerExecutor) {
         for (int i = 0; i < retry; i++) {
@@ -131,18 +135,13 @@ public abstract class AbstractCanalAdapterWorker {
                 }
                 if (messages != null) {
                     Future<Boolean> future = workerExecutor.submit(() -> {
-                        List<FlatMessage> flatMessages = new ArrayList<FlatMessage>(messages.size());
-                        for (final Object message : messages) {
-                            if (message instanceof FlatMessage) {
-                                flatMessages.add((FlatMessage) message);
-                            } else {
+                        if (flatMessage) {
+                            // batch write
+                            writeOut((List<FlatMessage>) messages);
+                        } else {
+                            for (final Object message : messages) {
                                 writeOut((Message) message);
                             }
-
-                            if (flatMessage) {
-                                // batch write
-                                writeOut(flatMessages);
-                            }
                         }
                         return true;
                     });
@@ -173,6 +172,28 @@ public abstract class AbstractCanalAdapterWorker {
         }
     }
 
+    /**
+     * 分批同步
+     * 
+     * @param dmls
+     * @param adapter
+     */
+    private void batchSync(List<Dml> dmls, OuterAdapter adapter) {
+        // 分批同步
+        int len = 0;
+        List<Dml> dmlsBatch = new ArrayList<>();
+        for (Dml dml : dmls) {
+            dmlsBatch.add(dml);
+            len += dml.getData().size();
+            if (len >= canalClientConfig.getSyncBatchSize()) {
+                adapter.sync(dmlsBatch);
+                dmlsBatch.clear();
+                len = 0;
+            }
+        }
+        adapter.sync(dmlsBatch);
+    }
+
     public void start() {
         if (!running) {
             thread = new Thread(this::process);

+ 0 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -19,7 +19,6 @@ import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
  */
 public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
-    private CanalClientConfig   canalClientConfig;
     private KafkaCanalConnector connector;
     private String              topic;
     private boolean             flatMessage;

+ 46 - 37
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -6,7 +6,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -55,14 +54,14 @@ public class CanalAdapterLoader {
         }
         String zkHosts = this.canalClientConfig.getZookeeperHosts();
 
-        if (canalClientConfig.getCanalInstances() != null) {
+        if ("tcp".equalsIgnoreCase(canalClientConfig.getMode())) {
             // 初始化canal-client的适配器
-            for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
+            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                 List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
-                for (CanalClientConfig.Group connectorGroup : instance.getGroups()) {
+                for (CanalClientConfig.Group connectorGroup : canalAdapter.getGroups()) {
                     List<OuterAdapter> canalOutConnectors = new ArrayList<>();
-                    for (OuterAdapterConfig c : connectorGroup.getOutAdapters()) {
+                    for (OuterAdapterConfig c : connectorGroup.getOuterAdapters()) {
                         loadConnector(c, canalOutConnectors);
                     }
                     canalOuterAdapterGroups.add(canalOutConnectors);
@@ -70,52 +69,65 @@ public class CanalAdapterLoader {
                 CanalAdapterWorker worker;
                 if (sa != null) {
                     worker = new CanalAdapterWorker(canalClientConfig,
-                        instance.getInstance(),
+                        canalAdapter.getInstance(),
                         sa,
                         canalOuterAdapterGroups);
                 } else if (zkHosts != null) {
                     worker = new CanalAdapterWorker(canalClientConfig,
-                        instance.getInstance(),
+                        canalAdapter.getInstance(),
                         zkHosts,
                         canalOuterAdapterGroups);
                 } else {
                     throw new RuntimeException("No canal server connector found");
                 }
-                canalWorkers.put(instance.getInstance(), worker);
+                canalWorkers.put(canalAdapter.getInstance(), worker);
                 worker.start();
-                logger.info("Start adapter for canal instance: {} succeed", instance.getInstance());
+                logger.info("Start adapter for canal instance: {} succeed", canalAdapter.getInstance());
             }
-        } else if (canalClientConfig.getMqTopics() != null) {
-            // 初始化canal-client-mq的适配器
-            for (CanalClientConfig.MQTopic topic : canalClientConfig.getMqTopics()) {
-                for (CanalClientConfig.MQGroup group : topic.getGroups()) {
+        } else if ("kafka".equalsIgnoreCase(canalClientConfig.getMode())) {
+            // 初始化canal-client-kafka的适配器
+            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
+                for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
                     List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
                     List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
-                    for (OuterAdapterConfig config : group.getOutAdapters()) {
+                    for (OuterAdapterConfig config : group.getOuterAdapters()) {
                         loadConnector(config, canalOuterAdapters);
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
-                    if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
-                            canalClientConfig.getMqServers(),
-                            topic.getTopic(),
-                            group.getGroupId(),
-                            canalOuterAdapterGroups,
-                            canalClientConfig.getFlatMessage());
-                        canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
-                        rocketMQWorker.start();
-                    } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
-                            canalClientConfig.getMqServers(),
-                            topic.getTopic(),
-                            group.getGroupId(),
-                            canalOuterAdapterGroups,
-                            canalClientConfig.getFlatMessage());
-                        canalMQWorker.put(topic.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
-                        canalKafkaWorker.start();
+
+                    CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
+                        canalClientConfig.getMqServers(),
+                        canalAdapter.getTopic(),
+                        group.getGroupId(),
+                        canalOuterAdapterGroups,
+                        canalClientConfig.getFlatMessage());
+                    canalMQWorker.put(canalAdapter.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
+                    canalKafkaWorker.start();
+                    logger.info("Start adapter for canal-client mq topic: {} succeed",
+                        canalAdapter.getTopic() + "-" + group.getGroupId());
+                }
+            }
+        } else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())) {
+            // 初始化canal-client-rocketMQ的适配器
+            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
+                for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
+                    for (OuterAdapterConfig config : group.getOuterAdapters()) {
+                        loadConnector(config, canalOuterAdapters);
                     }
+                    canalOuterAdapterGroups.add(canalOuterAdapters);
+                    CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
+                        canalClientConfig.getMqServers(),
+                        canalAdapter.getTopic(),
+                        group.getGroupId(),
+                        canalOuterAdapterGroups,
+                        canalClientConfig.getFlatMessage());
+                    canalMQWorker.put(canalAdapter.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
+                    rocketMQWorker.start();
+
                     logger.info("Start adapter for canal-client mq topic: {} succeed",
-                        topic.getTopic() + "-" + group.getGroupId());
+                        canalAdapter.getTopic() + "-" + group.getGroupId());
                 }
             }
         }
@@ -124,11 +136,8 @@ public class CanalAdapterLoader {
     private void loadConnector(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
         try {
             OuterAdapter adapter;
-            // if ("rdb".equalsIgnoreCase(config.getName())) {
             adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey()));
-            // } else {
-            // adapter = loader.getExtension(config.getName());
-            // }
+
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
             // 替换ClassLoader
             Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());

+ 0 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -21,7 +21,6 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
 
-    private CanalClientConfig      canalClientConfig;
     private RocketMQCanalConnector connector;
     private String                 topic;
     private boolean                flatMessage;

+ 0 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -25,8 +25,6 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
 
     private CanalConnector    connector;
 
-    private CanalClientConfig canalClientConfig;
-
     /**
      * 单台client适配器worker的构造方法
      *

+ 8 - 12
client-adapter/launcher/src/main/resources/application.yml

@@ -17,17 +17,22 @@ canal.conf:
 #  zookeeperHosts: slave1:2181
 #  mqServers: slave1:6667 #or rocketmq
 #  flatMessage: true
+  batchSize: 500
+  syncBatchSize: 1000
   retries: 0
   timeout:
+  mode: tcp # kafka rocketMQ
 #  srcDataSources:
 #    defaultDS:
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
 #      username: root
 #      password: 121212
-  canalInstances:
+  canalAdapters:
   - instance: example
+#    topic: example
     groups:
-    - outAdapters:
+    - #groupId: g1
+      outerAdapters:
       - name: logger
 #      - name: rdb
 #        key: oracle1
@@ -36,8 +41,6 @@ canal.conf:
 #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
 #          jdbc.username: mytest
 #          jdbc.password: m121212
-#          threads: 5
-#          commitSize: 5000
 #      - name: rdb
 #        key: postgres1
 #        properties:
@@ -55,11 +58,4 @@ canal.conf:
 #      - name: es
 #        hosts: 127.0.0.1:9300
 #        properties:
-#          cluster.name: elasticsearch
-#  mqTopics:
-#  - mqMode: kafka # or rocketmq
-#    topic: example
-#    groups:
-#    - groupId: g2
-#      outAdapters:
-#      - name: logger
+#          cluster.name: elasticsearch

+ 9 - 81
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java

@@ -2,16 +2,12 @@ package com.alibaba.otter.canal.client.adapter.rdb;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.sql.DataSource;
 
@@ -20,21 +16,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
 import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
-import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
-import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-import com.alibaba.otter.canal.client.adapter.support.EtlResult;
-import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
-import com.alibaba.otter.canal.client.adapter.support.SPI;
-import com.alibaba.otter.canal.client.adapter.support.Util;
+import com.alibaba.otter.canal.client.adapter.support.*;
 
 @SPI("rdb")
 public class RdbAdapter implements OuterAdapter {
@@ -48,13 +36,6 @@ public class RdbAdapter implements OuterAdapter {
 
     private RdbSyncService                          rdbSyncService;
 
-    private int                                     commitSize         = 3000;
-
-    private volatile boolean                        running            = false;
-
-    private List<SimpleDml>                         dmlList            = Collections
-        .synchronizedList(new ArrayList<>());
-    private Lock                                    syncLock           = new ReentrantLock();
     private ExecutorService                         executor           = Executors.newFixedThreadPool(1);
 
     private RdbConfigMonitor                        rdbConfigMonitor;
@@ -109,71 +90,19 @@ public class RdbAdapter implements OuterAdapter {
         }
 
         String threads = properties.get("threads");
-        String commitSize = properties.get("commitSize");
-        if (commitSize != null) {
-            this.commitSize = Integer.valueOf(commitSize);
-        }
-        rdbSyncService = new RdbSyncService(threads != null ? Integer.valueOf(threads) : null, dataSource);
-
-        running = true;
+        // String commitSize = properties.get("commitSize");
 
-        executor.submit(() -> {
-            while (running) {
-                int beginSize = dmlList.size();
-                try {
-                    Thread.sleep(3000);
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-                int endSize = dmlList.size();
-
-                if (endSize - beginSize < 300) {
-                    sync();
-                }
-            }
-        });
+        rdbSyncService = new RdbSyncService(mappingConfigCache,
+            dataSource,
+            threads != null ? Integer.valueOf(threads) : null);
 
         rdbConfigMonitor = new RdbConfigMonitor();
         rdbConfigMonitor.init(configuration.getKey(), this);
     }
 
+    @Override
     public void sync(List<Dml> dmls) {
-        for (Dml dml : dmls) {
-            sync(dml);
-        }
-    }
-
-    public void sync(Dml dml) {
-        String destination = StringUtils.trimToEmpty(dml.getDestination());
-        String database = dml.getDatabase();
-        String table = dml.getTable();
-        Map<String, MappingConfig> configMap = mappingConfigCache.get(destination + "." + database + "." + table);
-
-        if (configMap != null) {
-            configMap.values().forEach(config -> {
-                List<SimpleDml> simpleDmlList = SimpleDml.dml2SimpleDml(dml, config);
-                dmlList.addAll(simpleDmlList);
-
-                if (dmlList.size() >= commitSize) {
-                    sync();
-                }
-            });
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
-        }
-    }
-
-    private void sync() {
-        try {
-            syncLock.lock();
-            if (!dmlList.isEmpty()) {
-                rdbSyncService.sync(dmlList);
-                dmlList.clear();
-            }
-        } finally {
-            syncLock.unlock();
-        }
+        rdbSyncService.sync(dmls);
     }
 
     @Override
@@ -270,17 +199,16 @@ public class RdbAdapter implements OuterAdapter {
 
     @Override
     public void destroy() {
-        running = false;
         if (rdbConfigMonitor != null) {
             rdbConfigMonitor.destroy();
         }
 
-        executor.shutdown();
-
         if (rdbSyncService != null) {
             rdbSyncService.close();
         }
 
+        executor.shutdown();
+
         if (dataSource != null) {
             dataSource.close();
         }

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbEtlService.java

@@ -231,7 +231,7 @@ public class RdbEtlService {
 
                                 Object value = rs.getObject(srcColumnName);
                                 if (value != null) {
-                                    RdbSyncService.setPStmt(type, pstmt, value, i);
+                                    SyncUtil.setPStmt(type, pstmt, value, i);
                                 } else {
                                     pstmt.setNull(i, type);
                                 }

+ 156 - 288
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -1,27 +1,31 @@
 package com.alibaba.otter.canal.client.adapter.rdb.service;
 
-import java.io.Reader;
-import java.io.StringReader;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import javax.sql.DataSource;
 
-import org.joda.time.DateTime;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
-import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
+import com.alibaba.otter.canal.client.adapter.rdb.support.BatchExecutor;
+import com.alibaba.otter.canal.client.adapter.rdb.support.SingleDml;
 import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
+import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.Util;
 
 /**
@@ -36,87 +40,103 @@ public class RdbSyncService {
 
     private final Map<String, Map<String, Integer>> COLUMNS_TYPE_CACHE = new ConcurrentHashMap<>();
 
-    private BatchExecutor[]                         batchExecutors;
+    private Map<String, Map<String, MappingConfig>> mappingConfigCache;                                                // 库名-表名对应配置
 
-    private int                                     threads            = 1;
+    private int                                     threads            = 3;
 
-    private ExecutorService[]                       threadExecutors;
+    private List<SyncItem>[]                        dmlsPartition;
+    private BatchExecutor[]                         batchExecutors;
+    private ExecutorService[]                       executorThreads;
 
-    public RdbSyncService(Integer threads, DataSource dataSource){
+    @SuppressWarnings("unchecked")
+    public RdbSyncService(Map<String, Map<String, MappingConfig>> mappingConfigCache, DataSource dataSource,
+                          Integer threads){
         try {
-            if (threads != null && threads > 1 && threads <= 10) {
+            if (threads != null) {
                 this.threads = threads;
             }
-            batchExecutors = new BatchExecutor[this.threads];
+            this.mappingConfigCache = mappingConfigCache;
+            this.dmlsPartition = new List[this.threads];
+            this.batchExecutors = new BatchExecutor[this.threads];
+            this.executorThreads = new ExecutorService[this.threads];
             for (int i = 0; i < this.threads; i++) {
-                Connection conn = dataSource.getConnection();
-                conn.setAutoCommit(false);
-                this.batchExecutors[i] = new BatchExecutor(i, conn);
-            }
-            threadExecutors = new ExecutorService[this.threads];
-            for (int i = 0; i < this.threads; i++) {
-                threadExecutors[i] = Executors.newFixedThreadPool(1);
+                dmlsPartition[i] = new ArrayList<>();
+                batchExecutors[i] = new BatchExecutor(dataSource.getConnection());
+                executorThreads[i] = Executors.newSingleThreadExecutor();
             }
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private List<SimpleDml>[] simpleDmls2Partition(List<SimpleDml> simpleDmlList) {
-        List<SimpleDml>[] simpleDmlPartition = new ArrayList[threads];
-        for (int i = 0; i < threads; i++) {
-            simpleDmlPartition[i] = new ArrayList<>();
-        }
-        simpleDmlList.forEach(simpleDml -> {
-            int hash;
-            if (simpleDml.getConfig().getConcurrent()) {
-                hash = pkHash(simpleDml.getConfig().getDbMapping(), simpleDml.getData(), threads);
-            } else {
-                hash = Math.abs(Math.abs(simpleDml.getConfig().getDbMapping().getTargetTable().hashCode()) % threads);
-            }
-            simpleDmlPartition[hash].add(simpleDml);
-        });
-        return simpleDmlPartition;
-    }
-
-    public void sync(List<SimpleDml> simpleDmlList) {
+    public void sync(List<Dml> dmls) {
         try {
-            List<SimpleDml>[] simpleDmlsPartition = simpleDmls2Partition(simpleDmlList);
-
-            List<Future<Boolean>> futures = new ArrayList<>();
-            for (int i = 0; i < threads; i++) {
-                if (!simpleDmlsPartition[i].isEmpty()) {
-                    int j = i;
-                    futures.add(threadExecutors[i].submit(() -> {
-                        simpleDmlsPartition[j].forEach(simpleDml -> sync(simpleDml, batchExecutors[j]));
-                        batchExecutors[j].commit();
-                        return true;
-                    }));
+            for (Dml dml : dmls) {
+                String destination = StringUtils.trimToEmpty(dml.getDestination());
+                String database = dml.getDatabase();
+                String table = dml.getTable();
+                Map<String, MappingConfig> configMap = mappingConfigCache
+                    .get(destination + "." + database + "." + table);
+
+                for (MappingConfig config : configMap.values()) {
+
+                    if (config.getConcurrent()) {
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            dmlsPartition[hash].add(syncItem);
+                        });
+                    } else {
+                        int hash = Math.abs(Math.abs(config.getDbMapping().getTargetTable().hashCode()) % threads);
+                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
+                        singleDmls.forEach(singleDml -> {
+                            SyncItem syncItem = new SyncItem(config, singleDml);
+                            dmlsPartition[hash].add(syncItem);
+                        });
+                    }
                 }
             }
+            List<Future> futures = new ArrayList<>();
+            for (int i = 0; i < threads; i++) {
+                int j = i;
+                futures.add(executorThreads[i].submit(() -> {
+                    dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
+                    batchExecutors[j].commit();
+                    return true;
+                }));
+            }
 
             futures.forEach(future -> {
                 try {
                     future.get();
                 } catch (Exception e) {
-                    // ignore
+                    logger.error(e.getMessage(), e);
                 }
             });
+
+            for (int i = 0; i < threads; i++) {
+                dmlsPartition[i].clear();
+            }
         } catch (Exception e) {
-            logger.error("Error rdb sync for batch", e);
+            logger.error(e.getMessage(), e);
         }
     }
 
-    public void sync(SimpleDml dml, BatchExecutor batchExecutor) {
+    private void sync(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
         try {
-            String type = dml.getType();
-            if (type != null && type.equalsIgnoreCase("INSERT")) {
-                insert(dml, batchExecutor);
-            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
-                update(dml, batchExecutor);
-            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
-                delete(dml, batchExecutor);
+            if (config != null) {
+                String type = dml.getType();
+                if (type != null && type.equalsIgnoreCase("INSERT")) {
+                    insert(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
+                    update(batchExecutor, config, dml);
+                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
+                    delete(batchExecutor, config, dml);
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
+                }
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -126,53 +146,55 @@ public class RdbSyncService {
     /**
      * 插入操作
      *
-     * @param simpleDml DML数据
+     * @param config 配置项
+     * @param dml DML数据
      */
-    private void insert(SimpleDml simpleDml, BatchExecutor batchExecutor) {
-        Map<String, Object> data = simpleDml.getData();
+    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+        Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
-
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+        DbMapping dbMapping = config.getDbMapping();
 
-        StringBuilder insertSql = new StringBuilder();
-        insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
+        try {
+            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
-        columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
-        int len = insertSql.length();
-        insertSql.delete(len - 1, len).append(") VALUES (");
-        int mapLen = columnsMap.size();
-        for (int i = 0; i < mapLen; i++) {
-            insertSql.append("?,");
-        }
-        len = insertSql.length();
-        insertSql.delete(len - 1, len).append(")");
+            StringBuilder insertSql = new StringBuilder();
+            insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" (");
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
+            columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append(targetColumnName).append(","));
+            int len = insertSql.length();
+            insertSql.delete(len - 1, len).append(") VALUES (");
+            int mapLen = columnsMap.size();
+            for (int i = 0; i < mapLen; i++) {
+                insertSql.append("?,");
+            }
+            len = insertSql.length();
+            insertSql.delete(len - 1, len).append(")");
 
-        String sql = insertSql.toString();
+            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
-        try {
             List<Map<String, ?>> values = new ArrayList<>();
-
             for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
-                String targetColumnName = entry.getKey();
+                String targetClolumnName = entry.getKey();
                 String srcColumnName = entry.getValue();
                 if (srcColumnName == null) {
-                    srcColumnName = targetColumnName;
+                    srcColumnName = targetClolumnName;
                 }
 
-                Integer type = ctype.get(targetColumnName.toLowerCase());
-                if (type == null) {
-                    throw new RuntimeException("No column: " + targetColumnName + " found in target db");
-                }
+                Integer type = ctype.get(targetClolumnName.toLowerCase());
+
                 Object value = data.get(srcColumnName);
+
                 BatchExecutor.setValue(values, type, value);
             }
-            batchExecutor.execute(sql, values);
+
+            batchExecutor.execute(insertSql.toString(), values);
+            if (logger.isTraceEnabled()) {
+                logger.trace("Insert into target table, sql: {}", insertSql);
+            }
+
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
@@ -180,30 +202,30 @@ public class RdbSyncService {
 
     /**
      * 更新操作
-     * 
-     * @param simpleDml DML数据
+     *
+     * @param config 配置项
+     * @param dml DML数据
      */
-    private void update(SimpleDml simpleDml, BatchExecutor batchExecutor) {
-        Map<String, Object> data = simpleDml.getData();
+    private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+        Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        Map<String, Object> old = simpleDml.getOld();
+        Map<String, Object> old = dml.getOld();
         if (old == null || old.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
+        DbMapping dbMapping = config.getDbMapping();
 
-        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
+        try {
+            Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
 
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
+            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
 
-        try {
             StringBuilder updateSql = new StringBuilder();
             updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET ");
-
             List<Map<String, ?>> values = new ArrayList<>();
             for (String srcColumnName : old.keySet()) {
                 List<String> targetColumnNames = new ArrayList<>();
@@ -213,6 +235,7 @@ public class RdbSyncService {
                     }
                 });
                 if (!targetColumnNames.isEmpty()) {
+
                     for (String targetColumnName : targetColumnNames) {
                         updateSql.append(targetColumnName).append("=?, ");
                         Integer type = ctype.get(targetColumnName.toLowerCase());
@@ -229,7 +252,7 @@ public class RdbSyncService {
             batchExecutor.execute(updateSql.toString(), values);
 
             if (logger.isTraceEnabled()) {
-                logger.trace("Execute sql: {}", updateSql);
+                logger.trace("Update target table, sql: {}", updateSql);
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -238,31 +261,32 @@ public class RdbSyncService {
 
     /**
      * 删除操作
-     * 
-     * @param simpleDml
+     *
+     * @param config
+     * @param dml
      */
-    private void delete(SimpleDml simpleDml, BatchExecutor batchExecutor) {
-        Map<String, Object> data = simpleDml.getData();
+    private void delete(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) {
+        Map<String, Object> data = dml.getData();
         if (data == null || data.isEmpty()) {
             return;
         }
 
-        DbMapping dbMapping = simpleDml.getConfig().getDbMapping();
-
-        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig());
+        DbMapping dbMapping = config.getDbMapping();
 
         try {
+            Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
+
             StringBuilder sql = new StringBuilder();
             sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE ");
 
             List<Map<String, ?>> values = new ArrayList<>();
-
             // 拼接主键
             appendCondition(dbMapping, sql, ctype, values, data);
 
             batchExecutor.execute(sql.toString(), values);
+
             if (logger.isTraceEnabled()) {
-                logger.trace("Execute sql: {}", sql);
+                logger.trace("Delete from target table, sql: {}", sql);
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
@@ -271,7 +295,7 @@ public class RdbSyncService {
 
     /**
      * 获取目标字段类型
-     * 
+     *
      * @param conn sql connection
      * @param config 映射配置
      * @return 字段sqlType
@@ -305,181 +329,16 @@ public class RdbSyncService {
         return columnType;
     }
 
-    /**
-     * 设置 preparedStatement
-     * 
-     * @param type sqlType
-     * @param pstmt 需要设置的preparedStatement
-     * @param value 值
-     * @param i 索引号
-     */
-    public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
-        if (value == null) {
-            pstmt.setNull(i, type);
-            return;
-        }
-        switch (type) {
-            case Types.BIT:
-            case Types.BOOLEAN:
-                if (value instanceof Boolean) {
-                    pstmt.setBoolean(i, (Boolean) value);
-                } else if (value instanceof String) {
-                    boolean v = !value.equals("0");
-                    pstmt.setBoolean(i, v);
-                } else if (value instanceof Number) {
-                    boolean v = ((Number) value).intValue() != 0;
-                    pstmt.setBoolean(i, v);
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.CHAR:
-            case Types.NCHAR:
-            case Types.VARCHAR:
-            case Types.LONGVARCHAR:
-                pstmt.setString(i, value.toString());
-                break;
-            case Types.TINYINT:
-                if (value instanceof Number) {
-                    pstmt.setByte(i, ((Number) value).byteValue());
-                } else if (value instanceof String) {
-                    pstmt.setByte(i, Byte.parseByte((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.SMALLINT:
-                if (value instanceof Number) {
-                    pstmt.setShort(i, ((Number) value).shortValue());
-                } else if (value instanceof String) {
-                    pstmt.setShort(i, Short.parseShort((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.INTEGER:
-                if (value instanceof Number) {
-                    pstmt.setInt(i, ((Number) value).intValue());
-                } else if (value instanceof String) {
-                    pstmt.setInt(i, Integer.parseInt((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.BIGINT:
-                if (value instanceof Number) {
-                    pstmt.setLong(i, ((Number) value).longValue());
-                } else if (value instanceof String) {
-                    pstmt.setLong(i, Long.parseLong((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.DECIMAL:
-            case Types.NUMERIC:
-                pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
-                break;
-            case Types.REAL:
-                if (value instanceof Number) {
-                    pstmt.setFloat(i, ((Number) value).floatValue());
-                } else if (value instanceof String) {
-                    pstmt.setFloat(i, Float.parseFloat((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.FLOAT:
-            case Types.DOUBLE:
-                if (value instanceof Number) {
-                    pstmt.setDouble(i, ((Number) value).doubleValue());
-                } else if (value instanceof String) {
-                    pstmt.setDouble(i, Double.parseDouble((String) value));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.BINARY:
-            case Types.VARBINARY:
-            case Types.LONGVARBINARY:
-            case Types.BLOB:
-
-                if (value instanceof byte[]) {
-                    pstmt.setBytes(i, (byte[]) value);
-                } else if (value instanceof String) {
-                    pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.CLOB:
-                if (value instanceof byte[]) {
-                    pstmt.setBytes(i, (byte[]) value);
-                } else if (value instanceof String) {
-                    Reader clobReader = new StringReader((String) value);
-                    pstmt.setCharacterStream(i, clobReader);
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.DATE:
-                if (value instanceof java.util.Date) {
-                    pstmt.setDate(i, new Date(((java.util.Date) value).getTime()));
-                } else if (value instanceof String) {
-                    String v = (String) value;
-                    if (!v.startsWith("0000-00-00")) {
-                        v = v.trim().replace(" ", "T");
-                        DateTime dt = new DateTime(v);
-                        pstmt.setDate(i, new Date(dt.toDate().getTime()));
-                    } else {
-                        pstmt.setNull(i, type);
-                    }
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.TIME:
-                if (value instanceof java.util.Date) {
-                    pstmt.setTime(i, new Time(((java.util.Date) value).getTime()));
-                } else if (value instanceof String) {
-                    String v = (String) value;
-                    v = "T" + v;
-                    DateTime dt = new DateTime(v);
-                    pstmt.setTime(i, new Time(dt.toDate().getTime()));
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            case Types.TIMESTAMP:
-                if (value instanceof java.util.Date) {
-                    pstmt.setTimestamp(i, new Timestamp(((java.util.Date) value).getTime()));
-                } else if (value instanceof String) {
-                    String v = (String) value;
-                    if (!v.startsWith("0000-00-00")) {
-                        v = v.trim().replace(" ", "T");
-                        DateTime dt = new DateTime(v);
-                        pstmt.setTimestamp(i, new Timestamp(dt.toDate().getTime()));
-                    } else {
-                        pstmt.setNull(i, type);
-                    }
-                } else {
-                    pstmt.setNull(i, type);
-                }
-                break;
-            default:
-                pstmt.setObject(i, value, type);
-        }
-    }
-
     /**
      * 拼接主键 where条件
      */
-    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
-                                        List<Map<String, ?>> values, Map<String, Object> d) {
+    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                 List<Map<String, ?>> values, Map<String, Object> d) {
         appendCondition(dbMapping, sql, ctype, values, d, null);
     }
 
-    private static void appendCondition(DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
-                                        List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
+    private void appendCondition(MappingConfig.DbMapping dbMapping, StringBuilder sql, Map<String, Integer> ctype,
+                                 List<Map<String, ?>> values, Map<String, Object> d, Map<String, Object> o) {
         // 拼接主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
             String targetColumnName = entry.getKey();
@@ -500,14 +359,25 @@ public class RdbSyncService {
         sql.delete(len - 4, len);
     }
 
+    private class SyncItem {
+
+        private MappingConfig config;
+        private SingleDml     singleDml;
+
+        private SyncItem(MappingConfig config, SingleDml singleDml){
+            this.config = config;
+            this.singleDml = singleDml;
+        }
+    }
+
     /**
      * 取主键hash
      */
-    private static int pkHash(DbMapping dbMapping, Map<String, Object> d, int threads) {
-        return pkHash(dbMapping, d, null, threads);
+    private int pkHash(DbMapping dbMapping, Map<String, Object> d) {
+        return pkHash(dbMapping, d, null);
     }
 
-    private static int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o, int threads) {
+    private int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
         int hash = 0;
         // 取主键
         for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
@@ -531,11 +401,9 @@ public class RdbSyncService {
     }
 
     public void close() {
-        for (BatchExecutor batchExecutor : batchExecutors) {
-            batchExecutor.close();
-        }
-        for (ExecutorService executorService : threadExecutors) {
-            executorService.shutdown();
+        for (int i = 0; i < threads; i++) {
+            batchExecutors[i].close();
+            executorThreads[i].shutdown();
         }
     }
 }

+ 12 - 19
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java → client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java

@@ -1,32 +1,28 @@
-package com.alibaba.otter.canal.client.adapter.rdb.service;
+package com.alibaba.otter.canal.client.adapter.rdb.support;
 
+import java.io.Closeable;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BatchExecutor {
+public class BatchExecutor implements Closeable {
 
-    private static final Logger logger     = LoggerFactory.getLogger(BatchExecutor.class);
+    private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);
 
     private Integer             key;
     private Connection          conn;
-    private AtomicInteger       idx        = new AtomicInteger(0);
-    private ExecutorService     executor   = Executors.newFixedThreadPool(1);
-    private Lock                commitLock = new ReentrantLock();
-    private Condition           condition  = commitLock.newCondition();
+    private AtomicInteger       idx    = new AtomicInteger(0);
+
+    public BatchExecutor(Connection conn){
+        this(1, conn);
+    }
 
     public BatchExecutor(Integer key, Connection conn){
         this.key = key;
@@ -60,10 +56,11 @@ public class BatchExecutor {
             for (int i = 0; i < len; i++) {
                 int type = (Integer) values.get(i).get("type");
                 Object value = values.get(i).get("value");
-                RdbSyncService.setPStmt(type, pstmt, value, i + 1);
+                SyncUtil.setPStmt(type, pstmt, value, i + 1);
             }
 
             pstmt.execute();
+            idx.incrementAndGet();
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
         }
@@ -71,20 +68,17 @@ public class BatchExecutor {
 
     public void commit() {
         try {
-            commitLock.lock();
             conn.commit();
             if (logger.isTraceEnabled()) {
                 logger.trace("Batch executor: " + key + " commit " + idx.get() + " rows");
             }
-            condition.signal();
             idx.set(0);
         } catch (SQLException e) {
             logger.error(e.getMessage(), e);
-        } finally {
-            commitLock.unlock();
         }
     }
 
+    @Override
     public void close() {
         if (conn != null) {
             try {
@@ -93,6 +87,5 @@ public class BatchExecutor {
                 logger.error(e.getMessage(), e);
             }
         }
-        executor.shutdown();
     }
 }

+ 16 - 30
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SimpleDml.java → client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SingleDml.java

@@ -1,13 +1,12 @@
 package com.alibaba.otter.canal.client.adapter.rdb.support;
 
+import com.alibaba.otter.canal.client.adapter.support.Dml;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
-import com.alibaba.otter.canal.client.adapter.support.Dml;
-
-public class SimpleDml {
+public class SingleDml {
 
     private String              destination;
     private String              database;
@@ -16,8 +15,6 @@ public class SimpleDml {
     private Map<String, Object> data;
     private Map<String, Object> old;
 
-    private MappingConfig       config;
-
     public String getDestination() {
         return destination;
     }
@@ -66,32 +63,21 @@ public class SimpleDml {
         this.old = old;
     }
 
-    public MappingConfig getConfig() {
-        return config;
-    }
-
-    public void setConfig(MappingConfig config) {
-        this.config = config;
-    }
-
-    public static List<SimpleDml> dml2SimpleDml(Dml dml, MappingConfig config) {
-        List<SimpleDml> simpleDmlList = new ArrayList<>();
-        int len = dml.getData().size();
-
-        for (int i = 0; i < len; i++) {
-            SimpleDml simpleDml = new SimpleDml();
-            simpleDml.setDestination(dml.getDestination());
-            simpleDml.setDatabase(dml.getDatabase());
-            simpleDml.setTable(dml.getTable());
-            simpleDml.setType(dml.getType());
-            simpleDml.setData(dml.getData().get(i));
+    public static List<SingleDml> dml2SingleDmls(Dml dml) {
+        int size = dml.getData().size();
+        List<SingleDml> singleDmls = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            SingleDml singleDml = new SingleDml();
+            singleDml.setDestination(dml.getDestination());
+            singleDml.setDatabase(dml.getDatabase());
+            singleDml.setTable(dml.getTable());
+            singleDml.setType(dml.getType());
+            singleDml.setData(dml.getData().get(i));
             if (dml.getOld() != null) {
-                simpleDml.setOld(dml.getOld().get(i));
+                singleDml.setOld(dml.getOld().get(i));
             }
-            simpleDml.setConfig(config);
-            simpleDmlList.add(simpleDml);
+            singleDmls.add(singleDml);
         }
-
-        return simpleDmlList;
+        return singleDmls;
     }
 }

+ 213 - 0
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SyncUtil.java

@@ -1,9 +1,16 @@
 package com.alibaba.otter.canal.client.adapter.rdb.support;
 
+import java.io.Reader;
+import java.io.StringReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.*;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.joda.time.DateTime;
+
 import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
 
 public class SyncUtil {
@@ -36,4 +43,210 @@ public class SyncUtil {
         }
         return columnsMap;
     }
+
+    /**
+     * 设置 preparedStatement
+     *
+     * @param type sqlType
+     * @param pstmt 需要设置的preparedStatement
+     * @param value 值
+     * @param i 索引号
+     */
+    public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException {
+        switch (type) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+                if (value instanceof Boolean) {
+                    pstmt.setBoolean(i, (Boolean) value);
+                } else if (value instanceof String) {
+                    boolean v = !value.equals("0");
+                    pstmt.setBoolean(i, v);
+                } else if (value instanceof Number) {
+                    boolean v = ((Number) value).intValue() != 0;
+                    pstmt.setBoolean(i, v);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CHAR:
+            case Types.NCHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                if (value instanceof String) {
+                    pstmt.setString(i, (String) value);
+                } else {
+                    pstmt.setString(i, value.toString());
+                }
+                break;
+            case Types.TINYINT:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer) {
+                    pstmt.setByte(i, (byte) value);
+                } else if (value instanceof Number) {
+                    pstmt.setByte(i, ((Number) value).byteValue());
+                } else if (value instanceof String) {
+                    pstmt.setByte(i, Byte.parseByte((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.SMALLINT:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer) {
+                    pstmt.setShort(i, (short) value);
+                } else if (value instanceof Number) {
+                    pstmt.setShort(i, ((Number) value).shortValue());
+                } else if (value instanceof String) {
+                    pstmt.setShort(i, Short.parseShort((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.INTEGER:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer
+                    || value instanceof Long) {
+                    pstmt.setInt(i, (int) value);
+                } else if (value instanceof Number) {
+                    pstmt.setInt(i, ((Number) value).intValue());
+                } else if (value instanceof String) {
+                    pstmt.setInt(i, Integer.parseInt((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BIGINT:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer
+                    || value instanceof Long) {
+                    pstmt.setLong(i, (long) value);
+                } else if (value instanceof Number) {
+                    pstmt.setLong(i, ((Number) value).longValue());
+                } else if (value instanceof String) {
+                    pstmt.setLong(i, Long.parseLong((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DECIMAL:
+            case Types.NUMERIC:
+                if (value instanceof BigDecimal) {
+                    pstmt.setBigDecimal(i, (BigDecimal) value);
+                } else if (value instanceof Byte) {
+                    pstmt.setInt(i, (int) value);
+                } else if (value instanceof Short) {
+                    pstmt.setInt(i, (int) value);
+                } else if (value instanceof Integer) {
+                    pstmt.setInt(i, (int) value);
+                } else if (value instanceof Long) {
+                    pstmt.setLong(i, (long) value);
+                } else if (value instanceof Float) {
+                    pstmt.setBigDecimal(i, new BigDecimal((float) value));
+                } else if (value instanceof Double) {
+                    pstmt.setBigDecimal(i, new BigDecimal((double) value));
+                } else {
+                    pstmt.setBigDecimal(i, new BigDecimal(value.toString()));
+                }
+                break;
+            case Types.REAL:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long
+                    || value instanceof Float || value instanceof Double) {
+                    pstmt.setFloat(i, (float) value);
+                } else if (value instanceof Number) {
+                    pstmt.setFloat(i, ((Number) value).floatValue());
+                } else if (value instanceof String) {
+                    pstmt.setFloat(i, Float.parseFloat((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                if (value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long
+                    || value instanceof Float || value instanceof Double) {
+                    pstmt.setDouble(i, (double) value);
+                } else if (value instanceof Number) {
+                    pstmt.setDouble(i, ((Number) value).doubleValue());
+                } else if (value instanceof String) {
+                    pstmt.setDouble(i, Double.parseDouble((String) value));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                if (value instanceof Blob) {
+                    pstmt.setBlob(i, (Blob) value);
+                } else if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    pstmt.setBytes(i, ((String) value).getBytes(StandardCharsets.ISO_8859_1));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.CLOB:
+                if (value instanceof Clob) {
+                    pstmt.setClob(i, (Clob) value);
+                } else if (value instanceof byte[]) {
+                    pstmt.setBytes(i, (byte[]) value);
+                } else if (value instanceof String) {
+                    Reader clobReader = new StringReader((String) value);
+                    pstmt.setCharacterStream(i, clobReader);
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.DATE:
+                if (value instanceof java.sql.Date) {
+                    pstmt.setDate(i, (java.sql.Date) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setDate(i, new java.sql.Date(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        v = v.trim().replace(" ", "T");
+                        DateTime dt = new DateTime(v);
+                        pstmt.setDate(i, new Date(dt.toDate().getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIME:
+                if (value instanceof java.sql.Time) {
+                    pstmt.setTime(i, (java.sql.Time) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setTime(i, new java.sql.Time(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    v = "T" + v;
+                    DateTime dt = new DateTime(v);
+                    pstmt.setTime(i, new Time(dt.toDate().getTime()));
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            case Types.TIMESTAMP:
+                if (value instanceof java.sql.Timestamp) {
+                    pstmt.setTimestamp(i, (java.sql.Timestamp) value);
+                } else if (value instanceof java.util.Date) {
+                    pstmt.setTimestamp(i, new java.sql.Timestamp(((java.util.Date) value).getTime()));
+                } else if (value instanceof String) {
+                    String v = (String) value;
+                    if (!v.startsWith("0000-00-00")) {
+                        v = v.trim().replace(" ", "T");
+                        DateTime dt = new DateTime(v);
+                        pstmt.setTimestamp(i, new Timestamp(dt.toDate().getTime()));
+                    } else {
+                        pstmt.setNull(i, type);
+                    }
+                } else {
+                    pstmt.setNull(i, type);
+                }
+                break;
+            default:
+                pstmt.setObject(i, value, type);
+        }
+    }
 }

+ 1 - 1
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/DBTest.java

@@ -48,7 +48,7 @@ public class DBTest {
             pstmt.setString(2, "test_" + i);
             pstmt.setLong(3, (long) i % 4 + 1);
             pstmt.setDate(4, new java.sql.Date(now.getTime()));
-            pstmt.setString(5, "tttt");
+            pstmt.setString(5, null);
             pstmt.setBytes(6, null);
 
             pstmt.execute();

+ 2 - 2
client-adapter/rdb/src/test/java/com/alibaba/otter/canal/client/adapter/rdb/test/sync/OracleSyncTest.java

@@ -35,7 +35,7 @@ public class OracleSyncTest {
         data.put("test1", "sdfasdfawe中国asfwef");
         dml.setData(dataList);
 
-        rdbAdapter.sync(dml);
+        rdbAdapter.sync(Collections.singletonList(dml));
     }
 
     @Test
@@ -58,7 +58,7 @@ public class OracleSyncTest {
         old.put("name", "Eric");
         dml.setOld(oldList);
 
-        rdbAdapter.sync(dml);
+        rdbAdapter.sync(Collections.singletonList(dml));
     }
 
 }

+ 1 - 1
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -174,7 +174,7 @@ public class KafkaCanalConnector implements CanalMQConnector {
             return Lists.newArrayList();
         }
 
-        ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout)); // 基于配置,最多只能poll到一条数据
+        ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));
 
         if (!records.isEmpty()) {
             List<Message> messages = new ArrayList<>();

+ 27 - 15
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,11 +1,8 @@
 package com.alibaba.otter.canal.kafka;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -13,8 +10,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Properties;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 /**
  * kafka producer 主操作类
@@ -38,7 +39,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
         properties.put("acks", kafkaProperties.getAcks());
-        properties.put("compression.type",kafkaProperties.getCompressionType());
+        properties.put("compression.type", kafkaProperties.getCompressionType());
         properties.put("retries", kafkaProperties.getRetries());
         properties.put("batch.size", kafkaProperties.getBatchSize());
         properties.put("linger.ms", kafkaProperties.getLingerMs());
@@ -89,6 +90,12 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 }
 
                 producer.send(record).get();
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Send  message to kafka topic: [{}], packet: {}",
+                        canalDestination.getTopic(),
+                        message.toString());
+                }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
                 // producer.abortTransaction();
@@ -102,7 +109,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 for (FlatMessage flatMessage : flatMessages) {
                     if (canalDestination.getPartition() != null) {
                         try {
-                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                canalDestination.getTopic(),
                                 canalDestination.getPartition(),
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
@@ -124,7 +132,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
                                 if (flatMessagePart != null) {
                                     try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                            canalDestination.getTopic(),
                                             i,
                                             null,
                                             JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
@@ -139,7 +148,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         } else {
                             try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(canalDestination.getTopic(),
+                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                    canalDestination.getTopic(),
                                     0,
                                     null,
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
@@ -152,15 +162,17 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             }
                         }
                     }
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Send flat message to kafka topic: [{}], packet: {}",
+                            canalDestination.getTopic(),
+                            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
+                    }
                 }
             }
         }
 
         // producer.commitTransaction();
         callback.commit();
-        if (logger.isDebugEnabled()) {
-            logger.debug("send message to kafka topic: {}", canalDestination.getTopic());
-        }
 
     }