Kaynağa Gözat

Merge remote-tracking branch 'origin/master'

mcy 5 yıl önce
ebeveyn
işleme
b43043f4e4
29 değiştirilmiş dosya ile 479 ekleme ve 180 silme
  1. 40 1
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 1 1
      client-adapter/launcher/pom.xml
  3. 5 1
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  4. 24 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  5. 6 1
      client-adapter/launcher/src/main/resources/application.yml
  6. 1 1
      client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java
  7. 2 2
      client/pom.xml
  8. 39 7
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
  9. 2 4
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java
  10. 44 37
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
  11. 6 6
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  12. 44 10
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java
  13. 12 0
      deployer/src/main/resources/canal.properties
  14. 1 0
      deployer/src/main/resources/spring/tsdb/h2-tsdb.xml
  15. 5 1
      example/src/main/java/com/alibaba/otter/canal/example/rocketmq/AbstractRocektMQTest.java
  16. 17 8
      example/src/main/java/com/alibaba/otter/canal/example/rocketmq/CanalRocketMQClientExample.java
  17. 6 2
      instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java
  18. 14 5
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  19. BIN
      logo.png
  20. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  21. 8 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  22. 2 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  23. 2 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
  24. 7 6
      pom.xml
  25. 4 4
      server/pom.xml
  26. 18 7
      server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java
  27. 93 1
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  28. 41 17
      server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java
  29. 33 49
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

+ 40 - 1
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -33,7 +33,14 @@ public class CanalClientConfig {
     // aliyun ak/sk
     private String             accessKey;
     private String             secretKey;
-
+    // 是否启用消息轨迹
+    private boolean            enableMessageTrace;
+    // 在使用阿里云商业化mq服务时,如果想使用云上消息轨迹功能,请设置此配置为true
+    private String             accessChannel;
+    // 用于使用开源RocketMQ时,设置自定义的消息轨迹topic
+    private String             customizedTraceTopic;
+    // 开源RocketMQ命名空间
+    private String             namespace;
     // canal adapters 配置
     private List<CanalAdapter> canalAdapters;
 
@@ -133,6 +140,38 @@ public class CanalClientConfig {
         this.canalAdapters = canalAdapters;
     }
 
+    public boolean isEnableMessageTrace() {
+        return enableMessageTrace;
+    }
+
+    public void setEnableMessageTrace(boolean enableMessageTrace) {
+        this.enableMessageTrace = enableMessageTrace;
+    }
+
+    public String getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(String accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
+    public String getCustomizedTraceTopic() {
+        return customizedTraceTopic;
+    }
+
+    public void setCustomizedTraceTopic(String customizedTraceTopic) {
+        this.customizedTraceTopic = customizedTraceTopic;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
     public static class CanalAdapter {
 
         private String      instance; // 实例名

+ 1 - 1
client-adapter/launcher/pom.xml

@@ -62,7 +62,7 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
-            <version>4.3.0</version>
+	    <version>4.5.1</version>
         </dependency>
         <!-- 单独引入kafka依赖 -->
         <dependency>

+ 5 - 1
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -130,7 +130,11 @@ public class CanalAdapterLoader {
                         canalOuterAdapterGroups,
                         canalClientConfig.getAccessKey(),
                         canalClientConfig.getSecretKey(),
-                        canalClientConfig.getFlatMessage());
+                        canalClientConfig.getFlatMessage(),
+                        canalClientConfig.isEnableMessageTrace(),
+                        canalClientConfig.getCustomizedTraceTopic(),
+                        canalClientConfig.getAccessChannel(),
+                        canalClientConfig.getNamespace());
                     canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                     rocketMQWorker.start();
 

+ 24 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -40,6 +40,30 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
     }
 
+    public CanalAdapterRocketMQWorker(CanalClientConfig canalClientConfig, String nameServers, String topic,
+        String groupId, List<List<OuterAdapter>> canalOuterAdapters, String accessKey,
+        String secretKey, boolean flatMessage, boolean enableMessageTrace,
+        String customizedTraceTopic, String accessChannel, String namespace) {
+        super(canalOuterAdapters);
+        this.canalClientConfig = canalClientConfig;
+        this.topic = topic;
+        this.flatMessage = flatMessage;
+        super.canalDestination = topic;
+        super.groupId = groupId;
+        this.connector = new RocketMQCanalConnector(nameServers,
+            topic,
+            groupId,
+            accessKey,
+            secretKey,
+            canalClientConfig.getBatchSize(),
+            flatMessage,
+            enableMessageTrace,
+            customizedTraceTopic,
+            accessChannel,
+            namespace);
+        logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
+    }
+
     @Override
     protected void process() {
         while (!running) {

+ 6 - 1
client-adapter/launcher/src/main/resources/application.yml

@@ -10,7 +10,7 @@ canal.conf:
   mode: tcp # kafka rocketMQ
   canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-#  mqServers: 127.0.0.1:9092 #or rocketmq
+#  mqServers: 127.0.0.1:9092 #or rocketmq nameservers
 #  flatMessage: true
   batchSize: 500
   syncBatchSize: 1000
@@ -18,6 +18,11 @@ canal.conf:
   timeout:
   accessKey:
   secretKey:
+# enableMessageTrace:
+# accessChannel:
+# customizedTraceTopic:
+# namespace:
+
 #  srcDataSources:
 #    defaultDS:
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true

+ 1 - 1
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -272,7 +272,7 @@ public class RdbSyncService {
             batchExecutor.execute(insertSql.toString(), values);
         } catch (SQLException e) {
             if (skipDupException
-                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001: 违反唯一约束条件"))) {
+                && (e.getMessage().contains("Duplicate entry") || e.getMessage().startsWith("ORA-00001:"))) {
                 // ignore
                 // TODO 增加更多关系数据库的主键冲突的错误码
             } else {

+ 2 - 2
client/pom.xml

@@ -106,8 +106,8 @@
 			<artifactId>rocketmq-client</artifactId>
 		</dependency>
 		<dependency>
-			<groupId>com.aliyun.openservices</groupId>
-			<artifactId>aliware-apache-rocketmq-cloud</artifactId>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-acl</artifactId>
 		</dependency>
 		<!-- 客户端要使用请单独引入kafka-clients依赖 -->
 		<dependency>

+ 39 - 7
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -6,6 +6,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
@@ -24,8 +27,6 @@ import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import com.google.common.collect.Lists;
 
 /**
@@ -40,7 +41,8 @@ import com.google.common.collect.Lists;
  */
 public class RocketMQCanalConnector implements CanalMQConnector {
 
-    private static final Logger                 logger              = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+    private static final Logger                 logger               = LoggerFactory.getLogger(RocketMQCanalConnector.class);
+    private static final String                 CLOUD_ACCESS_CHANNEL = "cloud";
 
     private String                              nameServer;
     private String                              topic;
@@ -54,6 +56,26 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
     private String                              accessKey;
     private String                              secretKey;
+    private String                              customizedTraceTopic;
+    private boolean                             enableMessageTrace = false;
+    private String                              accessChannel;
+    private String                              namespace;
+
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, String accessKey,
+        String secretKey, Integer batchSize, boolean flatMessage, boolean enableMessageTrace,
+        String customizedTraceTopic, String accessChannel, String namespace) {
+        this(nameServer, topic, groupName, accessKey, secretKey, batchSize, flatMessage, enableMessageTrace, customizedTraceTopic, accessChannel);
+        this.namespace = namespace;
+    }
+
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, String accessKey,
+        String secretKey, Integer batchSize, boolean flatMessage, boolean enableMessageTrace,
+        String customizedTraceTopic, String accessChannel) {
+        this(nameServer, topic, groupName, accessKey, secretKey, batchSize, flatMessage);
+        this.enableMessageTrace = enableMessageTrace;
+        this.customizedTraceTopic = customizedTraceTopic;
+        this.accessChannel = accessChannel;
+    }
 
     public RocketMQCanalConnector(String nameServer, String topic, String groupName, Integer batchSize,
                                   boolean flatMessage){
@@ -73,16 +95,24 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     }
 
     public void connect() throws CanalClientException {
-
         RPCHook rpcHook = null;
         if (null != accessKey && accessKey.length() > 0 && null != secretKey && secretKey.length() > 0) {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(accessKey);
             sessionCredentials.setSecretKey(secretKey);
-            rpcHook = new ClientRPCHook(sessionCredentials);
+            rpcHook = new AclClientRPCHook(sessionCredentials);
         }
-        rocketMQConsumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely());
+
+        rocketMQConsumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely(), enableMessageTrace, customizedTraceTopic);
         rocketMQConsumer.setVipChannelEnabled(false);
+        if (CLOUD_ACCESS_CHANNEL.equals(this.accessChannel)) {
+            rocketMQConsumer.setAccessChannel(AccessChannel.CLOUD);
+        }
+
+        if (!StringUtils.isEmpty(this.namespace)) {
+            rocketMQConsumer.setNamespace(this.namespace);
+        }
+
         if (!StringUtils.isBlank(nameServer)) {
             rocketMQConsumer.setNamesrvAddr(nameServer);
         }
@@ -131,7 +161,9 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     }
 
     private boolean process(List<MessageExt> messageExts) {
-        logger.info("Get Message:{}", messageExts);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Get Message: {}", messageExts);
+        }
         List messageList = Lists.newArrayList();
         for (MessageExt messageExt : messageExts) {
             byte[] data = messageExt.getBody();

+ 2 - 4
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java

@@ -706,10 +706,8 @@ public class QueryLogEvent extends LogEvent {
                          * That's why you must write status vars in growing
                          * order of code
                          */
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Query_log_event has unknown status vars (first has code: " + code
-                                         + "), skipping the rest of them");
-                        }
+                        logger.error("Query_log_event has unknown status vars (first has code: " + code
+                                     + "), skipping the rest of them");
                         break; // Break loop
                 }
             }

+ 44 - 37
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -10,49 +10,56 @@ import java.text.MessageFormat;
  */
 public class CanalConstants {
 
-    public static final String MDC_DESTINATION                   = "destination";
-    public static final String ROOT                              = "canal";
-    public static final String CANAL_ID                          = ROOT + "." + "id";
-    public static final String CANAL_IP                          = ROOT + "." + "ip";
-    public static final String CANAL_PORT                        = ROOT + "." + "port";
-    public static final String CANAL_METRICS_PULL_PORT           = ROOT + "." + "metrics.pull.port";
-    public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
-    public static final String CANAL_WITHOUT_NETTY               = ROOT + "." + "withoutNetty";
+    public static final String MDC_DESTINATION                       = "destination";
+    public static final String ROOT                                  = "canal";
+    public static final String CANAL_ID                              = ROOT + "." + "id";
+    public static final String CANAL_IP                              = ROOT + "." + "ip";
+    public static final String CANAL_PORT                            = ROOT + "." + "port";
+    public static final String CANAL_METRICS_PULL_PORT               = ROOT + "." + "metrics.pull.port";
+    public static final String CANAL_ZKSERVERS                       = ROOT + "." + "zkServers";
+    public static final String CANAL_WITHOUT_NETTY                   = ROOT + "." + "withoutNetty";
 
-    public static final String CANAL_DESTINATIONS                = ROOT + "." + "destinations";
-    public static final String CANAL_AUTO_SCAN                   = ROOT + "." + "auto.scan";
-    public static final String CANAL_AUTO_SCAN_INTERVAL          = ROOT + "." + "auto.scan.interval";
-    public static final String CANAL_CONF_DIR                    = ROOT + "." + "conf.dir";
-    public static final String CANAL_SERVER_MODE                 = ROOT + "." + "serverMode";
+    public static final String CANAL_DESTINATIONS                    = ROOT + "." + "destinations";
+    public static final String CANAL_AUTO_SCAN                       = ROOT + "." + "auto.scan";
+    public static final String CANAL_AUTO_SCAN_INTERVAL              = ROOT + "." + "auto.scan.interval";
+    public static final String CANAL_CONF_DIR                        = ROOT + "." + "conf.dir";
+    public static final String CANAL_SERVER_MODE                     = ROOT + "." + "serverMode";
 
-    public static final String CANAL_DESTINATION_SPLIT           = ",";
-    public static final String GLOBAL_NAME                       = "global";
+    public static final String CANAL_DESTINATION_SPLIT               = ",";
+    public static final String GLOBAL_NAME                           = "global";
 
-    public static final String INSTANCE_MODE_TEMPLATE            = ROOT + "." + "instance.{0}.mode";
-    public static final String INSTANCE_LAZY_TEMPLATE            = ROOT + "." + "instance.{0}.lazy";
-    public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE = ROOT + "." + "instance.{0}.manager.address";
-    public static final String INSTANCE_SPRING_XML_TEMPLATE      = ROOT + "." + "instance.{0}.spring.xml";
+    public static final String INSTANCE_MODE_TEMPLATE                = ROOT + "." + "instance.{0}.mode";
+    public static final String INSTANCE_LAZY_TEMPLATE                = ROOT + "." + "instance.{0}.lazy";
+    public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE     = ROOT + "." + "instance.{0}.manager.address";
+    public static final String INSTANCE_SPRING_XML_TEMPLATE          = ROOT + "." + "instance.{0}.spring.xml";
 
-    public static final String CANAL_DESTINATION_PROPERTY        = ROOT + ".instance.destination";
+    public static final String CANAL_DESTINATION_PROPERTY            = ROOT + ".instance.destination";
 
-    public static final String CANAL_SOCKETCHANNEL               = ROOT + "." + "socketChannel";
+    public static final String CANAL_SOCKETCHANNEL                   = ROOT + "." + "socketChannel";
 
-    public static final String CANAL_MQ_SERVERS                  = ROOT + "." + "mq.servers";
-    public static final String CANAL_MQ_RETRIES                  = ROOT + "." + "mq.retries";
-    public static final String CANAL_MQ_BATCHSIZE                = ROOT + "." + "mq.batchSize";
-    public static final String CANAL_MQ_LINGERMS                 = ROOT + "." + "mq.lingerMs";
-    public static final String CANAL_MQ_MAXREQUESTSIZE           = ROOT + "." + "mq.maxRequestSize";
-    public static final String CANAL_MQ_BUFFERMEMORY             = ROOT + "." + "mq.bufferMemory";
-    public static final String CANAL_MQ_CANALBATCHSIZE           = ROOT + "." + "mq.canalBatchSize";
-    public static final String CANAL_MQ_CANALGETTIMEOUT          = ROOT + "." + "mq.canalGetTimeout";
-    public static final String CANAL_MQ_FLATMESSAGE              = ROOT + "." + "mq.flatMessage";
-    public static final String CANAL_MQ_COMPRESSION_TYPE         = ROOT + "." + "mq.compressionType";
-    public static final String CANAL_MQ_ACKS                     = ROOT + "." + "mq.acks";
-    public static final String CANAL_MQ_TRANSACTION              = ROOT + "." + "mq.transaction";
-    public static final String CANAL_MQ_PRODUCERGROUP            = ROOT + "." + "mq.producerGroup";
-    public static final String CANAL_ALIYUN_ACCESSKEY            = ROOT + "." + "aliyun.accessKey";
-    public static final String CANAL_ALIYUN_SECRETKEY            = ROOT + "." + "aliyun.secretKey";
-    public static final String CANAL_MQ_PROPERTIES               = ROOT + "." + "mq.properties";
+    public static final String CANAL_MQ_SERVERS                      = ROOT + "." + "mq.servers";
+    public static final String CANAL_MQ_RETRIES                      = ROOT + "." + "mq.retries";
+    public static final String CANAL_MQ_BATCHSIZE                    = ROOT + "." + "mq.batchSize";
+    public static final String CANAL_MQ_LINGERMS                     = ROOT + "." + "mq.lingerMs";
+    public static final String CANAL_MQ_MAXREQUESTSIZE               = ROOT + "." + "mq.maxRequestSize";
+    public static final String CANAL_MQ_BUFFERMEMORY                 = ROOT + "." + "mq.bufferMemory";
+    public static final String CANAL_MQ_CANALBATCHSIZE               = ROOT + "." + "mq.canalBatchSize";
+    public static final String CANAL_MQ_CANALGETTIMEOUT              = ROOT + "." + "mq.canalGetTimeout";
+    public static final String CANAL_MQ_FLATMESSAGE                  = ROOT + "." + "mq.flatMessage";
+    public static final String CANAL_MQ_COMPRESSION_TYPE             = ROOT + "." + "mq.compressionType";
+    public static final String CANAL_MQ_ACKS                         = ROOT + "." + "mq.acks";
+    public static final String CANAL_MQ_TRANSACTION                  = ROOT + "." + "mq.transaction";
+    public static final String CANAL_MQ_PRODUCERGROUP                = ROOT + "." + "mq.producerGroup";
+    public static final String CANAL_ALIYUN_ACCESSKEY                = ROOT + "." + "aliyun.accessKey";
+    public static final String CANAL_ALIYUN_SECRETKEY                = ROOT + "." + "aliyun.secretKey";
+    public static final String CANAL_MQ_PROPERTIES                   = ROOT + "." + "mq.properties";
+    public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE         = ROOT + "." + "mq.enableMessageTrace";
+    public static final String CANAL_MQ_ACCESS_CHANNEL               = ROOT + "." + "mq.accessChannel";
+    public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC       = ROOT + "." + "mq.customizedTraceTopic";
+    public static final String CANAL_MQ_NAMESPACE                    = ROOT + "." + "mq.namespace";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE        = ROOT + "." + "mq.kafka.kerberos.enable";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH  = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
+    public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH  = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 6 - 6
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -152,6 +152,9 @@ public class CanalController {
                             try {
                                 MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                                 embededCanalServer.start(destination);
+                                if (canalMQStarter != null) {
+                                    canalMQStarter.startDestination(destination);
+                                }
                             } finally {
                                 MDC.remove(CanalConstants.MDC_DESTINATION);
                             }
@@ -160,6 +163,9 @@ public class CanalController {
                         public void processActiveExit() {
                             try {
                                 MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                                if (canalMQStarter != null) {
+                                    canalMQStarter.stopDestination(destination);
+                                }
                                 embededCanalServer.stop(destination);
                             } finally {
                                 MDC.remove(CanalConstants.MDC_DESTINATION);
@@ -234,9 +240,6 @@ public class CanalController {
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                         if (!config.getLazy() && !runningMonitor.isStart()) {
                             runningMonitor.start();
-                            if (canalMQStarter != null) {
-                                canalMQStarter.startDestination(destination);
-                            }
                         }
                     }
                 }
@@ -245,9 +248,6 @@ public class CanalController {
                     // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息
                     InstanceConfig config = instanceConfigs.remove(destination);
                     if (config != null) {
-                        if (canalMQStarter != null) {
-                            canalMQStarter.stopDestination(destination);
-                        }
                         embededCanalServer.stop(destination);
                         ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                         if (runningMonitor.isStart()) {

+ 44 - 10
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java

@@ -1,15 +1,5 @@
 package com.alibaba.otter.canal.deployer;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
 import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
@@ -18,6 +8,15 @@ import com.alibaba.otter.canal.spi.CanalMQProducer;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
 
 /**
  * Canal server 启动类
@@ -207,6 +206,41 @@ public class CanalStater {
             mqProperties.setProducerGroup(producerGroup);
         }
 
+        String enableMessageTrace = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE);
+        if (!StringUtils.isEmpty(enableMessageTrace)) {
+            mqProperties.setEnableMessageTrace(Boolean.valueOf(enableMessageTrace));
+        }
+
+        String accessChannel = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ACCESS_CHANNEL);
+        if (!StringUtils.isEmpty(accessChannel)) {
+            mqProperties.setAccessChannel(accessChannel);
+        }
+
+        String customizedTraceTopic = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC);
+        if (!StringUtils.isEmpty(customizedTraceTopic)) {
+            mqProperties.setCustomizedTraceTopic(customizedTraceTopic);
+        }
+
+        String namespace = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_NAMESPACE);
+        if (!StringUtils.isEmpty(namespace)) {
+            mqProperties.setNamespace(namespace);
+        }
+
+        String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
+        if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
+            mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
+        }
+
+        String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
+        if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
+            mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
+        }
+
+        String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
+        if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
+            mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
+        }
+
         for (Object key : properties.keySet()) {
             key = StringUtils.trim(key.toString());
             if (((String) key).startsWith(CanalConstants.CANAL_MQ_PROPERTIES)) {

+ 12 - 0
deployer/src/main/resources/canal.properties

@@ -118,3 +118,15 @@ canal.mq.acks = all
 # use transaction for kafka flatMessage batch produce
 canal.mq.transaction = true
 #canal.mq.properties. =
+canal.mq.producerGroup = test
+# Set this value to "cloud", if you want open message trace feature in aliyun.
+canal.mq.accessChannel = local
+# aliyun mq namespace
+#canal.mq.namespace =
+
+##################################################
+#########     Kafka Kerberos Info    #############
+##################################################
+canal.mq.kafka.kerberos.enable = false
+canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
+canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

+ 1 - 0
deployer/src/main/resources/spring/tsdb/h2-tsdb.xml

@@ -43,6 +43,7 @@
         <property name="testOnBorrow" value="false" />
         <property name="testOnReturn" value="false" />
         <property name="useUnfairLock" value="true" />
+        <property name="validationQuery" value="SELECT 1" />
 	</bean>
 
     <bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">

+ 5 - 1
example/src/main/java/com/alibaba/otter/canal/example/rocketmq/AbstractRocektMQTest.java

@@ -7,5 +7,9 @@ public abstract class AbstractRocektMQTest extends BaseCanalClientTest {
     public static String topic       = "example";
     public static String groupId     = "group";
     public static String nameServers = "localhost:9876";
-
+    public static String accessKey   = "";
+    public static String secretKey   = "";
+    public static boolean enableMessageTrace = false;
+    public static String accessChannel = "local";
+    public static String namespace = "";
 }

+ 17 - 8
example/src/main/java/com/alibaba/otter/canal/example/rocketmq/CanalRocketMQClientExample.java

@@ -1,16 +1,13 @@
 package com.alibaba.otter.canal.example.rocketmq;
 
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
+import com.alibaba.otter.canal.protocol.Message;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.example.kafka.AbstractKafkaTest;
-import com.alibaba.otter.canal.protocol.Message;
-
 /**
  * Kafka client example
  *
@@ -34,15 +31,27 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                                                         }
                                                     };
 
-    public CanalRocketMQClientExample(String nameServers, String topic, String groupId){
+    public CanalRocketMQClientExample(String nameServers, String topic, String groupId) {
         connector = new RocketMQCanalConnector(nameServers, topic, groupId, 500, false);
     }
 
+    public CanalRocketMQClientExample(String nameServers, String topic, String groupId, boolean enableMessageTrace,
+        String accessKey, String secretKey, String accessChannel, String namespace) {
+        connector = new RocketMQCanalConnector(nameServers, topic, groupId, accessKey,
+            secretKey, -1, false, enableMessageTrace,
+            null, accessChannel, namespace);
+    }
+
     public static void main(String[] args) {
         try {
             final CanalRocketMQClientExample rocketMQClientExample = new CanalRocketMQClientExample(nameServers,
                 topic,
-                groupId);
+                groupId,
+                enableMessageTrace,
+                accessKey,
+                secretKey,
+                accessChannel,
+                namespace);
             logger.info("## Start the rocketmq consumer: {}-{}", topic, groupId);
             rocketMQClientExample.start();
             logger.info("## The canal rocketmq consumer is running now ......");
@@ -108,7 +117,7 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                 connector.connect();
                 connector.subscribe();
                 while (running) {
-                    List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
+                    List<Message> messages = connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS); // 获取message
                     for (Message message : messages) {
                         long batchId = message.getId();
                         int size = message.getEntries().size();

+ 6 - 2
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

@@ -53,10 +53,14 @@ public class AbstractCanalInstance extends AbstractCanalLifeCycle implements Can
                 // 处理group的模式
                 List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
                 for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
-                    ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
+                    if(singleEventParser instanceof AbstractEventParser) {
+                        ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
+                    }
                 }
             } else {
-                ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
+                if(eventParser instanceof AbstractEventParser) {
+                    ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
+                }
             }
 
         }

+ 14 - 5
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -1,6 +1,7 @@
 package com.alibaba.otter.canal.instance.manager;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -117,7 +118,13 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
         } else {
             try {
                 File externalLibDir = new File(alarmHandlerPluginDir);
-                File[] jarFiles = externalLibDir.listFiles((dir1, name) -> name.endsWith(".jar"));
+                File[] jarFiles = externalLibDir.listFiles(new FilenameFilter() {
+
+                    @Override
+                    public boolean accept(File dir, String name) {
+                        return name.endsWith(".jar");
+                    }
+                });
                 if (jarFiles == null || jarFiles.length == 0) {
                     throw new IllegalStateException(String.format("alarmHandlerPluginDir [%s] can't find any name endswith \".jar\" file.",
                         alarmHandlerPluginDir));
@@ -126,14 +133,16 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
                 for (int i = 0; i < jarFiles.length; i++) {
                     urls[i] = jarFiles[i].toURI().toURL();
                 }
-                ClassLoader currentClassLoader = new URLClassLoader(urls, CanalInstanceWithManager.class.getClassLoader());
-                Class<CanalAlarmHandler> _alarmClass =
-                    (Class<CanalAlarmHandler>)currentClassLoader.loadClass(alarmHandlerClass);
+                ClassLoader currentClassLoader = new URLClassLoader(urls,
+                    CanalInstanceWithManager.class.getClassLoader());
+                Class<CanalAlarmHandler> _alarmClass = (Class<CanalAlarmHandler>) currentClassLoader.loadClass(alarmHandlerClass);
                 alarmHandler = _alarmClass.newInstance();
                 logger.info("init [{}] alarm handler success.", alarmHandlerClass);
             } catch (Throwable e) {
                 String errorMsg = String.format("init alarmHandlerPluginDir [%s] alarm handler [%s] error: %s",
-                    alarmHandlerPluginDir, alarmHandlerClass, ExceptionUtils.getFullStackTrace(e));
+                    alarmHandlerPluginDir,
+                    alarmHandlerClass,
+                    ExceptionUtils.getFullStackTrace(e));
                 logger.error(errorMsg);
                 throw new CanalException(errorMsg, e);
             }

BIN
logo.png


+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -242,7 +242,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         if (parallel) {
                             // build stage processor
                             multiStageCoprocessor = buildMultiStageCoprocessor();
-                            if (isGTIDMode()) {
+                            if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
                                 // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
                                 GTIDSet gtidSet = MysqlGTIDSet.parse(startPosition.getGtid());
                                 ((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet);
@@ -260,7 +260,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                                 }
                             }
                         } else {
-                            if (isGTIDMode()) {
+                            if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
                                 // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
                                 erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
                             } else {

+ 8 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -355,11 +355,14 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             // GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instanc配置中的
             LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
             if (logPosition != null) {
-                return logPosition.getPostion();
-            }
-
-            if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
-                return masterPosition;
+                // 如果以前是非GTID模式,后来调整为了GTID模式,那么为了保持兼容,需要判断gtid是否为空
+                if (StringUtils.isNotEmpty(logPosition.getPostion().getGtid())) {
+                    return logPosition.getPostion();
+                }
+            }else {
+                if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
+                    return masterPosition;
+                }
             }
         }
 

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -441,6 +441,8 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void handleEventException(final Throwable ex, final long sequence, final Object event) {
+            //异常上抛,否则processEvents的逻辑会默认会mark为成功执行,有丢数据风险
+            throw new CanalParseException(ex);
         }
 
         @Override

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -81,7 +81,7 @@ public class TableMetaCache {
         }
     }
 
-    private TableMeta getTableMetaByDB(String fullname) throws IOException {
+    private synchronized TableMeta getTableMetaByDB(String fullname) throws IOException {
         try {
             ResultSetPacket packet = connection.query("show create table " + fullname);
             String[] names = StringUtils.split(fullname, "`.`");
@@ -159,7 +159,7 @@ public class TableMetaCache {
         return getTableMeta(schema, table, true, position);
     }
 
-    public TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
+    public synchronized TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
         TableMeta tableMeta = null;
         if (tableMetaTSDB != null) {
             tableMeta = tableMetaTSDB.find(schema, table);

+ 7 - 6
pom.xml

@@ -99,6 +99,7 @@
         <java_target_version>1.8</java_target_version>
         <file_encoding>UTF-8</file_encoding>
         <spring_version>3.2.18.RELEASE</spring_version>
+        <rocketmq_version>4.5.1</rocketmq_version>
         <maven-jacoco-plugin.version>0.8.3</maven-jacoco-plugin.version>
         <maven-surefire.version>2.22.1</maven-surefire.version>
         <argline>-server -Xms512m -Xmx1024m -Dfile.encoding=UTF-8
@@ -252,7 +253,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_855</version>
+                <version>2.0.0_preview_914</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>
@@ -309,14 +310,14 @@
                 <version>3.0.2</version>
             </dependency>
             <dependency>
-                <groupId>com.aliyun.openservices</groupId>
-                <artifactId>aliware-apache-rocketmq-cloud</artifactId>
-                <version>1.0</version>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-client</artifactId>
+                <version>${rocketmq_version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
-                <artifactId>rocketmq-client</artifactId>
-                <version>4.3.0</version>
+                <artifactId>rocketmq-acl</artifactId>
+                <version>${rocketmq_version}</version>
             </dependency>
             <dependency>
                 <groupId>javax.annotation</groupId>

+ 4 - 4
server/pom.xml

@@ -41,6 +41,10 @@
 			<groupId>org.apache.rocketmq</groupId>
 			<artifactId>rocketmq-client</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-acl</artifactId>
+		</dependency>
 		<!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
 		<dependency>
 			<groupId>org.jboss.netty</groupId>
@@ -54,9 +58,5 @@
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
-		<dependency>
-			<groupId>com.aliyun.openservices</groupId>
-			<artifactId>aliware-apache-rocketmq-cloud</artifactId>
-		</dependency>
 	</dependencies>
 </project>

+ 18 - 7
server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java

@@ -222,15 +222,23 @@ public class MQMessageUtils {
                     } else {
                         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                             int hashCode = database.hashCode();
+                            CanalEntry.EventType eventType = rowChange.getEventType();
+                            List<CanalEntry.Column> columns = null;
+                            if (eventType == CanalEntry.EventType.DELETE) {
+                                columns = rowData.getBeforeColumnsList();
+                            } else {
+                                columns = rowData.getAfterColumnsList();
+                            }
+
                             if (hashMode.autoPkHash) {
                                 // isEmpty use default pkNames
-                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                for (CanalEntry.Column column : columns) {
                                     if (column.getIsKey()) {
                                         hashCode = hashCode ^ column.getValue().hashCode();
                                     }
                                 }
                             } else {
-                                for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+                                for (CanalEntry.Column column : columns) {
                                     if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
                                         hashCode = hashCode ^ column.getValue().hashCode();
                                     }
@@ -438,12 +446,14 @@ public class MQMessageUtils {
                     int idx = 0;
                     for (Map<String, String> row : flatMessage.getData()) {
                         int hashCode = database.hashCode();
-                        for (String pkName : pkNames) {
-                            String value = row.get(pkName);
-                            if (value == null) {
-                                value = "";
+                        if (pkNames != null) {
+                            for (String pkName : pkNames) {
+                                String value = row.get(pkName);
+                                if (value == null) {
+                                    value = "";
+                                }
+                                hashCode = hashCode ^ value.hashCode();
                             }
-                            hashCode = hashCode ^ value.hashCode();
                         }
 
                         int pkHash = Math.abs(hashCode) % partitionsNum;
@@ -463,6 +473,7 @@ public class MQMessageUtils {
                             flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
                             flatMessageTmp.setEs(flatMessage.getEs());
                             flatMessageTmp.setTs(flatMessage.getTs());
+                            flatMessageTmp.setPkNames(flatMessage.getPkNames());
                         }
                         List<Map<String, String>> data = flatMessageTmp.getData();
                         if (data == null) {

+ 93 - 1
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -3,7 +3,7 @@ package com.alibaba.otter.canal.common;
 import java.util.Properties;
 
 /**
- * kafka 配置项
+ * MQ 配置项
  *
  * @author machengyuan 2018-6-11 下午05:30:49
  * @version 1.0.0
@@ -27,6 +27,13 @@ public class MQProperties {
     private String     aliyunSecretKey        = "";
     private boolean    transaction            = false;           // 是否开启事务
     private Properties properties             = new Properties();
+    private boolean    enableMessageTrace     = false;
+    private String     accessChannel          = null;
+    private String     customizedTraceTopic   = null;
+    private String     namespace              = "";
+    private boolean    kerberosEnable         = false;           //kafka集群是否启动Kerberos认证
+    private String     kerberosKrb5FilePath   = "";              //启动Kerberos认证时配置为krb5.conf文件的路径
+    private String     kerberosJaasFilePath   = "";              //启动Kerberos认证时配置为jaas.conf文件的路径
 
     public static class CanalDestination {
 
@@ -221,4 +228,89 @@ public class MQProperties {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
+
+    public boolean isEnableMessageTrace() {
+        return enableMessageTrace;
+    }
+
+    public void setEnableMessageTrace(boolean enableMessageTrace) {
+        this.enableMessageTrace = enableMessageTrace;
+    }
+
+    public String getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(String accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
+    public String getCustomizedTraceTopic() {
+        return customizedTraceTopic;
+    }
+
+    public void setCustomizedTraceTopic(String customizedTraceTopic) {
+        this.customizedTraceTopic = customizedTraceTopic;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public boolean isKerberosEnable() {
+        return kerberosEnable;
+    }
+
+    public void setKerberosEnable(boolean kerberosEnable) {
+        this.kerberosEnable = kerberosEnable;
+    }
+
+    public String getKerberosKrb5FilePath() {
+        return kerberosKrb5FilePath;
+    }
+
+    public void setKerberosKrb5FilePath(String kerberosKrb5FilePath) {
+        this.kerberosKrb5FilePath = kerberosKrb5FilePath;
+    }
+
+    public String getKerberosJaasFilePath() {
+        return kerberosJaasFilePath;
+    }
+
+    public void setKerberosJaasFilePath(String kerberosJaasFilePath) {
+        this.kerberosJaasFilePath = kerberosJaasFilePath;
+    }
+
+    @Override public String toString() {
+        return "MQProperties{" +
+            "servers='" + servers + '\'' +
+            ", retries=" + retries +
+            ", batchSize=" + batchSize +
+            ", lingerMs=" + lingerMs +
+            ", maxRequestSize=" + maxRequestSize +
+            ", bufferMemory=" + bufferMemory +
+            ", filterTransactionEntry=" + filterTransactionEntry +
+            ", producerGroup='" + producerGroup + '\'' +
+            ", canalBatchSize=" + canalBatchSize +
+            ", canalGetTimeout=" + canalGetTimeout +
+            ", flatMessage=" + flatMessage +
+            ", compressionType='" + compressionType + '\'' +
+            ", acks='" + acks + '\'' +
+            ", aliyunAccessKey='" + aliyunAccessKey + '\'' +
+            ", aliyunSecretKey='" + aliyunSecretKey + '\'' +
+            ", transaction=" + transaction +
+            ", properties=" + properties +
+            ", enableMessageTrace=" + enableMessageTrace +
+            ", accessChannel='" + accessChannel + '\'' +
+            ", customizedTraceTopic='" + customizedTraceTopic + '\'' +
+            ", namespace='" + namespace + '\'' +
+            ", kerberosEnable='" + kerberosEnable + '\'' +
+            ", kerberosKrb5FilePath='" + kerberosKrb5FilePath + '\'' +
+            ", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' +
+            '}';
+    }
 }

+ 41 - 17
server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java

@@ -1,11 +1,12 @@
 package com.alibaba.otter.canal.kafka;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.common.MQMessageUtils;
+import com.alibaba.otter.canal.common.MQProperties;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.spi.CanalMQProducer;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -14,13 +15,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import com.alibaba.otter.canal.common.MQMessageUtils;
-import com.alibaba.otter.canal.common.MQProperties;
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
-import com.alibaba.otter.canal.spi.CanalMQProducer;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 /**
  * kafka producer 主操作类
@@ -59,6 +59,26 @@ public class CanalKafkaProducer implements CanalMQProducer {
         } else {
             properties.put("retries", kafkaProperties.getRetries());
         }
+
+        if (kafkaProperties.isKerberosEnable()){
+            File krb5File = new File(kafkaProperties.getKerberosKrb5FilePath());
+            File jaasFile = new File(kafkaProperties.getKerberosJaasFilePath());
+            if(krb5File.exists() && jaasFile.exists()){
+                //配置kerberos认证,需要使用绝对路径
+                System.setProperty("java.security.krb5.conf",
+                        krb5File.getAbsolutePath());
+                System.setProperty("java.security.auth.login.config",
+                        jaasFile.getAbsolutePath());
+                System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
+                properties.put("security.protocol", "SASL_PLAINTEXT");
+                properties.put("sasl.kerberos.service.name", "kafka");
+            }else{
+                String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";
+                logger.error(errorMsg);
+                throw new RuntimeException(errorMsg);
+            }
+        }
+
         if (!kafkaProperties.getFlatMessage()) {
             properties.put("value.serializer", MessageSerializer.class.getName());
             producer = new KafkaProducer<String, Message>(properties);
@@ -102,10 +122,10 @@ public class CanalKafkaProducer implements CanalMQProducer {
             producerTmp = producer2;
         }
 
-        if (kafkaProperties.getTransaction()) {
-            producerTmp.beginTransaction();
-        }
         try {
+            if (kafkaProperties.getTransaction()) {
+                producerTmp.beginTransaction();
+            }
             if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) {
                 // 动态topic
                 Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
@@ -130,7 +150,11 @@ public class CanalKafkaProducer implements CanalMQProducer {
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
             if (kafkaProperties.getTransaction()) {
-                producerTmp.abortTransaction();
+                try {
+                    producerTmp.abortTransaction();
+                } catch (Exception e1) {
+                    logger.error(e1.getMessage(), e1);
+                }
             }
             callback.rollback();
         }

+ 33 - 49
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -4,17 +4,20 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.common.CanalMessageSerializer;
@@ -23,14 +26,13 @@ import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 
 public class CanalRocketMQProducer implements CanalMQProducer {
 
     private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
     private DefaultMQProducer   defaultMQProducer;
     private MQProperties        mqProperties;
+    private static final String CLOUD_ACCESS_CHANNEL = "cloud";
 
     @Override
     public void init(MQProperties rocketMQProperties) {
@@ -41,10 +43,16 @@ public class CanalRocketMQProducer implements CanalMQProducer {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
             sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
-            rpcHook = new ClientRPCHook(sessionCredentials);
+            rpcHook = new AclClientRPCHook(sessionCredentials);
         }
 
-        defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), rpcHook);
+        defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), rpcHook, mqProperties.isEnableMessageTrace(), mqProperties.getCustomizedTraceTopic());
+        if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProperties.getAccessChannel())){
+            defaultMQProducer.setAccessChannel(AccessChannel.CLOUD);
+        }
+        if (!StringUtils.isEmpty(mqProperties.getNamespace())){
+            defaultMQProducer.setNamespace(mqProperties.getNamespace());
+        }
         defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
         defaultMQProducer.setRetryTimesWhenSendFailed(rocketMQProperties.getRetries());
         defaultMQProducer.setVipChannelEnabled(false);
@@ -80,6 +88,22 @@ public class CanalRocketMQProducer implements CanalMQProducer {
         }
     }
 
+    private void sendMessage(Message message, int partition) throws Exception{
+        SendResult sendResult = this.defaultMQProducer.send(message, new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                if (partition > mqs.size()) {
+                    return mqs.get(partition % mqs.size());
+                } else {
+                    return mqs.get(partition);
+                }
+            }
+        }, null);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Send Message Result: {}", sendResult);
+        }
+    }
+
     public void send(final MQProperties.CanalDestination destination, String topicName,
                      com.alibaba.otter.canal.protocol.Message data) throws Exception {
         if (!mqProperties.getFlatMessage()) {
@@ -102,17 +126,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 Message message = new Message(topicName,
                                     CanalMessageSerializer.serializer(dataPartition,
                                         mqProperties.isFilterTransactionEntry()));
-                                this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                                    @Override
-                                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                        if (index > mqs.size()) {
-                                            return mqs.get(index % mqs.size());
-                                        } else {
-                                            return mqs.get(index);
-                                        }
-                                    }
-                                }, null);
+                                sendMessage(message, index);
                             } catch (Exception e) {
                                 logger.error("send flat message to hashed partition error", e);
                                 throw e;
@@ -129,17 +143,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             destination.getCanalDestination(),
                             partition);
                     }
-                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                        @Override
-                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                            if (partition > mqs.size()) {
-                                return mqs.get(partition % mqs.size());
-                            } else {
-                                return mqs.get(partition);
-                            }
-                        }
-                    }, null);
+                    sendMessage(message, partition);
                 }
             } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                 logger.error("Send message error!", e);
@@ -166,17 +170,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                                 try {
                                     Message message = new Message(topicName, JSON.toJSONString(flatMessagePart,
                                         SerializerFeature.WriteMapNullValue).getBytes());
-                                    this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                                        @Override
-                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                            if (index > mqs.size()) {
-                                                return mqs.get(index % mqs.size());
-                                            } else {
-                                                return mqs.get(index);
-                                            }
-                                        }
-                                    }, null);
+                                    sendMessage(message, index);
                                 } catch (Exception e) {
                                     logger.error("send flat message to hashed partition error", e);
                                     throw e;
@@ -194,17 +188,7 @@ public class CanalRocketMQProducer implements CanalMQProducer {
                             }
                             Message message = new Message(topicName, JSON.toJSONString(flatMessage,
                                 SerializerFeature.WriteMapNullValue).getBytes());
-                            this.defaultMQProducer.send(message, new MessageQueueSelector() {
-
-                                @Override
-                                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-                                    if (partition > mqs.size()) {
-                                        return mqs.get(partition % mqs.size());
-                                    } else {
-                                        return mqs.get(partition);
-                                    }
-                                }
-                            }, null);
+                            sendMessage(message, partition);
                         } catch (Exception e) {
                             logger.error("send flat message to fixed partition error", e);
                             throw e;