Просмотр исходного кода

Merge pull request #1084 from rewerma/master

fix #1083
rewerma 6 лет назад
Родитель
Сommit
d3e1ddd234

+ 26 - 22
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -78,8 +78,8 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 JSON.toJSONString(flatMessage),
                                 destination.getTopic(),
                                 destination.getPartition());
-                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage)
-                                .getBytes());
+                            Message message = new Message(destination.getTopic(),
+                                JSON.toJSONString(flatMessage).getBytes());
                             this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
                                 @Override
@@ -99,28 +99,32 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             int length = partitionFlatMessage.length;
                             for (int i = 0; i < length; i++) {
                                 FlatMessage flatMessagePart = partitionFlatMessage[i];
-                                logger.debug("flatMessagePart: {}, partition: {}",
-                                    JSON.toJSONString(flatMessagePart),
-                                    i);
-                                final int index = i;
-                                try {
-                                    Message message = new Message(destination.getTopic(),
-                                        JSON.toJSONString(flatMessagePart).getBytes());
-                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+                                if (flatMessagePart != null) {
+                                    logger.debug("flatMessagePart: {}, partition: {}",
+                                        JSON.toJSONString(flatMessagePart),
+                                        i);
+                                    final int index = i;
+                                    try {
+                                        Message message = new Message(destination.getTopic(),
+                                            JSON.toJSONString(flatMessagePart).getBytes());
+                                        this.defaultMQProducer.send(message, new MessageQueueSelector() {
 
-                                        @Override
-                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                            if (index > mqs.size()) {
-                                                throw new CanalServerException("partition number is error,config num:"
-                                                                               + destination.getPartitionsNum()
-                                                                               + ", mq num: " + mqs.size());
+                                            @Override
+                                            public MessageQueue select(List<MessageQueue> mqs, Message msg,
+                                                                       Object arg) {
+                                                if (index > mqs.size()) {
+                                                    throw new CanalServerException(
+                                                        "partition number is error,config num:"
+                                                                                   + destination.getPartitionsNum()
+                                                                                   + ", mq num: " + mqs.size());
+                                                }
+                                                return mqs.get(index);
                                             }
-                                            return mqs.get(index);
-                                        }
-                                    }, null);
-                                } catch (Exception e) {
-                                    logger.error("send flat message to hashed partition error", e);
-                                    callback.rollback();
+                                        }, null);
+                                    } catch (Exception e) {
+                                        logger.error("send flat message to hashed partition error", e);
+                                        callback.rollback();
+                                    }
                                 }
                             }
                         }