Explorar el Código

fixed issue #1076 , support rds binlog expired

七锋 hace 6 años
padre
commit
1d9e2ccedd

+ 16 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -69,6 +69,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     // update by yishun.chen,特殊异常处理参数
     private int                  dumpErrorCount                    = 0;        // binlogDump失败异常计数
     private int                  dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
+    private boolean              rdsOssMode                        = false;
 
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
@@ -352,7 +353,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 return logPosition.getPostion();
             }
 
-            if (masterPosition!=null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
+            if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
                 return masterPosition;
             }
         }
@@ -493,6 +494,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                         dumpErrorCount = 0;
                         return findPosition;
                     }
+
+                    Long timestamp = logPosition.getPostion().getTimestamp();
+                    if (isRdsOssMode() && (timestamp != null && timestamp > 0)) {
+                        // 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
+                        return null;
+                    }
                 }
                 // 其余情况
                 logger.warn("prepare to find start position just last position\n {}",
@@ -905,4 +912,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         this.dumpErrorCountThreshold = dumpErrorCountThreshold;
     }
 
+    public boolean isRdsOssMode() {
+        return rdsOssMode;
+    }
+
+    public void setRdsOssMode(boolean rdsOssMode) {
+        this.rdsOssMode = rdsOssMode;
+    }
+
 }

+ 5 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java

@@ -41,6 +41,7 @@ import org.apache.http.ssl.TrustStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
 
 /**
@@ -103,6 +104,9 @@ public class BinlogDownloadQueue {
 
     public BinlogFile tryOne() throws Throwable {
         BinlogFile binlogFile = binlogList.poll();
+        if (binlogFile == null) {
+            throw new CanalParseException("download binlog is null");
+        }
         download(binlogFile);
         hostId = binlogFile.getHostInstanceID();
         this.currentSize++;
@@ -162,6 +166,7 @@ public class BinlogDownloadQueue {
         this.currentSize = 0;
         binlogList.clear();
         downloadQueue.clear();
+        downloadThread = null;
     }
 
     private void download(BinlogFile binlogFile) throws Throwable {

+ 20 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

@@ -22,14 +22,14 @@ import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
  */
 public class RdsBinlogEventParserProxy extends MysqlEventParser {
 
-    private String                    rdsOpenApiUrl             = "https://rds.aliyuncs.com/";    // openapi地址
-    private String                    accesskey;                                                  // 云账号的ak
-    private String                    secretkey;                                                  // 云账号sk
-    private String                    instanceId;                                                 // rds实例id
-    private String                    directory;                                                  // binlog目录
-    private int                       batchFileSize             = 4;                              // 最多下载的binlog文件数量
-
-    private RdsLocalBinlogEventParser rdsLocalBinlogEventParser = new RdsLocalBinlogEventParser();
+    private String                    rdsOpenApiUrl             = "https://rds.aliyuncs.com/"; // openapi地址
+    private String                    accesskey;                                              // 云账号的ak
+    private String                    secretkey;                                              // 云账号sk
+    private String                    instanceId;                                             // rds实例id
+    private String                    directory;                                              // binlog目录
+    private int                       batchFileSize             = 4;                          // 最多下载的binlog文件数量
+
+    private RdsLocalBinlogEventParser rdsLocalBinlogEventParser = null;
     private ExecutorService           executorService           = Executors.newSingleThreadExecutor(new ThreadFactory() {
 
                                                                     @Override
@@ -43,8 +43,11 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
 
     @Override
     public void start() {
-        if (StringUtils.isNotEmpty(accesskey) && StringUtils.isNotEmpty(secretkey)
+        if (rdsLocalBinlogEventParser == null && StringUtils.isNotEmpty(accesskey) && StringUtils.isNotEmpty(secretkey)
             && StringUtils.isNotEmpty(instanceId)) {
+            rdsLocalBinlogEventParser = new RdsLocalBinlogEventParser();
+            // rds oss mode
+            setRdsOssMode(true);
             final ParserExceptionHandler targetHandler = this.getParserExceptionHandler();
             if (directory == null) {
                 directory = System.getProperty("java.io.tmpdir", "/tmp") + "/" + destination;
@@ -119,10 +122,18 @@ public class RdsBinlogEventParserProxy extends MysqlEventParser {
                         long serverId = rdsBinlogEventParserProxy.getServerId();
                         rdsLocalBinlogEventParser.setServerId(serverId);
                         rdsBinlogEventParserProxy.stop();
+                    } catch (Throwable e) {
+                        logger.info("handle exception failed", e);
+                    }
+
+                    try {
                         logger.info("start rds mysql binlog parser!");
                         rdsLocalBinlogEventParser.start();
                     } catch (Throwable e) {
                         logger.info("handle exception failed", e);
+                        rdsLocalBinlogEventParser.stop();
+                        RdsBinlogEventParserProxy rdsBinlogEventParserProxy = RdsBinlogEventParserProxy.this;
+                        rdsBinlogEventParserProxy.start();// 继续重试
                     }
                 }
             });

+ 9 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java

@@ -57,7 +57,11 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
             if (entryPosition == null) {
                 throw new PositionNotFoundException("position not found!");
             }
-            long startTimeInMill = entryPosition.getTimestamp();
+            Long startTimeInMill = entryPosition.getTimestamp();
+            if (startTimeInMill == null || startTimeInMill <= 0) {
+                throw new PositionNotFoundException("position timestamp is empty!");
+            }
+
             startTime = startTimeInMill;
             List<BinlogFile> binlogFiles = RdsBinlogOpenApi.listBinlogFiles(url,
                 accesskey,
@@ -65,6 +69,10 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
                 instanceId,
                 new Date(startTime),
                 new Date(endTime));
+            if (binlogFiles.isEmpty()) {
+                throw new CanalParseException("start timestamp : " + startTimeInMill + " binlog files is empty");
+            }
+
             binlogDownloadQueue = new BinlogDownloadQueue(binlogFiles, batchFileSize, directory);
             binlogDownloadQueue.silenceDownload();
             needWait = true;