|
@@ -16,6 +16,7 @@ import com.alibaba.otter.canal.common.CanalMessageSerializer;
|
|
|
import com.alibaba.otter.canal.common.MQMessageUtils;
|
|
|
import com.alibaba.otter.canal.common.MQMessageUtils.EntryRowData;
|
|
|
import com.alibaba.otter.canal.common.MQProperties;
|
|
|
+import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
|
|
|
import com.alibaba.otter.canal.protocol.FlatMessage;
|
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
|
import com.alibaba.otter.canal.server.exception.CanalServerException;
|
|
@@ -57,8 +58,9 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback)
|
|
|
- throws IOException {
|
|
|
+ public void send(final MQProperties.CanalDestination canalDestination, Message message, Callback callback)
|
|
|
+ throws IOException {
|
|
|
+ ExecutorTemplate template = new ExecutorTemplate(executor);
|
|
|
try {
|
|
|
if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
|
|
|
// 动态topic
|
|
@@ -67,21 +69,32 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
canalDestination.getDynamicTopic());
|
|
|
|
|
|
for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
|
|
|
- String topicName = entry.getKey().replace('.', '_');
|
|
|
- com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
|
|
|
- send(canalDestination, topicName, messageSub);
|
|
|
+ final String topicName = entry.getKey().replace('.', '_');
|
|
|
+ final com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
|
|
|
+
|
|
|
+ template.submit(new Runnable() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ send(canalDestination, topicName, messageSub);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
+
|
|
|
+ template.waitForResult();
|
|
|
} else {
|
|
|
send(canalDestination, canalDestination.getTopic(), message);
|
|
|
}
|
|
|
callback.commit();
|
|
|
} catch (Throwable e) {
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
callback.rollback();
|
|
|
+ } finally {
|
|
|
+ template.clear();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void send(MQProperties.CanalDestination canalDestination, String topicName, Message messageSub)
|
|
|
- throws Exception {
|
|
|
+ private void send(MQProperties.CanalDestination canalDestination, String topicName, Message messageSub) {
|
|
|
if (!mqProperties.getFlatMessage()) {
|
|
|
byte[] message = CanalMessageSerializer.serializer(messageSub, mqProperties.isFilterTransactionEntry());
|
|
|
if (logger.isDebugEnabled()) {
|
|
@@ -108,9 +121,13 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
|
|
|
|
|
|
}
|
|
|
|
|
|
- private void sendMessage(String queueName, byte[] message) throws Exception {
|
|
|
+ private void sendMessage(String queueName, byte[] message) {
|
|
|
// tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey
|
|
|
- channel.basicPublish(mqProperties.getExchange(), queueName, null, message);
|
|
|
+ try {
|
|
|
+ channel.basicPublish(mqProperties.getExchange(), queueName, null, message);
|
|
|
+ } catch (Throwable e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|