瀏覽代碼

fixed mq partition num priority

agapple 6 年之前
父節點
當前提交
024ffc2aa2

+ 22 - 29
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -32,9 +32,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
     private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
 
     private Producer<String, Message> producer;
-
     private Producer<String, String>  producer2;                                                 // 用于扁平message的数据投递
-
     private MQProperties              kafkaProperties;
 
     @Override
@@ -109,8 +107,9 @@ public class CanalKafkaProducer implements CanalMQProducer {
         try {
             if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
                 // 动态topic
-                Map<String, Message> messageMap = MQMessageUtils
-                    .messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
+                Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
+                    canalDestination.getTopic(),
+                    canalDestination.getDynamicTopic());
 
                 for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
                     String topicName = entry.getKey().replace('.', '_');
@@ -136,27 +135,24 @@ public class CanalKafkaProducer implements CanalMQProducer {
         }
     }
 
-    private void send(MQProperties.CanalDestination canalDestination, String topicName,
-                      Message message) throws Exception {
+    private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
+                                                                                                        throws Exception {
         if (!kafkaProperties.getFlatMessage()) {
             ProducerRecord<String, Message> record = null;
-            if (canalDestination.getPartition() != null) {
-                record = new ProducerRecord<>(topicName, canalDestination.getPartition(), null, message);
-            } else {
-                if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
-                    Message[] messages = MQMessageUtils.messagePartition(message,
-                        canalDestination.getPartitionsNum(),
-                        canalDestination.getPartitionHash());
-                    int length = messages.length;
-                    for (int i = 0; i < length; i++) {
-                        Message messagePartition = messages[i];
-                        if (messagePartition != null) {
-                            record = new ProducerRecord<>(topicName, i, null, messagePartition);
-                        }
+            if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
+                Message[] messages = MQMessageUtils.messagePartition(message,
+                    canalDestination.getPartitionsNum(),
+                    canalDestination.getPartitionHash());
+                int length = messages.length;
+                for (int i = 0; i < length; i++) {
+                    Message messagePartition = messages[i];
+                    if (messagePartition != null) {
+                        record = new ProducerRecord<>(topicName, i, null, messagePartition);
                     }
-                } else {
-                    record = new ProducerRecord<>(topicName, 0, null, message);
                 }
+            } else {
+                final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
+                record = new ProducerRecord<>(topicName, partition, null, message);
             }
 
             if (record != null) {
@@ -175,13 +171,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
             List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
             if (flatMessages != null) {
                 for (FlatMessage flatMessage : flatMessages) {
-                    if (StringUtils.isEmpty(canalDestination.getPartitionHash())) {
-                        Integer partition = canalDestination.getPartition();
-                        if (partition == null) {
-                            partition = 0;
-                        }
-                        produce(topicName, partition, flatMessage);
-                    } else {
+                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                         FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                             canalDestination.getPartitionsNum(),
                             canalDestination.getPartitionHash());
@@ -192,6 +182,9 @@ public class CanalKafkaProducer implements CanalMQProducer {
                                 produce(topicName, i, flatMessagePart);
                             }
                         }
+                    } else {
+                        final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
+                        produce(topicName, partition, flatMessage);
                     }
 
                     if (logger.isDebugEnabled()) {
@@ -205,7 +198,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
     }
 
     private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException,
-                                                                                   InterruptedException {
+                                                                                  InterruptedException {
         ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,
             partition,
             null,

+ 68 - 77
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -29,9 +29,7 @@ import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 public class CanalRocketMQProducer implements CanalMQProducer {
 
     private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
-
     private DefaultMQProducer   defaultMQProducer;
-
     private MQProperties        mqProperties;
 
     @Override
@@ -64,8 +62,9 @@ public class CanalRocketMQProducer implements CanalMQProducer {
         try {
             if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
                 // 动态topic
-                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
-                    .messageTopics(data, destination.getTopic(), destination.getDynamicTopic());
+                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils.messageTopics(data,
+                    destination.getTopic(),
+                    destination.getDynamicTopic());
 
                 for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
                     String topicName = entry.getKey().replace('.', '_');
@@ -85,44 +84,85 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                      com.alibaba.otter.canal.protocol.Message data) throws Exception {
         if (!mqProperties.getFlatMessage()) {
             try {
-                if (destination.getPartition() != null) {
-                    Message message = new Message(topicName,
-                        CanalMessageSerializer.serializer(data, mqProperties.isFilterTransactionEntry()));
+                if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
+                    com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(data,
+                        destination.getPartitionsNum(),
+                        destination.getPartitionHash());
+                    int length = messages.length;
+                    for (int i = 0; i < length; i++) {
+                        com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
+                        if (dataPartition != null) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("flatMessagePart: {}, partition: {}",
+                                    JSON.toJSONString(dataPartition, SerializerFeature.WriteMapNullValue),
+                                    i);
+                            }
+                            final int index = i;
+                            try {
+                                Message message = new Message(topicName,
+                                    CanalMessageSerializer.serializer(dataPartition,
+                                        mqProperties.isFilterTransactionEntry()));
+                                this.defaultMQProducer.send(message, new MessageQueueSelector() {
+
+                                    @Override
+                                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                        if (index > mqs.size()) {
+                                            throw new CanalServerException("partition number is error,config num:"
+                                                                           + destination.getPartitionsNum()
+                                                                           + ", mq num: " + mqs.size());
+                                        }
+                                        return mqs.get(index);
+                                    }
+                                }, null);
+                            } catch (Exception e) {
+                                logger.error("send flat message to hashed partition error", e);
+                                throw e;
+                            }
+                        }
+                    }
+                } else {
+                    final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
+                    Message message = new Message(topicName, CanalMessageSerializer.serializer(data,
+                        mqProperties.isFilterTransactionEntry()));
                     if (logger.isDebugEnabled()) {
                         logger.debug("send message:{} to destination:{}, partition: {}",
                             message,
                             destination.getCanalDestination(),
-                            destination.getPartition());
+                            partition);
                     }
                     this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                         @Override
                         public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                            int partition = 0;
-                            if (destination.getPartition() != null) {
-                                partition = destination.getPartition();
-                            }
                             return mqs.get(partition);
                         }
                     }, null);
-                } else {
+                }
+            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+                logger.error("Send message error!", e);
+                throw e;
+            }
+        } else {
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
+            if (flatMessages != null) {
+                for (FlatMessage flatMessage : flatMessages) {
                     if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
-                        com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils
-                            .messagePartition(data, destination.getPartitionsNum(), destination.getPartitionHash());
-                        int length = messages.length;
+                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
+                            destination.getPartitionsNum(),
+                            destination.getPartitionHash());
+                        int length = partitionFlatMessage.length;
                         for (int i = 0; i < length; i++) {
-                            com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
-                            if (dataPartition != null) {
+                            FlatMessage flatMessagePart = partitionFlatMessage[i];
+                            if (flatMessagePart != null) {
                                 if (logger.isDebugEnabled()) {
                                     logger.debug("flatMessagePart: {}, partition: {}",
-                                        JSON.toJSONString(dataPartition, SerializerFeature.WriteMapNullValue),
+                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
                                         i);
                                 }
                                 final int index = i;
                                 try {
-                                    Message message = new Message(topicName,
-                                        CanalMessageSerializer.serializer(dataPartition,
-                                            mqProperties.isFilterTransactionEntry()));
+                                    Message message = new Message(topicName, JSON.toJSONString(flatMessagePart,
+                                        SerializerFeature.WriteMapNullValue).getBytes());
                                     this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                         @Override
@@ -141,77 +181,28 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 }
                             }
                         }
-                    }
-                }
-            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
-                logger.error("Send message error!", e);
-                throw e;
-            }
-        } else {
-            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
-            if (flatMessages != null) {
-                for (FlatMessage flatMessage : flatMessages) {
-                    if (destination.getPartition() != null) {
+                    } else {
                         try {
+                            final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                             if (logger.isDebugEnabled()) {
                                 logger.debug("send message: {} to topic: {} fixed partition: {}",
                                     JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue),
                                     topicName,
-                                    destination.getPartition());
+                                    partition);
                             }
-                            Message message = new Message(topicName,
-                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes());
+                            Message message = new Message(topicName, JSON.toJSONString(flatMessage,
+                                SerializerFeature.WriteMapNullValue).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
                                 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                    return mqs.get(destination.getPartition());
+                                    return mqs.get(partition);
                                 }
                             }, null);
                         } catch (Exception e) {
                             logger.error("send flat message to fixed partition error", e);
                             throw e;
                         }
-                    } else {
-                        if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
-                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
-                                destination.getPartitionsNum(),
-                                destination.getPartitionHash());
-                            int length = partitionFlatMessage.length;
-                            for (int i = 0; i < length; i++) {
-                                FlatMessage flatMessagePart = partitionFlatMessage[i];
-                                if (flatMessagePart != null) {
-                                    if (logger.isDebugEnabled()) {
-                                        logger.debug("flatMessagePart: {}, partition: {}",
-                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue),
-                                            i);
-                                    }
-                                    final int index = i;
-                                    try {
-                                        Message message = new Message(topicName,
-                                            JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)
-                                                .getBytes());
-                                        this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                                            @Override
-                                            public MessageQueue select(List<MessageQueue> mqs, Message msg,
-                                                                       Object arg) {
-                                                if (index > mqs.size()) {
-                                                    throw new CanalServerException(
-                                                        "partition number is error,config num:"
-                                                                                   + destination.getPartitionsNum()
-                                                                                   + ", mq num: " + mqs.size());
-                                                }
-                                                return mqs.get(index);
-                                            }
-                                        }, null);
-                                    } catch (Exception e) {
-                                        logger.error("send flat message to hashed partition error", e);
-                                        throw e;
-                                    }
-                                }
-                            }
-                        }
                     }
                 }
             }