浏览代码

修改 kafka 异步发送 flat message

mcy 6 年之前
父节点
当前提交
150a13824b
共有 1 个文件被更改,包括 27 次插入38 次删除
  1. 27 38
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

+ 27 - 38
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -124,6 +124,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
                 }
 
                 if (record != null) {
+                    // 同步发送原生message
                     producer.send(record).get();
 
                     if (logger.isDebugEnabled()) {
@@ -141,13 +142,17 @@ public class CanalKafkaProducer implements CanalMQProducer {
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
-                    if (canalDestination.getPartition() != null) {
+                    if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
                         try {
+                            Integer partition = canalDestination.getPartition();
+                            if (partition == null) {
+                                partition = 0;
+                            }
                             ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
-                                canalDestination.getPartition(),
+                                partition,
                                 null,
                                 JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                            producer2.send(record).get();
+                            producer2.send(record);
                         } catch (Exception e) {
                             logger.error(e.getMessage(), e);
                             // producer.abortTransaction();
@@ -155,43 +160,27 @@ public class CanalKafkaProducer implements CanalMQProducer {
                             return;
                         }
                     } else {
-                        if (canalDestination.getPartitionHash() != null
-                            && !canalDestination.getPartitionHash().isEmpty()) {
-                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
-                                canalDestination.getPartitionsNum(),
-                                canalDestination.getPartitionHash());
-                            int length = partitionFlatMessage.length;
-                            for (int i = 0; i < length; i++) {
-                                FlatMessage flatMessagePart = partitionFlatMessage[i];
-                                if (flatMessagePart != null) {
-                                    try {
-                                        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
-                                            topicName,
-                                            i,
-                                            null,
-                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
-                                        producer2.send(record).get();
-                                    } catch (Exception e) {
-                                        logger.error(e.getMessage(), e);
-                                        // producer.abortTransaction();
-                                        callback.rollback();
-                                        return;
-                                    }
+                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
+                            canalDestination.getPartitionsNum(),
+                            canalDestination.getPartitionHash());
+                        int length = partitionFlatMessage.length;
+                        for (int i = 0; i < length; i++) {
+                            FlatMessage flatMessagePart = partitionFlatMessage[i];
+                            if (flatMessagePart != null) {
+                                try {
+                                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
+                                        topicName,
+                                        i,
+                                        null,
+                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
+                                    producer2.send(record);
+                                } catch (Exception e) {
+                                    logger.error(e.getMessage(), e);
+                                    // producer.abortTransaction();
+                                    callback.rollback();
+                                    return;
                                 }
                             }
-                        } else {
-                            try {
-                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
-                                    0,
-                                    null,
-                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
-                                producer2.send(record).get();
-                            } catch (Exception e) {
-                                logger.error(e.getMessage(), e);
-                                // producer.abortTransaction();
-                                callback.rollback();
-                                return;
-                            }
                         }
                     }
                     if (logger.isDebugEnabled()) {