Jelajahi Sumber

Add flat message support to rocketmq

duhengforever 6 tahun lalu
induk
melakukan
b484593869

+ 1 - 1
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -105,7 +105,7 @@ public class CanalAdapterLoader {
                         CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig.getBootstrapServers(),
                         CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig.getBootstrapServers(),
                             topic.getTopic(),
                             topic.getTopic(),
                             group.getGroupId(),
                             group.getGroupId(),
-                            canalOuterAdapterGroups);
+                            canalOuterAdapterGroups, canalClientConfig.getFlatMessage());
                         canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                         canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                         rocketMQWorker.start();
                         rocketMQWorker.start();
                     } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
                     } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {

+ 25 - 9
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.client.adapter.loader;
 package com.alibaba.otter.canal.client.adapter.loader;
 
 
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
@@ -23,16 +24,19 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
 
 
     private RocketMQCanalConnector connector;
     private RocketMQCanalConnector connector;
 
 
-    private String                 topic;
+    private String topic;
+
+    private boolean flatMessage;
 
 
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
     public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
-                                      List<List<CanalOuterAdapter>> canalOuterAdapters){
+        List<List<CanalOuterAdapter>> canalOuterAdapters, boolean flatMessage) {
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
         this.canalOuterAdapters = canalOuterAdapters;
         this.canalOuterAdapters = canalOuterAdapters;
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
         this.topic = topic;
         this.topic = topic;
+        this.flatMessage = flatMessage;
         this.canalDestination = topic;
         this.canalDestination = topic;
-        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
+        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId, flatMessage);
     }
     }
 
 
     @Override
     @Override
@@ -80,20 +84,32 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                 logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
                 logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
                 while (running) {
                 while (running) {
                     try {
                     try {
-                        // switcher.get(); //等待开关开启
-
-                        final Message message = connector.getWithoutAck(1);
+                        Object message = null;
+                        if (!flatMessage) {
+                            message = connector.getWithoutAck(1);
+                        } else {
+                            message = connector.getFlatMessageWithoutAck();
+                        }
                         if (message != null) {
                         if (message != null) {
+                            final Object msg = message;
                             executor.submit(new Runnable() {
                             executor.submit(new Runnable() {
-
                                 @Override
                                 @Override
                                 public void run() {
                                 public void run() {
                                     try {
                                     try {
-                                        writeOut(message, topic);
+                                        if (msg != null) {
+                                            if (msg instanceof Message) {
+                                                Message receive = (Message) msg;
+                                                writeOut(receive, topic);
+                                                connector.ack(receive.getId());
+                                            } else {
+                                                FlatMessage receive = (FlatMessage) msg;
+                                                writeOut(receive);
+                                                connector.ack(receive.getId());
+                                            }
+                                        }
                                     } catch (Exception e) {
                                     } catch (Exception e) {
                                         logger.error(e.getMessage(), e);
                                         logger.error(e.getMessage(), e);
                                     }
                                     }
-                                    connector.ack(message.getId());
                                 }
                                 }
                             });
                             });
                         } else {
                         } else {

+ 1 - 1
client-launcher/src/main/resources/canal-client.yml

@@ -1,7 +1,7 @@
 #canalServerHost: 127.0.0.1:11111
 #canalServerHost: 127.0.0.1:11111
 #zookeeperHosts: slave1:2181
 #zookeeperHosts: slave1:2181
 bootstrapServers: slave1:6667,slave2:6667 #or rocketmq nameservers:host1:9876;host2:9876
 bootstrapServers: slave1:6667,slave2:6667 #or rocketmq nameservers:host1:9876;host2:9876
-flatMessage: true
+flatMessage: false
 
 
 #canalInstances:
 #canalInstances:
 #- instance: example
 #- instance: example

+ 112 - 22
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.client.rocketmq;
 package com.alibaba.otter.canal.client.rocketmq;
 
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -24,21 +26,23 @@ import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 
 
 public class RocketMQCanalConnector implements CanalConnector {
 public class RocketMQCanalConnector implements CanalConnector {
 
 
-    private static final Logger                          logger              = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+    private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);
 
 
-    private String                                       nameServer;
-    private String                                       topic;
-    private String                                       groupName;
-    private volatile boolean                             connected           = false;
-    private DefaultMQPushConsumer                        rocketMQConsumer;
-    private BlockingQueue<ConsumerBatchMessage<Message>> messageBlockingQueue;
-    Map<Long, ConsumerBatchMessage<Message>>             messageCache;
-    private long                                         batchProcessTimeout = 10000;
+    private String nameServer;
+    private String topic;
+    private String groupName;
+    private volatile boolean connected = false;
+    private DefaultMQPushConsumer rocketMQConsumer;
+    private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
+    Map<Long, ConsumerBatchMessage> messageCache;
+    private long batchProcessTimeout = 3000;
+    private boolean flatMessage;
 
 
-    public RocketMQCanalConnector(String nameServer, String topic, String groupName){
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, boolean flatMessage) {
         this.nameServer = nameServer;
         this.nameServer = nameServer;
         this.topic = topic;
         this.topic = topic;
         this.groupName = groupName;
         this.groupName = groupName;
+        this.flatMessage = flatMessage;
         messageBlockingQueue = new LinkedBlockingQueue<>();
         messageBlockingQueue = new LinkedBlockingQueue<>();
         messageCache = new ConcurrentHashMap<>();
         messageCache = new ConcurrentHashMap<>();
     }
     }
@@ -70,11 +74,12 @@ public class RocketMQCanalConnector implements CanalConnector {
             if (rocketMQConsumer == null) {
             if (rocketMQConsumer == null) {
                 this.connect();
                 this.connect();
             }
             }
-            rocketMQConsumer.subscribe(topic, "*");
+            rocketMQConsumer.subscribe(this.topic, "*");
             rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
             rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
 
 
                 @Override
                 @Override
-                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts,
+                    ConsumeOrderlyContext context) {
                     context.setAutoCommit(true);
                     context.setAutoCommit(true);
                     boolean isSuccess = process(messageExts);
                     boolean isSuccess = process(messageExts);
                     if (isSuccess) {
                     if (isSuccess) {
@@ -93,17 +98,33 @@ public class RocketMQCanalConnector implements CanalConnector {
     }
     }
 
 
     private boolean process(List<MessageExt> messageExts) {
     private boolean process(List<MessageExt> messageExts) {
-        BlockingQueue<Message> messageList = new LinkedBlockingQueue<>();
+        logger.info("Get Message:{}", messageExts);
+        BlockingQueue messageList = new LinkedBlockingQueue<>();
         for (MessageExt messageExt : messageExts) {
         for (MessageExt messageExt : messageExts) {
             byte[] data = messageExt.getBody();
             byte[] data = messageExt.getBody();
-            Message message = CanalMessageDeserializer.deserializer(data);
-            try {
-                messageList.put(message);
-            } catch (InterruptedException ex) {
-                logger.error("Add message error");
+            if (data != null){
+                try {
+                    if (!flatMessage) {
+                        Message message = CanalMessageDeserializer.deserializer(data);
+                        messageList.put(message);
+                    } else {
+                        FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
+                        messageList.put(flatMessage);
+                    }
+                } catch (Exception ex) {
+                    logger.error("Add message error", ex);
+                    throw new CanalClientException(ex);
+                }
+            }else{
+                logger.warn("Received message data is null");
             }
             }
         }
         }
-        ConsumerBatchMessage<Message> batchMessage = new ConsumerBatchMessage<>(messageList);
+        ConsumerBatchMessage batchMessage;
+        if (!flatMessage) {
+            batchMessage = new ConsumerBatchMessage<Message>(messageList);
+        } else {
+            batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
+        }
         try {
         try {
             messageBlockingQueue.put(batchMessage);
             messageBlockingQueue.put(batchMessage);
         } catch (InterruptedException e) {
         } catch (InterruptedException e) {
@@ -131,17 +152,28 @@ public class RocketMQCanalConnector implements CanalConnector {
         this.rocketMQConsumer.unsubscribe(this.topic);
         this.rocketMQConsumer.unsubscribe(this.topic);
     }
     }
 
 
+    /**
+     * 暂时不支持batchSize 参数
+     *
+     * @param batchSize 暂时不支持
+     * @return
+     * @throws CanalClientException
+     */
     @Override
     @Override
     public Message get(int batchSize) throws CanalClientException {
     public Message get(int batchSize) throws CanalClientException {
         Message message = getWithoutAck(batchSize);
         Message message = getWithoutAck(batchSize);
-        ack(message.getId());
+        if (message != null) {
+            ack(message.getId());
+        }
         return message;
         return message;
     }
     }
 
 
     @Override
     @Override
     public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
     public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
         Message message = getWithoutAck(batchSize, timeout, unit);
         Message message = getWithoutAck(batchSize, timeout, unit);
-        ack(message.getId());
+        if (message != null) {
+            ack(message.getId());
+        }
         return message;
         return message;
     }
     }
 
 
@@ -155,6 +187,23 @@ public class RocketMQCanalConnector implements CanalConnector {
         return null;
         return null;
     }
     }
 
 
+    private FlatMessage getFlatMessage(ConsumerBatchMessage consumerBatchMessage) {
+        BlockingQueue<FlatMessage> messageList = consumerBatchMessage.getData();
+        if (messageList != null & messageList.size() > 0) {
+            FlatMessage message = messageList.poll();
+            messageCache.put(message.getId(), consumerBatchMessage);
+            return message;
+        }
+        return null;
+    }
+
+    /**
+     * 暂时不支持该参数设置
+     *
+     * @param batchSize
+     * @return
+     * @throws CanalClientException
+     */
     @Override
     @Override
     public Message getWithoutAck(int batchSize) throws CanalClientException {
     public Message getWithoutAck(int batchSize) throws CanalClientException {
         ConsumerBatchMessage batchMessage = messageBlockingQueue.poll();
         ConsumerBatchMessage batchMessage = messageBlockingQueue.poll();
@@ -168,11 +217,51 @@ public class RocketMQCanalConnector implements CanalConnector {
     public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
     public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
         try {
         try {
             ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
             ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
-            return getMessage(batchMessage);
+            if (batchMessage != null) {
+                return getMessage(batchMessage);
+            }
         } catch (InterruptedException ex) {
         } catch (InterruptedException ex) {
             logger.warn("Get message timeout", ex);
             logger.warn("Get message timeout", ex);
             throw new CanalClientException("Failed to fetch the data after: " + timeout);
             throw new CanalClientException("Failed to fetch the data after: " + timeout);
         }
         }
+        return null;
+    }
+
+    public FlatMessage getFlatMessageWithoutAck() {
+        return getFlatMessageWithoutAck(null, null);
+    }
+
+    public FlatMessage getFlatMessageWithoutAck(Long timeout,
+        TimeUnit unit) throws CanalClientException {
+        try {
+            ConsumerBatchMessage batchMessage = null;
+            if (timeout == null || timeout == 0) {
+                batchMessage = messageBlockingQueue.poll();
+            } else {
+                batchMessage = messageBlockingQueue.poll(timeout, unit);
+            }
+            if (batchMessage != null) {
+                return getFlatMessage(batchMessage);
+            }
+        } catch (InterruptedException ex) {
+            logger.warn("Get flat message timeout", ex);
+            throw new CanalClientException("Failed to fetch the flat message data after: " + timeout);
+        }
+        return null;
+    }
+
+    public FlatMessage getFlatMessage() throws CanalClientException {
+        FlatMessage message = getFlatMessageWithoutAck(null, null);
+        if (message != null) {
+            ack(message.getId());
+        }
+        return message;
+    }
+
+    public FlatMessage getFlatMessage(Long timeout, TimeUnit unit) throws CanalClientException {
+        FlatMessage message = getFlatMessageWithoutAck(timeout, unit);
+        ack(message.getId());
+        return message;
     }
     }
 
 
     @Override
     @Override
@@ -180,6 +269,7 @@ public class RocketMQCanalConnector implements CanalConnector {
         ConsumerBatchMessage batchMessage = messageCache.get(batchId);
         ConsumerBatchMessage batchMessage = messageCache.get(batchId);
         if (batchMessage != null) {
         if (batchMessage != null) {
             batchMessage.ack();
             batchMessage.ack();
+            messageCache.remove(batchId);
         }
         }
     }
     }
 
 

+ 5 - 1
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectorProvider.java

@@ -14,7 +14,11 @@ public class RocketMQCanalConnectorProvider {
      * @return {@link RocketMQCanalConnector}
      * @return {@link RocketMQCanalConnector}
      */
      */
     public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic, String groupId) {
     public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic, String groupId) {
-        return new RocketMQCanalConnector(nameServers, topic, groupId);
+        return new RocketMQCanalConnector(nameServers, topic, groupId, false);
     }
     }
 
 
+    public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic, String groupId,
+        boolean flatMessage) {
+        return new RocketMQCanalConnector(nameServers, topic, groupId, flatMessage);
+    }
 }
 }

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -57,6 +57,7 @@ public class CanalLauncher {
 
 
             CanalMQProducer canalMQProducer = null;
             CanalMQProducer canalMQProducer = null;
             String serverMode = controller.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
             String serverMode = controller.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
+            serverMode = "rocketmq";
             if (serverMode.equalsIgnoreCase("kafka")) {
             if (serverMode.equalsIgnoreCase("kafka")) {
                 canalMQProducer = new CanalKafkaProducer();
                 canalMQProducer = new CanalKafkaProducer();
             } else if (serverMode.equalsIgnoreCase("rocketmq")) {
             } else if (serverMode.equalsIgnoreCase("rocketmq")) {

+ 1 - 1
deployer/src/main/resources/mq.yml

@@ -8,7 +8,7 @@ bufferMemory: 33554432
 canalBatchSize: 50
 canalBatchSize: 50
 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
 canalGetTimeout: 100
 canalGetTimeout: 100
-flatMessage: true
+flatMessage: false
 
 
 canalDestinations:
 canalDestinations:
   - canalDestination: example
   - canalDestination: example

+ 75 - 16
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -1,10 +1,13 @@
 package com.alibaba.otter.canal.rocketmq;
 package com.alibaba.otter.canal.rocketmq;
 
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
 import java.util.List;
 import java.util.List;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -19,10 +22,13 @@ public class CanalRocketMQProducer implements CanalMQProducer {
 
 
     private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
     private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
 
 
-    private DefaultMQProducer   defaultMQProducer;
+    private DefaultMQProducer defaultMQProducer;
+
+    private MQProperties mqProperties;
 
 
     @Override
     @Override
     public void init(MQProperties rocketMQProperties) {
     public void init(MQProperties rocketMQProperties) {
+        this.mqProperties = rocketMQProperties;
         defaultMQProducer = new DefaultMQProducer();
         defaultMQProducer = new DefaultMQProducer();
         defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
         defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
         defaultMQProducer.setProducerGroup(rocketMQProperties.getProducerGroup());
         defaultMQProducer.setProducerGroup(rocketMQProperties.getProducerGroup());
@@ -37,25 +43,78 @@ public class CanalRocketMQProducer implements CanalMQProducer {
 
 
     @Override
     @Override
     public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
     public void send(final MQProperties.CanalDestination destination, com.alibaba.otter.canal.protocol.Message data,
-                     Callback callback) {
-        try {
-            Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data));
-            this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                @Override
-                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                    int partition = 0;
+        Callback callback) {
+        if (!mqProperties.getFlatMessage()) {
+            try {
+                Message message = new Message(destination.getTopic(), CanalMessageSerializer.serializer(data));
+                logger.debug("send message:{} to destination:{}, partition: {}", message, destination.getCanalDestination(), destination.getPartition());
+                this.defaultMQProducer.send(message, new MessageQueueSelector() {
+                    @Override
+                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                        int partition = 0;
+                        if (destination.getPartition() != null) {
+                            partition = destination.getPartition();
+                        }
+                        return mqs.get(partition);
+                    }
+                }, null);
+                callback.commit();
+            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+                logger.error("Send message error!", e);
+                callback.rollback();
+            }
+        } else {
+            List<FlatMessage> flatMessages = FlatMessage.messageConverter(data);
+            if (flatMessages != null) {
+                for (FlatMessage flatMessage : flatMessages) {
                     if (destination.getPartition() != null) {
                     if (destination.getPartition() != null) {
-                        partition = destination.getPartition();
+                        try {
+                            logger.info("send flat message: {} to topic: {} fixed partition: {}", JSON.toJSONString(flatMessage),destination.getTopic(), destination.getPartition());
+                            Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessage).getBytes());
+                            this.defaultMQProducer.send(message, new MessageQueueSelector() {
+                                @Override
+                                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                    return mqs.get(destination.getPartition());
+                                }
+                            }, null);
+                        } catch (Exception e) {
+                            logger.error("send flat message to fixed partition error", e);
+                            callback.rollback();
+                        }
+                    } else {
+                        if (destination.getPartitionHash() != null
+                            && !destination.getPartitionHash().isEmpty()) {
+                            FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
+                                destination.getPartitionsNum(),
+                                destination.getPartitionHash());
+                            int length = partitionFlatMessage.length;
+                            for (int i = 0; i < length; i++) {
+                                FlatMessage flatMessagePart = partitionFlatMessage[i];
+                                logger.debug("flatMessagePart: {}, partition: {}", JSON.toJSONString(flatMessagePart), i);
+                                final int index = i;
+                                try {
+                                    Message message = new Message(destination.getTopic(), JSON.toJSONString(flatMessagePart).getBytes());
+                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
+                                        @Override
+                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                                            if (index > mqs.size()) {
+                                                throw new CanalServerException("partition number is error,config num:" + destination.getPartitionsNum() + ", mq num: " + mqs.size());
+                                            }
+                                            return mqs.get(index);
+                                        }
+                                    }, null);
+                                } catch (Exception e) {
+                                    logger.error("send flat message to hashed partition error", e);
+                                    callback.rollback();
+                                }
+
+                            }
+                        }
                     }
                     }
-                    return mqs.get(partition);
                 }
                 }
-            }, null);
-            callback.commit();
-        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
-            logger.error("Send message error!", e);
-            callback.rollback();
+            }
         }
         }
+
     }
     }
 
 
     @Override
     @Override