Browse Source

Support seamless docking with Alibaba Cloud RocketMQ commercial service (#1849)

Heng Du 6 years ago
parent
commit
d0a83aad27

+ 40 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -33,7 +33,14 @@ public class CanalClientConfig {
     // aliyun ak/sk
     private String             accessKey;
     private String             secretKey;
-
+    // 是否启用消息轨迹
+    private boolean            enableMessageTrace;
+    // 在使用阿里云商业化mq服务时,如果想使用云上消息轨迹功能,请设置此配置为true
+    private String             accessChannel;
+    // 用于使用开源RocketMQ时,设置自定义的消息轨迹topic
+    private String             customizedTraceTopic;
+    // 开源RocketMQ命名空间
+    private String             namespace;
     // canal adapters 配置
     private List<CanalAdapter> canalAdapters;
 
@@ -133,6 +140,38 @@ public class CanalClientConfig {
         this.canalAdapters = canalAdapters;
     }
 
+    public boolean isEnableMessageTrace() {
+        return enableMessageTrace;
+    }
+
+    public void setEnableMessageTrace(boolean enableMessageTrace) {
+        this.enableMessageTrace = enableMessageTrace;
+    }
+
+    public String getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(String accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
+    public String getCustomizedTraceTopic() {
+        return customizedTraceTopic;
+    }
+
+    public void setCustomizedTraceTopic(String customizedTraceTopic) {
+        this.customizedTraceTopic = customizedTraceTopic;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
     public static class CanalAdapter {
 
         private String      instance; // 实例名

+ 0 - 1
client-adapter/launcher/pom.xml

@@ -62,7 +62,6 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
-            <version>4.3.0</version>
         </dependency>
         <!-- 单独引入kafka依赖 -->
         <dependency>

+ 5 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -130,7 +130,11 @@ public class CanalAdapterLoader {
                         canalOuterAdapterGroups,
                         canalClientConfig.getAccessKey(),
                         canalClientConfig.getSecretKey(),
-                        canalClientConfig.getFlatMessage());
+                        canalClientConfig.getFlatMessage(),
+                        canalClientConfig.isEnableMessageTrace(),
+                        canalClientConfig.getCustomizedTraceTopic(),
+                        canalClientConfig.getAccessChannel(),
+                        canalClientConfig.getNamespace());
                     canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                     rocketMQWorker.start();
 

+ 24 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -40,6 +40,30 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
     }
 
+    public CanalAdapterRocketMQWorker(CanalClientConfig canalClientConfig, String nameServers, String topic,
+        String groupId, List<List<OuterAdapter>> canalOuterAdapters, String accessKey,
+        String secretKey, boolean flatMessage, boolean enableMessageTrace,
+        String customizedTraceTopic, String accessChannel, String namespace) {
+        super(canalOuterAdapters);
+        this.canalClientConfig = canalClientConfig;
+        this.topic = topic;
+        this.flatMessage = flatMessage;
+        super.canalDestination = topic;
+        super.groupId = groupId;
+        this.connector = new RocketMQCanalConnector(nameServers,
+            topic,
+            groupId,
+            accessKey,
+            secretKey,
+            canalClientConfig.getBatchSize(),
+            flatMessage,
+            enableMessageTrace,
+            customizedTraceTopic,
+            accessChannel,
+            namespace);
+        logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
+    }
+
     @Override
     protected void process() {
         while (!running) {

+ 6 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -10,7 +10,7 @@ canal.conf:
   mode: tcp # kafka rocketMQ
   canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-#  mqServers: 127.0.0.1:9092 #or rocketmq
+#  mqServers: 127.0.0.1:9092 #or rocketmq nameservers
 #  flatMessage: true
   batchSize: 500
   syncBatchSize: 1000
@@ -18,6 +18,11 @@ canal.conf:
   timeout:
   accessKey:
   secretKey:
+# enableMessageTrace:
+# accessChannel:
+# customizedTraceTopic:
+# namespace:
+
 #  srcDataSources:
 #    defaultDS:
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true

+ 2 - 2
client/pom.xml

@@ -106,8 +106,8 @@
 			<artifactId>rocketmq-client</artifactId>
 		</dependency>
 		<dependency>
-			<groupId>com.aliyun.openservices</groupId>
-			<artifactId>aliware-apache-rocketmq-cloud</artifactId>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-acl</artifactId>
 		</dependency>
 		<!-- 客户端要使用请单独引入kafka-clients依赖 -->
 		<dependency>

+ 39 - 7
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -6,6 +6,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
@@ -24,8 +27,6 @@ import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import com.google.common.collect.Lists;
 
 /**
@@ -40,7 +41,8 @@ import com.google.common.collect.Lists;
  */
 public class RocketMQCanalConnector implements CanalMQConnector {
 
-    private static final Logger                 logger              = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+    private static final Logger                 logger               = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+    private static final String                 CLOUD_ACCESS_CHANNEL = "cloud";
 
     private String                              nameServer;
     private String                              topic;
@@ -54,6 +56,26 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
     private String                              accessKey;
     private String                              secretKey;
+    private String                              customizedTraceTopic;
+    private boolean                             enableMessageTrace = false;
+    private String                              accessChannel;
+    private String                              namespace;
+
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, String accessKey,
+        String secretKey, Integer batchSize, boolean flatMessage, boolean enableMessageTrace,
+        String customizedTraceTopic, String accessChannel, String namespace) {
+        this(nameServer, topic, groupName, accessKey, secretKey, batchSize, flatMessage, enableMessageTrace, customizedTraceTopic, accessChannel);
+        this.namespace = namespace;
+    }
+
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, String accessKey,
+        String secretKey, Integer batchSize, boolean flatMessage, boolean enableMessageTrace,
+        String customizedTraceTopic, String accessChannel) {
+        this(nameServer, topic, groupName, accessKey, secretKey, batchSize, flatMessage);
+        this.enableMessageTrace = enableMessageTrace;
+        this.customizedTraceTopic = customizedTraceTopic;
+        this.accessChannel = accessChannel;
+    }
 
     public RocketMQCanalConnector(String nameServer, String topic, String groupName, Integer batchSize,
                                   boolean flatMessage){
@@ -73,16 +95,24 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     }
 
     public void connect() throws CanalClientException {
-
         RPCHook rpcHook = null;
         if (null != accessKey && accessKey.length() > 0 && null != secretKey && secretKey.length() > 0) {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(accessKey);
             sessionCredentials.setSecretKey(secretKey);
-            rpcHook = new ClientRPCHook(sessionCredentials);
+            rpcHook = new AclClientRPCHook(sessionCredentials);
         }
-        rocketMQConsumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely());
+
+        rocketMQConsumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely(), enableMessageTrace, customizedTraceTopic);
         rocketMQConsumer.setVipChannelEnabled(false);
+        if (CLOUD_ACCESS_CHANNEL.equals(this.accessChannel)) {
+            rocketMQConsumer.setAccessChannel(AccessChannel.CLOUD);
+        }
+
+        if (!StringUtils.isEmpty(this.namespace)) {
+            rocketMQConsumer.setNamespace(this.namespace);
+        }
+
         if (!StringUtils.isBlank(nameServer)) {
             rocketMQConsumer.setNamesrvAddr(nameServer);
         }
@@ -131,7 +161,9 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     }
 
     private boolean process(List<MessageExt> messageExts) {
-        logger.info("Get Message:{}", messageExts);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Get Message: {}", messageExts);
+        }
         List messageList = Lists.newArrayList();
         for (MessageExt messageExt : messageExts) {
             byte[] data = messageExt.getBody();

+ 6 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -53,6 +53,12 @@ public class CanalConstants {
     public static final String CANAL_ALIYUN_ACCESSKEY            = ROOT + "." + "aliyun.accessKey";
     public static final String CANAL_ALIYUN_SECRETKEY            = ROOT + "." + "aliyun.secretKey";
     public static final String CANAL_MQ_PROPERTIES               = ROOT + "." + "mq.properties";
+    public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE     = ROOT + "." + "mq.enableMessageTrace";
+    public static final String CANAL_MQ_ACCESS_CHANNEL           = ROOT + "." + "mq.accessChannel";
+    public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC   = ROOT + "." + "mq.customizedTraceTopic";
+    public static final String CANAL_MQ_NAMESPACE                = ROOT + "." + "mq.namespace";
+
+
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 20 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -207,6 +207,26 @@ public class CanalStater {
             mqProperties.setProducerGroup(producerGroup);
         }
 
+        String enableMessageTrace = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE);
+        if (!StringUtils.isEmpty(enableMessageTrace)) {
+            mqProperties.setEnableMessageTrace(Boolean.valueOf(enableMessageTrace));
+        }
+
+        String accessChannel = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ACCESS_CHANNEL);
+        if (!StringUtils.isEmpty(accessChannel)) {
+            mqProperties.setAccessChannel(accessChannel);
+        }
+
+        String customizedTraceTopic = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC);
+        if (!StringUtils.isEmpty(customizedTraceTopic)) {
+            mqProperties.setCustomizedTraceTopic(customizedTraceTopic);
+        }
+
+        String namespace = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_NAMESPACE);
+        if (!StringUtils.isEmpty(namespace)) {
+            mqProperties.setNamespace(namespace);
+        }
+
         for (Object key : properties.keySet()) {
             key = StringUtils.trim(key.toString());
             if (((String) key).startsWith(CanalConstants.CANAL_MQ_PROPERTIES)) {

+ 6 - 0
deployer/src/main/resources/canal.properties

@@ -118,3 +118,9 @@ canal.mq.acks = all
 # use transaction for kafka flatMessage batch produce
 canal.mq.transaction = true
 #canal.mq.properties. =
+canal.mq.producerGroup = test
+# Set this value to "cloud", if you want open message trace feature in aliyun.
+canal.mq.accessChannel = local
+# aliyun mq namespace
+#canal.mq.namespace =
+

+ 5 - 1
example/src/main/java/com/alibaba/otter/canal/example/rocketmq/AbstractRocektMQTest.java

@@ -7,5 +7,9 @@ public abstract class AbstractRocektMQTest extends BaseCanalClientTest {
     public static String topic       = "example";
     public static String groupId     = "group";
     public static String nameServers = "localhost:9876";
-
+    public static String accessKey   = "";
+    public static String secretKey   = "";
+    public static boolean enableMessageTrace = false;
+    public static String accessChannel = "local";
+    public static String namespace = "";
 }

+ 17 - 8
example/src/main/java/com/alibaba/otter/canal/example/rocketmq/CanalRocketMQClientExample.java

@@ -1,16 +1,13 @@
 package com.alibaba.otter.canal.example.rocketmq;
 
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.protocol.Message;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.example.kafka.AbstractKafkaTest;
-import com.alibaba.otter.canal.protocol.Message;
-
 /**
  * Kafka client example
  *
@@ -34,15 +31,27 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                                                         }
                                                     };
 
-    public CanalRocketMQClientExample(String nameServers, String topic, String groupId){
+    public CanalRocketMQClientExample(String nameServers, String topic, String groupId) {
         connector = new RocketMQCanalConnector(nameServers, topic, groupId, 500, false);
     }
 
+    public CanalRocketMQClientExample(String nameServers, String topic, String groupId, boolean enableMessageTrace,
+        String accessKey, String secretKey, String accessChannel, String namespace) {
+        connector = new RocketMQCanalConnector(nameServers, topic, groupId, accessKey,
+            secretKey, -1, false, enableMessageTrace,
+            null, accessChannel, namespace);
+    }
+
     public static void main(String[] args) {
         try {
             final CanalRocketMQClientExample rocketMQClientExample = new CanalRocketMQClientExample(nameServers,
                 topic,
-                groupId);
+                groupId,
+                enableMessageTrace,
+                accessKey,
+                secretKey,
+                accessChannel,
+                namespace);
             logger.info("## Start the rocketmq consumer: {}-{}", topic, groupId);
             rocketMQClientExample.start();
             logger.info("## The canal rocketmq consumer is running now ......");
@@ -108,7 +117,7 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                 connector.connect();
                 connector.subscribe();
                 while (running) {
-                    List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
+                    List<Message> messages = connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS); // 获取message
                     for (Message message : messages) {
                         long batchId = message.getId();
                         int size = message.getEntries().size();

+ 6 - 5
pom.xml

@@ -99,6 +99,7 @@
         <java_target_version>1.8</java_target_version>
         <file_encoding>UTF-8</file_encoding>
         <spring_version>3.2.18.RELEASE</spring_version>
+        <rocketmq_version>4.5.1</rocketmq_version>
         <maven-jacoco-plugin.version>0.8.3</maven-jacoco-plugin.version>
         <maven-surefire.version>2.22.1</maven-surefire.version>
         <argline>-server -Xms512m -Xmx1024m -Dfile.encoding=UTF-8
@@ -309,14 +310,14 @@
                 <version>3.0.2</version>
             </dependency>
             <dependency>
-                <groupId>com.aliyun.openservices</groupId>
-                <artifactId>aliware-apache-rocketmq-cloud</artifactId>
-                <version>1.0</version>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-client</artifactId>
+                <version>${rocketmq_version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
-                <artifactId>rocketmq-client</artifactId>
-                <version>4.3.0</version>
+                <artifactId>rocketmq-acl</artifactId>
+                <version>${rocketmq_version}</version>
             </dependency>
             <dependency>
                 <groupId>javax.annotation</groupId>

+ 4 - 4
server/pom.xml

@@ -41,6 +41,10 @@
 			<groupId>org.apache.rocketmq</groupId>
 			<artifactId>rocketmq-client</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-acl</artifactId>
+		</dependency>
 		<!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
 		<dependency>
 			<groupId>org.jboss.netty</groupId>
@@ -54,9 +58,5 @@
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
-		<dependency>
-			<groupId>com.aliyun.openservices</groupId>
-			<artifactId>aliware-apache-rocketmq-cloud</artifactId>
-		</dependency>
 	</dependencies>
 </project>

+ 63 - 2
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -3,7 +3,7 @@ package com.alibaba.otter.canal.common;
 import java.util.Properties;
 
 /**
- * kafka 配置项
+ * MQ 配置项
  *
  * @author machengyuan 2018-6-11 下午05:30:49
  * @version 1.0.0
@@ -27,7 +27,10 @@ public class MQProperties {
     private String     aliyunSecretKey        = "";
     private boolean    transaction            = false;           // 是否开启事务
     private Properties properties             = new Properties();
-
+    private boolean    enableMessageTrace     = false;
+    private String     accessChannel          = null;
+    private String     customizedTraceTopic   = null;
+    private String     namespace              = "";
     public static class CanalDestination {
 
         private String  canalDestination;
@@ -221,4 +224,62 @@ public class MQProperties {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
+
+    public boolean isEnableMessageTrace() {
+        return enableMessageTrace;
+    }
+
+    public void setEnableMessageTrace(boolean enableMessageTrace) {
+        this.enableMessageTrace = enableMessageTrace;
+    }
+
+    public String getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(String accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
+    public String getCustomizedTraceTopic() {
+        return customizedTraceTopic;
+    }
+
+    public void setCustomizedTraceTopic(String customizedTraceTopic) {
+        this.customizedTraceTopic = customizedTraceTopic;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    @Override public String toString() {
+        return "MQProperties{" +
+            "servers='" + servers + '\'' +
+            ", retries=" + retries +
+            ", batchSize=" + batchSize +
+            ", lingerMs=" + lingerMs +
+            ", maxRequestSize=" + maxRequestSize +
+            ", bufferMemory=" + bufferMemory +
+            ", filterTransactionEntry=" + filterTransactionEntry +
+            ", producerGroup='" + producerGroup + '\'' +
+            ", canalBatchSize=" + canalBatchSize +
+            ", canalGetTimeout=" + canalGetTimeout +
+            ", flatMessage=" + flatMessage +
+            ", compressionType='" + compressionType + '\'' +
+            ", acks='" + acks + '\'' +
+            ", aliyunAccessKey='" + aliyunAccessKey + '\'' +
+            ", aliyunSecretKey='" + aliyunSecretKey + '\'' +
+            ", transaction=" + transaction +
+            ", properties=" + properties +
+            ", enableMessageTrace=" + enableMessageTrace +
+            ", accessChannel='" + accessChannel + '\'' +
+            ", customizedTraceTopic='" + customizedTraceTopic + '\'' +
+            ", namespace='" + namespace + '\'' +
+            '}';
+    }
 }

+ 33 - 49
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -4,17 +4,20 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
@@ -23,14 +26,13 @@ import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 
 public class CanalRocketMQProducer implements CanalMQProducer {
 
     private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
     private DefaultMQProducer   defaultMQProducer;
     private MQProperties        mqProperties;
+    private static final String CLOUD_ACCESS_CHANNEL = "cloud";
 
     @Override
     public void init(MQProperties rocketMQProperties) {
@@ -41,10 +43,16 @@ public class CanalRocketMQProducer implements CanalMQProducer {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
             sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
-            rpcHook = new ClientRPCHook(sessionCredentials);
+            rpcHook = new AclClientRPCHook(sessionCredentials);
         }
 
-        defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), rpcHook);
+        defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), rpcHook, mqProperties.isEnableMessageTrace(), mqProperties.getCustomizedTraceTopic());
+        if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProperties.getAccessChannel())){
+            defaultMQProducer.setAccessChannel(AccessChannel.CLOUD);
+        }
+        if (!StringUtils.isEmpty(mqProperties.getNamespace())){
+            defaultMQProducer.setNamespace(mqProperties.getNamespace());
+        }
         defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
         defaultMQProducer.setRetryTimesWhenSendFailed(rocketMQProperties.getRetries());
         defaultMQProducer.setVipChannelEnabled(false);
@@ -80,6 +88,22 @@ public class CanalRocketMQProducer implements CanalMQProducer {
         }
     }
 
+    private void sendMessage(Message message, int partition) throws Exception{
+        SendResult sendResult = this.defaultMQProducer.send(message, new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                if (partition > mqs.size()) {
+                    return mqs.get(partition % mqs.size());
+                } else {
+                    return mqs.get(partition);
+                }
+            }
+        }, null);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Send Message Result: {}", sendResult);
+        }
+    }
+
     public void send(final MQProperties.CanalDestination destination, String topicName,
                      com.alibaba.otter.canal.protocol.Message data) throws Exception {
         if (!mqProperties.getFlatMessage()) {
@@ -102,17 +126,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 Message message = new Message(topicName,
                                     CanalMessageSerializer.serializer(dataPartition,
                                         mqProperties.isFilterTransactionEntry()));
-                                this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                                    @Override
-                                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                        if (index > mqs.size()) {
-                                            return mqs.get(index % mqs.size());
-                                        } else {
-                                            return mqs.get(index);
-                                        }
-                                    }
-                                }, null);
+                                sendMessage(message, index);
                             } catch (Exception e) {
                                 logger.error("send flat message to hashed partition error", e);
                                 throw e;
@@ -129,17 +143,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             destination.getCanalDestination(),
                             partition);
                     }
-                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                        @Override
-                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                            if (partition > mqs.size()) {
-                                return mqs.get(partition % mqs.size());
-                            } else {
-                                return mqs.get(partition);
-                            }
-                        }
-                    }, null);
+                    sendMessage(message, partition);
                 }
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
@@ -166,17 +170,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 try {
                                     Message message = new Message(topicName, JSON.toJSONString(flatMessagePart,
                                         SerializerFeature.WriteMapNullValue).getBytes());
-                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                                        @Override
-                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                            if (index > mqs.size()) {
-                                                return mqs.get(index % mqs.size());
-                                            } else {
-                                                return mqs.get(index);
-                                            }
-                                        }
-                                    }, null);
+                                    sendMessage(message, index);
                                 } catch (Exception e) {
                                     logger.error("send flat message to hashed partition error", e);
                                     throw e;
@@ -194,17 +188,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             }
                             Message message = new Message(topicName, JSON.toJSONString(flatMessage,
                                 SerializerFeature.WriteMapNullValue).getBytes());
-                            this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                                @Override
-                                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                    if (partition > mqs.size()) {
-                                        return mqs.get(partition % mqs.size());
-                                    } else {
-                                        return mqs.get(partition);
-                                    }
-                                }
-                            }, null);
+                            sendMessage(message, partition);
                         } catch (Exception e) {
                             logger.error("send flat message to fixed partition error", e);
                             throw e;