瀏覽代碼

修复rds高可用主从切换和oss相关问题 (#3480)

* 修复rds oos 拉取的binlog 的消费逻辑以及本地消费到直连消费的逻辑衔接

* 去除无用代码
ruanjl 3 年之前
父節點
當前提交
911711248c

+ 49 - 13
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -2,7 +2,9 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
+import java.util.Set;
 
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
@@ -25,7 +27,7 @@ import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
 
 
 /**
 /**
  * local bin log connection (not real connection)
  * local bin log connection (not real connection)
- * 
+ *
  * @author yuanzu Date: 12-9-27 Time: 下午6:14
  * @author yuanzu Date: 12-9-27 Time: 下午6:14
  */
  */
 public class LocalBinLogConnection implements ErosaConnection {
 public class LocalBinLogConnection implements ErosaConnection {
@@ -37,11 +39,28 @@ public class LocalBinLogConnection implements ErosaConnection {
     private int                 bufferSize = 16 * 1024;
     private int                 bufferSize = 16 * 1024;
     private boolean             running    = false;
     private boolean             running    = false;
     private long                serverId;
     private long                serverId;
+
+    /** rdsOosMode binlog 的 serverId 是两个 */
+    private boolean             isRdsOssMode = false;
+
+    /** rdsOosMode 主从信息 */
+    private final Set<Long> rdsOssMasterSlaveInfo = new HashSet<>(4);
+
+    private boolean firstUpdateRdsOssMasterSlave = true;
+
     private FileParserListener  parserListener;
     private FileParserListener  parserListener;
 
 
     public LocalBinLogConnection(){
     public LocalBinLogConnection(){
     }
     }
 
 
+    public boolean isRdsOssMode() {
+        return isRdsOssMode;
+    }
+
+    public void setRdsOssMode(boolean rdsOssMode) {
+        isRdsOssMode = rdsOssMode;
+    }
+
     public LocalBinLogConnection(String directory, boolean needWait){
     public LocalBinLogConnection(String directory, boolean needWait){
         this.needWait = needWait;
         this.needWait = needWait;
         this.directory = directory;
         this.directory = directory;
@@ -94,9 +113,7 @@ public class LocalBinLogConnection implements ErosaConnection {
                     if (event == null) {
                     if (event == null) {
                         continue;
                         continue;
                     }
                     }
-                    if (serverId != 0 && event.getServerId() != serverId) {
-                        throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
-                    }
+                    checkServerId(event);
 
 
                     if (!func.sink(event)) {
                     if (!func.sink(event)) {
                         needContinue = false;
                         needContinue = false;
@@ -131,6 +148,30 @@ public class LocalBinLogConnection implements ErosaConnection {
         }
         }
     }
     }
 
 
+    /**
+     * 1. 非 rdsOos 模式下需要要校验 serverId 是否一致 防止解析其他实例的 binlog
+     * 2. rdsOos 高可用模式下解析 binlog 会有两个 serverId,分别对应着主从节点 binlog解析出来的 serverId
+     * 主从的关系可能会变, 但是 serverId一直都会是这两个 serverId
+     *
+     * @param event
+     */
+    private void checkServerId(LogEvent event) {
+        if (serverId != 0 && event.getServerId() != serverId) {
+            if (isRdsOssMode()) {
+                // 第一次添加主从信息
+                if (firstUpdateRdsOssMasterSlave) {
+                    firstUpdateRdsOssMasterSlave = false;
+                    rdsOssMasterSlaveInfo.add(event.getServerId());
+                } else if (!rdsOssMasterSlaveInfo.contains(event.getServerId())) {
+                    // 主从节点信息之外的节点信息
+                    throw new ServerIdNotMatchException("unexpected rds serverId " + serverId + " in binlog file !");
+                }
+            } else {
+                throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
+            }
+        }
+    }
+
     public void dump(long timestampMills, SinkFunction func) throws IOException {
     public void dump(long timestampMills, SinkFunction func) throws IOException {
         List<File> currentBinlogs = binlogs.currentBinlogs();
         List<File> currentBinlogs = binlogs.currentBinlogs();
         File current = currentBinlogs.get(currentBinlogs.size() - 1);
         File current = currentBinlogs.get(currentBinlogs.size() - 1);
@@ -158,9 +199,7 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                 while (fetcher.fetch()) {
                     LogEvent event = decoder.decode(fetcher, context);
                     LogEvent event = decoder.decode(fetcher, context);
                     if (event != null) {
                     if (event != null) {
-                        if (serverId != 0 && event.getServerId() != serverId) {
-                            throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
-                        }
+                        checkServerId(event);
 
 
                         if (event.getWhen() > timestampSeconds) {
                         if (event.getWhen() > timestampSeconds) {
                             break;
                             break;
@@ -234,9 +273,7 @@ public class LocalBinLogConnection implements ErosaConnection {
                     if (event == null) {
                     if (event == null) {
                         continue;
                         continue;
                     }
                     }
-                    if (serverId != 0 && event.getServerId() != serverId) {
-                        throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
-                    }
+                    checkServerId(event);
 
 
                     if (!coprocessor.publish(event)) {
                     if (!coprocessor.publish(event)) {
                         needContinue = false;
                         needContinue = false;
@@ -304,9 +341,7 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                 while (fetcher.fetch()) {
                     LogEvent event = decoder.decode(fetcher, context);
                     LogEvent event = decoder.decode(fetcher, context);
                     if (event != null) {
                     if (event != null) {
-                        if (serverId != 0 && event.getServerId() != serverId) {
-                            throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
-                        }
+                        checkServerId(event);
 
 
                         if (event.getWhen() > timestampSeconds) {
                         if (event.getWhen() > timestampSeconds) {
                             break;
                             break;
@@ -404,6 +439,7 @@ public class LocalBinLogConnection implements ErosaConnection {
 
 
     public void setServerId(long serverId) {
     public void setServerId(long serverId) {
         this.serverId = serverId;
         this.serverId = serverId;
+        rdsOssMasterSlaveInfo.add(serverId);
     }
     }
 
 
     public void setParserListener(FileParserListener parserListener) {
     public void setParserListener(FileParserListener parserListener) {

+ 4 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -464,6 +464,10 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                     }
                     }
 
 
                     if (specificLogFilePosition == null) {
                     if (specificLogFilePosition == null) {
+                        if (isRdsOssMode()) {
+                            // 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
+                            return null;
+                        }
                         // position不存在,从文件头开始
                         // position不存在,从文件头开始
                         entryPosition.setPosition(BINLOG_START_OFFEST);
                         entryPosition.setPosition(BINLOG_START_OFFEST);
                         return entryPosition;
                         return entryPosition;

+ 6 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java

@@ -18,7 +18,7 @@ import com.alibaba.otter.canal.parse.exception.CanalParseException;
 
 
 /**
 /**
  * 维护binlog文件列表
  * 维护binlog文件列表
- * 
+ *
  * @author jianghang 2012-7-7 下午03:48:05
  * @author jianghang 2012-7-7 下午03:48:05
  * @version 1.0.0
  * @version 1.0.0
  */
  */
@@ -26,6 +26,7 @@ public class BinLogFileQueue {
 
 
     private String              baseName       = "mysql-bin.";
     private String              baseName       = "mysql-bin.";
     private List<File>          binlogs        = new ArrayList<>();
     private List<File>          binlogs        = new ArrayList<>();
+    private Pattern binLogPattern = Pattern.compile(baseName + "\\d+$");
     private File                directory;
     private File                directory;
     private ReentrantLock       lock           = new ReentrantLock();
     private ReentrantLock       lock           = new ReentrantLock();
     private Condition           nextCondition  = lock.newCondition();
     private Condition           nextCondition  = lock.newCondition();
@@ -77,7 +78,7 @@ public class BinLogFileQueue {
 
 
     /**
     /**
      * 根据前一个文件,获取符合条件的下一个binlog文件
      * 根据前一个文件,获取符合条件的下一个binlog文件
-     * 
+     *
      * @param pre
      * @param pre
      * @return
      * @return
      */
      */
@@ -141,7 +142,7 @@ public class BinLogFileQueue {
 
 
     /**
     /**
      * 根据前一个文件,获取符合条件的下一个binlog文件
      * 根据前一个文件,获取符合条件的下一个binlog文件
-     * 
+     *
      * @param pre
      * @param pre
      * @return
      * @return
      * @throws InterruptedException
      * @throws InterruptedException
@@ -219,9 +220,8 @@ public class BinLogFileQueue {
         files.addAll(FileUtils.listFiles(directory, new IOFileFilter() {
         files.addAll(FileUtils.listFiles(directory, new IOFileFilter() {
 
 
             public boolean accept(File file) {
             public boolean accept(File file) {
-                Pattern pattern = Pattern.compile("\\d+$");
-                Matcher matcher = pattern.matcher(file.getName());
-                return file.getName().startsWith(baseName) && matcher.find();
+                Matcher matcher = binLogPattern.matcher(file.getName());
+                return matcher.find();
             }
             }
 
 
             public boolean accept(File dir, String name) {
             public boolean accept(File dir, String name) {

+ 3 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java

@@ -123,7 +123,7 @@ public class BinlogDownloadQueue {
     public boolean isLastFile(String fileName) {
     public boolean isLastFile(String fileName) {
         String needCompareName = lastDownload;
         String needCompareName = lastDownload;
         if (StringUtils.isNotEmpty(needCompareName) && StringUtils.endsWith(needCompareName, "tar")) {
         if (StringUtils.isNotEmpty(needCompareName) && StringUtils.endsWith(needCompareName, "tar")) {
-            needCompareName = needCompareName.substring(0, needCompareName.indexOf("."));
+            needCompareName = needCompareName.substring(0, needCompareName.lastIndexOf("."));
         }
         }
         return (needCompareName == null || fileName.equalsIgnoreCase(needCompareName)) && binlogList.isEmpty();
         return (needCompareName == null || fileName.equalsIgnoreCase(needCompareName)) && binlogList.isEmpty();
     }
     }
@@ -223,7 +223,7 @@ public class BinlogDownloadQueue {
                 TarArchiveEntry tarArchiveEntry = null;
                 TarArchiveEntry tarArchiveEntry = null;
                 while ((tarArchiveEntry = tais.getNextTarEntry()) != null) {
                 while ((tarArchiveEntry = tais.getNextTarEntry()) != null) {
                     String name = tarArchiveEntry.getName();
                     String name = tarArchiveEntry.getName();
-                    File tarFile = new File(parentFile, name + ".tmp");
+                    File tarFile = new File(parentFile, name);
                     logger.info("start to download file " + tarFile.getName());
                     logger.info("start to download file " + tarFile.getName());
                     if (tarFile.exists()) {
                     if (tarFile.exists()) {
                         tarFile.delete();
                         tarFile.delete();
@@ -244,7 +244,7 @@ public class BinlogDownloadQueue {
                 }
                 }
                 tais.close();
                 tais.close();
             } else {
             } else {
-                File file = new File(parentFile, fileName + ".tmp");
+                File file = new File(parentFile, fileName);
                 if (file.exists()) {
                 if (file.exists()) {
                     file.delete();
                     file.delete();
                 }
                 }

+ 2 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java

@@ -21,7 +21,7 @@ import com.alibaba.otter.canal.protocol.position.LogPosition;
 
 
 /**
 /**
  * 基于rds binlog备份文件的复制
  * 基于rds binlog备份文件的复制
- * 
+ *
  * @author agapple 2017年10月15日 下午1:27:36
  * @author agapple 2017年10月15日 下午1:27:36
  * @since 1.0.25
  * @since 1.0.25
  */
  */
@@ -117,6 +117,7 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
             localBinLogConnection.setNeedWait(true);
             localBinLogConnection.setNeedWait(true);
             localBinLogConnection.setServerId(serverId);
             localBinLogConnection.setServerId(serverId);
             localBinLogConnection.setParserListener(this);
             localBinLogConnection.setParserListener(this);
+            localBinLogConnection.setRdsOssMode(true);
         }
         }
         return connection;
         return connection;
     }
     }