jianghang.loujh 1 vuosi sitten
vanhempi
commit
fccc6bc816

+ 3 - 3
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/config/RabbitMQConstants.java

@@ -15,9 +15,9 @@ public class RabbitMQConstants {
     public static final String RABBITMQ_VIRTUAL_HOST     = ROOT + "." + "virtual.host";
     public static final String RABBITMQ_USERNAME         = ROOT + "." + "username";
     public static final String RABBITMQ_PASSWORD         = ROOT + "." + "password";
-    public static final String RABBITMQ_QUEUE         = ROOT + "." + "queue";
-    public static final String RABBITMQ_ROUTING_KEY         = ROOT + "." + "routingKey";
-    public static final String RABBITMQ_DELIVERY_MODE         = ROOT + "." + "deliveryMode";
+    public static final String RABBITMQ_QUEUE            = ROOT + "." + "queue";
+    public static final String RABBITMQ_ROUTING_KEY      = ROOT + "." + "routingKey";
+    public static final String RABBITMQ_DELIVERY_MODE    = ROOT + "." + "deliveryMode";
 
     public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
 }

+ 5 - 10
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/consumer/CanalRabbitMQConsumer.java

@@ -9,11 +9,11 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.fastjson2.JSON;
+import com.alibaba.otter.canal.common.utils.PropertiesUtils;
 import com.alibaba.otter.canal.connector.core.config.CanalConstants;
 import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
 import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
@@ -24,13 +24,7 @@ import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
 import com.alibaba.otter.canal.connector.rabbitmq.producer.AliyunCredentialsProvider;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.Consumer;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.*;
 
 /**
  * RabbitMQ consumer SPI 实现
@@ -41,7 +35,8 @@ import com.rabbitmq.client.Envelope;
 @SPI("rabbitmq")
 public class CanalRabbitMQConsumer implements CanalMsgConsumer {
 
-    private static final Logger                                logger              = LoggerFactory.getLogger(CanalRabbitMQConsumer.class);
+    private static final Logger                                logger              = LoggerFactory
+        .getLogger(CanalRabbitMQConsumer.class);
 
     // 链接地址
     private String                                             nameServer;
@@ -91,7 +86,7 @@ public class CanalRabbitMQConsumer implements CanalMsgConsumer {
             factory.setUsername(username);
             factory.setPassword(password);
         }
-        //解析出端口 modified by 16075140
+        // 解析出端口 modified by 16075140
         if (nameServer != null && nameServer.contains(":")) {
             String[] serverHostAndPort = nameServer.split(":");
             factory.setHost(serverHostAndPort[0]);

+ 12 - 7
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -6,7 +6,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
-import com.rabbitmq.client.*;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +26,7 @@ import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
 import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
+import com.rabbitmq.client.*;
 
 /**
  * RabbitMQ Producer SPI 实现
@@ -73,8 +73,11 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
             connect = factory.newConnection();
             channel = connect.createChannel();
             channel.queueDeclare(rabbitMQProperties.getQueue(), true, false, false, null);
-            channel.exchangeDeclare(rabbitMQProperties.getExchange(), rabbitMQProperties.getDeliveryMode(), true, false, false, null);
-            channel.queueBind(rabbitMQProperties.getQueue(), rabbitMQProperties.getExchange(), rabbitMQProperties.getRoutingKey());
+            channel.exchangeDeclare(rabbitMQProperties
+                .getExchange(), rabbitMQProperties.getDeliveryMode(), true, false, false, null);
+            channel.queueBind(rabbitMQProperties.getQueue(),
+                rabbitMQProperties.getExchange(),
+                rabbitMQProperties.getRoutingKey());
 
         } catch (IOException | TimeoutException ex) {
             throw new CanalException("Start RabbitMQ producer error", ex);
@@ -126,9 +129,8 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         try {
             if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
                 // 动态topic
-                Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
-                    destination.getTopic(),
-                    destination.getDynamicTopic());
+                Map<String, Message> messageMap = MQMessageUtils
+                    .messageTopics(message, destination.getTopic(), destination.getDynamicTopic());
 
                 for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
                     final String topicName = entry.getKey().replace('.', '_');
@@ -177,7 +179,10 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         // tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey
         try {
             RabbitMQProducerConfig rabbitMQProperties = (RabbitMQProducerConfig) this.mqProperties;
-            channel.basicPublish(rabbitMQProperties.getExchange(), queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message);
+            channel.basicPublish(rabbitMQProperties.getExchange(),
+                queueName,
+                MessageProperties.PERSISTENT_TEXT_PLAIN,
+                message);
         } catch (Throwable e) {
             throw new RuntimeException(e);
         }