Parcourir la source

增加rabbitmq消息持久化deliveryMode配置 (#3300)

995586041 il y a 4 ans
Parent
commit
75174a63af

+ 1 - 0
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/config/RabbitMQConstants.java

@@ -15,6 +15,7 @@ public class RabbitMQConstants {
     public static final String RABBITMQ_VIRTUAL_HOST     = ROOT + "." + "virtual.host";
     public static final String RABBITMQ_USERNAME         = ROOT + "." + "username";
     public static final String RABBITMQ_PASSWORD         = ROOT + "." + "password";
+    public static final String RABBITMQ_DELIVERY_NODE    = ROOT + "." + "deliveryMode";
 
     public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
 }

+ 10 - 0
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/config/RabbitMQProducerConfig.java

@@ -15,6 +15,8 @@ public class RabbitMQProducerConfig extends MQProperties {
     private String exchange;
     private String username;
     private String password;
+    // 1:transient, 2:"persistent"
+    private String  deliveryMode;
 
     public String getHost() {
         return host;
@@ -55,4 +57,12 @@ public class RabbitMQProducerConfig extends MQProperties {
     public void setPassword(String password) {
         this.password = password;
     }
+
+    public String getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    public void setDeliveryMode(String deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
 }

+ 8 - 5
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -6,6 +6,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
+import com.rabbitmq.client.*;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,10 +27,6 @@ import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
 import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
-import com.rabbitmq.client.AlreadyClosedException;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
 
 /**
  * RabbitMQ Producer SPI 实现
@@ -106,6 +103,10 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(password)) {
             rabbitMQProperties.setPassword(password);
         }
+        String deliveryMode = properties.getProperty(RabbitMQConstants.RABBITMQ_DELIVERY_NODE);
+        if (!StringUtils.isEmpty(deliveryMode)) {
+            rabbitMQProperties.setDeliveryMode(deliveryMode);
+        }
     }
 
     @Override
@@ -165,7 +166,9 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         // tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey
         try {
             RabbitMQProducerConfig rabbitMQProperties = (RabbitMQProducerConfig) this.mqProperties;
-            channel.basicPublish(rabbitMQProperties.getExchange(), queueName, null, message);
+            AMQP.BasicProperties properties = rabbitMQProperties != null && rabbitMQProperties.getDeliveryMode() != null
+                    && "2".equals(rabbitMQProperties.getDeliveryMode()) ? MessageProperties.MINIMAL_PERSISTENT_BASIC : null;
+            channel.basicPublish(rabbitMQProperties.getExchange(), queueName, properties, message);
         } catch (Throwable e) {
             throw new RuntimeException(e);
         }

+ 3 - 2
deployer/src/main/resources/canal.properties

@@ -2,7 +2,7 @@
 ######### 		common argument		#############
 #################################################
 # tcp bind ip
-canal.ip =
+canal.ip = 
 # register ip to zookeeper
 canal.register.ip =
 canal.port = 11111
@@ -166,4 +166,5 @@ rabbitmq.host =
 rabbitmq.virtual.host =
 rabbitmq.exchange =
 rabbitmq.username =
-rabbitmq.password =
+rabbitmq.password =
+rabbitmq.deliveryMode =