Parcourir la source

fixed issue #3267 , fixed rockemq send null message

agapple il y a 4 ans
Parent
commit
64ea4ff222

+ 37 - 12
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -4,9 +4,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -26,6 +28,8 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalException;
 import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
+import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
 import com.alibaba.otter.canal.connector.core.producer.MQDestination;
 import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
@@ -46,10 +50,11 @@ import com.alibaba.otter.canal.protocol.FlatMessage;
 @SPI("rocketmq")
 public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQProducer {
 
-    private static final Logger logger               = LoggerFactory.getLogger(CanalRocketMQProducer.class);
+    private static final Logger  logger               = LoggerFactory.getLogger(CanalRocketMQProducer.class);
 
-    private DefaultMQProducer   defaultMQProducer;
-    private static final String CLOUD_ACCESS_CHANNEL = "cloud";
+    private DefaultMQProducer    defaultMQProducer;
+    private static final String  CLOUD_ACCESS_CHANNEL = "cloud";
+    protected ThreadPoolExecutor sendPartitionExecutor;
 
     @Override
     public void init(Properties properties) {
@@ -85,6 +90,15 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         } catch (MQClientException ex) {
             throw new CanalException("Start RocketMQ producer error", ex);
         }
+
+        int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
+        sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
+            parallelPartitionSendThreadSize,
+            0,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
+            new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
+            new ThreadPoolExecutor.CallerRunsPolicy());
     }
 
     private void loadRocketMQProperties(Properties properties) {
@@ -99,11 +113,13 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(producerGroup)) {
             rocketMQProperties.setProducerGroup(producerGroup);
         }
-        String enableMessageTrace = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
+        String enableMessageTrace = PropertiesUtils.getProperty(properties,
+            RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
         if (!StringUtils.isEmpty(enableMessageTrace)) {
             rocketMQProperties.setEnableMessageTrace(Boolean.parseBoolean(enableMessageTrace));
         }
-        String customizedTraceTopic = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
+        String customizedTraceTopic = PropertiesUtils.getProperty(properties,
+            RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
         if (!StringUtils.isEmpty(customizedTraceTopic)) {
             rocketMQProperties.setCustomizedTraceTopic(customizedTraceTopic);
         }
@@ -119,7 +135,8 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(retry)) {
             rocketMQProperties.setRetryTimesWhenSendFailed(Integer.parseInt(retry));
         }
-        String vipChannelEnabled = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
+        String vipChannelEnabled = PropertiesUtils.getProperty(properties,
+            RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
         if (!StringUtils.isEmpty(vipChannelEnabled)) {
             rocketMQProperties.setVipChannelEnabled(Boolean.parseBoolean(vipChannelEnabled));
         }
@@ -163,7 +180,8 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
 
     public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
         // 获取当前topic的分区数
-        Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName, destination.getDynamicTopicPartitionNum());
+        Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
+            destination.getDynamicTopicPartitionNum());
         if (partitionNum == null) {
             partitionNum = destination.getPartitionsNum();
         }
@@ -179,7 +197,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                     mqProperties.isDatabaseHash());
                 int length = messages.length;
 
-                ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
+                ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
                 for (int i = 0; i < length; i++) {
                     com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
                     if (dataPartition != null) {
@@ -218,14 +236,17 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                         mqProperties.isDatabaseHash());
                     int length = partitionFlatMessage.length;
                     for (int i = 0; i < length; i++) {
-                        partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
+                        // 增加null判断,issue #3267
+                        if (partitionFlatMessage[i] != null) {
+                            partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
+                        }
                     }
                 }
 
-                ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
+                ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
                 for (int i = 0; i < partitionFlatMessages.size(); i++) {
                     final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
-                    if (flatMessagePart != null) {
+                    if (flatMessagePart != null && flatMessagePart.size() > 0) {
                         final int index = i;
                         template.submit(() -> {
                             List<Message> messages = flatMessagePart.stream()
@@ -316,6 +337,10 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
     public void stop() {
         logger.info("## Stop RocketMQ producer##");
         this.defaultMQProducer.shutdown();
+        if (sendPartitionExecutor != null) {
+            sendPartitionExecutor.shutdownNow();
+        }
+
         super.stop();
     }
 }