Browse Source

fixed rds binlog dump

七锋 6 years ago
parent
commit
ff3ccde62a

+ 5 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -353,7 +353,11 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         eventSink.interrupt();
 
         if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
-            multiStageCoprocessor.stop();
+            try {
+                multiStageCoprocessor.stop();
+            } catch (Throwable t) {
+                logger.debug("multi processor rejected:", t);
+            }
         }
 
         try {

+ 11 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -752,14 +752,13 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                             logPosition.setPostion(entryPosition);
                         }
 
-                        if (entry == null) {
-                            return true;
-                        }
-
-                        String logfilename = entry.getHeader().getLogfileName();
-                        Long logfileoffset = entry.getHeader().getLogfileOffset();
-                        Long logposTimestamp = entry.getHeader().getExecuteTime();
-                        Long serverId = entry.getHeader().getServerId();
+                        // 直接用event的位点来处理,解决一个binlog文件里没有任何事件导致死循环无法退出的问题
+                        String logfilename = event.getHeader().getLogFileName();
+                        // 记录的是binlog end offest,
+                        // 因为与其对比的offest是show master status里的end offest
+                        Long logfileoffset = event.getHeader().getLogPos();
+                        Long logposTimestamp = event.getHeader().getWhen() * 1000;
+                        Long serverId = event.getHeader().getServerId();
 
                         // 如果最小的一条记录都不满足条件,可直接退出
                         if (logposTimestamp >= startTimestamp) {
@@ -771,6 +770,10 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                             return false;
                         }
 
+                        if (entry == null) {
+                            return true;
+                        }
+
                         // 记录一下上一个事务结束的位置,即下一个事务的position
                         // position = current +
                         // data.length,代表该事务的下一条offest,避免多余的事务重复

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -567,8 +567,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     }
 
     private EntryPosition createPosition(LogHeader logHeader) {
-        return new EntryPosition(logHeader.getLogFileName(),
-            logHeader.getLogPos(),
+        return new EntryPosition(logHeader.getLogFileName(), logHeader.getLogPos() - logHeader.getEventLen(), // startPos
             logHeader.getWhen() * 1000L,
             logHeader.getServerId()); // 记录到秒
     }
@@ -818,6 +817,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         Header.Builder headerBuilder = Header.newBuilder();
         headerBuilder.setVersion(version);
         headerBuilder.setLogfileName(logHeader.getLogFileName());
+        // 记录的是该binlog的start offest
         headerBuilder.setLogfileOffset(logHeader.getLogPos() - logHeader.getEventLen());
         headerBuilder.setServerId(logHeader.getServerId());
         headerBuilder.setServerenCode(UTF_8);// 经过java输出后所有的编码为unicode

+ 9 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java

@@ -135,7 +135,7 @@ public class BinlogDownloadQueue {
         if (StringUtils.isNotEmpty(needCompareName) && StringUtils.endsWith(needCompareName, "tar")) {
             needCompareName = needCompareName.substring(0, needCompareName.indexOf("."));
         }
-        return fileName.equalsIgnoreCase(needCompareName) && binlogList.isEmpty();
+        return (needCompareName == null || fileName.equalsIgnoreCase(needCompareName)) && binlogList.isEmpty();
     }
 
     public void prepare() throws InterruptedException {
@@ -166,7 +166,14 @@ public class BinlogDownloadQueue {
         this.currentSize = 0;
         binlogList.clear();
         downloadQueue.clear();
-        downloadThread = null;
+        try {
+            downloadThread.interrupt();
+            downloadThread.join();// 等待其结束
+        } catch (InterruptedException e) {
+            // ignore
+        } finally {
+            downloadThread = null;
+        }
     }
 
     private void download(BinlogFile binlogFile) throws Throwable {