Browse Source

fixed issue #449 , optimize findTransactionBeginPosition

agapple 7 years ago
parent
commit
3104b4cc24

+ 21 - 58
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -8,7 +8,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.TimerTask;
 import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
@@ -500,10 +499,10 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     // 主要考虑一个事务执行时间可能会几秒种,如果仅仅按照timestamp相同,则可能会丢失事务的前半部分数据
     // 主要考虑一个事务执行时间可能会几秒种,如果仅仅按照timestamp相同,则可能会丢失事务的前半部分数据
     private Long findTransactionBeginPosition(ErosaConnection mysqlConnection, final EntryPosition entryPosition)
     private Long findTransactionBeginPosition(ErosaConnection mysqlConnection, final EntryPosition entryPosition)
                                                                                                                  throws IOException {
                                                                                                                  throws IOException {
-        // 尝试找到一个合适的位置
-        final AtomicBoolean reDump = new AtomicBoolean(false);
+        // 针对开始的第一条为非Begin记录,需要从该binlog扫描
+        final AtomicLong preTransactionStartPosition = new AtomicLong(0L);
         mysqlConnection.reconnect();
         mysqlConnection.reconnect();
-        mysqlConnection.seek(entryPosition.getJournalName(), entryPosition.getPosition(), new SinkFunction<LogEvent>() {
+        mysqlConnection.seek(entryPosition.getJournalName(), 4L, new SinkFunction<LogEvent>() {
 
 
             private LogPosition lastPosition;
             private LogPosition lastPosition;
 
 
@@ -514,69 +513,33 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         return true;
                         return true;
                     }
                     }
 
 
-                    // 直接查询第一条业务数据,确认是否为事务Begin/End
-                    if (CanalEntry.EntryType.TRANSACTIONBEGIN == entry.getEntryType()
-                        || CanalEntry.EntryType.TRANSACTIONEND == entry.getEntryType()) {
-                        lastPosition = buildLastPosition(entry);
-                        return false;
-                    } else {
-                        reDump.set(true);
-                        lastPosition = buildLastPosition(entry);
-                        return false;
+                    // 直接查询第一条业务数据,确认是否为事务Begin
+                    // 记录一下transaction begin position
+                    if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                        && entry.getHeader().getLogfileOffset() < entryPosition.getPosition()) {
+                        preTransactionStartPosition.set(entry.getHeader().getLogfileOffset());
                     }
                     }
+
+                    if (entry.getHeader().getLogfileOffset() >= entryPosition.getPosition()) {
+                        return false;// 退出
+                    }
+
+                    lastPosition = buildLastPosition(entry);
                 } catch (Exception e) {
                 } catch (Exception e) {
-                    // 上一次记录的poistion可能为一条update/insert/delete变更事件,直接进行dump的话,会缺少tableMap事件,导致tableId未进行解析
                     processSinkError(e, lastPosition, entryPosition.getJournalName(), entryPosition.getPosition());
                     processSinkError(e, lastPosition, entryPosition.getJournalName(), entryPosition.getPosition());
-                    reDump.set(true);
                     return false;
                     return false;
                 }
                 }
+
+                return running;
             }
             }
         });
         });
-        // 针对开始的第一条为非Begin记录,需要从该binlog扫描
-        if (reDump.get()) {
-            final AtomicLong preTransactionStartPosition = new AtomicLong(0L);
-            mysqlConnection.reconnect();
-            mysqlConnection.seek(entryPosition.getJournalName(), 4L, new SinkFunction<LogEvent>() {
-
-                private LogPosition lastPosition;
-
-                public boolean sink(LogEvent event) {
-                    try {
-                        CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true);
-                        if (entry == null) {
-                            return true;
-                        }
-
-                        // 直接查询第一条业务数据,确认是否为事务Begin
-                        // 记录一下transaction begin position
-                        if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
-                            && entry.getHeader().getLogfileOffset() < entryPosition.getPosition()) {
-                            preTransactionStartPosition.set(entry.getHeader().getLogfileOffset());
-                        }
-
-                        if (entry.getHeader().getLogfileOffset() >= entryPosition.getPosition()) {
-                            return false;// 退出
-                        }
-
-                        lastPosition = buildLastPosition(entry);
-                    } catch (Exception e) {
-                        processSinkError(e, lastPosition, entryPosition.getJournalName(), entryPosition.getPosition());
-                        return false;
-                    }
-
-                    return running;
-                }
-            });
 
 
-            // 判断一下找到的最接近position的事务头的位置
-            if (preTransactionStartPosition.get() > entryPosition.getPosition()) {
-                logger.error("preTransactionEndPosition greater than startPosition from zk or localconf, maybe lost data");
-                throw new CanalParseException("preTransactionStartPosition greater than startPosition from zk or localconf, maybe lost data");
-            }
-            return preTransactionStartPosition.get();
-        } else {
-            return entryPosition.getPosition();
+        // 判断一下找到的最接近position的事务头的位置
+        if (preTransactionStartPosition.get() > entryPosition.getPosition()) {
+            logger.error("preTransactionEndPosition greater than startPosition from zk or localconf, maybe lost data");
+            throw new CanalParseException("preTransactionStartPosition greater than startPosition from zk or localconf, maybe lost data");
         }
         }
+        return preTransactionStartPosition.get();
     }
     }
 
 
     // 根据时间查找binlog位置
     // 根据时间查找binlog位置