Просмотр исходного кода

merge issue #5496 , 修复mysql rds oss订阅本地binlog过期需要再次订阅oss

jianghang.loujh 1 неделя назад
Родитель
Сommit
9f2fc7409f

+ 1 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerLogPurgedException.java

@@ -0,0 +1 @@
+package com.alibaba.otter.canal.parse.exception;

import com.alibaba.otter.canal.common.CanalException;

public class ServerLogPurgedException extends CanalException {

    public ServerLogPurgedException(String errorCode){
        super("ServerLogPurged by " + errorCode);
    }
}

+ 9 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java

@@ -5,10 +5,12 @@ import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedByInterruptException;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.parse.driver.mysql.socket.SocketChannel;
+import com.alibaba.otter.canal.parse.exception.ServerLogPurgedException;
 import com.taobao.tddl.dbsync.binlog.LogFetcher;
 
 /**
@@ -99,6 +101,13 @@ public class DirectLogFetcher extends LogFetcher {
                     final int errno = getInt16();
                     String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH);
                     String errmsg = getFixString(limit - position);
+                    if (StringUtils.containsIgnoreCase(errmsg, "not find first log file name")
+                        || StringUtils.containsIgnoreCase(errmsg, "purged binary logs")) {
+                        // 开始 dump 后,server 位点过期,DUMP 和 DUMP_GTID 两种错误信息
+                        throw new ServerLogPurgedException(
+                            " errno = " + errno + ", sqlstate = " + sqlstate + " errmsg = " + errmsg);
+                    }
+
                     throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate
                                           + " errmsg = " + errmsg);
                 } else if (mark == 254) {

+ 2 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

@@ -6,6 +6,7 @@ import java.util.concurrent.Executors;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
+import com.alibaba.otter.canal.parse.exception.ServerLogPurgedException;
 import com.alibaba.otter.canal.parse.inbound.ParserExceptionHandler;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
 
@@ -92,7 +93,7 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
     }
 
     private void handleMysqlParserException(Throwable throwable) {
-        if (throwable instanceof PositionNotFoundException) {
+        if (throwable instanceof PositionNotFoundException || throwable instanceof ServerLogPurgedException) {
             logger.info("remove rds not found position, try download rds binlog!");
             executorService.execute(() -> {
                 try {