浏览代码

support rabbitmq ssl (#4965)

Devin Zhang 1 年之前
父节点
当前提交
ca79258e4f

+ 10 - 1
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -1,6 +1,9 @@
 package com.alibaba.otter.canal.connector.rabbitmq.producer;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -51,7 +54,13 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
 
         ConnectionFactory factory = new ConnectionFactory();
         String servers = rabbitMQProperties.getHost();
-        if (servers.contains(":")) {
+        if (servers.startsWith("amqp")) {
+            try {
+                factory.setUri(servers);
+            } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException ex) {
+                throw new CanalException("failed to parse host", ex);
+            }
+        } else if (servers.contains(":")) {
             String[] serverHostAndPort = servers.split(":");
             factory.setHost(serverHostAndPort[0]);
             factory.setPort(Integer.parseInt(serverHostAndPort[1]));