Explorar o código

修复:当mysql 发生过purged gtid时,同时开启tsdb与gtid后,gtid属性丢失问题

winger %!s(int64=6) %!d(string=hai) anos
pai
achega
19c35c0b08

+ 3 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -131,7 +131,9 @@ public class MysqlConnection implements ErosaConnection {
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
-        // 若entry position存在gtid,则使用传入的gtid作为gtidSet 的拼接标准,否则同时开启gtid和tsdb时,会导致丢失初始化传入的gtid
+        // 若entry position存在gtid,则使用传入的gtid作为gtidSet 拼接的标准,否则同时开启gtid和tsdb时,会导致丢失gtid
+        // 而当源端数据库gtid 有purged时会有如下类似报错
+        // 'errno = 1236, sqlstate = HY000 errmsg = The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1 ...
         if (StringUtils.isNotEmpty(gtid)) {
             decoder.handle(LogEvent.GTID_LOG_EVENT);
             context.setGtidSet(MysqlGTIDSet.parse(gtid));

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -790,6 +790,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                                     entryPosition);
                             }
                             logPosition.setPostion(entryPosition);
+                            entryPosition.setGtid(entry.getHeader().getGtid());
                         } else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
                             // 当前事务开始位点
                             entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
@@ -797,6 +798,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                                 logger.debug("set {} to be pending start position before finding another proper one...",
                                     entryPosition);
                             }
+                            entryPosition.setGtid(entry.getHeader().getGtid());
                             logPosition.setPostion(entryPosition);
                         }