Browse Source

fixed search by time

agapple 10 years ago
parent
commit
13bf7c85a0

+ 9 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -615,10 +615,11 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         Long logfileoffset = entry.getHeader().getLogfileOffset();
                         Long logposTimestamp = entry.getHeader().getExecuteTime();
 
-                        if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
+                        if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())
+                            || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
                             logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] {
                                     logfilename, logfileoffset, logposTimestamp, startTimestamp });
-                            // 寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
+                            // 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
                             if (logposTimestamp >= startTimestamp) {
                                 return false;
                             }
@@ -639,6 +640,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                             logger.debug("set {} to be pending start position before finding another proper one...",
                                 entryPosition);
                             logPosition.setPostion(entryPosition);
+                        } else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
+                            // 当前事务开始位点
+                            entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp);
+                            logger.debug("set {} to be pending start position before finding another proper one...",
+                                entryPosition);
+                            logPosition.setPostion(entryPosition);
                         }
 
                         lastPosition = buildLastPosition(entry);