فهرست منبع

feature(canal):support start with timestamp position(empty journal_file,set position=0),#2594

bucketli 5 سال پیش
والد
کامیت
3249d3c50d
1فایلهای تغییر یافته به همراه20 افزوده شده و 6 حذف شده
  1. 20 6
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

+ 20 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -493,12 +493,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                                     && logPosition.getPostion().getServerId() != null
                                     && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
                     if (case2) {
-                        long timestamp = logPosition.getPostion().getTimestamp();
-                        long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
-                        logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",
-                                logPosition.getPostion().getTimestamp() });
-                        EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
-                        // 重新置为一下
+                        EntryPosition findPosition = fallbackFindByStartTimestamp(logPosition, mysqlConnection);
                         dumpErrorCount = 0;
                         return findPosition;
                     }
@@ -508,6 +503,10 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         // 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
                         return null;
                     }
+                } else if (StringUtils.isBlank(logPosition.getPostion().getJournalName())
+                        && logPosition.getPostion().getPosition() <= 0
+                        && logPosition.getPostion().getTimestamp() > 0) {
+                    return fallbackFindByStartTimestamp(logPosition,mysqlConnection);
                 }
                 // 其余情况
                 logger.warn("prepare to find start position just last position\n {}",
@@ -523,6 +522,21 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         }
     }
 
+    /**
+     * find position by timestamp with a fallback interval seconds.
+     *
+     * @param logPosition
+     * @param mysqlConnection
+     * @return
+     */
+    protected EntryPosition fallbackFindByStartTimestamp(LogPosition logPosition,MysqlConnection mysqlConnection){
+        long timestamp = logPosition.getPostion().getTimestamp();
+        long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
+        logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",
+                logPosition.getPostion().getTimestamp() });
+        return findByStartTimeStamp(mysqlConnection, newStartTimestamp);
+    }
+
     // 根据想要的position,可能这个position对应的记录为rowdata,需要找到事务头,避免丢数据
     // 主要考虑一个事务执行时间可能会几秒种,如果仅仅按照timestamp相同,则可能会丢失事务的前半部分数据
     private Long findTransactionBeginPosition(ErosaConnection mysqlConnection, final EntryPosition entryPosition)