1
0
Эх сурвалжийг харах

增加rabbitmq配置,以持久化方式投递消息 (#4644)

Co-authored-by: wangheng <760261296@qq.com>
Smile-cmd 2 жил өмнө
parent
commit
5d7d19fdb1

+ 3 - 0
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/config/RabbitMQConstants.java

@@ -15,6 +15,9 @@ public class RabbitMQConstants {
     public static final String RABBITMQ_VIRTUAL_HOST     = ROOT + "." + "virtual.host";
     public static final String RABBITMQ_USERNAME         = ROOT + "." + "username";
     public static final String RABBITMQ_PASSWORD         = ROOT + "." + "password";
+    public static final String RABBITMQ_QUEUE         = ROOT + "." + "queue";
+    public static final String RABBITMQ_ROUTING_KEY         = ROOT + "." + "routingKey";
+    public static final String RABBITMQ_DELIVERY_MODE         = ROOT + "." + "deliveryMode";
 
     public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
 }

+ 27 - 0
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/config/RabbitMQProducerConfig.java

@@ -15,6 +15,9 @@ public class RabbitMQProducerConfig extends MQProperties {
     private String exchange;
     private String username;
     private String password;
+    private String queue;
+    private String routingKey;
+    private String deliveryMode;
 
     public String getHost() {
         return host;
@@ -55,4 +58,28 @@ public class RabbitMQProducerConfig extends MQProperties {
     public void setPassword(String password) {
         this.password = password;
     }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public String getRoutingKey() {
+        return routingKey;
+    }
+
+    public void setRoutingKey(String routingKey) {
+        this.routingKey = routingKey;
+    }
+
+    public String getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    public void setDeliveryMode(String deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
 }

+ 18 - 6
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -6,6 +6,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
+import com.rabbitmq.client.*;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,10 +27,6 @@ import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
 import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
-import com.rabbitmq.client.AlreadyClosedException;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
 
 /**
  * RabbitMQ Producer SPI 实现
@@ -75,7 +72,10 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         try {
             connect = factory.newConnection();
             channel = connect.createChannel();
-            // channel.exchangeDeclare(mqProperties.getExchange(), "topic");
+            channel.queueDeclare(rabbitMQProperties.getQueue(), true, false, false, null);
+            channel.exchangeDeclare(rabbitMQProperties.getExchange(), rabbitMQProperties.getDeliveryMode(), true, false, false, null);
+            channel.queueBind(rabbitMQProperties.getQueue(), rabbitMQProperties.getExchange(), rabbitMQProperties.getRoutingKey());
+
         } catch (IOException | TimeoutException ex) {
             throw new CanalException("Start RabbitMQ producer error", ex);
         }
@@ -106,6 +106,18 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(password)) {
             rabbitMQProperties.setPassword(password);
         }
+        String queue = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_QUEUE);
+        if (!StringUtils.isEmpty(queue)) {
+            rabbitMQProperties.setQueue(queue);
+        }
+        String routingKey = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_ROUTING_KEY);
+        if (!StringUtils.isEmpty(routingKey)) {
+            rabbitMQProperties.setRoutingKey(routingKey);
+        }
+        String deliveryMode = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_DELIVERY_MODE);
+        if (!StringUtils.isEmpty(deliveryMode)) {
+            rabbitMQProperties.setDeliveryMode(deliveryMode);
+        }
     }
 
     @Override
@@ -165,7 +177,7 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         // tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey
         try {
             RabbitMQProducerConfig rabbitMQProperties = (RabbitMQProducerConfig) this.mqProperties;
-            channel.basicPublish(rabbitMQProperties.getExchange(), queueName, null, message);
+            channel.basicPublish(rabbitMQProperties.getExchange(), queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message);
         } catch (Throwable e) {
             throw new RuntimeException(e);
         }