Selaa lähdekoodia

修复开启gtid时,同时开启tsdb后gtid 属性丢失的问题

winger 6 vuotta sitten
vanhempi
commit
3465866278

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

@@ -20,7 +20,7 @@ public interface ErosaConnection {
     /**
      * 用于快速数据查找,和dump的区别在于,seek会只给出部分的数据
      */
-    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;
+    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException;
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;
 

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -75,7 +75,7 @@ public class LocalBinLogConnection implements ErosaConnection {
         return running;
     }
 
-    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
+    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException {
     }
 
     public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {

+ 7 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -10,6 +10,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
@@ -118,7 +119,7 @@ public class MysqlConnection implements ErosaConnection {
     /**
      * 加速主备切换时的查找速度,做一些特殊优化,比如只解析事务头或者尾
      */
-    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
+    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException {
         updateSettings();
         loadBinlogChecksum();
         sendBinlogDump(binlogfilename, binlogPosition);
@@ -130,6 +131,11 @@ public class MysqlConnection implements ErosaConnection {
         decoder.handle(LogEvent.QUERY_EVENT);
         decoder.handle(LogEvent.XID_EVENT);
         LogContext context = new LogContext();
+        // 若entry position存在gtid,则使用传入的gtid作为gtidSet 的拼接标准,否则同时开启gtid和tsdb时,会导致丢失初始化传入的gtid
+        if (StringUtils.isNotEmpty(gtid)) {
+            decoder.handle(LogEvent.GTID_LOG_EVENT);
+            context.setGtidSet(MysqlGTIDSet.parse(gtid));
+        }
         context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
         while (fetcher.fetch()) {
             accumulateReceivedBytes(fetcher.limit());

+ 4 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -527,7 +527,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         // 针对开始的第一条为非Begin记录,需要从该binlog扫描
         final java.util.concurrent.atomic.AtomicLong preTransactionStartPosition = new java.util.concurrent.atomic.AtomicLong(0L);
         mysqlConnection.reconnect();
-        mysqlConnection.seek(entryPosition.getJournalName(), 4L, new SinkFunction<LogEvent>() {
+        mysqlConnection.seek(entryPosition.getJournalName(), 4L, entryPosition.getGtid(), new SinkFunction<LogEvent>() {
 
             private LogPosition lastPosition;
 
@@ -740,7 +740,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         try {
             mysqlConnection.reconnect();
             // 开始遍历文件
-            mysqlConnection.seek(searchBinlogFile, 4L, new SinkFunction<LogEvent>() {
+            mysqlConnection.seek(searchBinlogFile, 4L, endPosition.getGtid(), new SinkFunction<LogEvent>() {
 
                 private LogPosition lastPosition;
 
@@ -754,6 +754,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                                 event.getLogPos() - event.getEventLen(),
                                 event.getWhen() * 1000,
                                 event.getServerId());
+                            entryPosition.setGtid(event.getHeader().getGtidSetStr());
                             logPosition.setPostion(entryPosition);
                         }
 
@@ -813,9 +814,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         }
 
         if (logPosition.getPostion() != null) {
-            EntryPosition position = logPosition.getPostion();
-            position.setGtid(endPosition.getGtid());
-            return position;
+            return logPosition.getPostion();
         } else {
             return null;
         }