Browse Source

fixed binlog seek & aliyun ak/sk

agapple 6 years ago
parent
commit
638952fad1

+ 6 - 5
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogDecoder.java

@@ -109,11 +109,6 @@ public final class LogDecoder {
                     try {
                         /* Decoding binary-log to event */
                         event = decode(buffer, header, context);
-                        if (event != null) {
-                            // set logFileName
-                            event.getHeader().setLogFileName(context.getLogPosition().getFileName());
-                            event.setSemival(buffer.semival);
-                        }
                     } catch (IOException e) {
                         if (logger.isWarnEnabled()) {
                             logger.warn("Decoding " + LogEvent.getTypeName(header.getType()) + " failed from: "
@@ -128,6 +123,12 @@ public final class LogDecoder {
                     event = new UnknownLogEvent(header);
                 }
 
+                if (event != null) {
+                    // set logFileName
+                    event.getHeader().setLogFileName(context.getLogPosition().getFileName());
+                    event.setSemival(buffer.semival);
+                }
+
                 /* consume this binary-log. */
                 buffer.consume(len);
                 return event;

+ 2 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -47,6 +47,8 @@ public class CanalConstants {
     public static final String CANAL_MQ_FLATMESSAGE              = ROOT + "." + "mq.flatMessage";
     public static final String CANAL_MQ_COMPRESSION_TYPE         = ROOT + "." + "mq.compressionType";
     public static final String CANAL_MQ_ACKS                     = ROOT + "." + "mq.acks";
+    public static final String CANAL_ALIYUN_ACCESSKEY            = ROOT + "." + "aliyun.accessKey";
+    public static final String CANAL_ALIYUN_SECRETKEY            = ROOT + "." + "aliyun.secretKey";
 
     public static String getInstanceModeKey(String destination) {
         return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);

+ 75 - 69
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -17,7 +17,6 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
 import com.alibaba.otter.canal.common.CanalException;
-import com.alibaba.otter.canal.common.MQProperties;
 import com.alibaba.otter.canal.common.utils.AddressUtils;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
@@ -96,6 +95,16 @@ public class CanalController {
             System.setProperty(CanalConstants.CANAL_SOCKETCHANNEL, socketChannel);
         }
 
+        // 兼容1.1.0版本的ak/sk参数名
+        String accesskey = getProperty(properties, "canal.instance.rds.accesskey");
+        String secretkey = getProperty(properties, "canal.instance.rds.secretkey");
+        if (StringUtils.isNotEmpty(accesskey)) {
+            System.setProperty(CanalConstants.CANAL_ALIYUN_ACCESSKEY, accesskey);
+        }
+        if (StringUtils.isNotEmpty(secretkey)) {
+            System.setProperty(CanalConstants.CANAL_ALIYUN_SECRETKEY, secretkey);
+        }
+
         // 准备canal server
         cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
         ip = getProperty(properties, CanalConstants.CANAL_IP);
@@ -131,81 +140,80 @@ public class CanalController {
 
         final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
         ServerRunningMonitors.setServerData(serverData);
-        ServerRunningMonitors
-            .setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
-
-                public ServerRunningMonitor apply(final String destination) {
-                    ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
-                    runningMonitor.setDestination(destination);
-                    runningMonitor.setListener(new ServerRunningListener() {
-
-                        public void processActiveEnter() {
-                            try {
-                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                                embededCanalServer.start(destination);
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
-                            }
+        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
+
+            public ServerRunningMonitor apply(final String destination) {
+                ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
+                runningMonitor.setDestination(destination);
+                runningMonitor.setListener(new ServerRunningListener() {
+
+                    public void processActiveEnter() {
+                        try {
+                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                            embededCanalServer.start(destination);
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
+                    }
 
-                        public void processActiveExit() {
-                            try {
-                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                                embededCanalServer.stop(destination);
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
-                            }
+                    public void processActiveExit() {
+                        try {
+                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                            embededCanalServer.stop(destination);
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
+                    }
 
-                        public void processStart() {
-                            try {
-                                if (zkclientx != null) {
-                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
-                                        ip + ":" + port);
-                                    initCid(path);
-                                    zkclientx.subscribeStateChanges(new IZkStateListener() {
-
-                                        public void handleStateChanged(KeeperState state) throws Exception {
-
-                                        }
-
-                                        public void handleNewSession() throws Exception {
-                                            initCid(path);
-                                        }
-
-                                        @Override
-                                        public void handleSessionEstablishmentError(Throwable error) throws Exception {
-                                            logger.error("failed to connect to zookeeper", error);
-                                        }
-                                    });
-                                }
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                    public void processStart() {
+                        try {
+                            if (zkclientx != null) {
+                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
+                                                                                                              + port);
+                                initCid(path);
+                                zkclientx.subscribeStateChanges(new IZkStateListener() {
+
+                                    public void handleStateChanged(KeeperState state) throws Exception {
+
+                                    }
+
+                                    public void handleNewSession() throws Exception {
+                                        initCid(path);
+                                    }
+
+                                    @Override
+                                    public void handleSessionEstablishmentError(Throwable error) throws Exception {
+                                        logger.error("failed to connect to zookeeper", error);
+                                    }
+                                });
                             }
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
+                    }
 
-                        public void processStop() {
-                            try {
-                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
-                                if (zkclientx != null) {
-                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
-                                        ip + ":" + port);
-                                    releaseCid(path);
-                                }
-                            } finally {
-                                MDC.remove(CanalConstants.MDC_DESTINATION);
+                    public void processStop() {
+                        try {
+                            MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
+                            if (zkclientx != null) {
+                                final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":"
+                                                                                                              + port);
+                                releaseCid(path);
                             }
+                        } finally {
+                            MDC.remove(CanalConstants.MDC_DESTINATION);
                         }
-
-                    });
-                    if (zkclientx != null) {
-                        runningMonitor.setZkClient(zkclientx);
                     }
-                    // 触发创建一下cid节点
-                    runningMonitor.init();
-                    return runningMonitor;
+
+                });
+                if (zkclientx != null) {
+                    runningMonitor.setZkClient(zkclientx);
                 }
-            }));
+                // 触发创建一下cid节点
+                runningMonitor.init();
+                return runningMonitor;
+            }
+        }));
 
         // 初始化monitor机制
         autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
@@ -257,8 +265,7 @@ public class CanalController {
             instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
 
                 public InstanceConfigMonitor apply(InstanceMode mode) {
-                    int scanInterval = Integer
-                        .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
+                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
 
                     if (mode.isSpring()) {
                         SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
@@ -366,8 +373,7 @@ public class CanalController {
             InstanceConfig oldConfig = instanceConfigs.put(destination, config);
 
             if (oldConfig != null) {
-                logger
-                    .warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
+                logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
             }
         }
     }

+ 9 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -125,6 +125,15 @@ public class CanalLauncher {
         if (!StringUtils.isEmpty(acks)) {
             mqProperties.setAcks(acks);
         }
+
+        String aliyunAccessKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESSKEY);
+        if (!StringUtils.isEmpty(aliyunAccessKey)) {
+            mqProperties.setAliyunAccessKey(aliyunAccessKey);
+        }
+        String aliyunSecretKey = CanalController.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRETKEY);
+        if (!StringUtils.isEmpty(aliyunSecretKey)) {
+            mqProperties.setAliyunSecretKey(aliyunSecretKey);
+        }
         return mqProperties;
     }
 

+ 5 - 5
deployer/src/main/resources/canal.properties

@@ -74,9 +74,9 @@ canal.instance.tsdb.snapshot.interval=24
 # purge snapshot expire , default 360 hour(15 days)
 canal.instance.tsdb.snapshot.expire=360
 
-# rds oss binlog account
-canal.instance.rds.accesskey =
-canal.instance.rds.secretkey =
+# aliyun ak/sk , support rds/mq
+canal.aliyun.accesskey=
+canal.aliyun.secretkey=
 
 #################################################
 ######### 		destinations		############# 
@@ -109,5 +109,5 @@ canal.mq.bufferMemory=33554432
 canal.mq.canalBatchSize=50
 canal.mq.canalGetTimeout=100
 canal.mq.flatMessage=true
-canal.mq.compressionType=
-canal.mq.acks=
+canal.mq.compressionType=none
+canal.mq.acks=all

+ 2 - 2
deployer/src/main/resources/spring/base-instance.xml

@@ -32,8 +32,8 @@
 	</bean>
 	
 	<bean id="baseEventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy" abstract="true">
-		<property name="accesskey" value="${canal.instance.rds.accesskey:}" />
-		<property name="secretkey" value="${canal.instance.rds.secretkey:}" />
+		<property name="accesskey" value="${canal.aliyun.accesskey:}" />
+		<property name="secretkey" value="${canal.aliyun.secretkey:}" />
 		<property name="instanceId" value="${canal.instance.rds.instanceId:}" />
 	</bean>
 </beans>

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -747,7 +747,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         if (justForPositionTimestamp && logPosition.getPostion() == null && event.getWhen() > 0) {
                             // 初始位点
                             entryPosition = new EntryPosition(searchBinlogFile,
-                                event.getLogPos(),
+                                event.getLogPos() - event.getEventLen(),
                                 event.getWhen() * 1000,
                                 event.getServerId());
                             logPosition.setPostion(entryPosition);

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -657,7 +657,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                                                   + tableMeta.getFullName() + ", db fieldMeta : "
                                                   + fieldMeta.toString() + " , binlog fieldMeta : " + info.toString()
                                                   + " , on : " + event.getHeader().getLogFileName() + ":"
-                                                  + event.getHeader().getLogPos());
+                                                  + (event.getHeader().getLogPos() - event.getHeader().getEventLen()));
                 }
             }
 

+ 4 - 4
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -20,10 +20,10 @@ public class MQProperties {
     private int     canalBatchSize         = 50;
     private Long    canalGetTimeout        = 100L;
     private boolean flatMessage            = true;
-    private String compressionType         = "none";
-    private String acks                    = "all";
-    private String aliyunAccessKey         = "";
-    private String aliyunSecretKey         = "";
+    private String  compressionType        = "none";
+    private String  acks                   = "all";
+    private String  aliyunAccessKey        = "";
+    private String  aliyunSecretKey        = "";
 
     public static class CanalDestination {