Browse Source

fixed issue #440 , optimize find position

agapple 7 years ago
parent
commit
5e206e4f7e

+ 37 - 24
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -374,7 +374,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             return findAsPerTimestampInSpecificLogFile(mysqlConnection,
                 startTimestamp,
                 endPosition,
-                endPosition.getJournalName());
+                endPosition.getJournalName(),
+                true);
         } else {
             return endPosition;
         }
@@ -388,7 +389,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             EntryPosition entryPosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
                 startTimestamp,
                 fixedPosition,
-                fixedPosition.getJournalName());
+                fixedPosition.getJournalName(),
+                true);
             if (entryPosition == null) {
                 throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position"
                                               + fixedPosition.getJournalName() + ":" + fixedPosition.getPosition());
@@ -446,7 +448,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                             specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
                                 entryPosition.getTimestamp(),
                                 endPosition,
-                                entryPosition.getJournalName());
+                                entryPosition.getJournalName(),
+                                true);
                         }
                     }
 
@@ -590,7 +593,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 EntryPosition entryPosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
                     startTimestamp,
                     endPosition,
-                    startSearchBinlogFile);
+                    startSearchBinlogFile,
+                    false);
                 if (entryPosition == null) {
                     if (StringUtils.equalsIgnoreCase(minBinlogFileName, startSearchBinlogFile)) {
                         // 已经找到最早的一个binlog,没必要往前找了
@@ -738,7 +742,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection,
                                                               final Long startTimestamp,
                                                               final EntryPosition endPosition,
-                                                              final String searchBinlogFile) {
+                                                              final String searchBinlogFile,
+                                                              final Boolean justForPositionTimestamp) {
 
         final LogPosition logPosition = new LogPosition();
         try {
@@ -752,6 +757,15 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                     EntryPosition entryPosition = null;
                     try {
                         CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true);
+                        if (justForPositionTimestamp && logPosition.getPostion() == null && event.getWhen() > 0) {
+                            // 初始位点
+                            entryPosition = new EntryPosition(searchBinlogFile,
+                                event.getLogPos(),
+                                event.getWhen() * 1000,
+                                event.getServerId());
+                            logPosition.setPostion(entryPosition);
+                        }
+
                         if (entry == null) {
                             return true;
                         }
@@ -761,21 +775,16 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         Long logposTimestamp = entry.getHeader().getExecuteTime();
                         Long serverId = entry.getHeader().getServerId();
 
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] {
-                                    logfilename, logfileoffset, logposTimestamp, startTimestamp });
-                        }
-                        // 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
-                        if (logposTimestamp >= startTimestamp) {
-                            return false;
-                        }
-
-                        if (entryPosition == null) {
-                            // 如果啥都找不到,就返回第一条Format Event的位点,一般是116位点
-                            entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
-                            logger.debug("set {} to be pending start position before finding another proper one...",
-                                entryPosition);
-                            logPosition.setPostion(entryPosition);
+                        if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())
+                            || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] {
+                                        logfilename, logfileoffset, logposTimestamp, startTimestamp });
+                            }
+                            // 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
+                            if (logposTimestamp >= startTimestamp) {
+                                return false;
+                            }
                         }
 
                         if (StringUtils.equals(endPosition.getJournalName(), logfilename)
@@ -788,14 +797,18 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         // data.length,代表该事务的下一条offest,避免多余的事务重复
                         if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
                             entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
-                            logger.debug("set {} to be pending start position before finding another proper one...",
-                                entryPosition);
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("set {} to be pending start position before finding another proper one...",
+                                    entryPosition);
+                            }
                             logPosition.setPostion(entryPosition);
                         } else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
                             // 当前事务开始位点
                             entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
-                            logger.debug("set {} to be pending start position before finding another proper one...",
-                                entryPosition);
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("set {} to be pending start position before finding another proper one...",
+                                    entryPosition);
+                            }
                             logPosition.setPostion(entryPosition);
                         }