|
@@ -527,7 +527,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
|
|
|
// 针对开始的第一条为非Begin记录,需要从该binlog扫描
|
|
|
final java.util.concurrent.atomic.AtomicLong preTransactionStartPosition = new java.util.concurrent.atomic.AtomicLong(0L);
|
|
|
mysqlConnection.reconnect();
|
|
|
- mysqlConnection.seek(entryPosition.getJournalName(), 4L, new SinkFunction<LogEvent>() {
|
|
|
+ mysqlConnection.seek(entryPosition.getJournalName(), 4L, entryPosition.getGtid(), new SinkFunction<LogEvent>() {
|
|
|
|
|
|
private LogPosition lastPosition;
|
|
|
|
|
@@ -740,7 +740,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
|
|
|
try {
|
|
|
mysqlConnection.reconnect();
|
|
|
// 开始遍历文件
|
|
|
- mysqlConnection.seek(searchBinlogFile, 4L, new SinkFunction<LogEvent>() {
|
|
|
+ mysqlConnection.seek(searchBinlogFile, 4L, endPosition.getGtid(), new SinkFunction<LogEvent>() {
|
|
|
|
|
|
private LogPosition lastPosition;
|
|
|
|
|
@@ -754,6 +754,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
|
|
|
event.getLogPos() - event.getEventLen(),
|
|
|
event.getWhen() * 1000,
|
|
|
event.getServerId());
|
|
|
+ entryPosition.setGtid(event.getHeader().getGtidSetStr());
|
|
|
logPosition.setPostion(entryPosition);
|
|
|
}
|
|
|
|
|
@@ -789,6 +790,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
|
|
|
entryPosition);
|
|
|
}
|
|
|
logPosition.setPostion(entryPosition);
|
|
|
+ entryPosition.setGtid(entry.getHeader().getGtid());
|
|
|
} else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
|
|
|
// 当前事务开始位点
|
|
|
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
|
|
@@ -796,6 +798,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
|
|
|
logger.debug("set {} to be pending start position before finding another proper one...",
|
|
|
entryPosition);
|
|
|
}
|
|
|
+ entryPosition.setGtid(entry.getHeader().getGtid());
|
|
|
logPosition.setPostion(entryPosition);
|
|
|
}
|
|
|
|