Browse Source

rabbitmq(amqp)MQ支持 (#2156)

耐小心 5 years ago
parent
commit
79552e0a37
21 changed files with 817 additions and 8 deletions
  1. 40 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 10 0
      client-adapter/launcher/pom.xml
  3. 23 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  4. 92 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRabbitMQWorker.java
  5. 3 0
      client-adapter/launcher/src/main/resources/application.yml
  6. 8 0
      client/pom.xml
  7. 1 0
      client/src/main/java/com/alibaba/otter/canal/client/CanalConnector.java
  8. 1 1
      client/src/main/java/com/alibaba/otter/canal/client/ConsumerBatchMessage.java
  9. 54 0
      client/src/main/java/com/alibaba/otter/canal/client/rabbitmq/AliyunCredentialsProvider.java
  10. 299 0
      client/src/main/java/com/alibaba/otter/canal/client/rabbitmq/RabbitMQCanalConnector.java
  11. 1 0
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
  12. 5 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
  13. 28 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStarter.java
  14. 5 0
      deployer/src/main/resources/canal.properties
  15. 4 1
      deployer/src/main/resources/logback.xml
  16. 5 5
      example/src/main/java/com/alibaba/otter/canal/example/SimpleCanalClientTest.java
  17. 11 0
      pom.xml
  18. 8 0
      server/pom.xml
  19. 45 0
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  20. 55 0
      server/src/main/java/com/alibaba/otter/canal/rabbitmq/AliyunCredentialsProvider.java
  21. 119 0
      server/src/main/java/com/alibaba/otter/canal/rabbitmq/CanalRabbitMQProducer.java

+ 40 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -33,6 +33,13 @@ public class CanalClientConfig {
     // aliyun ak/sk
     // aliyun ak/sk
     private String             accessKey;
     private String             accessKey;
     private String             secretKey;
     private String             secretKey;
+    // rabbitmq 账号密码
+    private String             username;
+    private String             password;
+    // rabbitmq vhost
+    private String             vhost         = "/";
+
+    private Long               resourceOwnerId;
     // 是否启用消息轨迹
     // 是否启用消息轨迹
     private boolean            enableMessageTrace;
     private boolean            enableMessageTrace;
     // 在使用阿里云商业化mq服务时,如果想使用云上消息轨迹功能,请设置此配置为true
     // 在使用阿里云商业化mq服务时,如果想使用云上消息轨迹功能,请设置此配置为true
@@ -132,6 +139,38 @@ public class CanalClientConfig {
         this.secretKey = secretKey;
         this.secretKey = secretKey;
     }
     }
 
 
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getVhost() {
+        return vhost;
+    }
+
+    public void setVhost(String vhost) {
+        this.vhost = vhost;
+    }
+
+    public Long getResourceOwnerId() {
+        return resourceOwnerId;
+    }
+
+    public void setResourceOwnerId(Long resourceOwnerId) {
+        this.resourceOwnerId = resourceOwnerId;
+    }
+
     public List<CanalAdapter> getCanalAdapters() {
     public List<CanalAdapter> getCanalAdapters() {
         return canalAdapters;
         return canalAdapters;
     }
     }
@@ -176,7 +215,7 @@ public class CanalClientConfig {
 
 
         private String      instance; // 实例名
         private String      instance; // 实例名
 
 
-        private List<Group> groups;  // 适配器分组列表
+        private List<Group> groups;   // 适配器分组列表
 
 
         public String getInstance() {
         public String getInstance() {
             return instance;
             return instance;

+ 10 - 0
client-adapter/launcher/pom.xml

@@ -71,6 +71,16 @@
             <version>1.1.1</version>
             <version>1.1.1</version>
         </dependency>
         </dependency>
 
 
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>5.5.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.mq-amqp</groupId>
+            <artifactId>mq-amqp-client</artifactId>
+            <version>1.0.3</version>
+        </dependency>
         <!-- jdbc -->
         <!-- jdbc -->
         <dependency>
         <dependency>
             <groupId>mysql</groupId>
             <groupId>mysql</groupId>

+ 23 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -142,6 +142,29 @@ public class CanalAdapterLoader {
                         canalAdapter.getInstance() + "-" + group.getGroupId());
                         canalAdapter.getInstance() + "-" + group.getGroupId());
                 }
                 }
             }
             }
+        } else if ("rabbitMQ".equalsIgnoreCase(canalClientConfig.getMode())) {
+            // 初始化canal-client-rabbitMQ的适配器
+            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
+                for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
+                    for (OuterAdapterConfig config : group.getOuterAdapters()) {
+                        loadAdapter(config, canalOuterAdapters);
+                    }
+                    canalOuterAdapterGroups.add(canalOuterAdapters);
+                    CanalAdapterRabbitMQWorker rabbitMQWork = new CanalAdapterRabbitMQWorker(canalClientConfig,
+                        canalOuterAdapterGroups,
+                        canalAdapter.getInstance(),
+                        group.getGroupId(),
+                        canalClientConfig.getFlatMessage());
+                    canalMQWorker.put(canalAdapter.getInstance() + "-rabbitmq-" + group.getGroupId(), rabbitMQWork);
+                    rabbitMQWork.start();
+
+                    logger.info("Start adapter for canal-client mq topic: {} succeed",
+                        canalAdapter.getInstance() + "-" + group.getGroupId());
+                }
+            }
+            // CanalAdapterRabbitMQWork
         }
         }
     }
     }
 
 

+ 92 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRabbitMQWorker.java

@@ -0,0 +1,92 @@
+package com.alibaba.otter.canal.adapter.launcher.loader;
+
+import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
+import com.alibaba.otter.canal.client.adapter.support.Util;
+import com.alibaba.otter.canal.client.rabbitmq.RabbitMQCanalConnector;
+import org.apache.kafka.common.errors.WakeupException;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public class CanalAdapterRabbitMQWorker extends AbstractCanalAdapterWorker {
+
+    private RabbitMQCanalConnector connector;
+    private String                 topic;
+    private boolean                flatMessage;
+
+    public CanalAdapterRabbitMQWorker(CanalClientConfig canalClientConfig, List<List<OuterAdapter>> canalOuterAdapters,
+                                      String topic, String groupId, boolean flatMessage){
+        super(canalOuterAdapters);
+        this.canalClientConfig = canalClientConfig;
+        this.topic = topic;
+        this.flatMessage = flatMessage;
+        this.canalDestination = topic;
+        this.groupId = groupId;
+        this.connector = new RabbitMQCanalConnector(canalClientConfig.getMqServers(),
+            canalClientConfig.getVhost(),
+            topic,
+            canalClientConfig.getAccessKey(),
+            canalClientConfig.getSecretKey(),
+            canalClientConfig.getUsername(),
+            canalClientConfig.getPassword(),
+            canalClientConfig.getResourceOwnerId(),
+            flatMessage);
+    }
+
+    @Override
+    protected void process() {
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignored) {
+            }
+        }
+
+        ExecutorService workerExecutor = Util.newSingleThreadExecutor(5000L);
+        int retry = canalClientConfig.getRetries() == null
+                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
+        while (running) {
+            try {
+                syncSwitch.get(canalDestination);
+                logger.info("=============> Start to connect topic: {} <=============", this.topic);
+                connector.connect();
+                logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
+                connector.subscribe();
+                logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
+                while (running) {
+                    boolean status = syncSwitch.status(canalDestination);
+                    if (!status) {
+                        connector.disconnect();
+                        break;
+                    }
+                    if (retry == -1) {
+                        retry = Integer.MAX_VALUE;
+                    }
+                    for (int i = 0; i < retry; i++) {
+                        if (!running) {
+                            break;
+                        }
+                        if (mqWriteOutData(retry, timeout, i, flatMessage, connector, workerExecutor)) {
+                            break;
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        workerExecutor.shutdown();
+
+        try {
+            connector.unsubscribe();
+        } catch (WakeupException e) {
+            // No-op. Continue process
+        }
+        connector.disconnect();
+        logger.info("=============> Disconnect topic: {} <=============", this.topic);
+
+    }
+}

+ 3 - 0
client-adapter/launcher/src/main/resources/application.yml

@@ -18,6 +18,9 @@ canal.conf:
   timeout:
   timeout:
   accessKey:
   accessKey:
   secretKey:
   secretKey:
+  username:
+  password:
+  vhost:
 #  srcDataSources:
 #  srcDataSources:
 #    defaultDS:
 #    defaultDS:
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true

+ 8 - 0
client/pom.xml

@@ -109,6 +109,14 @@
 			<groupId>org.apache.rocketmq</groupId>
 			<groupId>org.apache.rocketmq</groupId>
 			<artifactId>rocketmq-acl</artifactId>
 			<artifactId>rocketmq-acl</artifactId>
 		</dependency>
 		</dependency>
+		<dependency>
+			<groupId>com.rabbitmq</groupId>
+			<artifactId>amqp-client</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.alibaba.mq-amqp</groupId>
+			<artifactId>mq-amqp-client</artifactId>
+		</dependency>
 		<!-- 客户端要使用请单独引入kafka-clients依赖 -->
 		<!-- 客户端要使用请单独引入kafka-clients依赖 -->
 		<dependency>
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<groupId>org.apache.kafka</groupId>

+ 1 - 0
client/src/main/java/com/alibaba/otter/canal/client/CanalConnector.java

@@ -1,5 +1,6 @@
 package com.alibaba.otter.canal.client;
 package com.alibaba.otter.canal.client;
 
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.Message;

+ 1 - 1
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/ConsumerBatchMessage.java → client/src/main/java/com/alibaba/otter/canal/client/ConsumerBatchMessage.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.client.rocketmq;
+package com.alibaba.otter.canal.client;
 
 
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;

+ 54 - 0
client/src/main/java/com/alibaba/otter/canal/client/rabbitmq/AliyunCredentialsProvider.java

@@ -0,0 +1,54 @@
+/**
+ * aliyun amqp协议 账号类
+ * 暂不支持STS授权情况
+ */
+package com.alibaba.otter.canal.client.rabbitmq;
+
+import com.alibaba.mq.amqp.utils.UserUtils;
+import com.rabbitmq.client.impl.CredentialsProvider;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+
+public class AliyunCredentialsProvider implements CredentialsProvider {
+
+
+    /**
+     * Access Key ID
+     */
+    private final String AliyunAccessKey;
+
+    /**
+     * Access Key Secret
+     */
+    private final String AliyunAccessSecret;
+
+    /**
+     * 资源主账号ID
+     */
+    private final long resourceOwnerId;
+
+
+    public AliyunCredentialsProvider(final String accessKey, final String accessSecret, final long resourceOwnerId) {
+        this.AliyunAccessKey = accessKey;
+        this.AliyunAccessSecret = accessSecret;
+        this.resourceOwnerId = resourceOwnerId;
+    }
+
+
+    @Override
+    public String getUsername() {
+        return UserUtils.getUserName(AliyunAccessKey, resourceOwnerId);
+    }
+
+    @Override
+    public String getPassword() {
+        try {
+            return UserUtils.getPassord(AliyunAccessSecret);
+        } catch (InvalidKeyException | NoSuchAlgorithmException ignored) {
+        }
+        return null;
+    }
+
+
+}

+ 299 - 0
client/src/main/java/com/alibaba/otter/canal/client/rabbitmq/RabbitMQCanalConnector.java

@@ -0,0 +1,299 @@
+package com.alibaba.otter.canal.client.rabbitmq;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.otter.canal.client.CanalMQConnector;
+import com.alibaba.otter.canal.client.CanalMessageDeserializer;
+import com.alibaba.otter.canal.client.ConsumerBatchMessage;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.Lists;
+import com.rabbitmq.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class RabbitMQCanalConnector implements CanalMQConnector {
+
+    private static final Logger                 logger              = LoggerFactory
+        .getLogger(RabbitMQCanalConnector.class);
+
+    // 链接地址
+    private String                              nameServer;
+
+    // 主机名
+    private String                              vhost;
+
+    private String                              queueName;
+
+    // 一些鉴权信息
+    private String                              accessKey;
+    private String                              secretKey;
+    private Long                                resourceOwnerId;
+    private String                              username;
+    private String                              password;
+
+    private boolean                             flatMessage;
+
+    private Connection                          connect;
+    private Channel                             channel;
+
+
+    private long                                batchProcessTimeout = 60 * 1000;
+    private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
+    private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
+
+    public RabbitMQCanalConnector(String nameServer, String vhost, String queueName, String accessKey, String secretKey,
+                                  String username, String password, Long resourceOwnerId, boolean flatMessage){
+        this.nameServer = nameServer;
+        this.vhost = vhost;
+        this.queueName = queueName;
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
+        this.username = username;
+        this.password = password;
+        this.resourceOwnerId = resourceOwnerId;
+        this.flatMessage = flatMessage;
+        this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
+    }
+
+    public void connect() throws CanalClientException {
+        ConnectionFactory factory = new ConnectionFactory();
+        if (accessKey.length() > 0 && secretKey.length() > 0) {
+            factory.setCredentialsProvider(new AliyunCredentialsProvider(accessKey, secretKey, resourceOwnerId));
+        } else {
+            factory.setUsername(username);
+            factory.setPassword(password);
+        }
+        factory.setHost(nameServer);
+        factory.setAutomaticRecoveryEnabled(true);
+        factory.setNetworkRecoveryInterval(5000);
+        factory.setVirtualHost(vhost);
+        try {
+            connect = factory.newConnection();
+            channel = connect.createChannel();
+        } catch (IOException | TimeoutException e) {
+            throw new CanalClientException("Start RabbitMQ producer error", e);
+        }
+    }
+
+    @Override
+    public void disconnect() throws CanalClientException {
+        if (connect != null) {
+            try {
+                connect.close();
+            } catch (IOException e) {
+                throw new CanalClientException("stop connect error", e);
+            }
+        }
+        if (channel != null) {
+            try {
+                channel.close();
+            } catch (IOException | TimeoutException e) {
+                throw new CanalClientException("stop channel error", e);
+            }
+        }
+    }
+
+    @Override
+    public boolean checkValid() throws CanalClientException {
+        return true; // 永远true
+    }
+
+    /**
+     * RabbitMQ支持拉取 不需要订阅
+     *
+     * @param filter
+     * @throws CanalClientException
+     */
+    @Override
+    public void subscribe(String filter) throws CanalClientException {
+        // 不存在连接 则重新连接
+        if (connect == null) {
+            this.connect();
+        }
+
+        Consumer consumer = new DefaultConsumer(channel) {
+
+            @Override
+            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
+                                       byte[] body) throws IOException {
+
+                if (body != null) {
+                    channel.basicAck(envelope.getDeliveryTag(), process(body));
+                }
+            }
+        };
+        try {
+            channel.basicConsume(queueName, false, consumer);
+        } catch (IOException e) {
+            throw new CanalClientException("error", e);
+        }
+    }
+
+    @Override
+    public void subscribe() throws CanalClientException {
+        this.subscribe(null);
+    }
+
+    @Override
+    public void unsubscribe() throws CanalClientException {
+        // 取消订阅 直接强行断开吧
+        this.disconnect();
+    }
+
+    @Override
+    public Message get(int batchSize) throws CanalClientException {
+        return null;
+    }
+
+    @Override
+    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        return null;
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void ack(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+    }
+
+    @Override
+    public void rollback(long batchId) throws CanalClientException {
+        throw new CanalClientException("mq not support this method");
+
+    }
+
+    @Override
+    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
+        List<Message> messages = getListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            ack();
+        }
+        return messages;
+    }
+
+    @Override
+    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+        try {
+            if (this.lastGetBatchMessage != null) {
+                throw new CanalClientException("mq get/ack not support concurrent & async ack");
+            }
+
+            ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
+            if (batchMessage != null) {
+                this.lastGetBatchMessage = batchMessage;
+                return batchMessage.getData();
+            }
+        } catch (InterruptedException ex) {
+            logger.warn("Get message timeout", ex);
+            throw new CanalClientException("Failed to fetch the data after: " + timeout);
+        }
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
+        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
+        if (messages != null && !messages.isEmpty()) {
+            ack();
+        }
+        return messages;
+    }
+
+    @Override
+    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+        try {
+            if (this.lastGetBatchMessage != null) {
+                throw new CanalClientException("mq get/ack not support concurrent & async ack");
+            }
+
+            ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
+            if (batchMessage != null) {
+                this.lastGetBatchMessage = batchMessage;
+                return batchMessage.getData();
+            }
+        } catch (InterruptedException ex) {
+            logger.warn("Get message timeout", ex);
+            throw new CanalClientException("Failed to fetch the data after: " + timeout);
+        }
+        return Lists.newArrayList();
+    }
+
+    private boolean process(byte[] messageData) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Get Message: {}", new String(messageData));
+        }
+        List messageList = Lists.newArrayList();
+        if (!flatMessage) {
+            Message message = CanalMessageDeserializer.deserializer(messageData);
+            messageList.add(message);
+        } else {
+            FlatMessage flatMessage = JSON.parseObject(messageData, FlatMessage.class);
+            messageList.add(flatMessage);
+        }
+        ConsumerBatchMessage batchMessage;
+        if (!flatMessage) {
+            batchMessage = new ConsumerBatchMessage<Message>(messageList);
+        } else {
+            batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
+        }
+        try {
+            messageBlockingQueue.put(batchMessage);
+        } catch (InterruptedException e) {
+            logger.error("Put message to queue error", e);
+            throw new RuntimeException(e);
+        }
+        boolean isCompleted;
+        try {
+            isCompleted = batchMessage.waitFinish(batchProcessTimeout);
+        } catch (InterruptedException e) {
+            logger.error("Interrupted when waiting messages to be finished.", e);
+            throw new RuntimeException(e);
+        }
+        boolean isSuccess = batchMessage.isSuccess();
+        return isCompleted && isSuccess;
+    }
+
+    @Override
+    public void ack() throws CanalClientException {
+        try {
+            if (this.lastGetBatchMessage != null) {
+                this.lastGetBatchMessage.ack();
+            }
+        } catch (Throwable e) {
+            if (this.lastGetBatchMessage != null) {
+                this.lastGetBatchMessage.fail();
+            }
+        } finally {
+            this.lastGetBatchMessage = null;
+        }
+    }
+
+    @Override
+    public void rollback() throws CanalClientException {
+        try {
+            if (this.lastGetBatchMessage != null) {
+                this.lastGetBatchMessage.fail();
+            }
+        } finally {
+            this.lastGetBatchMessage = null;
+        }
+    }
+
+}

+ 1 - 0
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -5,6 +5,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
+import com.alibaba.otter.canal.client.ConsumerBatchMessage;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.acl.common.SessionCredentials;

+ 5 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -69,6 +69,11 @@ public class CanalConstants {
     public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE       = ROOT + "." + "mq.kafka.kerberos.enable";
     public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE       = ROOT + "." + "mq.kafka.kerberos.enable";
     public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
     public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
     public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
     public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
+    public static final String CANAL_MQ_USERNAME                    = ROOT + "." + "mq.username";
+    public static final String CANAL_MQ_PASSWORD                    = ROOT + "." + "mq.password";
+    public static final String CANAL_MQ_VHOST                       = ROOT + "." + "mq.vhost";
+    public static final String CANAL_MQ_ALIYUN_UID                  = ROOT + "." + "mq.aliyunuid";
+    public static final String CANAL_MQ_EXCHANGE                    = ROOT + "." + "mq.exchange";
 
 
     public static String getInstanceModeKey(String destination) {
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 28 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStarter.java

@@ -11,6 +11,7 @@ import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.deployer.admin.CanalAdminController;
 import com.alibaba.otter.canal.deployer.admin.CanalAdminController;
 import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
 import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
+import com.alibaba.otter.canal.rabbitmq.CanalRabbitMQProducer;
 import com.alibaba.otter.canal.server.CanalMQStarter;
 import com.alibaba.otter.canal.server.CanalMQStarter;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
 
 
@@ -64,6 +65,8 @@ public class CanalStarter {
             canalMQProducer = new CanalKafkaProducer();
             canalMQProducer = new CanalKafkaProducer();
         } else if (serverMode.equalsIgnoreCase("rocketmq")) {
         } else if (serverMode.equalsIgnoreCase("rocketmq")) {
             canalMQProducer = new CanalRocketMQProducer();
             canalMQProducer = new CanalRocketMQProducer();
+        } else if (serverMode.equalsIgnoreCase("rabbitmq")) {
+            canalMQProducer = new CanalRabbitMQProducer();
         }
         }
 
 
         if (canalMQProducer != null) {
         if (canalMQProducer != null) {
@@ -260,6 +263,31 @@ public class CanalStarter {
             mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
             mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
         }
         }
 
 
+        String vhost = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_VHOST);
+        if (!StringUtils.isEmpty(vhost)) {
+            mqProperties.setVhost(vhost);
+        }
+
+        String username = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_USERNAME);
+        if (!StringUtils.isEmpty(username)) {
+            mqProperties.setUsername(username);
+        }
+
+        String password = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_PASSWORD);
+        if (!StringUtils.isEmpty(password)) {
+            mqProperties.setPassword(password);
+        }
+
+        String aliyunUID = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ALIYUN_UID);
+        if (!StringUtils.isEmpty(aliyunUID)) {
+            mqProperties.setAliyunUID(Long.valueOf(aliyunUID));
+        }
+
+        String exchange = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_EXCHANGE);
+        if (!StringUtils.isEmpty(exchange)) {
+            mqProperties.setExchange(exchange);
+        }
+
         for (Object key : properties.keySet()) {
         for (Object key : properties.keySet()) {
             key = StringUtils.trim(key.toString());
             key = StringUtils.trim(key.toString());
             if (((String) key).startsWith(CanalConstants.CANAL_MQ_PROPERTIES)) {
             if (((String) key).startsWith(CanalConstants.CANAL_MQ_PROPERTIES)) {

+ 5 - 0
deployer/src/main/resources/canal.properties

@@ -130,6 +130,11 @@ canal.mq.producerGroup = test
 canal.mq.accessChannel = local
 canal.mq.accessChannel = local
 # aliyun mq namespace
 # aliyun mq namespace
 #canal.mq.namespace =
 #canal.mq.namespace =
+canal.mq.vhost=
+canal.mq.exchange=
+canal.mq.username=
+canal.mq.password=
+canal.mq.aliyunuid=
 
 
 ##################################################
 ##################################################
 #########     Kafka Kerberos Info    #############
 #########     Kafka Kerberos Info    #############

+ 4 - 1
deployer/src/main/resources/logback.xml

@@ -77,7 +77,10 @@
 		<level value="INFO" />
 		<level value="INFO" />
 		<appender-ref ref="CANAL-ROOT" />
 		<appender-ref ref="CANAL-ROOT" />
 	</logger>
 	</logger>
-
+	<logger name="com.alibaba.otter.canal.rabbitmq" additivity="false">
+		<level value="INFO" />
+		<appender-ref ref="CANAL-ROOT" />
+	</logger>
 	<root level="WARN">
 	<root level="WARN">
 		<!-- <appender-ref ref="STDOUT"/>  -->
 		<!-- <appender-ref ref="STDOUT"/>  -->
 		<appender-ref ref="CANAL-ROOT" />
 		<appender-ref ref="CANAL-ROOT" />

+ 5 - 5
example/src/main/java/com/alibaba/otter/canal/example/SimpleCanalClientTest.java

@@ -15,17 +15,17 @@ import com.alibaba.otter.canal.common.utils.AddressUtils;
 public class SimpleCanalClientTest extends AbstractCanalClientTest {
 public class SimpleCanalClientTest extends AbstractCanalClientTest {
 
 
     public SimpleCanalClientTest(String destination){
     public SimpleCanalClientTest(String destination){
-        super(destination);
-    }
+                super(destination);
+            }
 
 
     public static void main(String args[]) {
     public static void main(String args[]) {
         // 根据ip,直接创建链接,无HA的功能
         // 根据ip,直接创建链接,无HA的功能
         String destination = "example";
         String destination = "example";
         String ip = AddressUtils.getHostIp();
         String ip = AddressUtils.getHostIp();
         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
-            destination,
-            "canal",
-            "canal");
+                destination,
+                "canal",
+                "canal");
 
 
         final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
         final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
         clientTest.setConnector(connector);
         clientTest.setConnector(connector);

+ 11 - 0
pom.xml

@@ -101,6 +101,7 @@
         <javadoc_skip>true</javadoc_skip>
         <javadoc_skip>true</javadoc_skip>
         <spring_version>3.2.18.RELEASE</spring_version>
         <spring_version>3.2.18.RELEASE</spring_version>
         <rocketmq_version>4.5.2</rocketmq_version>
         <rocketmq_version>4.5.2</rocketmq_version>
+        <rabbitmq_version>5.5.0</rabbitmq_version>
         <maven-jacoco-plugin.version>0.8.3</maven-jacoco-plugin.version>
         <maven-jacoco-plugin.version>0.8.3</maven-jacoco-plugin.version>
         <maven-surefire.version>2.22.1</maven-surefire.version>
         <maven-surefire.version>2.22.1</maven-surefire.version>
         <argline>-server -Xms512m -Xmx1024m -Dfile.encoding=UTF-8
         <argline>-server -Xms512m -Xmx1024m -Dfile.encoding=UTF-8
@@ -321,6 +322,16 @@
                 <artifactId>rocketmq-acl</artifactId>
                 <artifactId>rocketmq-acl</artifactId>
                 <version>${rocketmq_version}</version>
                 <version>${rocketmq_version}</version>
             </dependency>
             </dependency>
+            <dependency>
+                <groupId>com.rabbitmq</groupId>
+                <artifactId>amqp-client</artifactId>
+                <version>${rabbitmq_version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.alibaba.mq-amqp</groupId>
+                <artifactId>mq-amqp-client</artifactId>
+                <version>1.0.3</version>
+            </dependency>
             <dependency>
             <dependency>
                 <groupId>javax.annotation</groupId>
                 <groupId>javax.annotation</groupId>
                 <artifactId>javax.annotation-api</artifactId>
                 <artifactId>javax.annotation-api</artifactId>

+ 8 - 0
server/pom.xml

@@ -45,6 +45,14 @@
 			<groupId>org.apache.rocketmq</groupId>
 			<groupId>org.apache.rocketmq</groupId>
 			<artifactId>rocketmq-acl</artifactId>
 			<artifactId>rocketmq-acl</artifactId>
 		</dependency>
 		</dependency>
+		<dependency>
+			<groupId>com.rabbitmq</groupId>
+			<artifactId>amqp-client</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.alibaba.mq-amqp</groupId>
+			<artifactId>mq-amqp-client</artifactId>
+		</dependency>
 		<!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
 		<!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
 		<dependency>
 		<dependency>
 			<groupId>org.jboss.netty</groupId>
 			<groupId>org.jboss.netty</groupId>

+ 45 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -33,6 +33,11 @@ public class MQProperties {
     private boolean    kerberosEnable         = false;           // kafka集群是否启动Kerberos认证
     private boolean    kerberosEnable         = false;           // kafka集群是否启动Kerberos认证
     private String     kerberosKrb5FilePath   = "";              // 启动Kerberos认证时配置为krb5.conf文件的路径
     private String     kerberosKrb5FilePath   = "";              // 启动Kerberos认证时配置为krb5.conf文件的路径
     private String     kerberosJaasFilePath   = "";              // 启动Kerberos认证时配置为jaas.conf文件的路径
     private String     kerberosJaasFilePath   = "";              // 启动Kerberos认证时配置为jaas.conf文件的路径
+    private String     username               = "";              // rabbitmq 账号
+    private String     password               = "";              // rabbitmq 密码
+    private String     vhost                  = "";              // rabbitmq 密码
+    private long       aliyunUID              = 0;               // aliyun 用户ID rabbitmq 阿里云需要使用
+    private String     exchange               = "";       // rabbitmq 交换机
 
 
     public static class CanalDestination {
     public static class CanalDestination {
 
 
@@ -276,6 +281,46 @@ public class MQProperties {
         this.kerberosJaasFilePath = kerberosJaasFilePath;
         this.kerberosJaasFilePath = kerberosJaasFilePath;
     }
     }
 
 
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setVhost(String vhost) {
+        this.vhost = vhost;
+    }
+
+    public String getVhost() {
+        return vhost;
+    }
+
+    public long getAliyunUID() {
+        return aliyunUID;
+    }
+
+    public void setAliyunUID(long aliyunUID) {
+        this.aliyunUID = aliyunUID;
+    }
+
+    public String getExchange() {
+        return exchange;
+    }
+
+    public void setExchange(String exchange) {
+        this.exchange = exchange;
+    }
+
     @Override
     @Override
     public String toString() {
     public String toString() {
         return "MQProperties{" + "servers='" + servers + '\'' + ", retries=" + retries + ", batchSize=" + batchSize
         return "MQProperties{" + "servers='" + servers + '\'' + ", retries=" + retries + ", batchSize=" + batchSize

+ 55 - 0
server/src/main/java/com/alibaba/otter/canal/rabbitmq/AliyunCredentialsProvider.java

@@ -0,0 +1,55 @@
+/**
+ * aliyun amqp协议 账号类
+ * 暂不支持STS授权情况
+ */
+package com.alibaba.otter.canal.rabbitmq;
+
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+
+import com.alibaba.mq.amqp.utils.UserUtils;
+import com.rabbitmq.client.impl.CredentialsProvider;
+
+
+public class AliyunCredentialsProvider implements CredentialsProvider {
+
+
+    /**
+     * Access Key ID
+     */
+    private final String AliyunAccessKey;
+
+    /**
+     * Access Key Secret
+     */
+    private final String AliyunAccessSecret;
+
+    /**
+     * 资源主账号ID
+     */
+    private final long resourceOwnerId;
+
+
+    public AliyunCredentialsProvider(final String accessKey, final String accessSecret, final long resourceOwnerId) {
+        this.AliyunAccessKey = accessKey;
+        this.AliyunAccessSecret = accessSecret;
+        this.resourceOwnerId = resourceOwnerId;
+    }
+
+
+    @Override
+    public String getUsername() {
+        return UserUtils.getUserName(AliyunAccessKey, resourceOwnerId);
+    }
+
+    @Override
+    public String getPassword() {
+        try {
+            return UserUtils.getPassord(AliyunAccessSecret);
+        } catch (InvalidKeyException | NoSuchAlgorithmException ignored) {
+        }
+        return null;
+    }
+
+
+}

+ 119 - 0
server/src/main/java/com/alibaba/otter/canal/rabbitmq/CanalRabbitMQProducer.java

@@ -0,0 +1,119 @@
+package com.alibaba.otter.canal.rabbitmq;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.CanalMessageSerializer;
+import com.alibaba.otter.canal.common.MQMessageUtils;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.server.exception.CanalServerException;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+public class CanalRabbitMQProducer implements CanalMQProducer {
+
+    private static final Logger logger = LoggerFactory.getLogger(CanalRabbitMQProducer.class);
+    private MQProperties        mqProperties;
+    private Connection          connect;
+    private Channel             channel;
+
+    @Override
+    public void init(MQProperties mqProperties) {
+        this.mqProperties = mqProperties;
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost(mqProperties.getServers());
+        if (mqProperties.getAliyunAccessKey().length() > 0 && mqProperties.getAliyunSecretKey().length() > 0
+            && mqProperties.getAliyunUID() > 0) {
+            factory.setCredentialsProvider(new AliyunCredentialsProvider(mqProperties.getAliyunAccessKey(),
+                mqProperties.getAliyunSecretKey(),
+                mqProperties.getAliyunUID()));
+        } else {
+            factory.setUsername(mqProperties.getUsername());
+            factory.setPassword(mqProperties.getPassword());
+        }
+        factory.setVirtualHost(mqProperties.getVhost());
+        try {
+            connect = factory.newConnection();
+            channel = connect.createChannel();
+            // channel.exchangeDeclare(mqProperties.getExchange(), "topic");
+        } catch (IOException | TimeoutException ex) {
+            throw new CanalServerException("Start RabbitMQ producer error", ex);
+        }
+    }
+
+    @Override
+    public void send(MQProperties.CanalDestination canalDestination, Message message,
+                     Callback callback) throws IOException {
+        try {
+            if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
+                // 动态topic
+                Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
+                    .messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
+
+                for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
+                    String topicName = entry.getKey().replace('.', '_');
+                    com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
+                    send(canalDestination, topicName, messageSub);
+                }
+            } else {
+                send(canalDestination, canalDestination.getTopic(), message);
+            }
+            callback.commit();
+        } catch (Throwable e) {
+            callback.rollback();
+        }
+    }
+
+    private void send(MQProperties.CanalDestination canalDestination, String topicName,
+                      Message messageSub) throws Exception {
+        if (!mqProperties.getFlatMessage()) {
+            byte[] message = CanalMessageSerializer.serializer(messageSub, mqProperties.isFilterTransactionEntry());
+            if (logger.isDebugEnabled()) {
+                logger.debug("send message:{} to destination:{}", message, canalDestination.getCanalDestination());
+            }
+            sendMessage(topicName, message);
+        } else {
+            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(messageSub);
+            if (flatMessages != null) {
+                for (FlatMessage flatMessage : flatMessages) {
+                    byte[] message = JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue).getBytes();
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("send message:{} to destination:{}",
+                            message,
+                            canalDestination.getCanalDestination());
+                    }
+                    sendMessage(topicName, message);
+                }
+            }
+        }
+
+    }
+
+    private void sendMessage(String queueName, byte[] message) throws Exception {
+        // tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey
+        channel.basicPublish(mqProperties.getExchange(), queueName, null, message);
+    }
+
+    @Override
+    public void stop() {
+        logger.info("## Stop RabbitMQ producer##");
+        try {
+            this.connect.close();
+            this.channel.close();
+        } catch (IOException | TimeoutException ex) {
+            throw new CanalServerException("Stop RabbitMQ producer error", ex);
+        }
+    }
+}