浏览代码

fixed issue #2258 , optimizer kafka send perf

agapple 5 年之前
父节点
当前提交
0fb717a97f

+ 100 - 0
common/src/main/java/com/alibaba/otter/canal/common/utils/ExecutorTemplate.java

@@ -0,0 +1,100 @@
+package com.alibaba.otter.canal.common.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * 多线程执行器模板代码,otter中好多地方都写多线程,比较多的都是重复的逻辑代码,抽象一下做个模板把
+ *
+ * <pre>
+ * 示例代码:
+ * ExecutorTemplate template = new ExecutorTemplate(executor);
+ * ...
+ * try {
+ *    for ( ....) {
+ *       template.submit(new Runnable() {})
+ *    }
+ * 
+ *    List<?> result = template.waitForResult();
+ *    // do result
+ * } finally {
+ *    template.clear();
+ * }
+ * 
+ * 注意:该模板工程,不支持多业务并发调用,会出现数据混乱
+ * </pre>
+ */
+public class ExecutorTemplate {
+
+    private volatile ThreadPoolExecutor executor = null;
+    private volatile List<Future>       futures  = null;
+
+    public ExecutorTemplate(ThreadPoolExecutor executor){
+        this.futures = Collections.synchronizedList(new ArrayList<Future>());
+        this.executor = executor;
+    }
+
+    public void submit(Runnable task) {
+        Future future = executor.submit(task, null);
+        futures.add(future);
+        check(future);
+    }
+
+    public void submit(Callable<Exception> task) {
+        Future future = executor.submit(task);
+        futures.add(future);
+        check(future);
+    }
+
+    private void check(Future future) {
+        if (future.isDone()) {
+            // 立即判断一次,因为使用了CallerRun可能当场跑出结果,针对有异常时快速响应,而不是等跑完所有的才抛异常
+            try {
+                future.get();
+            } catch (Throwable e) {
+                // 取消完之后立马退出
+                cacelAllFutures();
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public synchronized List<?> waitForResult() {
+        List result = new ArrayList();
+        RuntimeException exception = null;
+
+        for (Future future : futures) {
+            try {
+                result.add(future.get());
+            } catch (Throwable e) {
+                exception = new RuntimeException(e);
+                // 如何一个future出现了异常,就退出
+                break;
+            }
+        }
+
+        if (exception != null) {
+            cacelAllFutures();
+            throw exception;
+        } else {
+            return result;
+        }
+    }
+
+    public void cacelAllFutures() {
+        for (Future future : futures) {
+            if (!future.isDone() && !future.isCancelled()) {
+                future.cancel(true);
+            }
+        }
+    }
+
+    public void clear() {
+        futures.clear();
+    }
+
+}

+ 7 - 4
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStarter.java

@@ -10,8 +10,8 @@ import com.alibaba.otter.canal.admin.netty.CanalAdminWithNetty;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.deployer.admin.CanalAdminController;
 import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
-import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
 import com.alibaba.otter.canal.rabbitmq.CanalRabbitMQProducer;
+import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
 import com.alibaba.otter.canal.server.CanalMQStarter;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
 
@@ -69,11 +69,15 @@ public class CanalStarter {
             canalMQProducer = new CanalRabbitMQProducer();
         }
 
+        MQProperties mqProperties = null;
         if (canalMQProducer != null) {
+            mqProperties = buildMQProperties(properties);
             // disable netty
             System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
-            // 设置为raw避免ByteString->Entry的二次解析
-            System.setProperty("canal.instance.memory.rawEntry", "false");
+            if (mqProperties.getFlatMessage()) {
+                // 设置为raw避免ByteString->Entry的二次解析
+                System.setProperty("canal.instance.memory.rawEntry", "false");
+            }
         }
 
         logger.info("## start the canal server.");
@@ -99,7 +103,6 @@ public class CanalStarter {
 
         if (canalMQProducer != null) {
             canalMQStarter = new CanalMQStarter(canalMQProducer);
-            MQProperties mqProperties = buildMQProperties(properties);
             String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
             canalMQStarter.start(mqProperties, destinations);
             controller.setCanalMQStarter(canalMQStarter);

+ 35 - 0
server/src/main/java/com/alibaba/otter/canal/common/AbstractMQProducer.java

@@ -0,0 +1,35 @@
+package com.alibaba.otter.canal.common;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+
+/**
+ * @author agapple 2019年9月29日 上午11:17:11
+ * @since 1.1.5
+ */
+public abstract class AbstractMQProducer implements CanalMQProducer {
+
+    protected ThreadPoolExecutor executor;
+
+    @Override
+    public void init(MQProperties mqProperties) {
+        int parallelThreadSize = mqProperties.getParallelThreadSize();
+        executor = new ThreadPoolExecutor(parallelThreadSize,
+            parallelThreadSize,
+            0,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<Runnable>(parallelThreadSize * 2),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+    }
+
+    @Override
+    public void stop() {
+        executor.shutdownNow();
+        executor = null;
+    }
+
+}

+ 164 - 139
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -1,15 +1,18 @@
 package com.alibaba.otter.canal.common;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.lang.StringUtils;
 
+import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
@@ -28,6 +31,10 @@ import com.google.protobuf.InvalidProtocolBufferException;
  *
  * @author agapple 2018年12月11日 下午1:28:32
  */
+/**
+ * @author agapple 2019年9月29日 下午12:36:26
+ * @since 5.0.0
+ */
 public class MQMessageUtils {
 
     @SuppressWarnings("deprecation")
@@ -160,6 +167,67 @@ public class MQMessageUtils {
         return messages;
     }
 
+    /**
+     * 多线程构造message的rowChanged对象,比如为partition/flastMessage转化等处理 </br>
+     * 因为protobuf对象的序列化和反序列化是cpu密集型,串行执行会有代价
+     */
+    public static EntryRowData[] buildMessageData(Message message, ThreadPoolExecutor executor) {
+        ExecutorTemplate template = new ExecutorTemplate(executor);
+        if (message.isRaw()) {
+            List<ByteString> rawEntries = message.getRawEntries();
+            final EntryRowData[] datas = new EntryRowData[rawEntries.size()];
+            int i = 0;
+            for (ByteString byteString : rawEntries) {
+                final int index = i;
+                template.submit(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        try {
+                            Entry entry = Entry.parseFrom(byteString);
+                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                            datas[index] = new EntryRowData();
+                            datas[index].entry = entry;
+                            datas[index].rowChange = rowChange;
+                        } catch (InvalidProtocolBufferException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+
+                i++;
+            }
+
+            template.waitForResult();
+            return datas;
+        } else {
+            final EntryRowData[] datas = new EntryRowData[message.getEntries().size()];
+            int i = 0;
+            for (Entry entry : message.getEntries()) {
+                final int index = i;
+                template.submit(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        try {
+                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                            datas[index] = new EntryRowData();
+                            datas[index].entry = entry;
+                            datas[index].rowChange = rowChange;
+                        } catch (InvalidProtocolBufferException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+
+                i++;
+            }
+
+            template.waitForResult();
+            return datas;
+        }
+    }
+
     /**
      * 将 message 分区
      *
@@ -168,40 +236,20 @@ public class MQMessageUtils {
      * @return 分区message数组
      */
     @SuppressWarnings("unchecked")
-    public static Message[] messagePartition(Message message, Integer partitionsNum, String pkHashConfigs) {
+    public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum, String pkHashConfigs) {
         if (partitionsNum == null) {
             partitionsNum = 1;
         }
         Message[] partitionMessages = new Message[partitionsNum];
         List<Entry>[] partitionEntries = new List[partitionsNum];
         for (int i = 0; i < partitionsNum; i++) {
-            partitionEntries[i] = new ArrayList<>();
-        }
-
-        List<CanalEntry.Entry> entries;
-        if (message.isRaw()) {
-            List<ByteString> rawEntries = message.getRawEntries();
-            entries = new ArrayList<>(rawEntries.size());
-            for (ByteString byteString : rawEntries) {
-                Entry entry;
-                try {
-                    entry = Entry.parseFrom(byteString);
-                } catch (InvalidProtocolBufferException e) {
-                    throw new RuntimeException(e);
-                }
-                entries.add(entry);
-            }
-        } else {
-            entries = message.getEntries();
+            // 注意一下并发
+            partitionEntries[i] = Collections.synchronizedList(Lists.newArrayList());
         }
 
-        for (Entry entry : entries) {
-            CanalEntry.RowChange rowChange;
-            try {
-                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-            } catch (Exception e) {
-                throw new RuntimeException(e.getMessage(), e);
-            }
+        for (EntryRowData data : datas) {
+            CanalEntry.Entry entry = data.entry;
+            CanalEntry.RowChange rowChange = data.rowChange;
 
             if (rowChange.getIsDdl()) {
                 partitionEntries[0].add(entry);
@@ -220,6 +268,10 @@ public class MQMessageUtils {
                         // tableHash not need split entry message
                         partitionEntries[pkHash].add(entry);
                     } else {
+                        // build new entry
+                        Entry.Builder builder = Entry.newBuilder(entry);
+                        RowChange.Builder rowChangeBuilder = RowChange.newBuilder(rowChange);
+
                         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                             int hashCode = database.hashCode();
                             CanalEntry.EventType eventType = rowChange.getEventType();
@@ -247,9 +299,7 @@ public class MQMessageUtils {
 
                             int pkHash = Math.abs(hashCode) % partitionsNum;
                             pkHash = Math.abs(pkHash);
-                            // build new entry
-                            Entry.Builder builder = Entry.newBuilder(entry);
-                            RowChange.Builder rowChangeBuilder = RowChange.newBuilder(rowChange);
+                            // clear rowDatas
                             rowChangeBuilder.clearRowDatas();
                             rowChangeBuilder.addRowDatas(rowData);
                             builder.clearStoreValue();
@@ -267,7 +317,7 @@ public class MQMessageUtils {
         for (int i = 0; i < partitionsNum; i++) {
             List<Entry> entriesTmp = partitionEntries[i];
             if (!entriesTmp.isEmpty()) {
-                partitionMessages[i] = new Message(message.getId(), entriesTmp);
+                partitionMessages[i] = new Message(id, entriesTmp);
             }
         }
 
@@ -280,131 +330,100 @@ public class MQMessageUtils {
      * @param message 原生message
      * @return FlatMessage列表
      */
-    public static List<FlatMessage> messageConverter(Message message) {
-        try {
-            if (message == null) {
-                return null;
-            }
-
-            List<FlatMessage> flatMessages = new ArrayList<>();
-            List<CanalEntry.Entry> entrys = null;
-            if (message.isRaw()) {
-                List<ByteString> rawEntries = message.getRawEntries();
-                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
-                for (ByteString byteString : rawEntries) {
-                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
-                    entrys.add(entry);
-                }
-            } else {
-                entrys = message.getEntries();
-            }
+    public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id) {
+        List<FlatMessage> flatMessages = new ArrayList<>();
+        for (EntryRowData entryRowData : datas) {
+            CanalEntry.Entry entry = entryRowData.entry;
+            CanalEntry.RowChange rowChange = entryRowData.rowChange;
+            // build flatMessage
+            CanalEntry.EventType eventType = rowChange.getEventType();
+            FlatMessage flatMessage = new FlatMessage(id);
+            flatMessages.add(flatMessage);
+            flatMessage.setDatabase(entry.getHeader().getSchemaName());
+            flatMessage.setTable(entry.getHeader().getTableName());
+            flatMessage.setIsDdl(rowChange.getIsDdl());
+            flatMessage.setType(eventType.toString());
+            flatMessage.setEs(entry.getHeader().getExecuteTime());
+            flatMessage.setTs(System.currentTimeMillis());
+            flatMessage.setSql(rowChange.getSql());
+
+            if (!rowChange.getIsDdl()) {
+                Map<String, Integer> sqlType = new LinkedHashMap<>();
+                Map<String, String> mysqlType = new LinkedHashMap<>();
+                List<Map<String, String>> data = new ArrayList<>();
+                List<Map<String, String>> old = new ArrayList<>();
+
+                Set<String> updateSet = new HashSet<>();
+                boolean hasInitPkNames = false;
+                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                    if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
+                        && eventType != CanalEntry.EventType.DELETE) {
+                        continue;
+                    }
 
-            for (CanalEntry.Entry entry : entrys) {
-                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
-                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
-                    continue;
-                }
+                    Map<String, String> row = new LinkedHashMap<>();
+                    List<CanalEntry.Column> columns;
 
-                CanalEntry.RowChange rowChange;
-                try {
-                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-                } catch (Exception e) {
-                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
-                                               + entry.toString(), e);
-                }
+                    if (eventType == CanalEntry.EventType.DELETE) {
+                        columns = rowData.getBeforeColumnsList();
+                    } else {
+                        columns = rowData.getAfterColumnsList();
+                    }
 
-                CanalEntry.EventType eventType = rowChange.getEventType();
-
-                FlatMessage flatMessage = new FlatMessage(message.getId());
-                flatMessages.add(flatMessage);
-                flatMessage.setDatabase(entry.getHeader().getSchemaName());
-                flatMessage.setTable(entry.getHeader().getTableName());
-                flatMessage.setIsDdl(rowChange.getIsDdl());
-                flatMessage.setType(eventType.toString());
-                flatMessage.setEs(entry.getHeader().getExecuteTime());
-                flatMessage.setTs(System.currentTimeMillis());
-                flatMessage.setSql(rowChange.getSql());
-
-                if (!rowChange.getIsDdl()) {
-                    Map<String, Integer> sqlType = new LinkedHashMap<>();
-                    Map<String, String> mysqlType = new LinkedHashMap<>();
-                    List<Map<String, String>> data = new ArrayList<>();
-                    List<Map<String, String>> old = new ArrayList<>();
-
-                    Set<String> updateSet = new HashSet<>();
-                    boolean hasInitPkNames = false;
-                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
-                            && eventType != CanalEntry.EventType.DELETE) {
-                            continue;
+                    for (CanalEntry.Column column : columns) {
+                        if (!hasInitPkNames && column.getIsKey()) {
+                            flatMessage.addPkName(column.getName());
                         }
-
-                        Map<String, String> row = new LinkedHashMap<>();
-                        List<CanalEntry.Column> columns;
-
-                        if (eventType == CanalEntry.EventType.DELETE) {
-                            columns = rowData.getBeforeColumnsList();
+                        sqlType.put(column.getName(), column.getSqlType());
+                        mysqlType.put(column.getName(), column.getMysqlType());
+                        if (column.getIsNull()) {
+                            row.put(column.getName(), null);
                         } else {
-                            columns = rowData.getAfterColumnsList();
+                            row.put(column.getName(), column.getValue());
                         }
-
-                        for (CanalEntry.Column column : columns) {
-                            if (!hasInitPkNames && column.getIsKey()) {
-                                flatMessage.addPkName(column.getName());
-                            }
-                            sqlType.put(column.getName(), column.getSqlType());
-                            mysqlType.put(column.getName(), column.getMysqlType());
-                            if (column.getIsNull()) {
-                                row.put(column.getName(), null);
-                            } else {
-                                row.put(column.getName(), column.getValue());
-                            }
-                            // 获取update为true的字段
-                            if (column.getUpdated()) {
-                                updateSet.add(column.getName());
-                            }
+                        // 获取update为true的字段
+                        if (column.getUpdated()) {
+                            updateSet.add(column.getName());
                         }
+                    }
 
-                        hasInitPkNames = true;
-                        if (!row.isEmpty()) {
-                            data.add(row);
-                        }
+                    hasInitPkNames = true;
+                    if (!row.isEmpty()) {
+                        data.add(row);
+                    }
 
-                        if (eventType == CanalEntry.EventType.UPDATE) {
-                            Map<String, String> rowOld = new LinkedHashMap<>();
-                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
-                                if (updateSet.contains(column.getName())) {
-                                    if (column.getIsNull()) {
-                                        rowOld.put(column.getName(), null);
-                                    } else {
-                                        rowOld.put(column.getName(), column.getValue());
-                                    }
+                    if (eventType == CanalEntry.EventType.UPDATE) {
+                        Map<String, String> rowOld = new LinkedHashMap<>();
+                        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+                            if (updateSet.contains(column.getName())) {
+                                if (column.getIsNull()) {
+                                    rowOld.put(column.getName(), null);
+                                } else {
+                                    rowOld.put(column.getName(), column.getValue());
                                 }
                             }
-                            // update操作将记录修改前的值
-                            if (!rowOld.isEmpty()) {
-                                old.add(rowOld);
-                            }
+                        }
+                        // update操作将记录修改前的值
+                        if (!rowOld.isEmpty()) {
+                            old.add(rowOld);
                         }
                     }
-                    if (!sqlType.isEmpty()) {
-                        flatMessage.setSqlType(sqlType);
-                    }
-                    if (!mysqlType.isEmpty()) {
-                        flatMessage.setMysqlType(mysqlType);
-                    }
-                    if (!data.isEmpty()) {
-                        flatMessage.setData(data);
-                    }
-                    if (!old.isEmpty()) {
-                        flatMessage.setOld(old);
-                    }
+                }
+                if (!sqlType.isEmpty()) {
+                    flatMessage.setSqlType(sqlType);
+                }
+                if (!mysqlType.isEmpty()) {
+                    flatMessage.setMysqlType(mysqlType);
+                }
+                if (!data.isEmpty()) {
+                    flatMessage.setData(data);
+                }
+                if (!old.isEmpty()) {
+                    flatMessage.setOld(old);
                 }
             }
-            return flatMessages;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
         }
+        return flatMessages;
     }
 
     /**
@@ -616,4 +635,10 @@ public class MQMessageUtils {
         public AviaterRegexFilter schemaRegexFilter;
         public AviaterRegexFilter tableRegexFilter;
     }
+
+    public static class EntryRowData {
+
+        public Entry     entry;
+        public RowChange rowChange;
+    }
 }

+ 38 - 17
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -30,14 +30,24 @@ public class MQProperties {
     private String     accessChannel          = null;
     private String     customizedTraceTopic   = null;
     private String     namespace              = "";
-    private boolean    kerberosEnable         = false;           // kafka集群是否启动Kerberos认证
-    private String     kerberosKrb5FilePath   = "";              // 启动Kerberos认证时配置为krb5.conf文件的路径
-    private String     kerberosJaasFilePath   = "";              // 启动Kerberos认证时配置为jaas.conf文件的路径
-    private String     username               = "";              // rabbitmq 账号
-    private String     password               = "";              // rabbitmq 密码
-    private String     vhost                  = "";              // rabbitmq 密码
-    private long       aliyunUID              = 0;               // aliyun 用户ID rabbitmq 阿里云需要使用
-    private String     exchange               = "";       // rabbitmq 交换机
+    // kafka集群是否启动Kerberos认证
+    private boolean    kerberosEnable         = false;
+    // 启动Kerberos认证时配置为krb5.conf文件的路径
+    private String     kerberosKrb5FilePath   = "";
+    // 启动Kerberos认证时配置为jaas.conf文件的路径
+    private String     kerberosJaasFilePath   = "";
+    // rabbitmq 账号
+    private String     username               = "";
+    // rabbitmq 密码
+    private String     password               = "";
+    // rabbitmq 密码
+    private String     vhost                  = "";
+    // aliyun 用户ID rabbitmq 阿里云需要使用
+    private long       aliyunUID              = 0;
+    // rabbitmq 交换机
+    private String     exchange               = "";
+    // 消息发送的并行度
+    private int        parallelThreadSize     = 8;
 
     public static class CanalDestination {
 
@@ -321,17 +331,28 @@ public class MQProperties {
         this.exchange = exchange;
     }
 
+    public int getParallelThreadSize() {
+        return parallelThreadSize;
+    }
+
+    public void setParallelThreadSize(int parallelThreadSize) {
+        this.parallelThreadSize = parallelThreadSize;
+    }
+
     @Override
     public String toString() {
-        return "MQProperties{" + "servers='" + servers + '\'' + ", retries=" + retries + ", batchSize=" + batchSize
-               + ", lingerMs=" + lingerMs + ", maxRequestSize=" + maxRequestSize + ", bufferMemory=" + bufferMemory
-               + ", filterTransactionEntry=" + filterTransactionEntry + ", producerGroup='" + producerGroup + '\''
+        return "MQProperties [servers=" + servers + ", retries=" + retries + ", batchSize=" + batchSize + ", lingerMs="
+               + lingerMs + ", maxRequestSize=" + maxRequestSize + ", bufferMemory=" + bufferMemory
+               + ", filterTransactionEntry=" + filterTransactionEntry + ", producerGroup=" + producerGroup
                + ", canalBatchSize=" + canalBatchSize + ", canalGetTimeout=" + canalGetTimeout + ", flatMessage="
-               + flatMessage + ", compressionType='" + compressionType + '\'' + ", acks='" + acks + '\''
-               + ", aliyunAccessKey='" + aliyunAccessKey + '\'' + ", aliyunSecretKey='" + aliyunSecretKey + '\''
-               + ", properties=" + properties + ", enableMessageTrace=" + enableMessageTrace + ", accessChannel='"
-               + accessChannel + '\'' + ", customizedTraceTopic='" + customizedTraceTopic + '\'' + ", namespace='"
-               + namespace + '\'' + ", kerberosEnable='" + kerberosEnable + '\'' + ", kerberosKrb5FilePath='"
-               + kerberosKrb5FilePath + '\'' + ", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' + '}';
+               + flatMessage + ", compressionType=" + compressionType + ", acks=" + acks + ", aliyunAccessKey="
+               + aliyunAccessKey + ", aliyunSecretKey=" + aliyunSecretKey + ", properties=" + properties
+               + ", enableMessageTrace=" + enableMessageTrace + ", accessChannel=" + accessChannel
+               + ", customizedTraceTopic=" + customizedTraceTopic + ", namespace=" + namespace + ", kerberosEnable="
+               + kerberosEnable + ", kerberosKrb5FilePath=" + kerberosKrb5FilePath + ", kerberosJaasFilePath="
+               + kerberosJaasFilePath + ", username=" + username + ", password=" + password + ", vhost=" + vhost
+               + ", aliyunUID=" + aliyunUID + ", exchange=" + exchange + ", parallelThreadSize=" + parallelThreadSize
+               + "]";
     }
+
 }

+ 107 - 90
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -5,6 +5,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -18,8 +19,12 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.AbstractMQProducer;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
 import com.alibaba.otter.canal.common.MQMessageUtils;
+import com.alibaba.otter.canal.common.MQMessageUtils.EntryRowData;
 import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
@@ -30,16 +35,17 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
  * @author machengyuan 2018-6-11 下午05:30:49
  * @version 1.0.0
  */
-public class CanalKafkaProducer implements CanalMQProducer {
+public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQProducer {
 
-    private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
+    private static final Logger      logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
 
-    private Producer<String, Message> producer;
-    private Producer<String, String>  producer2;                                                 // 用于扁平message的数据投递
-    private MQProperties              kafkaProperties;
+    private Producer<String, byte[]> producer;
+    private MQProperties             kafkaProperties;
 
     @Override
     public void init(MQProperties kafkaProperties) {
+        super.init(kafkaProperties);
+
         this.kafkaProperties = kafkaProperties;
         Properties properties = new Properties();
         properties.put("bootstrap.servers", kafkaProperties.getServers());
@@ -73,13 +79,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
             }
         }
 
-        if (!kafkaProperties.getFlatMessage()) {
-            properties.put("value.serializer", MessageSerializer.class.getName());
-            producer = new KafkaProducer<String, Message>(properties);
-        } else {
-            properties.put("value.serializer", StringSerializer.class.getName());
-            producer2 = new KafkaProducer<String, String>(properties);
-        }
+        properties.put("value.serializer", KafkaMessageSerializer.class.getName());
+        producer = new KafkaProducer<String, byte[]>(properties);
     }
 
     @Override
@@ -89,133 +90,149 @@ public class CanalKafkaProducer implements CanalMQProducer {
             if (producer != null) {
                 producer.close();
             }
-            if (producer2 != null) {
-                producer2.close();
-            }
         } catch (Throwable e) {
             logger.warn("##something goes wrong when stopping kafka producer:", e);
         } finally {
             logger.info("## kafka producer is down.");
         }
+
+        super.stop();
     }
 
     @Override
     public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
+        ExecutorTemplate template = new ExecutorTemplate(executor);
+        boolean flat = kafkaProperties.getFlatMessage();
+
         try {
+            List result = null;
             if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
-                // 动态topic
+                // 动态topic路由计算,只是基于schema/table,不涉及proto数据反序列化
                 Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
                     canalDestination.getTopic(),
                     canalDestination.getDynamicTopic());
 
+                // 针对不同的topic,引入多线程提升效率
                 for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
-                    String topicName = entry.getKey(); //.replace('.', '_');
-                    Message messageSub = entry.getValue();
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("## Send message to kafka topic: " + topicName);
-                    }
-                    send(canalDestination, topicName, messageSub);
+                    final String topicName = entry.getKey().replace('.', '_');
+                    final Message messageSub = entry.getValue();
+                    template.submit(new Callable() {
+
+                        @Override
+                        public List<Future> call() throws Exception {
+                            try {
+                                return send(canalDestination, topicName, messageSub, flat);
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    });
                 }
+
+                result = template.waitForResult();
             } else {
-                send(canalDestination, canalDestination.getTopic(), message);
+                result = new ArrayList();
+                List<Future> futures = send(canalDestination, canalDestination.getTopic(), message, flat);
+                result.add(futures);
             }
+
+            // 一个批次的所有topic和分区的队列,都采用异步的模式进行多线程批量发送
+            // 最后在集结点进行flush等待,确保所有数据都写出成功
+            // 注意:kafka的异步模式如果要保证顺序性,需要设置max.in.flight.requests.per.connection=1,确保在网络异常重试时有排他性
+            producer.flush();
+            // flush操作也有可能是发送失败,这里需要异步关注一下发送结果,针对有异常的直接出发rollback
+            for (Object obj : result) {
+                List<Future> futures = (List<Future>) obj;
+                for (Future future : futures) {
+                    try {
+                        future.get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+
             callback.commit();
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
             callback.rollback();
+        } finally {
+            template.clear();
         }
     }
 
-    private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
-                                                                                                        throws Exception {
-        if (!kafkaProperties.getFlatMessage()) {
-            List<ProducerRecord> records = new ArrayList<ProducerRecord>();
+    private List<Future> send(MQProperties.CanalDestination canalDestination, String topicName, Message message,
+                              boolean flat) throws Exception {
+        List<ProducerRecord<String, byte[]>> records = new ArrayList<ProducerRecord<String, byte[]>>();
+        if (!flat) {
             if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
-                Message[] messages = MQMessageUtils.messagePartition(message,
+                // 并发构造
+                EntryRowData[] datas = MQMessageUtils.buildMessageData(message, executor);
+                // 串行分区
+                Message[] messages = MQMessageUtils.messagePartition(datas,
+                    message.getId(),
                     canalDestination.getPartitionsNum(),
                     canalDestination.getPartitionHash());
                 int length = messages.length;
                 for (int i = 0; i < length; i++) {
                     Message messagePartition = messages[i];
                     if (messagePartition != null) {
-                        records.add(new ProducerRecord<String, Message>(topicName, i, null, messagePartition));
+                        records.add(new ProducerRecord<String, byte[]>(topicName,
+                            i,
+                            null,
+                            CanalMessageSerializer.serializer(messagePartition,
+                                kafkaProperties.isFilterTransactionEntry())));
                     }
                 }
             } else {
                 final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
-                records.add(new ProducerRecord<String, Message>(topicName, partition, null, message));
+                records.add(new ProducerRecord<String, byte[]>(topicName,
+                    partition,
+                    null,
+                    CanalMessageSerializer.serializer(message, kafkaProperties.isFilterTransactionEntry())));
             }
-
-            produce(topicName, records, false);
         } else {
             // 发送扁平数据json
-            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
-            List<ProducerRecord> records = new ArrayList<ProducerRecord>();
-            if (flatMessages != null) {
-                for (FlatMessage flatMessage : flatMessages) {
-                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
-                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
-                            canalDestination.getPartitionsNum(),
-                            canalDestination.getPartitionHash());
-                        int length = partitionFlatMessage.length;
-                        for (int i = 0; i < length; i++) {
-                            FlatMessage flatMessagePart = partitionFlatMessage[i];
-                            if (flatMessagePart != null) {
-                                records.add(new ProducerRecord<String, String>(topicName,
-                                    i,
-                                    null,
-                                    JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)));
-                            }
+            // 并发构造
+            EntryRowData[] datas = MQMessageUtils.buildMessageData(message, executor);
+            // 串行分区
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
+            for (FlatMessage flatMessage : flatMessages) {
+                if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
+                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
+                        canalDestination.getPartitionsNum(),
+                        canalDestination.getPartitionHash());
+                    int length = partitionFlatMessage.length;
+                    for (int i = 0; i < length; i++) {
+                        FlatMessage flatMessagePart = partitionFlatMessage[i];
+                        if (flatMessagePart != null) {
+                            records.add(new ProducerRecord<String, byte[]>(topicName,
+                                i,
+                                null,
+                                JSON.toJSONBytes(flatMessagePart, SerializerFeature.WriteMapNullValue)));
                         }
-                    } else {
-                        final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
-                        records.add(new ProducerRecord<String, String>(topicName,
-                            partition,
-                            null,
-                            JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)));
                     }
-
-                    // 每条记录需要flush
-                    produce(topicName, records, true);
-                    records.clear();
+                } else {
+                    final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
+                    records.add(new ProducerRecord<String, byte[]>(topicName,
+                        partition,
+                        null,
+                        JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)));
                 }
             }
         }
-    }
-
-    private void produce(String topicName, List<ProducerRecord> records, boolean flatMessage) {
 
-        Producer producerTmp = null;
-        if (flatMessage) {
-            producerTmp = producer2;
-        } else {
-            producerTmp = producer;
-        }
+        return produce(topicName, records, flat);
+    }
 
+    private List<Future> produce(String topicName, List<ProducerRecord<String, byte[]>> records, boolean flatMessage) {
         List<Future> futures = new ArrayList<Future>();
-        try {
-            // 异步发送,因为在partition hash的时候已经按照每个分区合并了消息,走到这一步不需要考虑单个分区内的顺序问题
-            for (ProducerRecord record : records) {
-                futures.add(producerTmp.send(record));
-            }
-        } finally {
-            if (logger.isDebugEnabled()) {
-                for (ProducerRecord record : records) {
-                    logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, record.toString());
-                }
-            }
-            // 批量刷出
-            producerTmp.flush();
-
-            // flush操作也有可能是发送失败,这里需要异步关注一下发送结果,针对有异常的直接出发rollback
-            for (Future future : futures) {
-                try {
-                    future.get();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RuntimeException(e);
-                }
-            }
+        // 异步发送,因为在partition hash的时候已经按照每个分区合并了消息,走到这一步不需要考虑单个分区内的顺序问题
+        for (ProducerRecord record : records) {
+            futures.add(producer.send(record));
         }
+
+        return futures;
     }
 
 }

+ 28 - 0
server/src/main/java/com/alibaba/otter/canal/kafka/KafkaMessageSerializer.java

@@ -0,0 +1,28 @@
+package com.alibaba.otter.canal.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Kafka Message类的序列化
+ *
+ * @author machengyuan 2018-6-11 下午05:30:49
+ * @version 1.0.0
+ */
+public class KafkaMessageSerializer implements Serializer<byte[]> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public byte[] serialize(String topic, byte[] data) {
+        return data;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

+ 0 - 39
server/src/main/java/com/alibaba/otter/canal/kafka/MessageSerializer.java

@@ -1,39 +0,0 @@
-package com.alibaba.otter.canal.kafka;
-
-import java.util.Map;
-
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.kafka.common.serialization.Serializer;
-
-import com.alibaba.otter.canal.common.CanalMessageSerializer;
-import com.alibaba.otter.canal.protocol.Message;
-
-/**
- * Kafka Message类的序列化
- *
- * @author machengyuan 2018-6-11 下午05:30:49
- * @version 1.0.0
- */
-public class MessageSerializer implements Serializer<Message> {
-
-    private boolean filterTransactionEntry = false;
-
-    public MessageSerializer(){
-        this.filterTransactionEntry = BooleanUtils.toBoolean(System.getProperty("canal.instance.filter.transaction.entry",
-            "false"));
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-    }
-
-    @Override
-    public byte[] serialize(String topic, Message data) {
-        return CanalMessageSerializer.serializer(data, filterTransactionEntry);
-    }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
-}

+ 27 - 18
server/src/main/java/com/alibaba/otter/canal/rabbitmq/CanalRabbitMQProducer.java

@@ -1,9 +1,20 @@
 package com.alibaba.otter.canal.rabbitmq;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.AbstractMQProducer;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
 import com.alibaba.otter.canal.common.MQMessageUtils;
+import com.alibaba.otter.canal.common.MQMessageUtils.EntryRowData;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
@@ -13,16 +24,7 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-public class CanalRabbitMQProducer implements CanalMQProducer {
+public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQProducer {
 
     private static final Logger logger = LoggerFactory.getLogger(CanalRabbitMQProducer.class);
     private MQProperties        mqProperties;
@@ -31,6 +33,7 @@ public class CanalRabbitMQProducer implements CanalMQProducer {
 
     @Override
     public void init(MQProperties mqProperties) {
+        super.init(mqProperties);
         this.mqProperties = mqProperties;
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost(mqProperties.getServers());
@@ -54,13 +57,14 @@ public class CanalRabbitMQProducer implements CanalMQProducer {
     }
 
     @Override
-    public void send(MQProperties.CanalDestination canalDestination, Message message,
-                     Callback callback) throws IOException {
+    public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback)
+                                                                                                        throws IOException {
         try {
             if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
                 // 动态topic
-                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
-                    .messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
+                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils.messageTopics(message,
+                    canalDestination.getTopic(),
+                    canalDestination.getDynamicTopic());
 
                 for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
                     String topicName = entry.getKey().replace('.', '_');
@@ -76,8 +80,8 @@ public class CanalRabbitMQProducer implements CanalMQProducer {
         }
     }
 
-    private void send(MQProperties.CanalDestination canalDestination, String topicName,
-                      Message messageSub) throws Exception {
+    private void send(MQProperties.CanalDestination canalDestination, String topicName, Message messageSub)
+                                                                                                           throws Exception {
         if (!mqProperties.getFlatMessage()) {
             byte[] message = CanalMessageSerializer.serializer(messageSub, mqProperties.isFilterTransactionEntry());
             if (logger.isDebugEnabled()) {
@@ -85,10 +89,13 @@ public class CanalRabbitMQProducer implements CanalMQProducer {
             }
             sendMessage(topicName, message);
         } else {
-            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(messageSub);
+            // 并发构造
+            EntryRowData[] datas = MQMessageUtils.buildMessageData(messageSub, executor);
+            // 串行分区
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, messageSub.getId());
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
-                    byte[] message = JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes();
+                    byte[] message = JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue);
                     if (logger.isDebugEnabled()) {
                         logger.debug("send message:{} to destination:{}",
                             message,
@@ -115,5 +122,7 @@ public class CanalRabbitMQProducer implements CanalMQProducer {
         } catch (IOException | TimeoutException ex) {
             throw new CanalServerException("Stop RabbitMQ producer error", ex);
         }
+
+        super.stop();
     }
 }

+ 96 - 57
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -7,7 +7,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
@@ -15,20 +14,22 @@ import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.AbstractMQProducer;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
 import com.alibaba.otter.canal.common.MQMessageUtils;
+import com.alibaba.otter.canal.common.MQMessageUtils.EntryRowData;
 import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
 
-public class CanalRocketMQProducer implements CanalMQProducer {
+public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQProducer {
 
     private static final Logger logger               = LoggerFactory.getLogger(CanalRocketMQProducer.class);
     private DefaultMQProducer   defaultMQProducer;
@@ -37,6 +38,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
 
     @Override
     public void init(MQProperties rocketMQProperties) {
+        super.init(rocketMQProperties);
         this.mqProperties = rocketMQProperties;
         RPCHook rpcHook = null;
         if (rocketMQProperties.getAliyunAccessKey().length() > 0
@@ -71,6 +73,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     @Override
     public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
                      Callback callback) {
+        ExecutorTemplate template = new ExecutorTemplate(executor);
         try {
             if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
                 // 动态topic
@@ -81,66 +84,94 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                 for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
                     String topicName = entry.getKey().replace('.', '_');
                     com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
-                    send(destination, topicName, messageSub);
+                    template.submit(new Runnable() {
+
+                        @Override
+                        public void run() {
+                            try {
+                                send(destination, topicName, messageSub);
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    });
                 }
+
+                template.waitForResult();
             } else {
                 send(destination, destination.getTopic(), data);
             }
+
             callback.commit();
         } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
             callback.rollback();
+        } finally {
+            template.clear();
         }
     }
 
     public void send(final MQProperties.CanalDestination destination, String topicName,
-                     com.alibaba.otter.canal.protocol.Message data) throws Exception {
+                     com.alibaba.otter.canal.protocol.Message message) throws Exception {
         if (!mqProperties.getFlatMessage()) {
-            try {
-                if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
-                    com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(data,
-                        destination.getPartitionsNum(),
-                        destination.getPartitionHash());
-                    int length = messages.length;
-                    for (int i = 0; i < length; i++) {
-                        com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
-                        if (dataPartition != null) {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("flatMessagePart: {}, partition: {}",
-                                    JSON.toJSONString(dataPartition, SerializerFeature.WriteMapNullValue),
-                                    i);
-                            }
-                            final int index = i;
-                            try {
-                                Message message = new Message(topicName,
-                                    CanalMessageSerializer.serializer(dataPartition,
-                                        mqProperties.isFilterTransactionEntry()));
-                                sendMessage(message, index);
-                            } catch (Exception e) {
-                                logger.error("send flat message to hashed partition error", e);
-                                throw e;
+            if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                // 并发构造
+                EntryRowData[] datas = MQMessageUtils.buildMessageData(message, executor);
+                // 串行分区
+                com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
+                    message.getId(),
+                    destination.getPartitionsNum(),
+                    destination.getPartitionHash());
+                int length = messages.length;
+
+                ExecutorTemplate template = new ExecutorTemplate(executor);
+                for (int i = 0; i < length; i++) {
+                    com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
+                    if (dataPartition != null) {
+                        final int index = i;
+                        template.submit(new Runnable() {
+
+                            @Override
+                            public void run() {
+                                try {
+                                    if (logger.isDebugEnabled()) {
+                                        logger.debug("flatMessagePart: {}, partition: {}",
+                                            JSON.toJSONString(dataPartition, SerializerFeature.WriteMapNullValue),
+                                            index);
+                                    }
+                                    Message data = new Message(topicName,
+                                        CanalMessageSerializer.serializer(dataPartition,
+                                            mqProperties.isFilterTransactionEntry()));
+                                    sendMessage(data, index);
+                                } catch (Exception e) {
+                                    throw new RuntimeException(e);
+                                }
                             }
-                        }
+                        });
                     }
-                } else {
-                    final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
-                    Message message = new Message(topicName, CanalMessageSerializer.serializer(data,
-                        mqProperties.isFilterTransactionEntry()));
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("send message:{} to destination:{}, partition: {}",
-                            message,
-                            destination.getCanalDestination(),
-                            partition);
-                    }
-                    sendMessage(message, partition);
                 }
-            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
-                logger.error("Send message error!", e);
-                throw e;
+                // 等所有分片发送完毕
+                template.waitForResult();
+            } else {
+                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
+                Message data = new Message(topicName, CanalMessageSerializer.serializer(message,
+                    mqProperties.isFilterTransactionEntry()));
+                if (logger.isDebugEnabled()) {
+                    logger.debug("send message:{} to destination:{}, partition: {}",
+                        message,
+                        destination.getCanalDestination(),
+                        partition);
+                }
+                sendMessage(data, partition);
             }
         } else {
-            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
+            // 并发构造
+            EntryRowData[] datas = MQMessageUtils.buildMessageData(message, executor);
+            // 串行分区
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
+                    ExecutorTemplate template = new ExecutorTemplate(executor);
                     if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                         FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                             destination.getPartitionsNum(),
@@ -155,16 +186,24 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                         i);
                                 }
                                 final int index = i;
-                                try {
-                                    Message message = new Message(topicName, JSON.toJSONString(flatMessagePart,
-                                        SerializerFeature.WriteMapNullValue).getBytes());
-                                    sendMessage(message, index);
-                                } catch (Exception e) {
-                                    logger.error("send flat message to hashed partition error", e);
-                                    throw e;
-                                }
+                                template.submit(new Runnable() {
+
+                                    @Override
+                                    public void run() {
+                                        try {
+                                            Message data = new Message(topicName, JSON.toJSONBytes(flatMessagePart,
+                                                SerializerFeature.WriteMapNullValue));
+                                            sendMessage(data, index);
+                                        } catch (Exception e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                    }
+                                });
                             }
                         }
+
+                        // 批量等所有分区的结果
+                        template.waitForResult();
                     } else {
                         try {
                             final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
@@ -174,12 +213,11 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                     topicName,
                                     partition);
                             }
-                            Message message = new Message(topicName, JSON.toJSONString(flatMessage,
-                                SerializerFeature.WriteMapNullValue).getBytes());
-                            sendMessage(message, partition);
+                            Message data = new Message(topicName, JSON.toJSONBytes(flatMessage,
+                                SerializerFeature.WriteMapNullValue));
+                            sendMessage(data, partition);
                         } catch (Exception e) {
-                            logger.error("send flat message to fixed partition error", e);
-                            throw e;
+                            throw new RuntimeException(e);
                         }
                     }
                 }
@@ -212,5 +250,6 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     public void stop() {
         logger.info("## Stop RocketMQ producer##");
         this.defaultMQProducer.shutdown();
+        super.stop();
     }
 }