Browse Source

Merge pull request #23 from alibaba/master

merge
rewerma 6 years ago
parent
commit
3b25b3b342
19 changed files with 253 additions and 173 deletions
  1. 23 33
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 0 2
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java
  3. 20 12
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  4. 8 8
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  5. 2 3
      client-adapter/launcher/src/main/resources/application.yml
  6. 4 3
      client/pom.xml
  7. 25 1
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
  8. 6 5
      dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java
  9. 2 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java
  10. 75 69
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java
  11. 9 0
      deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java
  12. 28 28
      deployer/src/main/resources/canal.properties
  13. 2 2
      deployer/src/main/resources/spring/base-instance.xml
  14. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  15. 1 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  16. 10 0
      pom.xml
  17. 4 1
      server/pom.xml
  18. 19 2
      server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java
  19. 14 2
      server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

+ 23 - 33
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -12,25 +12,26 @@ import java.util.Map;
  */
 public class CanalClientConfig {
 
-    private String             canalServerHost;       // 单机模式下canal server的 ip:port
-
-    private String             zookeeperHosts;        // 集群模式下的zk地址, 如果配置了单机地址则以单机为准!!
-
-    private String             mqServers;             // kafka or rocket mq 地址
-
-    private Boolean            flatMessage   = true;  // 是否已flatMessage模式传输, 只适用于mq模式
-
-    private Integer            batchSize;             // 批大小
-
-    private Integer            syncBatchSize = 1000;  // 同步分批提交大小
-
-    private Integer            retries;               // 重试次数
-
-    private Long               timeout;               // 消费超时时间
-
-    private String             mode          = "tcp"; // 模式 tcp kafka rocketMQ
-
-    private List<CanalAdapter> canalAdapters;         // canal adapters 配置
+    // 单机模式下canal server的ip:port
+    private String             canalServerHost;
+    // 集群模式下的zk地址,如果配置了单机地址则以单机为准!!
+    private String             zookeeperHosts;
+    // kafka or rocket mq 地址
+    private String             mqServers;
+    // 是否已flatMessage模式传输,只适用于mq模式
+    private Boolean            flatMessage   = true;
+    // 批大小
+    private Integer            batchSize;
+    // 同步分批提交大小
+    private Integer            syncBatchSize = 1000;
+    // 重试次数
+    private Integer            retries;
+    // 消费超时时间
+    private Long               timeout;
+    // 模式 tcp kafka rocketMQ
+    private String             mode          = "tcp";
+    // canal adapters 配置
+    private List<CanalAdapter> canalAdapters;
 
     public String getCanalServerHost() {
         return canalServerHost;
@@ -116,9 +117,7 @@ public class CanalClientConfig {
 
         private String      instance; // 实例名
 
-        private String      topic;    // mq topic
-
-        private List<Group> groups;   // 适配器分组列表
+        private List<Group> groups;  // 适配器分组列表
 
         public String getInstance() {
             return instance;
@@ -137,22 +136,13 @@ public class CanalClientConfig {
         public void setGroups(List<Group> groups) {
             this.groups = groups;
         }
-
-        public String getTopic() {
-            return topic;
-        }
-
-        public void setTopic(String topic) {
-            this.topic = topic;
-        }
     }
 
     public static class Group {
 
-        private String                          groupId;                               // group id
-
+        // group id
+        private String                          groupId          = "default";
         private List<OuterAdapterConfig>        outerAdapters;                           // 适配器列表
-
         private Map<String, OuterAdapterConfig> outerAdaptersMap = new LinkedHashMap<>();
 
         public String getGroupId() {

+ 0 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/AdapterCanalConfig.java

@@ -37,8 +37,6 @@ public class AdapterCanalConfig extends CanalClientConfig {
                 for (CanalAdapter canalAdapter : canalAdapters) {
                     if (canalAdapter.getInstance() != null) {
                         DESTINATIONS.add(canalAdapter.getInstance());
-                    } else if (canalAdapter.getTopic() != null) {
-                        DESTINATIONS.add(canalAdapter.getInstance());
                     }
                 }
             }

+ 20 - 12
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,9 +2,12 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -12,6 +15,7 @@ import com.alibaba.otter.canal.adapter.launcher.common.SyncSwitch;
 import com.alibaba.otter.canal.adapter.launcher.config.SpringContext;
 import com.alibaba.otter.canal.client.CanalMQConnector;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
 import com.alibaba.otter.canal.protocol.FlatMessage;
@@ -180,18 +184,22 @@ public abstract class AbstractCanalAdapterWorker {
      */
     private void batchSync(List<Dml> dmls, OuterAdapter adapter) {
         // 分批同步
-        int len = 0;
-        List<Dml> dmlsBatch = new ArrayList<>();
-        for (Dml dml : dmls) {
-            dmlsBatch.add(dml);
-            len += dml.getData().size();
-            if (len >= canalClientConfig.getSyncBatchSize()) {
-                adapter.sync(dmlsBatch);
-                dmlsBatch.clear();
-                len = 0;
+        if (dmls.size() <= canalClientConfig.getSyncBatchSize()) {
+            adapter.sync(dmls);
+        } else {
+            int len = 0;
+            List<Dml> dmlsBatch = new ArrayList<>();
+            for (Dml dml : dmls) {
+                dmlsBatch.add(dml);
+                len += dml.getData().size();
+                if (len >= canalClientConfig.getSyncBatchSize()) {
+                    adapter.sync(dmlsBatch);
+                    dmlsBatch.clear();
+                    len = 0;
+                }
             }
+            adapter.sync(dmlsBatch);
         }
-        adapter.sync(dmlsBatch);
     }
 
     public void start() {

+ 8 - 8
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -97,14 +97,14 @@ public class CanalAdapterLoader {
 
                     CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
                         canalClientConfig.getMqServers(),
-                        canalAdapter.getTopic(),
+                        canalAdapter.getInstance(),
                         group.getGroupId(),
                         canalOuterAdapterGroups,
                         canalClientConfig.getFlatMessage());
-                    canalMQWorker.put(canalAdapter.getTopic() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
+                    canalMQWorker.put(canalAdapter.getInstance() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
                     canalKafkaWorker.start();
-                    logger.info("Start adapter for canal-client mq topic: {} succeed",
-                        canalAdapter.getTopic() + "-" + group.getGroupId());
+                    logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-"
+                                                                                       + group.getGroupId());
                 }
             }
         } else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())) {
@@ -119,15 +119,15 @@ public class CanalAdapterLoader {
                     canalOuterAdapterGroups.add(canalOuterAdapters);
                     CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
                         canalClientConfig.getMqServers(),
-                        canalAdapter.getTopic(),
+                        canalAdapter.getInstance(),
                         group.getGroupId(),
                         canalOuterAdapterGroups,
                         canalClientConfig.getFlatMessage());
-                    canalMQWorker.put(canalAdapter.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
+                    canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                     rocketMQWorker.start();
 
-                    logger.info("Start adapter for canal-client mq topic: {} succeed",
-                        canalAdapter.getTopic() + "-" + group.getGroupId());
+                    logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-"
+                                                                                       + group.getGroupId());
                 }
             }
         }

+ 2 - 3
client-adapter/launcher/src/main/resources/application.yml

@@ -28,10 +28,9 @@ canal.conf:
 #      username: root
 #      password: 121212
   canalAdapters:
-  - instance: example
-#    topic: example
+  - instance: example # canal instance Name or mq topic name
     groups:
-    - #groupId: g1
+    - groupId: g1
       outerAdapters:
       - name: logger
 #      - name: rdb

+ 4 - 3
client/pom.xml

@@ -101,12 +101,9 @@
 			<version>${spring_version}</version>
 			<scope>test</scope>
 		</dependency>
-        <!-- 客户端要使用请单独引入rocketmq-client依赖 -->
 		<dependency>
 			<groupId>org.apache.rocketmq</groupId>
 			<artifactId>rocketmq-client</artifactId>
-			<version>4.3.0</version>
-            <scope>provided</scope>
 		</dependency>
 		<!-- 客户端要使用请单独引入kafka-clients依赖 -->
 		<dependency>
@@ -122,6 +119,10 @@
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>com.aliyun.openservices</groupId>
+			<artifactId>aliware-apache-rocketmq-cloud</artifactId>
+		</dependency>
 	</dependencies>
 
 	<build>

+ 25 - 1
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -1,5 +1,7 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -10,8 +12,10 @@ import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +51,9 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     private long                                batchProcessTimeout = 60 * 1000;
     private boolean                             flatMessage;
     private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
+    private String                              accessKey;
+    private String                              secretKey;
+
 
     public RocketMQCanalConnector(String nameServer, String topic, String groupName, boolean flatMessage){
         this.nameServer = nameServer;
@@ -56,8 +63,25 @@ public class RocketMQCanalConnector implements CanalMQConnector {
         this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
     }
 
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName,
+        String accessKey, String secretKey, boolean flatMessage){
+        this(nameServer, topic, groupName, flatMessage);
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
+    }
+
     public void connect() throws CanalClientException {
-        rocketMQConsumer = new DefaultMQPushConsumer(groupName);
+
+        RPCHook rpcHook = null;
+        if(null != accessKey && accessKey.length() > 0
+            && null != secretKey && secretKey.length() > 0){
+            SessionCredentials sessionCredentials = new SessionCredentials();
+            sessionCredentials.setAccessKey(accessKey);
+            sessionCredentials.setSecretKey(secretKey);
+            rpcHook = new ClientRPCHook(sessionCredentials);
+        }
+        rocketMQConsumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely());
+        rocketMQConsumer.setVipChannelEnabled(false);
         if (!StringUtils.isBlank(nameServer)) {
             rocketMQConsumer.setNamesrvAddr(nameServer);
         }

+ 6 - 5
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -109,11 +109,6 @@ public final class LogDecoder {
                     try {
                         /* Decoding binary-log to event */
                         event = decode(buffer, header, context);
-                        if (event != null) {
-                            // set logFileName
-                            event.getHeader().setLogFileName(context.getLogPosition().getFileName());
-                            event.setSemival(buffer.semival);
-                        }
                     } catch (IOException e) {
                         if (logger.isWarnEnabled()) {
                             logger.warn("Decoding " + LogEvent.getTypeName(header.getType()) + " failed from: "
@@ -128,6 +123,12 @@ public final class LogDecoder {
                     event = new UnknownLogEvent(header);
                 }
 
+                if (event != null) {
+                    // set logFileName
+                    event.getHeader().setLogFileName(context.getLogPosition().getFileName());
+                    event.setSemival(buffer.semival);
+                }
+
                 /* consume this binary-log. */
                 buffer.consume(len);
                 return event;

+ 2 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -47,6 +47,8 @@ public class CanalConstants {
     public static final String CANAL_MQ_FLATMESSAGE              = ROOT + "." + "mq.flatMessage";
     public static final String CANAL_MQ_COMPRESSION_TYPE         = ROOT + "." + "mq.compressionType";
     public static final String CANAL_MQ_ACKS                     = ROOT + "." + "mq.acks";
+    public static final String CANAL_ALIYUN_ACCESSKEY            = ROOT + "." + "aliyun.accessKey";
+    public static final String CANAL_ALIYUN_SECRETKEY            = ROOT + "." + "aliyun.secretKey";
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 75 - 69
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -17,7 +17,6 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
 import com.alibaba.otter.canal.common.CanalException;
-import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.common.utils.AddressUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
@@ -96,6 +95,16 @@ public class CanalController {
             System.setProperty(CanalConstants.CANAL_SOCKETCHANNEL, socketChannel);
         }
 
+        // 兼容1.1.0版本的ak/sk参数名
+        String accesskey = getProperty(properties, "canal.instance.rds.accesskey");
+        String secretkey = getProperty(properties, "canal.instance.rds.secretkey");
+        if (StringUtils.isNotEmpty(accesskey)) {
+            System.setProperty(CanalConstants.CANAL_ALIYUN_ACCESSKEY, accesskey);
+        }
+        if (StringUtils.isNotEmpty(secretkey)) {
+            System.setProperty(CanalConstants.CANAL_ALIYUN_SECRETKEY, secretkey);
+        }
+
         // 准备canal server
         cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
         ip = getProperty(properties, CanalConstants.CANAL_IP);
@@ -131,81 +140,80 @@ public class CanalController {
 
         final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
         ServerRunningMonitors.setServerData(serverData);
-        ServerRunningMonitors
-            .setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
-
-                public ServerRunningMonitor apply(final String destination) {
-                    ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
-                    runningMonitor.setDestination(destination);
-                    runningMonitor.setListener(new ServerRunningListener() {
-
-                        public void processActiveEnter() {
-                            try {
-                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                                embededCanalServer.start(destination);
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
-                            }
+        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
+
+            public ServerRunningMonitor apply(final String destination) {
+                ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
+                runningMonitor.setDestination(destination);
+                runningMonitor.setListener(new ServerRunningListener() {
+
+                    public void processActiveEnter() {
+                        try {
+                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                            embededCanalServer.start(destination);
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
+                    }
 
-                        public void processActiveExit() {
-                            try {
-                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                                embededCanalServer.stop(destination);
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
-                            }
+                    public void processActiveExit() {
+                        try {
+                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                            embededCanalServer.stop(destination);
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
+                    }
 
-                        public void processStart() {
-                            try {
-                                if (zkclientx != null) {
-                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
-                                        ip + ":" + port);
-                                    initCid(path);
-                                    zkclientx.subscribeStateChanges(new IZkStateListener() {
-
-                                        public void handleStateChanged(KeeperState state) throws Exception {
-
-                                        }
-
-                                        public void handleNewSession() throws Exception {
-                                            initCid(path);
-                                        }
-
-                                        @Override
-                                        public void handleSessionEstablishmentError(Throwable error) throws Exception {
-                                            logger.error("failed to connect to zookeeper", error);
-                                        }
-                                    });
-                                }
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                    public void processStart() {
+                        try {
+                            if (zkclientx != null) {
+                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
+                                                                                                              + port);
+                                initCid(path);
+                                zkclientx.subscribeStateChanges(new IZkStateListener() {
+
+                                    public void handleStateChanged(KeeperState state) throws Exception {
+
+                                    }
+
+                                    public void handleNewSession() throws Exception {
+                                        initCid(path);
+                                    }
+
+                                    @Override
+                                    public void handleSessionEstablishmentError(Throwable error) throws Exception {
+                                        logger.error("failed to connect to zookeeper", error);
+                                    }
+                                });
                             }
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
+                    }
 
-                        public void processStop() {
-                            try {
-                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                                if (zkclientx != null) {
-                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
-                                        ip + ":" + port);
-                                    releaseCid(path);
-                                }
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                    public void processStop() {
+                        try {
+                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                            if (zkclientx != null) {
+                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
+                                                                                                              + port);
+                                releaseCid(path);
                             }
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
-
-                    });
-                    if (zkclientx != null) {
-                        runningMonitor.setZkClient(zkclientx);
                     }
-                    // 触发创建一下cid节点
-                    runningMonitor.init();
-                    return runningMonitor;
+
+                });
+                if (zkclientx != null) {
+                    runningMonitor.setZkClient(zkclientx);
                 }
-            }));
+                // 触发创建一下cid节点
+                runningMonitor.init();
+                return runningMonitor;
+            }
+        }));
 
         // 初始化monitor机制
         autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
@@ -257,8 +265,7 @@ public class CanalController {
             instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
 
                 public InstanceConfigMonitor apply(InstanceMode mode) {
-                    int scanInterval = Integer
-                        .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
+                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
 
                     if (mode.isSpring()) {
                         SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
@@ -366,8 +373,7 @@ public class CanalController {
             InstanceConfig oldConfig = instanceConfigs.put(destination, config);
 
             if (oldConfig != null) {
-                logger
-                    .warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
+                logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
             }
         }
     }

+ 9 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -125,6 +125,15 @@ public class CanalLauncher {
         if (!StringUtils.isEmpty(acks)) {
             mqProperties.setAcks(acks);
         }
+
+        String aliyunAccessKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESSKEY);
+        if (!StringUtils.isEmpty(aliyunAccessKey)) {
+            mqProperties.setAliyunAccessKey(aliyunAccessKey);
+        }
+        String aliyunSecretKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRETKEY);
+        if (!StringUtils.isEmpty(aliyunSecretKey)) {
+            mqProperties.setAliyunSecretKey(aliyunSecretKey);
+        }
         return mqProperties;
     }
 

+ 28 - 28
deployer/src/main/resources/canal.properties

@@ -1,11 +1,11 @@
 #################################################
 ######### 		common argument		############# 
 #################################################
-canal.id= 1
-canal.ip=
-canal.port=11111
-canal.metrics.pull.port=11112
-canal.zkServers=
+canal.id = 1
+canal.ip =
+canal.port = 11111
+canal.metrics.pull.port = 11112
+canal.zkServers =
 # flush data to zk
 canal.zookeeper.flush.period = 1000
 canal.withoutNetty = false
@@ -64,32 +64,32 @@ canal.instance.parser.parallel = true
 canal.instance.parser.parallelBufferSize = 256
 
 # table meta tsdb info
-canal.instance.tsdb.enable=true
-canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
-canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
-canal.instance.tsdb.dbUsername=canal
-canal.instance.tsdb.dbPassword=canal
+canal.instance.tsdb.enable = true
+canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
+canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
+canal.instance.tsdb.dbUsername = canal
+canal.instance.tsdb.dbPassword = canal
 # dump snapshot interval, default 24 hour
-canal.instance.tsdb.snapshot.interval=24
+canal.instance.tsdb.snapshot.interval = 24
 # purge snapshot expire , default 360 hour(15 days)
-canal.instance.tsdb.snapshot.expire=360
+canal.instance.tsdb.snapshot.expire = 360
 
-# rds oss binlog account
-canal.instance.rds.accesskey =
-canal.instance.rds.secretkey =
+# aliyun ak/sk , support rds/mq
+canal.aliyun.accesskey =
+canal.aliyun.secretkey =
 
 #################################################
 ######### 		destinations		############# 
 #################################################
-canal.destinations= example
+canal.destinations = example
 # conf root dir
 canal.conf.dir = ../conf
 # auto scan instance dir add/remove and start/stop instance
 canal.auto.scan = true
 canal.auto.scan.interval = 5
 
-canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
-#canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml
+canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
+#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
 
 canal.instance.global.mode = spring
 canal.instance.global.lazy = false
@@ -101,13 +101,13 @@ canal.instance.global.spring.xml = classpath:spring/file-instance.xml
 ##################################################
 ######### 		     MQ 		     #############
 ##################################################
-canal.mq.servers=127.0.0.1:6667
-canal.mq.retries=0
-canal.mq.batchSize=16384
-canal.mq.lingerMs=1
-canal.mq.bufferMemory=33554432
-canal.mq.canalBatchSize=50
-canal.mq.canalGetTimeout=100
-canal.mq.flatMessage=true
-canal.mq.compressionType=
-canal.mq.acks=
+canal.mq.servers = 127.0.0.1:6667
+canal.mq.retries = 0
+canal.mq.batchSize = 16384
+canal.mq.lingerMs = 1
+canal.mq.bufferMemory = 33554432
+canal.mq.canalBatchSize = 50
+canal.mq.canalGetTimeout = 100
+canal.mq.flatMessage = true
+canal.mq.compressionType = none
+canal.mq.acks = all

+ 2 - 2
deployer/src/main/resources/spring/base-instance.xml

@@ -32,8 +32,8 @@
 	</bean>
 	
 	<bean id="baseEventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy" abstract="true">
-		<property name="accesskey" value="${canal.instance.rds.accesskey:}" />
-		<property name="secretkey" value="${canal.instance.rds.secretkey:}" />
+		<property name="accesskey" value="${canal.aliyun.accesskey:}" />
+		<property name="secretkey" value="${canal.aliyun.secretkey:}" />
 		<property name="instanceId" value="${canal.instance.rds.instanceId:}" />
 	</bean>
 </beans>

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -747,7 +747,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         if (justForPositionTimestamp && logPosition.getPostion() == null && event.getWhen() > 0) {
                             // 初始位点
                             entryPosition = new EntryPosition(searchBinlogFile,
-                                event.getLogPos(),
+                                event.getLogPos() - event.getEventLen(),
                                 event.getWhen() * 1000,
                                 event.getServerId());
                             logPosition.setPostion(entryPosition);

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -657,7 +657,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                                                   + tableMeta.getFullName() + ", db fieldMeta : "
                                                   + fieldMeta.toString() + " , binlog fieldMeta : " + info.toString()
                                                   + " , on : " + event.getHeader().getLogFileName() + ":"
-                                                  + event.getHeader().getLogPos());
+                                                  + (event.getHeader().getLogPos() - event.getHeader().getEventLen()));
                 }
             }
 

+ 10 - 0
pom.xml

@@ -302,6 +302,16 @@
                 <artifactId>jsr305</artifactId>
                 <version>3.0.2</version>
             </dependency>
+            <dependency>
+                <groupId>com.aliyun.openservices</groupId>
+                <artifactId>aliware-apache-rocketmq-cloud</artifactId>
+                <version>1.0</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-client</artifactId>
+                <version>4.3.0</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

+ 4 - 1
server/pom.xml

@@ -40,7 +40,6 @@
 		<dependency>
 			<groupId>org.apache.rocketmq</groupId>
 			<artifactId>rocketmq-client</artifactId>
-			<version>4.3.0</version>
 		</dependency>
 		<!--kafka_2.11_1.1.1 exclusion掉了netty 的依赖,但CanalServerWithNetty 依赖 netty3,升级kafka至 1.1.1 需要显示加入,否则会启动失败 -->
 		<dependency>
@@ -55,5 +54,9 @@
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>com.aliyun.openservices</groupId>
+			<artifactId>aliware-apache-rocketmq-cloud</artifactId>
+		</dependency>
 	</dependencies>
 </project>

+ 19 - 2
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -20,8 +20,10 @@ public class MQProperties {
     private int     canalBatchSize         = 50;
     private Long    canalGetTimeout        = 100L;
     private boolean flatMessage            = true;
-    private String compressionType         = "none";
-    private String acks                    = "all";
+    private String  compressionType        = "none";
+    private String  acks                   = "all";
+    private String  aliyunAccessKey        = "";
+    private String  aliyunSecretKey        = "";
 
     public static class CanalDestination {
 
@@ -168,4 +170,19 @@ public class MQProperties {
         this.acks = acks;
     }
 
+    public String getAliyunAccessKey() {
+        return aliyunAccessKey;
+    }
+
+    public void setAliyunAccessKey(String aliyunAccessKey) {
+        this.aliyunAccessKey = aliyunAccessKey;
+    }
+
+    public String getAliyunSecretKey() {
+        return aliyunSecretKey;
+    }
+
+    public void setAliyunSecretKey(String aliyunSecretKey) {
+        this.aliyunSecretKey = aliyunSecretKey;
+    }
 }

+ 14 - 2
server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java

@@ -7,12 +7,15 @@ import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.server.exception.CanalServerException;
 import com.alibaba.otter.canal.spi.CanalMQProducer;
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,10 +33,19 @@ public class CanalRocketMQProducer implements CanalMQProducer {
     @Override
     public void init(MQProperties rocketMQProperties) {
         this.mqProperties = rocketMQProperties;
-        defaultMQProducer = new DefaultMQProducer();
+        RPCHook rpcHook = null;
+        if(rocketMQProperties.getAliyunAccessKey().length() > 0
+            && rocketMQProperties.getAliyunSecretKey().length() > 0){
+            SessionCredentials sessionCredentials = new SessionCredentials();
+            sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
+            sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
+            rpcHook = new ClientRPCHook(sessionCredentials);
+        }
+
+        defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), rpcHook);
         defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
-        defaultMQProducer.setProducerGroup(rocketMQProperties.getProducerGroup());
         defaultMQProducer.setRetryTimesWhenSendFailed(rocketMQProperties.getRetries());
+        defaultMQProducer.setVipChannelEnabled(false);
         logger.info("##Start RocketMQ producer##");
         try {
             defaultMQProducer.start();