Sfoglia il codice sorgente

Merge pull request #1639 from wangsaner/master

fixed issue #1636
agapple 6 anni fa
parent
commit
99b34ff632

+ 13 - 14
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.kafka;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -138,7 +139,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
     private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
                                                                                                         throws Exception {
         if (!kafkaProperties.getFlatMessage()) {
-            ProducerRecord<String, Message> record = null;
+        	List<ProducerRecord<String, Message>> records = new ArrayList<>();
             if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                 Message[] messages = MQMessageUtils.messagePartition(message,
                     canalDestination.getPartitionsNum(),
@@ -147,25 +148,23 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 for (int i = 0; i < length; i++) {
                     Message messagePartition = messages[i];
                     if (messagePartition != null) {
-                        record = new ProducerRecord<>(topicName, i, null, messagePartition);
+                    	records.add(new ProducerRecord<>(topicName, i, null, messagePartition));
                     }
                 }
             } else {
                 final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
-                record = new ProducerRecord<>(topicName, partition, null, message);
+                records.add(new ProducerRecord<>(topicName, partition, null, message));
             }
 
-            if (record != null) {
-                if (kafkaProperties.getTransaction()) {
-                    producer.send(record).get();
-                } else {
-                    producer.send(record).get();
-                }
-
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
-                }
-            }
+			if (!records.isEmpty()) {
+				for (ProducerRecord<String, Message> record : records) {
+					producer.send(record).get();
+				}
+				
+				if (logger.isDebugEnabled()) {
+					logger.debug("Send  message to kafka topic: [{}], packet: {}", topicName, message.toString());
+				}
+			}
         } else {
             // 发送扁平数据json
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);