Pārlūkot izejas kodu

Merge pull request #1169 from ymwneu/supportRocketMQ

[ISSUE#1109]Support aliyun RocketMQ
agapple 6 gadi atpakaļ
vecāks
revīzija
c0bbe530f1

+ 4 - 3
client/pom.xml

@@ -101,12 +101,9 @@
 			<version>${spring_version}</version>
 			<scope>test</scope>
 		</dependency>
-        <!-- 客户端要使用请单独引入rocketmq-client依赖 -->
 		<dependency>
 			<groupId>org.apache.rocketmq</groupId>
 			<artifactId>rocketmq-client</artifactId>
-			<version>4.3.0</version>
-            <scope>provided</scope>
 		</dependency>
 		<!-- 客户端要使用请单独引入kafka-clients依赖 -->
 		<dependency>
@@ -122,6 +119,10 @@
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>com.aliyun.openservices</groupId>
+			<artifactId>aliware-apache-rocketmq-cloud</artifactId>
+		</dependency>
 	</dependencies>
 
 	<build>

+ 25 - 1
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -10,8 +12,10 @@ import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +51,9 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     private long                                batchProcessTimeout = 60 * 1000;
     private boolean                             flatMessage;
     private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
+    private String                              accessKey;
+    private String                              secretKey;
+
 
     public RocketMQCanalConnector(String nameServer, String topic, String groupName, boolean flatMessage){
         this.nameServer = nameServer;
@@ -56,8 +63,25 @@ public class RocketMQCanalConnector implements CanalMQConnector {
         this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
     }
 
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName,
+        String accessKey, String secretKey, boolean flatMessage){
+        this(nameServer, topic, groupName, flatMessage);
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
+    }
+
     public void connect() throws CanalClientException {
-        rocketMQConsumer = new DefaultMQPushConsumer(groupName);
+
+        RPCHook rpcHook = null;
+        if(null != accessKey && accessKey.length() > 0
+            && null != secretKey && secretKey.length() > 0){
+            SessionCredentials sessionCredentials = new SessionCredentials();
+            sessionCredentials.setAccessKey(accessKey);
+            sessionCredentials.setSecretKey(secretKey);
+            rpcHook = new ClientRPCHook(sessionCredentials);
+        }
+        rocketMQConsumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely());
+        rocketMQConsumer.setVipChannelEnabled(false);
         if (!StringUtils.isBlank(nameServer)) {
             rocketMQConsumer.setNamesrvAddr(nameServer);
         }

+ 10 - 0
pom.xml

@@ -302,6 +302,16 @@
                 <artifactId>jsr305</artifactId>
                 <version>3.0.2</version>
             </dependency>
+            <dependency>
+                <groupId>com.aliyun.openservices</groupId>
+                <artifactId>aliware-apache-rocketmq-cloud</artifactId>
+                <version>1.0</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-client</artifactId>
+                <version>4.3.0</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

+ 4 - 1
server/pom.xml

@@ -40,7 +40,6 @@
 		<dependency>
 			<groupId>org.apache.rocketmq</groupId>
 			<artifactId>rocketmq-client</artifactId>
-			<version>4.3.0</version>
 		</dependency>
 		<!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
 		<dependency>
@@ -55,5 +54,9 @@
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>com.aliyun.openservices</groupId>
+			<artifactId>aliware-apache-rocketmq-cloud</artifactId>
+		</dependency>
 	</dependencies>
 </project>

+ 17 - 0
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -22,6 +22,8 @@ public class MQProperties {
     private boolean flatMessage            = true;
     private String compressionType         = "none";
     private String acks                    = "all";
+    private String aliyunAccessKey         = "";
+    private String aliyunSecretKey         = "";
 
     public static class CanalDestination {
 
@@ -168,4 +170,19 @@ public class MQProperties {
         this.acks = acks;
     }
 
+    public String getAliyunAccessKey() {
+        return aliyunAccessKey;
+    }
+
+    public void setAliyunAccessKey(String aliyunAccessKey) {
+        this.aliyunAccessKey = aliyunAccessKey;
+    }
+
+    public String getAliyunSecretKey() {
+        return aliyunSecretKey;
+    }
+
+    public void setAliyunSecretKey(String aliyunSecretKey) {
+        this.aliyunSecretKey = aliyunSecretKey;
+    }
 }

+ 14 - 2
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -7,12 +7,15 @@ import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,10 +33,19 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     @Override
     public void init(MQProperties rocketMQProperties) {
         this.mqProperties = rocketMQProperties;
-        defaultMQProducer = new DefaultMQProducer();
+        RPCHook rpcHook = null;
+        if(rocketMQProperties.getAliyunAccessKey().length() > 0
+            && rocketMQProperties.getAliyunSecretKey().length() > 0){
+            SessionCredentials sessionCredentials = new SessionCredentials();
+            sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
+            sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
+            rpcHook = new ClientRPCHook(sessionCredentials);
+        }
+
+        defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), rpcHook);
         defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
-        defaultMQProducer.setProducerGroup(rocketMQProperties.getProducerGroup());
         defaultMQProducer.setRetryTimesWhenSendFailed(rocketMQProperties.getRetries());
+        defaultMQProducer.setVipChannelEnabled(false);
         logger.info("##Start RocketMQ producer##");
         try {
             defaultMQProducer.start();