Browse Source

fixed issue #2258 , optimizer rocketmq send perf

agapple 5 years ago
parent
commit
d505b5ab9d

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -55,6 +55,7 @@ public class CanalConstants {
     public static final String CANAL_MQ_CANALBATCHSIZE              = ROOT + "." + "mq.canalBatchSize";
     public static final String CANAL_MQ_CANALGETTIMEOUT             = ROOT + "." + "mq.canalGetTimeout";
     public static final String CANAL_MQ_FLATMESSAGE                 = ROOT + "." + "mq.flatMessage";
+    public static final String CANAL_MQ_PARALLELTHREADSIZE          = ROOT + "." + "mq.parallelThreadSize";
     public static final String CANAL_MQ_COMPRESSION_TYPE            = ROOT + "." + "mq.compressionType";
     public static final String CANAL_MQ_ACKS                        = ROOT + "." + "mq.acks";
     public static final String CANAL_MQ_TRANSACTION                 = ROOT + "." + "mq.transaction";

+ 4 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStarter.java

@@ -204,6 +204,10 @@ public class CanalStarter {
         if (!StringUtils.isEmpty(flatMessage)) {
             mqProperties.setFlatMessage(Boolean.valueOf(flatMessage));
         }
+        String parallelThreadSize = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_PARALLELTHREADSIZE);
+        if (!StringUtils.isEmpty(parallelThreadSize)) {
+            mqProperties.setParallelThreadSize(Integer.valueOf(parallelThreadSize));
+        }
         String compressionType = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_COMPRESSION_TYPE);
         if (!StringUtils.isEmpty(compressionType)) {
             mqProperties.setCompressionType(compressionType);

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -121,6 +121,7 @@ canal.mq.lingerMs = 100
 canal.mq.bufferMemory = 33554432
 canal.mq.canalBatchSize = 50
 canal.mq.canalGetTimeout = 100
+canal.mq.parallelThreadSize = 8
 canal.mq.flatMessage = true
 canal.mq.compressionType = none
 canal.mq.acks = all

+ 2 - 0
server/src/main/java/com/alibaba/otter/canal/common/AbstractMQProducer.java

@@ -4,6 +4,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 /**
@@ -22,6 +23,7 @@ public abstract class AbstractMQProducer implements CanalMQProducer {
             0,
             TimeUnit.SECONDS,
             new ArrayBlockingQueue<Runnable>(parallelThreadSize * 2),
+            new NamedThreadFactory("MQParallel"),
             new ThreadPoolExecutor.CallerRunsPolicy());
 
     }

+ 12 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -136,6 +136,7 @@ public class MQMessageUtils {
         }
         Map<String, Message> messages = new HashMap<>();
         for (CanalEntry.Entry entry : entries) {
+            // 如果有topic路由,则忽略begin/end事件
             if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                 || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                 continue;
@@ -250,6 +251,11 @@ public class MQMessageUtils {
         for (EntryRowData data : datas) {
             CanalEntry.Entry entry = data.entry;
             CanalEntry.RowChange rowChange = data.rowChange;
+            // 如果有分区路由,则忽略begin/end事件
+            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                continue;
+            }
 
             if (rowChange.getIsDdl()) {
                 partitionEntries[0].add(entry);
@@ -335,6 +341,12 @@ public class MQMessageUtils {
         for (EntryRowData entryRowData : datas) {
             CanalEntry.Entry entry = entryRowData.entry;
             CanalEntry.RowChange rowChange = entryRowData.rowChange;
+            // 如果有分区路由,则忽略begin/end事件
+            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                continue;
+            }
+
             // build flatMessage
             CanalEntry.EventType eventType = rowChange.getEventType();
             FlatMessage flatMessage = new FlatMessage(id);

+ 94 - 73
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,13 +1,17 @@
 package com.alibaba.otter.canal.rocketmq;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
 import org.apache.rocketmq.client.producer.SendResult;
@@ -133,19 +137,9 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
 
                             @Override
                             public void run() {
-                                try {
-                                    if (logger.isDebugEnabled()) {
-                                        logger.debug("flatMessagePart: {}, partition: {}",
-                                            JSON.toJSONString(dataPartition, SerializerFeature.WriteMapNullValue),
-                                            index);
-                                    }
-                                    Message data = new Message(topicName,
-                                        CanalMessageSerializer.serializer(dataPartition,
-                                            mqProperties.isFilterTransactionEntry()));
-                                    sendMessage(data, index);
-                                } catch (Exception e) {
-                                    throw new RuntimeException(e);
-                                }
+                                Message data = new Message(topicName, CanalMessageSerializer.serializer(dataPartition,
+                                    mqProperties.isFilterTransactionEntry()));
+                                sendMessage(data, index);
                             }
                         });
                     }
@@ -156,12 +150,6 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                 final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                 Message data = new Message(topicName, CanalMessageSerializer.serializer(message,
                     mqProperties.isFilterTransactionEntry()));
-                if (logger.isDebugEnabled()) {
-                    logger.debug("send message:{} to destination:{}, partition: {}",
-                        message,
-                        destination.getCanalDestination(),
-                        partition);
-                }
                 sendMessage(data, partition);
             }
         } else {
@@ -170,79 +158,112 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
             // 串行分区
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
             if (flatMessages != null) {
-                for (FlatMessage flatMessage : flatMessages) {
-                    ExecutorTemplate template = new ExecutorTemplate(executor);
-                    if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                // 初始化分区合并队列
+                if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                    List<List<FlatMessage>> partitionFlatMessages = new ArrayList<List<FlatMessage>>();
+                    for (int i = 0; i < destination.getPartitionsNum(); i++) {
+                        partitionFlatMessages.add(new ArrayList<FlatMessage>());
+                    }
+
+                    for (FlatMessage flatMessage : flatMessages) {
                         FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                             destination.getPartitionsNum(),
                             destination.getPartitionHash());
                         int length = partitionFlatMessage.length;
                         for (int i = 0; i < length; i++) {
-                            FlatMessage flatMessagePart = partitionFlatMessage[i];
-                            if (flatMessagePart != null) {
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug("flatMessagePart: {}, partition: {}",
-                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
-                                        i);
-                                }
-                                final int index = i;
-                                template.submit(new Runnable() {
-
-                                    @Override
-                                    public void run() {
-                                        try {
-                                            Message data = new Message(topicName, JSON.toJSONBytes(flatMessagePart,
-                                                SerializerFeature.WriteMapNullValue));
-                                            sendMessage(data, index);
-                                        } catch (Exception e) {
-                                            throw new RuntimeException(e);
-                                        }
-                                    }
-                                });
-                            }
+                            partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
                         }
+                    }
 
-                        // 批量等所有分区的结果
-                        template.waitForResult();
-                    } else {
-                        try {
-                            final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("send message: {} to topic: {} fixed partition: {}",
-                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
-                                    topicName,
-                                    partition);
-                            }
-                            Message data = new Message(topicName, JSON.toJSONBytes(flatMessage,
-                                SerializerFeature.WriteMapNullValue));
-                            sendMessage(data, partition);
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
+                    ExecutorTemplate template = new ExecutorTemplate(executor);
+                    for (int i = 0; i < partitionFlatMessages.size(); i++) {
+                        final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
+                        if (flatMessagePart != null) {
+                            final int index = i;
+                            template.submit(new Runnable() {
+
+                                @Override
+                                public void run() {
+                                    List<Message> messages = flatMessagePart.stream()
+                                        .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
+                                            SerializerFeature.WriteMapNullValue)))
+                                        .collect(Collectors.toList());
+                                    // 批量发送
+                                    sendMessage(messages, index);
+                                }
+                            });
                         }
                     }
+
+                    // 批量等所有分区的结果
+                    template.waitForResult();
+                } else {
+                    final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
+                    List<Message> messages = flatMessages.stream()
+                        .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
+                            SerializerFeature.WriteMapNullValue)))
+                        .collect(Collectors.toList());
+                    // 批量发送
+                    sendMessage(messages, partition);
                 }
             }
         }
+    }
 
-        if (logger.isDebugEnabled()) {
-            logger.debug("send message to rocket topic: {}", destination.getTopic());
+    private void sendMessage(Message message, int partition) {
+        try {
+            SendResult sendResult = this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                @Override
+                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                    if (partition > mqs.size()) {
+                        return mqs.get(partition % mqs.size());
+                    } else {
+                        return mqs.get(partition);
+                    }
+                }
+            }, null);
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Send Message Result: {}", sendResult);
+            }
+        } catch (Throwable e) {
+            throw new RuntimeException(e);
         }
     }
 
-    private void sendMessage(Message message, int partition) throws Exception {
-        SendResult sendResult = this.defaultMQProducer.send(message, new MessageQueueSelector() {
+    @SuppressWarnings("deprecation")
+    private void sendMessage(List<Message> messages, int partition) {
+        if (messages.isEmpty()) {
+            return;
+        }
 
-            @Override
-            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                if (partition > mqs.size()) {
-                    return mqs.get(partition % mqs.size());
-                } else {
-                    return mqs.get(partition);
+        // 获取一下messageQueue
+        DefaultMQProducerImpl innerProducer = this.defaultMQProducer.getDefaultMQProducerImpl();
+        TopicPublishInfo topicInfo = innerProducer.getTopicPublishInfoTable().get(messages.get(0).getTopic());
+        if (topicInfo == null) {
+            for (Message message : messages) {
+                sendMessage(message, partition);
+            }
+        } else {
+            // 批量发送
+            List<MessageQueue> queues = topicInfo.getMessageQueueList();
+            int size = queues.size();
+            MessageQueue queue = null;
+            if (partition > queues.size()) {
+                queue = queues.get(partition % size);
+            } else {
+                queue = queues.get(partition);
+            }
+
+            try {
+                SendResult sendResult = this.defaultMQProducer.send(messages, queue);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Send Message Result: {}", sendResult);
                 }
+            } catch (Throwable e) {
+                throw new RuntimeException(e);
             }
-        }, null);
-        if (logger.isDebugEnabled()) {
-            logger.debug("Send Message Result: {}", sendResult);
         }
     }