Parcourir la source

Merge branch 'branch-rds-support'

charles.lin il y a 6 ans
Parent
commit
ea6391d1c3
34 fichiers modifiés avec 1988 ajouts et 51 suppressions
  1. 30 0
      parse/src/main/java/com/alibaba/otter/canal/parse/exception/PositionNotFoundException.java
  2. 30 0
      parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerIdNotMatchException.java
  3. 29 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  4. 2 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java
  5. 9 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ParserExceptionHandler.java
  6. 48 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  7. 12 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  8. 4 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  9. 8 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  10. 10 3
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  11. 6 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
  12. 260 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java
  13. 149 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java
  14. 53 8
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java
  15. 130 29
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java
  16. 72 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/BinlogFile.java
  17. 62 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/DescribeBinlogFileResult.java
  18. 69 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsBackupPolicy.java
  19. 19 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsItem.java
  20. 250 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/AbstractRequest.java
  21. 41 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBackupPolicyRequest.java
  22. 56 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBinlogFilesRequest.java
  23. 191 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/HistoryTableMetaCache.java
  24. 20 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheInterface.java
  25. 105 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheWithStorage.java
  26. 55 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaEntry.java
  27. 18 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorage.java
  28. 9 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorageFactory.java
  29. 9 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/CacheConnectionNull.java
  30. 21 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/NoHistoryException.java
  31. 14 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaCallback.java
  32. 48 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorage.java
  33. 26 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorageFactory.java
  34. 123 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogEventParserProxyTest.java

+ 30 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/exception/PositionNotFoundException.java

@@ -0,0 +1,30 @@
+package com.alibaba.otter.canal.parse.exception;
+
+/**
+ * @author chengjin.lyf on 2018/7/20 下午2:54
+ * @since 1.0.25
+ */
+public class PositionNotFoundException extends CanalParseException {
+
+    private static final long serialVersionUID = -7382448928116244017L;
+
+    public PositionNotFoundException(String errorCode) {
+        super(errorCode);
+    }
+
+    public PositionNotFoundException(String errorCode, Throwable cause) {
+        super(errorCode, cause);
+    }
+
+    public PositionNotFoundException(String errorCode, String errorDesc) {
+        super(errorCode, errorDesc);
+    }
+
+    public PositionNotFoundException(String errorCode, String errorDesc, Throwable cause) {
+        super(errorCode, errorDesc, cause);
+    }
+
+    public PositionNotFoundException(Throwable cause) {
+        super(cause);
+    }
+}

+ 30 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerIdNotMatchException.java

@@ -0,0 +1,30 @@
+package com.alibaba.otter.canal.parse.exception;
+
+import com.alibaba.otter.canal.common.CanalException;
+
+/**
+ * @author chengjin.lyf on 2018/8/8 下午1:07
+ * @since 1.0.25
+ */
+public class ServerIdNotMatchException extends CanalException{
+
+    public ServerIdNotMatchException(String errorCode) {
+        super(errorCode);
+    }
+
+    public ServerIdNotMatchException(String errorCode, Throwable cause) {
+        super(errorCode, cause);
+    }
+
+    public ServerIdNotMatchException(String errorCode, String errorDesc) {
+        super(errorCode, errorDesc);
+    }
+
+    public ServerIdNotMatchException(String errorCode, String errorDesc, Throwable cause) {
+        super(errorCode, errorDesc, cause);
+    }
+
+    public ServerIdNotMatchException(Throwable cause) {
+        super(cause);
+    }
+}

+ 29 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,6 +8,7 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -94,6 +95,10 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                                                                                     .availableProcessors() * 60 / 100;     // 60%的能力跑解析,剩余部分处理网络
     protected int                                    parallelBufferSize         = 256;                                     // 必须为2的幂
     protected MultiStageCoprocessor                  multiStageCoprocessor;
+    protected ParserExceptionHandler                 parserExceptionHandler;
+    protected long serverId;
+
+
 
     protected abstract BinlogParser buildParser();
 
@@ -170,11 +175,16 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         preDump(erosaConnection);
 
                         erosaConnection.connect();// 链接
+
+                        long queryServerId = erosaConnection.queryServerId();
+                        if (queryServerId != 0){
+                            serverId = queryServerId;
+                        }
                         // 4. 获取最后的位置信息
                         EntryPosition position = findStartPosition(erosaConnection);
                         final EntryPosition startPosition = position;
                         if (startPosition == null) {
-                            throw new CanalParseException("can't find start position for " + destination);
+                            throw new PositionNotFoundException("can't find start position for " + destination);
                         }
 
                         if (!processTableMeta(startPosition)) {
@@ -277,6 +287,9 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                                 runningInfo.getAddress().toString()), e);
                             sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
                         }
+                        if (parserExceptionHandler!=null){
+                            parserExceptionHandler.handle(e);
+                        }
                     } finally {
                         // 重新置为中断状态
                         Thread.interrupted();
@@ -615,4 +628,19 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         this.parallelBufferSize = parallelBufferSize;
     }
 
+    public ParserExceptionHandler getParserExceptionHandler() {
+        return parserExceptionHandler;
+    }
+
+    public void setParserExceptionHandler(ParserExceptionHandler parserExceptionHandler) {
+        this.parserExceptionHandler = parserExceptionHandler;
+    }
+
+    public long getServerId() {
+        return serverId;
+    }
+
+    public void setServerId(long serverId) {
+        this.serverId = serverId;
+    }
 }

+ 2 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

@@ -40,4 +40,6 @@ public interface ErosaConnection {
     public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException;
 
     ErosaConnection fork();
+
+    public long queryServerId() throws IOException;
 }

+ 9 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ParserExceptionHandler.java

@@ -0,0 +1,9 @@
+package com.alibaba.otter.canal.parse.inbound;
+
+/**
+ * @author chengjin.lyf on 2018/7/20 下午3:55
+ * @since 1.0.25
+ */
+public interface ParserExceptionHandler {
+    void handle(Throwable e);
+}

+ 48 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -4,6 +4,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
+import com.alibaba.otter.canal.parse.exception.ServerIdNotMatchException;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -36,6 +37,9 @@ public class LocalBinLogConnection implements ErosaConnection {
     private String              directory;
     private int                 bufferSize = 16 * 1024;
     private boolean             running    = false;
+    private long                serverId;
+    private FileParserListener  parserListener;
+
 
     public LocalBinLogConnection(){
     }
@@ -96,6 +100,9 @@ public class LocalBinLogConnection implements ErosaConnection {
                     if (event == null) {
                         continue;
                     }
+                    if (serverId != 0 && event.getServerId() != serverId){
+                        throw new ServerIdNotMatchException("unexpected serverId "+serverId + " in binlog file !");
+                    }
 
                     if (!func.sink(event)) {
                         needContinue = false;
@@ -103,8 +110,9 @@ public class LocalBinLogConnection implements ErosaConnection {
                     }
                 }
 
+                fetcher.close(); // 关闭上一个文件
+                parserFinish(current.getName());
                 if (needContinue) {// 读取下一个
-                    fetcher.close(); // 关闭上一个文件
 
                     File nextFile;
                     if (needWait) {
@@ -160,6 +168,11 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                     LogEvent event = decoder.decode(fetcher, context);
                     if (event != null) {
+
+                        if (serverId != 0 && event.getServerId() != serverId){
+                            throw new ServerIdNotMatchException("unexpected serverId "+serverId + " in binlog file !");
+                        }
+
                         if (event.getWhen() > timestampSeconds) {
                             break;
                         }
@@ -228,8 +241,9 @@ public class LocalBinLogConnection implements ErosaConnection {
                     }
                 }
 
+                fetcher.close(); // 关闭上一个文件
+                parserFinish(binlogfilename);
                 if (needContinue) {// 读取下一个
-                    fetcher.close(); // 关闭上一个文件
 
                     File nextFile;
                     if (needWait) {
@@ -258,6 +272,12 @@ public class LocalBinLogConnection implements ErosaConnection {
         }
     }
 
+    private void parserFinish(String fileName){
+        if (parserListener != null){
+            parserListener.onFinish(fileName);
+        }
+    }
+
     @Override
     public void dump(long timestampMills, MultiStageCoprocessor coprocessor) throws IOException {
         List<File> currentBinlogs = binlogs.currentBinlogs();
@@ -286,6 +306,11 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                     LogEvent event = decoder.decode(fetcher, context);
                     if (event != null) {
+
+                        if (serverId != 0 && event.getServerId() != serverId){
+                            throw new ServerIdNotMatchException("unexpected serverId "+serverId + " in binlog file !");
+                        }
+
                         if (event.getWhen() > timestampSeconds) {
                             break;
                         }
@@ -344,6 +369,11 @@ public class LocalBinLogConnection implements ErosaConnection {
         return connection;
     }
 
+    @Override
+    public long queryServerId() {
+        return 0;
+    }
+
     public boolean isNeedWait() {
         return needWait;
     }
@@ -368,4 +398,20 @@ public class LocalBinLogConnection implements ErosaConnection {
         this.bufferSize = bufferSize;
     }
 
+    public long getServerId() {
+        return serverId;
+    }
+
+    public void setServerId(long serverId) {
+        this.serverId = serverId;
+    }
+
+    public void setParserListener(FileParserListener parserListener) {
+        this.parserListener = parserListener;
+    }
+
+    public interface FileParserListener{
+        void onFinish(String fileName);
+    }
+
 }

+ 12 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -9,8 +9,10 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
@@ -324,6 +326,16 @@ public class MysqlConnection implements ErosaConnection {
         return connection;
     }
 
+    @Override
+    public long queryServerId() throws IOException {
+        ResultSetPacket resultSetPacket = query("show variables like 'server_id'");
+        List<String> fieldValues = resultSetPacket.getFieldValues();
+        if (fieldValues == null || fieldValues.size() != 2){
+            return 0;
+        }
+        return NumberUtils.toLong(fieldValues.get(1));
+    }
+
     // ====================== help method ====================
 
     /**

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -53,11 +53,11 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private int                receiveBufferSize                 = 64 * 1024;
     private int                sendBufferSize                    = 64 * 1024;
     // 数据库信息
-    private AuthenticationInfo masterInfo;                                   // 主库
-    private AuthenticationInfo standbyInfo;                                  // 备库
+    protected AuthenticationInfo masterInfo;                                   // 主库
+    protected AuthenticationInfo standbyInfo;                                  // 备库
     // binlog信息
-    private EntryPosition      masterPosition;
-    private EntryPosition      standbyPosition;
+    protected EntryPosition      masterPosition;
+    protected EntryPosition      standbyPosition;
     private long               slaveId;                                      // 链接到mysql的slave
     // 心跳检查信息
     private String             detectingSQL;                                 // 心跳sql

+ 8 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -203,10 +203,15 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
             try {
                 LogBuffer buffer = event.getBuffer();
                 if (StringUtils.isNotEmpty(event.getBinlogFileName())
-                    && !context.getLogPosition().getFileName().equals(event.getBinlogFileName())) {
+                    && (context.getLogPosition() == null
+                    || !context.getLogPosition().getFileName().equals(event.getBinlogFileName()))) {
                     // set roate binlog file name
-                    context.setLogPosition(new LogPosition(event.getBinlogFileName(), context.getLogPosition()
-                        .getPosition()));
+                    if (context.getLogPosition() == null){
+                        context.setLogPosition(new LogPosition(event.getBinlogFileName(), 0));
+                    }else{
+                        context.setLogPosition(new LogPosition(event.getBinlogFileName(), context.getLogPosition()
+                                .getPosition()));
+                    }
                 }
 
                 LogEvent logEvent = decoder.decode(buffer, context);

+ 10 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -10,6 +10,9 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.NoHistoryException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -87,7 +90,10 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private volatile AviaterRegexFilter nameFilter;                                                          // 运行时引用可能会有变化,比如规则发生变化时
     private volatile AviaterRegexFilter nameBlackFilter;
 
-    private TableMetaCache              tableMetaCache;
+
+    private TableMetaCacheInterface tableMetaCache;
+    private String                      binlogFileName      = "mysql-bin.000001";
+
     private Charset                     charset             = Charset.defaultCharset();
     private boolean                     filterQueryDcl      = false;
     private boolean                     filterQueryDml      = false;
@@ -262,7 +268,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             if (!isSeek) {
                 // 使用新的表结构元数据管理方式
                 EntryPosition position = createPosition(event.getHeader());
-                tableMetaCache.apply(position, event.getDbName(), queryString, null);
+                String fulltbName = schemaName+"."+tableName;
+                tableMetaCache.apply(position, fulltbName, queryString, null);
             }
 
             Header header = createHeader(event.getHeader(), schemaName, tableName, type);
@@ -937,7 +944,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         this.nameBlackFilter = nameBlackFilter;
     }
 
-    public void setTableMetaCache(TableMetaCache tableMetaCache) {
+    public void setTableMetaCache(TableMetaCacheInterface tableMetaCache) {
         this.tableMetaCache = tableMetaCache;
     }
 

+ 6 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -6,6 +6,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
@@ -29,7 +30,7 @@ import com.google.common.cache.LoadingCache;
  * @author jianghang 2013-1-17 下午10:15:16
  * @version 1.0.0
  */
-public class TableMetaCache {
+public class TableMetaCache implements TableMetaCacheInterface {
 
     public static final String              COLUMN_NAME    = "COLUMN_NAME";
     public static final String              COLUMN_TYPE    = "COLUMN_TYPE";
@@ -99,6 +100,10 @@ public class TableMetaCache {
             String createDDL = packet.getFieldValues().get(1);
             MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
             memoryTableMeta.apply(DatabaseTableMeta.INIT_POSITION, schema, createDDL, null);
+            String[] strings = table.split("\\.");
+            if (strings.length > 1) {
+                table = strings[1];
+            }
             TableMeta tableMeta = memoryTableMeta.find(schema, table);
             return tableMeta.getFields();
         } else {

+ 260 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java

@@ -0,0 +1,260 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * @author chengjin.lyf on 2018/8/7 下午3:10
+ * @since 1.0.25
+ */
+public class BinlogDownloadQueue {
+
+    private static final Logger logger = LoggerFactory.getLogger(BinlogDownloadQueue.class);
+    private static final int      TIMEOUT             = 10000;
+
+    private LinkedBlockingQueue<BinlogFile> downloadQueue = new LinkedBlockingQueue<BinlogFile>();
+    private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
+    private LinkedList<BinlogFile> binlogList;
+    private final int batchSize;
+    private Thread downloadThread;
+    public boolean running = true;
+    private final String destDir;
+    private String hostId;
+    private int currentSize;
+    private String lastDownload;
+
+    public BinlogDownloadQueue(List<BinlogFile> downloadQueue, int batchSize, String destDir) throws IOException {
+        this.binlogList = new LinkedList(downloadQueue);
+        this.batchSize = batchSize;
+        this.destDir = destDir;
+        this.currentSize = 0;
+        prepareBinlogList();
+        cleanDir();
+    }
+
+    private void prepareBinlogList(){
+        for (BinlogFile binlog : this.binlogList) {
+            String fileName = StringUtils.substringBetween(binlog.getDownloadLink(), "mysql-bin.", "?");
+            binlog.setFileName(fileName);
+        }
+        Collections.sort(this.binlogList, new Comparator<BinlogFile>() {
+            @Override
+            public int compare(BinlogFile o1, BinlogFile o2) {
+                return o1.getFileName().compareTo(o2.getFileName());
+            }
+        });
+    }
+
+    public void cleanDir() throws IOException {
+        File destDirFile = new File(destDir);
+        FileUtils.forceMkdir(destDirFile);
+        FileUtils.cleanDirectory(destDirFile);
+    }
+
+    public void silenceDownload() {
+        if (downloadThread != null) {
+            return;
+        }
+        downloadThread = new Thread(new DownloadThread());
+        downloadThread.start();
+    }
+
+
+    public BinlogFile tryOne() throws IOException {
+        BinlogFile binlogFile = binlogList.poll();
+        download(binlogFile);
+        hostId = binlogFile.getHostInstanceID();
+        this.currentSize ++;
+        return binlogFile;
+    }
+
+    public void notifyNotMatch(){
+        this.currentSize --;
+        filter(hostId);
+    }
+
+    private void filter(String hostInstanceId){
+        Iterator<BinlogFile> it = binlogList.iterator();
+        while (it.hasNext()){
+            BinlogFile bf = it.next();
+            if(bf.getHostInstanceID().equalsIgnoreCase(hostInstanceId)){
+                it.remove();
+            }else{
+                hostId = bf.getHostInstanceID();
+            }
+        }
+    }
+
+    public boolean isLastFile(String fileName){
+        String needCompareName = lastDownload;
+        if (StringUtils.isNotEmpty(needCompareName) && StringUtils.endsWith(needCompareName, "tar")){
+            needCompareName = needCompareName.substring(0, needCompareName.indexOf("."));
+        }
+        return fileName.equalsIgnoreCase(needCompareName) && binlogList.isEmpty();
+    }
+
+    public void prepare() throws InterruptedException {
+        for (int i = this.currentSize; i < batchSize && !binlogList.isEmpty(); i++) {
+            BinlogFile binlogFile = null;
+            while (!binlogList.isEmpty()){
+                binlogFile = binlogList.poll();
+                if (!binlogFile.getHostInstanceID().equalsIgnoreCase(hostId)){
+                    continue;
+                }
+                break;
+            }
+            if (binlogFile == null){
+                break;
+            }
+            this.downloadQueue.put(binlogFile);
+            this.lastDownload = "mysql-bin." + binlogFile.getFileName();
+            this.currentSize ++;
+        }
+    }
+
+    public void downOne(){
+        this.currentSize --;
+    }
+
+    public void release(){
+        running = false;
+        this.currentSize = 0;
+        binlogList.clear();
+        downloadQueue.clear();
+    }
+
+    private void download(BinlogFile binlogFile) throws IOException {
+        String downloadLink = binlogFile.getDownloadLink();
+        String fileName = binlogFile.getFileName();
+        HttpGet httpGet = new HttpGet(downloadLink);
+        CloseableHttpClient httpClient = HttpClientBuilder.create()
+                .setMaxConnPerRoute(50)
+                .setMaxConnTotal(100)
+                .build();
+        RequestConfig requestConfig = RequestConfig.custom()
+                .setConnectTimeout(TIMEOUT)
+                .setConnectionRequestTimeout(TIMEOUT)
+                .setSocketTimeout(TIMEOUT)
+                .build();
+        httpGet.setConfig(requestConfig);
+        HttpResponse response = httpClient.execute(httpGet);
+        int statusCode = response.getStatusLine().getStatusCode();
+        if (statusCode != HttpResponseStatus.OK.code()) {
+            throw new RuntimeException("download failed , url:" + downloadLink + " , statusCode:"
+                                       + statusCode);
+        }
+        saveFile(new File(destDir), "mysql-bin." + fileName, response);
+    }
+
+    private static void saveFile(File parentFile, String fileName, HttpResponse response) throws IOException {
+        InputStream is = response.getEntity().getContent();
+        long totalSize = Long.parseLong(response.getFirstHeader("Content-Length").getValue());
+        if(response.getFirstHeader("Content-Disposition")!=null){
+            fileName = response.getFirstHeader("Content-Disposition").getValue();
+            fileName = StringUtils.substringAfter(fileName, "filename=");
+        }
+        boolean isTar = StringUtils.endsWith(fileName, ".tar");
+        FileUtils.forceMkdir(parentFile);
+        FileOutputStream fos = null;
+        try {
+            if (isTar) {
+                TarArchiveInputStream tais = new TarArchiveInputStream(is);
+                TarArchiveEntry tarArchiveEntry = null;
+                while ((tarArchiveEntry = tais.getNextTarEntry()) != null) {
+                    String name = tarArchiveEntry.getName();
+                    File tarFile = new File(parentFile, name + ".tmp");
+                    logger.info("start to download file " + tarFile.getName());
+                    BufferedOutputStream bos = null;
+                    try {
+                        bos = new BufferedOutputStream(new FileOutputStream(tarFile));
+                        int read = -1;
+                        byte[] buffer = new byte[1024];
+                        while ((read = tais.read(buffer)) != -1) {
+                            bos.write(buffer, 0, read);
+                        }
+                        logger.info("download file " + tarFile.getName() + " end!");
+                        tarFile.renameTo(new File(parentFile, name));
+                    } finally {
+                        IOUtils.closeQuietly(bos);
+                    }
+                }
+                tais.close();
+            } else {
+                File file = new File(parentFile, fileName + ".tmp");
+                if (!file.isFile()) {
+                    file.createNewFile();
+                }
+                try {
+                    fos = new FileOutputStream(file);
+                    byte[] buffer = new byte[1024];
+                    int len;
+                    long copySize = 0;
+                    long nextPrintProgress = 0;
+                    logger.info("start to download file " + file.getName());
+                    while ((len = is.read(buffer)) != -1) {
+                        fos.write(buffer, 0, len);
+                        copySize += len;
+                        long progress = copySize * 100 / totalSize;
+                        if (progress >= nextPrintProgress) {
+                            logger.info("download " + file.getName() + " progress : " + progress
+                                        + "% , download size : " + copySize + ", total size : " + totalSize);
+                            nextPrintProgress += 10;
+                        }
+                    }
+                    logger.info("download file " + file.getName() + " end!");
+                    fos.flush();
+                } finally {
+                    IOUtils.closeQuietly(fos);
+                }
+                file.renameTo(new File(parentFile, fileName));
+            }
+        } finally {
+            IOUtils.closeQuietly(fos);
+        }
+    }
+
+    public void execute(Runnable runnable) throws InterruptedException {
+        taskQueue.put(runnable);
+    }
+
+    private class DownloadThread implements Runnable {
+
+        @Override
+        public void run() {
+            while (running) {
+                try {
+                    BinlogFile binlogFile = downloadQueue.poll(5000, TimeUnit.MILLISECONDS);
+                    if (binlogFile != null){
+                        download(binlogFile);
+                    }
+                    Runnable runnable = taskQueue.poll(5000, TimeUnit.MILLISECONDS);
+                    if (runnable != null){
+                        runnable.run();
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+        }
+    }
+}

+ 149 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

@@ -0,0 +1,149 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
+import com.alibaba.otter.canal.parse.inbound.ParserExceptionHandler;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
+
+/**
+ * @author chengjin.lyf on 2018/7/20 上午10:52
+ * @since 1.0.25
+ */
+public class RdsBinlogEventParserProxy extends MysqlEventParser {
+
+    private String rdsOpenApiUrl = "https://rds.aliyuncs.com/"; // openapi地址
+    private String accesskey; // 云账号的ak
+    private String secretkey; // 云账号sk
+    private String instanceId; // rds实例id
+    private Long startTime;
+    private Long endTime;
+    private String directory; //binlog 目录
+    private int batchSize = 4; //最多下载的binlog文件数量
+
+    private RdsLocalBinlogEventParser rdsBinlogEventParser = new RdsLocalBinlogEventParser();
+    private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
+
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, "rds-binlog-daemon-thread");
+            t.setDaemon(true);
+            return t;
+        }
+    });
+
+    @Override
+    public void start() {
+        final ParserExceptionHandler targetHandler = this.getParserExceptionHandler();
+        rdsBinlogEventParser.setLogPositionManager(this.getLogPositionManager());
+        rdsBinlogEventParser.setDestination(destination);
+        rdsBinlogEventParser.setAlarmHandler(this.getAlarmHandler());
+        rdsBinlogEventParser.setConnectionCharset(this.connectionCharset);
+        rdsBinlogEventParser.setConnectionCharsetNumber(this.connectionCharsetNumber);
+        rdsBinlogEventParser.setEnableTsdb(this.enableTsdb);
+        rdsBinlogEventParser.setEventBlackFilter(this.eventBlackFilter);
+        rdsBinlogEventParser.setFilterQueryDcl(this.filterQueryDcl);
+        rdsBinlogEventParser.setFilterQueryDdl(this.filterQueryDdl);
+        rdsBinlogEventParser.setFilterQueryDml(this.filterQueryDml);
+        rdsBinlogEventParser.setFilterRows(this.filterRows);
+        rdsBinlogEventParser.setFilterTableError(this.filterTableError);
+        rdsBinlogEventParser.setIsGTIDMode(this.isGTIDMode);
+        rdsBinlogEventParser.setMasterInfo(this.masterInfo);
+        rdsBinlogEventParser.setEventFilter(this.eventFilter);
+        rdsBinlogEventParser.setMasterPosition(this.masterPosition);
+        rdsBinlogEventParser.setTransactionSize(this.transactionSize);
+        rdsBinlogEventParser.setUrl(this.rdsOpenApiUrl);
+        rdsBinlogEventParser.setAccesskey(this.accesskey);
+        rdsBinlogEventParser.setSecretkey(this.secretkey);
+        rdsBinlogEventParser.setInstanceId(this.instanceId);
+        rdsBinlogEventParser.setEventSink(eventSink);
+        rdsBinlogEventParser.setDirectory(directory);
+        rdsBinlogEventParser.setBatchSize(batchSize);
+        rdsBinlogEventParser.setFinishListener(new RdsLocalBinlogEventParser.ParseFinishListener() {
+            @Override
+            public void onFinish() {
+                executorService.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        rdsBinlogEventParser.stop();
+                        RdsBinlogEventParserProxy.this.start();
+                    }
+                });
+
+            }
+        });
+        this.setParserExceptionHandler(new ParserExceptionHandler() {
+
+            @Override
+            public void handle(Throwable e) {
+                handleMysqlParserException(e);
+                if (targetHandler != null) {
+                    targetHandler.handle(e);
+                }
+            }
+        });
+        super.start();
+    }
+
+    public void handleMysqlParserException(Throwable throwable) {
+        if (throwable instanceof PositionNotFoundException) {
+            logger.info("remove rds not found position, try download rds binlog!");
+            executorService.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        logger.info("stop mysql parser!");
+                        RdsBinlogEventParserProxy rdsBinlogEventParserProxy = RdsBinlogEventParserProxy.this;
+                        long serverId = rdsBinlogEventParserProxy.getServerId();
+                        rdsBinlogEventParser.setServerId(serverId);
+                        rdsBinlogEventParserProxy.stop();
+                        logger.info("start rds mysql binlog parser!");
+                        rdsBinlogEventParser.start();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+    }
+
+    @Override
+    public boolean isStart() {
+        return super.isStart();
+    }
+
+    public void setRdsOpenApiUrl(String rdsOpenApiUrl) {
+        this.rdsOpenApiUrl = rdsOpenApiUrl;
+    }
+
+
+    public void setAccesskey(String accesskey) {
+        this.accesskey = accesskey;
+    }
+
+
+    public void setSecretkey(String secretkey) {
+        this.secretkey = secretkey;
+    }
+
+
+    public void setInstanceId(String instanceId) {
+        this.instanceId = instanceId;
+    }
+
+    public void setDirectory(String directory) {
+        this.directory = directory;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+}

+ 53 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java

@@ -1,5 +1,9 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.rds;
 
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.DescribeBinlogFileResult;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsItem;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.request.DescribeBinlogFilesRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 import java.io.BufferedOutputStream;
@@ -9,16 +13,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.UUID;
+import java.util.*;
 
 import javax.crypto.Mac;
 import javax.crypto.SecretKey;
@@ -56,6 +55,52 @@ public class RdsBinlogOpenApi {
     private static final String   API_VERSION         = "2014-08-15";
     private static final String   SIGNATURE_VERSION   = "1.0";
 
+
+    public static List<BinlogFile> listBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
+                                                   Date endTime) {
+        DescribeBinlogFilesRequest request = new DescribeBinlogFilesRequest();
+        if (StringUtils.isNotEmpty(url)){
+            try {
+                URI uri = new URI(url);
+                request.setEndPoint(uri.getHost());
+            } catch (URISyntaxException e) {
+                logger.error("resolve url host failed, will use default rds endpoint!");
+            }
+        }
+        request.setStartDate(startTime);
+        request.setEndDate(endTime);
+        request.setPageNumber(1);
+        request.setPageSize(100);
+        request.setRdsInstanceId(dbInstanceId);
+        request.setAccessKeyId(ak);
+        request.setAccessKeySecret(sk);
+        DescribeBinlogFileResult result = null;
+        int retryTime = 3;
+        while (true){
+            try{
+                result = request.doAction();
+                break;
+            }catch (Exception e){
+                if(retryTime-- <= 0){
+                    throw new RuntimeException(e);
+                }
+                try {
+                    Thread.sleep(100L);
+                } catch (InterruptedException e1) {
+                }
+            }
+        }
+        if (result == null){
+            return Collections.EMPTY_LIST;
+        }
+        RdsItem rdsItem = result.getItems();
+        if (rdsItem != null){
+            return rdsItem.getBinLogFile();
+        }
+        return Collections.EMPTY_LIST;
+    }
+
+
     public static void downloadBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
                                            Date endTime, File destDir) throws Throwable {
         int pageSize = 100;

+ 130 - 29
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java

@@ -2,14 +2,23 @@ package com.alibaba.otter.canal.parse.inbound.mysql.rds;
 
 import java.io.File;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
 import org.springframework.util.Assert;
 
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
+import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
+import com.alibaba.otter.canal.parse.exception.ServerIdNotMatchException;
+import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
+import com.alibaba.otter.canal.parse.inbound.ParserExceptionHandler;
+import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinLogConnection;
 import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
 
 /**
  * 基于rds binlog备份文件的复制
@@ -17,47 +26,102 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
  * @author agapple 2017年10月15日 下午1:27:36
  * @since 1.0.25
  */
-public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements CanalEventParser {
+public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements CanalEventParser, LocalBinLogConnection.FileParserListener {
 
     private String url = "https://rds.aliyuncs.com/"; // openapi地址
-    private String accesskey;                        // 云账号的ak
-    private String secretkey;                        // 云账号sk
-    private String instanceId;                       // rds实例id
-    private Long   startTime;
-    private Long   endTime;
+    private String accesskey; // 云账号的ak
+    private String secretkey; // 云账号sk
+    private String instanceId; // rds实例id
+    private Long startTime;
+    private Long endTime;
+    private BinlogDownloadQueue binlogDownloadQueue;
+    private ParseFinishListener finishListener;
+    private int batchSize;
 
     public RdsLocalBinlogEventParser(){
     }
 
     public void start() throws CanalParseException {
         try {
-            Assert.notNull(startTime);
             Assert.notNull(accesskey);
             Assert.notNull(secretkey);
             Assert.notNull(instanceId);
             Assert.notNull(url);
+            Assert.notNull(directory);
+
             if (endTime == null) {
                 endTime = System.currentTimeMillis();
             }
 
-            RdsBinlogOpenApi.downloadBinlogFiles(url,
-                accesskey,
+            EntryPosition entryPosition = findStartPosition(null);
+            if (entryPosition == null) {
+                throw new PositionNotFoundException("position not found!");
+            }
+            long startTimeInMill = entryPosition.getTimestamp();
+            startTime = startTimeInMill;
+            List<BinlogFile> binlogFiles = RdsBinlogOpenApi.listBinlogFiles(url, accesskey,
                 secretkey,
                 instanceId,
                 new Date(startTime),
-                new Date(endTime),
-                new File(directory));
-
-            // 更新一下时间戳
-            masterPosition = new EntryPosition(startTime);
+                new Date(endTime));
+            binlogDownloadQueue = new BinlogDownloadQueue(binlogFiles, batchSize, directory);
+            binlogDownloadQueue.silenceDownload();
+            needWait = true;
+            parallel = false;
+            // try to download one file,use to test server id
+            binlogDownloadQueue.tryOne();
         } catch (Throwable e) {
             logger.error("download binlog failed", e);
             throw new CanalParseException(e);
         }
+        setParserExceptionHandler(new ParserExceptionHandler() {
 
+            @Override
+            public void handle(Throwable e) {
+                handleMysqlParserException(e);
+            }
+        });
         super.start();
     }
 
+    private void handleMysqlParserException(Throwable throwable) {
+        if (throwable instanceof ServerIdNotMatchException) {
+            logger.error("server id not match, try download another rds binlog!");
+            binlogDownloadQueue.notifyNotMatch();
+            try {
+                binlogDownloadQueue.cleanDir();
+                binlogDownloadQueue.prepare();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            try {
+                binlogDownloadQueue.execute(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        RdsLocalBinlogEventParser.super.stop();
+                        RdsLocalBinlogEventParser.super.start();
+                    }
+                });
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+
+        }
+    }
+
+    @Override
+    protected ErosaConnection buildErosaConnection() {
+        ErosaConnection connection = super.buildErosaConnection();
+        if (connection instanceof LocalBinLogConnection) {
+            LocalBinLogConnection localBinLogConnection = (LocalBinLogConnection) connection;
+            localBinLogConnection.setNeedWait(true);
+            localBinLogConnection.setServerId(serverId);
+            localBinLogConnection.setParserListener(this);
+        }
+        return connection;
+    }
+
     public String getUrl() {
         return url;
     }
@@ -68,44 +132,81 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
         }
     }
 
-    public String getAccesskey() {
-        return accesskey;
-    }
 
     public void setAccesskey(String accesskey) {
         this.accesskey = accesskey;
     }
 
-    public String getSecretkey() {
-        return secretkey;
-    }
 
     public void setSecretkey(String secretkey) {
         this.secretkey = secretkey;
     }
 
-    public String getInstanceId() {
-        return instanceId;
-    }
 
     public void setInstanceId(String instanceId) {
         this.instanceId = instanceId;
     }
 
-    public Long getStartTime() {
-        return startTime;
-    }
 
     public void setStartTime(Long startTime) {
         this.startTime = startTime;
     }
 
-    public Long getEndTime() {
-        return endTime;
-    }
 
     public void setEndTime(Long endTime) {
         this.endTime = endTime;
     }
 
+    @Override
+    public void onFinish(String fileName) {
+        try {
+            binlogDownloadQueue.downOne();
+            File needDeleteFile = new File(directory + File.separator + fileName);
+            if (needDeleteFile.exists()){
+                needDeleteFile.delete();
+            }
+            // 处理下logManager位点问题
+            LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
+            EntryPosition position = logPosition.getPostion();
+            if (position != null){
+                LogPosition newLogPosition = new LogPosition();
+                String journalName = position.getJournalName();
+                int sepIdx = journalName.indexOf(".");
+                String fileIndex = journalName.substring(sepIdx+1);
+                int index = NumberUtils.toInt(fileIndex) + 1;
+                String newJournalName = journalName.substring(0, sepIdx) + "." + StringUtils.leftPad(String.valueOf(index), fileIndex.length(), "0");
+                newLogPosition.setPostion(new EntryPosition(newJournalName, 4L, position.getTimestamp(), position.getServerId()));
+                newLogPosition.setIdentity(logPosition.getIdentity());
+                logPositionManager.persistLogPosition(destination, newLogPosition);
+            }
+
+            if (binlogDownloadQueue.isLastFile(fileName)) {
+                logger.info("all file parse complete, switch to mysql parser!");
+                finishListener.onFinish();
+                return;
+            }
+            binlogDownloadQueue.prepare();
+        } catch (Exception e) {
+            logger.error("prepare download binlog file failed!", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        this.binlogDownloadQueue.release();
+        super.stop();
+    }
+
+    public void setFinishListener(ParseFinishListener finishListener) {
+        this.finishListener = finishListener;
+    }
+
+    public interface ParseFinishListener{
+        void onFinish();
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
 }

+ 72 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/BinlogFile.java

@@ -0,0 +1,72 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
+
+/**
+ * @author chengjin.lyf on 2018/8/7 下午2:26
+ * @since 1.0.25
+ */
+public class BinlogFile {
+
+    private Long FileSize;
+    private String LogBeginTime;
+    private String LogEndTime;
+    private String DownloadLink;
+    private String HostInstanceID;
+    private String LinkExpiredTime;
+    private String fileName;
+
+    public Long getFileSize() {
+        return FileSize;
+    }
+
+    public void setFileSize(Long fileSize) {
+        FileSize = fileSize;
+    }
+
+    public String getLogBeginTime() {
+        return LogBeginTime;
+    }
+
+    public void setLogBeginTime(String logBeginTime) {
+        LogBeginTime = logBeginTime;
+    }
+
+    public String getLogEndTime() {
+        return LogEndTime;
+    }
+
+    public void setLogEndTime(String logEndTime) {
+        LogEndTime = logEndTime;
+    }
+
+    public String getDownloadLink() {
+        return DownloadLink;
+    }
+
+    public void setDownloadLink(String downloadLink) {
+        DownloadLink = downloadLink;
+    }
+
+    public String getHostInstanceID() {
+        return HostInstanceID;
+    }
+
+    public void setHostInstanceID(String hostInstanceID) {
+        HostInstanceID = hostInstanceID;
+    }
+
+    public String getLinkExpiredTime() {
+        return LinkExpiredTime;
+    }
+
+    public void setLinkExpiredTime(String linkExpiredTime) {
+        LinkExpiredTime = linkExpiredTime;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public void setFileName(String fileName) {
+        this.fileName = fileName;
+    }
+}

+ 62 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/DescribeBinlogFileResult.java

@@ -0,0 +1,62 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
+
+/**
+ * @author chengjin.lyf on 2018/8/7 下午2:26
+ * @since 1.0.25
+ */
+public class DescribeBinlogFileResult {
+    private RdsItem Items;
+    private long PageNumber;
+    private long TotalRecordCount;
+    private long TotalFileSize;
+    private String RequestId;
+    private long PageRecordCount;
+
+    public RdsItem getItems() {
+        return Items;
+    }
+
+    public void setItems(RdsItem items) {
+        Items = items;
+    }
+
+    public long getPageNumber() {
+        return PageNumber;
+    }
+
+    public void setPageNumber(long pageNumber) {
+        PageNumber = pageNumber;
+    }
+
+    public long getTotalRecordCount() {
+        return TotalRecordCount;
+    }
+
+    public void setTotalRecordCount(long totalRecordCount) {
+        TotalRecordCount = totalRecordCount;
+    }
+
+    public long getTotalFileSize() {
+        return TotalFileSize;
+    }
+
+    public void setTotalFileSize(long totalFileSize) {
+        TotalFileSize = totalFileSize;
+    }
+
+    public String getRequestId() {
+        return RequestId;
+    }
+
+    public void setRequestId(String requestId) {
+        RequestId = requestId;
+    }
+
+    public long getPageRecordCount() {
+        return PageRecordCount;
+    }
+
+    public void setPageRecordCount(long pageRecordCount) {
+        PageRecordCount = pageRecordCount;
+    }
+}

+ 69 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsBackupPolicy.java

@@ -0,0 +1,69 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
+
+/**
+ * @author chengjin.lyf on 2018/8/7 下午2:26
+ * @since 1.0.25
+ */
+public class RdsBackupPolicy {
+
+    /**
+     * 数据备份保留天数(7到730天)。
+     */
+    private String BackupRetentionPeriod;
+    /**
+     * 数据备份时间,格式:HH:mmZ- HH:mm Z。
+     */
+    private String PreferredBackupTime;
+    /**
+     * 数据备份周期。Monday:周一;Tuesday:周二;Wednesday:周三;Thursday:周四;Friday:周五;Saturday:周六;Sunday:周日。
+     */
+    private String PreferredBackupPeriod;
+    /**
+     * 日志备份状态。Enable:开启;Disabled:关闭。
+     */
+    private boolean BackupLog;
+    /**
+     * 日志备份保留天数(7到730天)。
+     */
+    private int LogBackupRetentionPeriod;
+
+    public String getBackupRetentionPeriod() {
+        return BackupRetentionPeriod;
+    }
+
+    public void setBackupRetentionPeriod(String backupRetentionPeriod) {
+        BackupRetentionPeriod = backupRetentionPeriod;
+    }
+
+    public String getPreferredBackupTime() {
+        return PreferredBackupTime;
+    }
+
+    public void setPreferredBackupTime(String preferredBackupTime) {
+        PreferredBackupTime = preferredBackupTime;
+    }
+
+    public String getPreferredBackupPeriod() {
+        return PreferredBackupPeriod;
+    }
+
+    public void setPreferredBackupPeriod(String preferredBackupPeriod) {
+        PreferredBackupPeriod = preferredBackupPeriod;
+    }
+
+    public boolean isBackupLog() {
+        return BackupLog;
+    }
+
+    public void setBackupLog(boolean backupLog) {
+        BackupLog = backupLog;
+    }
+
+    public int getLogBackupRetentionPeriod() {
+        return LogBackupRetentionPeriod;
+    }
+
+    public void setLogBackupRetentionPeriod(int logBackupRetentionPeriod) {
+        LogBackupRetentionPeriod = logBackupRetentionPeriod;
+    }
+}

+ 19 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsItem.java

@@ -0,0 +1,19 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
+
+import java.util.List;
+
+/**
+ * @author chengjin.lyf on 2018/8/7 下午2:26
+ * @since 1.0.25
+ */
+public class RdsItem {
+    private List<BinlogFile> BinLogFile;
+
+    public List<BinlogFile> getBinLogFile() {
+        return BinLogFile;
+    }
+
+    public void setBinLogFile(List<BinlogFile> binLogFile) {
+        BinLogFile = binLogFile;
+    }
+}

+ 250 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/AbstractRequest.java

@@ -0,0 +1,250 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds.request;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import javax.net.ssl.SSLContext;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustStrategy;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.http.util.EntityUtils;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * @author chengjin.lyf on 2018/8/7 下午2:26
+ * @since 1.0.25
+ */
+public abstract class AbstractRequest<T> {
+
+    /**
+     * 要求的编码格式
+     */
+    private static final String ENCODING = "UTF-8";
+    /**
+     * 要求的sign签名算法
+     */
+    private static final String MAC_NAME = "HmacSHA1";
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    /**
+     *  api 版本
+     *
+     */
+    private String version;
+
+    private String endPoint = "rds.aliyuncs.com";
+
+    private String protocol = "http";
+
+    public void setProtocol(String protocol) {
+        this.protocol = protocol;
+    }
+
+    private int timeout = (int) TimeUnit.MINUTES.toMillis(1);
+
+
+    private Map<String, String> treeMap = new TreeMap();
+
+    public void putQueryString(String name, String value){
+        if (StringUtils.isBlank(name) || StringUtils.isBlank(value)){
+            return;
+        }
+        treeMap.put(name, value);
+    }
+
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+
+    public void setEndPoint(String endPoint) {
+        this.endPoint = endPoint;
+    }
+
+    public void setAccessKeyId(String accessKeyId) {
+        this.accessKeyId = accessKeyId;
+    }
+
+    public void setAccessKeySecret(String accessKeySecret) {
+        this.accessKeySecret = accessKeySecret;
+    }
+
+    /**
+     * 使用 HMAC-SHA1 签名方法对对encryptText进行签名
+     *
+     * @param encryptText 被签名的字符串
+     * @param encryptKey 密钥
+     * @return
+     * @throws Exception
+     */
+    private byte[] HmacSHA1Encrypt(String encryptText, String encryptKey) throws Exception {
+        byte[] data = encryptKey.getBytes(ENCODING);
+        // 根据给定的字节数组构造一个密钥,第二参数指定一个密钥算法的名称
+        SecretKey secretKey = new SecretKeySpec(data, MAC_NAME);
+        // 生成一个指定 Mac 算法 的 Mac 对象
+        Mac mac = Mac.getInstance(MAC_NAME);
+        // 用给定密钥初始化 Mac 对象
+        mac.init(secretKey);
+
+        byte[] text = encryptText.getBytes(ENCODING);
+        // 完成 Mac 操作
+        return mac.doFinal(text);
+    }
+
+    private String base64(byte input[]) throws UnsupportedEncodingException {
+        return new String(Base64.encodeBase64(input), ENCODING);
+    }
+
+    private String concatQueryString(Map<String, String> parameters) throws UnsupportedEncodingException {
+        if (null == parameters) {
+            return null;
+        }
+        StringBuilder urlBuilder = new StringBuilder("");
+        for (Map.Entry<String, String> entry : parameters.entrySet()) {
+            String key = entry.getKey();
+            String val = entry.getValue();
+            urlBuilder.append(encode(key));
+            if (val != null) {
+                urlBuilder.append("=").append(encode(val));
+            }
+            urlBuilder.append("&");
+        }
+        int strIndex = urlBuilder.length();
+        if (parameters.size() > 0) {
+            urlBuilder.deleteCharAt(strIndex - 1);
+        }
+        return urlBuilder.toString();
+    }
+
+    private String encode(String value) throws UnsupportedEncodingException {
+        return URLEncoder.encode(value, "UTF-8");
+    }
+
+    private String makeSignature(TreeMap<String, String> paramMap) throws Exception {
+        String cqs = concatQueryString(paramMap);
+        cqs = encode(cqs);
+        cqs = cqs.replaceAll("\\+", "%20");
+        cqs = cqs.replaceAll("\\*", "%2A");
+        cqs = cqs.replaceAll("%7E", "~");
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("GET").append("&").append(encode("/")).append("&").append(cqs);
+        return base64(HmacSHA1Encrypt(stringBuilder.toString(), accessKeySecret + "&"));
+    }
+
+    public final String formatUTCTZ(Date date) {
+        SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss'Z'");
+        sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+        return sdf.format(date);
+    }
+
+    private void fillCommonParam(Map<String, String> p) {
+        p.put("Format", "JSON");
+        p.put("Version", version);
+        p.put("AccessKeyId", accessKeyId);
+        p.put("SignatureMethod", "HMAC-SHA1"); //此处不能用变量 MAC_NAME
+        p.put("Timestamp", formatUTCTZ(new Date()));
+        p.put("SignatureVersion", "1.0");
+        p.put("SignatureNonce", UUID.randomUUID().toString());
+    }
+
+    private String makeRequestString(Map<String, String> param) throws Exception {
+        fillCommonParam(param);
+        String sign = makeSignature(new TreeMap<String, String>(param));
+        StringBuilder builder = new StringBuilder();
+        for (Map.Entry<String, String> entry : param.entrySet()) {
+            builder.append(encode(entry.getKey())).append("=").append(encode(entry.getValue())).append("&");
+        }
+        builder.append("Signature").append("=").append(sign);
+        return builder.toString();
+    }
+
+    /**
+     * 执行http请求
+     *
+     * @param getMethod
+     * @return
+     * @throws IOException
+     */
+    private final HttpResponse executeHttpRequest(HttpGet getMethod, String host) throws Exception {
+        SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() {
+
+            @Override
+            public boolean isTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+                return true;
+            }
+        }).build();
+        SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext,
+                new String[] { "TLSv1" },
+                null,
+                SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+        Registry registry = RegistryBuilder.create()
+                .register("http", PlainConnectionSocketFactory.INSTANCE)
+                .register("https", sslsf)
+                .build();
+        HttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(registry);
+        CloseableHttpClient httpClient = HttpClientBuilder.create()
+                .setMaxConnPerRoute(50)
+                .setMaxConnTotal(100)
+                .setConnectionManager(httpClientConnectionManager)
+                .build();
+        RequestConfig requestConfig = RequestConfig.custom()
+                .setConnectTimeout(timeout)
+                .setConnectionRequestTimeout(timeout)
+                .setSocketTimeout(timeout)
+                .build();
+        getMethod.setConfig(requestConfig);
+        HttpResponse response = httpClient.execute(getMethod);
+        int statusCode = response.getStatusLine().getStatusCode();
+        if (statusCode != HttpResponseStatus.OK.code() && statusCode != HttpResponseStatus.PARTIAL_CONTENT.code()) {
+            String result = EntityUtils.toString(response.getEntity());
+            throw new RuntimeException("return error !" + response.getStatusLine().getReasonPhrase() + ", " + result);
+        }
+        return response;
+    }
+
+    protected abstract T processResult(HttpResponse response) throws Exception;
+
+    protected void processBefore(){
+
+    }
+
+    public final T  doAction() throws Exception {
+        processBefore();
+        String requestStr = makeRequestString(treeMap);
+        HttpGet httpGet = new HttpGet(protocol + "://" +endPoint + "?" + requestStr);
+        HttpResponse response = executeHttpRequest(httpGet, endPoint);
+        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+            String result = EntityUtils.toString(response.getEntity());
+            throw new RuntimeException("http request failed! " + result);
+        }
+        return processResult(response);
+    }
+}

+ 41 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBackupPolicyRequest.java

@@ -0,0 +1,41 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds.request;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsBackupPolicy;
+import org.apache.http.HttpResponse;
+import org.apache.http.util.EntityUtils;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+
+/**
+ * rds 备份策略查询
+ * @author chengjin.lyf on 2018/8/7 下午3:41
+ * @since 1.0.25
+ */
+public class DescribeBackupPolicyRequest extends AbstractRequest<RdsBackupPolicy> {
+
+
+    public DescribeBackupPolicyRequest() {
+        setVersion("2014-08-15");
+        putQueryString("Action", "DescribeBackupPolicy");
+
+    }
+
+
+    public void setRdsInstanceId(String rdsInstanceId) {
+        putQueryString("DBInstanceId", rdsInstanceId);
+    }
+
+    @Override
+    protected RdsBackupPolicy processResult(HttpResponse response) throws Exception {
+        String result = EntityUtils.toString(response.getEntity());
+        JSONObject jsonObj = JSON.parseObject(result);
+        RdsBackupPolicy policy = new RdsBackupPolicy();
+        policy.setBackupRetentionPeriod(jsonObj.getString("BackupRetentionPeriod"));
+        policy.setBackupLog(jsonObj.getString("BackupLog").equalsIgnoreCase("Enable"));
+        policy.setLogBackupRetentionPeriod(jsonObj.getIntValue("LogBackupRetentionPeriod"));
+        policy.setPreferredBackupPeriod(jsonObj.getString("PreferredBackupPeriod"));
+        policy.setPreferredBackupTime(jsonObj.getString("PreferredBackupTime"));
+        return policy;
+    }
+}

+ 56 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBinlogFilesRequest.java

@@ -0,0 +1,56 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.rds.request;
+
+import java.util.Date;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.DescribeBinlogFileResult;
+import org.apache.http.HttpResponse;
+import org.apache.http.util.EntityUtils;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+
+/**
+ * @author chengjin.lyf on 2018/8/7 下午3:41
+ * @since 1.0.25
+ */
+public class DescribeBinlogFilesRequest extends AbstractRequest<DescribeBinlogFileResult> {
+
+
+    public DescribeBinlogFilesRequest() {
+        setVersion("2014-08-15");
+        putQueryString("Action", "DescribeBinlogFiles");
+
+    }
+
+    public void setRdsInstanceId(String rdsInstanceId) {
+        putQueryString("DBInstanceId", rdsInstanceId);
+    }
+
+    public void setPageSize(int pageSize) {
+        putQueryString("PageSize", String.valueOf(pageSize));
+    }
+
+    public void setPageNumber(int pageNumber) {
+        putQueryString("PageNumber", String.valueOf(pageNumber));
+    }
+
+    public void setStartDate(Date startDate) {
+        putQueryString("StartTime" , formatUTCTZ(startDate));
+    }
+
+    public void setEndDate(Date endDate) {
+        putQueryString("EndTime" , formatUTCTZ(endDate));
+    }
+
+    public void setResourceOwnerId(Long resourceOwnerId) {
+        putQueryString("ResourceOwnerId", String.valueOf(resourceOwnerId));
+    }
+
+    @Override
+    protected DescribeBinlogFileResult processResult(HttpResponse response) throws Exception {
+        String result = EntityUtils.toString(response.getEntity());
+        DescribeBinlogFileResult describeBinlogFileResult = JSONObject.parseObject(result, new TypeReference<DescribeBinlogFileResult>() {
+        });
+        return describeBinlogFileResult;
+    }
+}

+ 191 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/HistoryTableMetaCache.java

@@ -0,0 +1,191 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.CacheConnectionNull;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.NoHistoryException;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+import java.io.IOException;
+import java.util.*;
+
+public class HistoryTableMetaCache {
+    private TableMetaStorage tableMetaStorage;
+    private MysqlConnection metaConnection;
+    private LoadingCache<String, Map<Long, TableMeta>> cache; // 第一层:数据库名.表名,第二层时间戳,TableMeta
+
+    public HistoryTableMetaCache() {
+        cache = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Long, TableMeta>>() {
+            @Override
+            public Map<Long, TableMeta> load(String tableName) throws Exception {
+                Long timestamp = new Date().getTime();
+                String[] strs = tableName.split("\\.");
+                String schema = strs[0];
+                if (tableMetaStorage != null) {
+                    init(tableMetaStorage.fetchByTableName(tableName)); // 从存储中读取表的历史ddl
+                }
+                ResultSetPacket resultSetPacket = connectionQuery("show create table " + tableName); // 获取当前ddl
+                String currentDdl = resultSetPacket.getFieldValues().get(1);
+                if (cache.asMap().containsKey(tableName)) {
+                    Map<Long, TableMeta> tableMetaMap = cache.getUnchecked(tableName);
+                    if (tableMetaMap.isEmpty()) {
+                        put(schema, tableName, currentDdl, timestamp - 1000L); // 放入当前schema,取时间为当前时间-1s
+                    } else {                                               // 如果table存在历史
+                        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
+                        Long firstTimestamp = iterator.next();
+                        TableMeta first = tableMetaMap.get(firstTimestamp); // 拿第一条ddl
+                        if (!first.getDdl().equalsIgnoreCase(currentDdl)) { // 当前ddl与历史第一条不一致,放入当前ddl
+                            put(schema, tableName, currentDdl, calculateNewTimestamp(firstTimestamp)); // 计算放入的timestamp,设为第一条时间+1s
+                        }
+                    }
+                } else {
+                    put(schema, tableName, currentDdl, timestamp - 1000L); // 放入当前schema
+                }
+                return cache.get(tableName);
+            }
+        });
+    }
+
+    public void init(List<TableMetaEntry> entries) throws IOException {
+        if (entries == null) {
+            return;
+        }
+        for (TableMetaEntry entry : entries) {
+            try {
+                put(entry.getSchema(), entry.getTable(), entry.getDdl(), entry.getTimestamp());
+            } catch (CacheConnectionNull cacheConnectionNull) {
+                cacheConnectionNull.printStackTrace();
+            }
+        }
+    }
+
+    public TableMeta put(String schema, String table, String ddl, Long timestamp) throws CacheConnectionNull, IOException {
+        ResultSetPacket resultSetPacket;
+        if (!(ddl.contains("CREATE TABLE") || ddl.contains("create table"))) { // 尝试直接从数据库拉取CREATE TABLE的DDL
+            resultSetPacket = connectionQuery("show create table " + table);
+            ddl = resultSetPacket.getFieldValues().get(1);
+        } else { // CREATE TABLE 的 DDL
+            resultSetPacket = new ResultSetPacket();
+            List<String> fields = new ArrayList<String>();
+            String[] strings = table.split("\\.");
+            String shortTable = table;
+            if (strings.length > 1) {
+                shortTable = strings[1];
+            }
+            fields.add(0, shortTable);
+            fields.add(1, ddl);
+            resultSetPacket.setFieldValues(fields);
+            if (metaConnection != null) {
+                resultSetPacket.setSourceAddress(metaConnection.getAddress());
+            }
+        }
+        Map<Long, TableMeta> tableMetaMap;
+        if (!cache.asMap().containsKey(table)) {
+            tableMetaMap = new TreeMap<Long, TableMeta>(new Comparator<Long>() {
+                @Override
+                public int compare(Long o1, Long o2) {
+                    return o2.compareTo(o1);
+                }
+            });
+            cache.put(table, tableMetaMap);
+        } else {
+            tableMetaMap = cache.getUnchecked(table);
+        }
+        eliminate(tableMetaMap); // 淘汰旧的TableMeta
+        TableMeta tableMeta = new TableMeta(schema, table, TableMetaCache.parseTableMeta(schema, table, resultSetPacket));
+        if (tableMeta.getDdl() == null) { // 生成的TableMeta有时DDL为null
+            tableMeta.setDdl(ddl);
+        }
+        tableMetaMap.put(timestamp, tableMeta);
+        return tableMeta;
+    }
+
+    public TableMeta get(String schema, String table, Long timestamp) throws NoHistoryException, CacheConnectionNull {
+        Map<Long, TableMeta> tableMetaMap = cache.getUnchecked(table);
+        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
+        Long selected = null;
+        while(iterator.hasNext()) {
+            Long temp = iterator.next();
+            if (timestamp > temp) {
+                selected = temp;
+                break;
+            }
+        }
+
+        if (selected == null) {
+            iterator = tableMetaMap.keySet().iterator();
+            if (iterator.hasNext()) {
+                selected = iterator.next();
+            } else {
+                throw new NoHistoryException(schema, table);
+            }
+        }
+
+        return tableMetaMap.get(selected);
+    }
+
+    public void clearTableMeta() {
+        cache.invalidateAll();
+    }
+
+    public void clearTableMetaWithSchemaName(String schema) {
+        for (String tableName : cache.asMap().keySet()) {
+            String[] strs = tableName.split("\\.");
+            if (schema.equalsIgnoreCase(strs[0])) {
+                cache.invalidate(tableName);
+            }
+        }
+    }
+
+    public void clearTableMeta(String schema, String table) {
+        if (!table.contains(".")) {
+            table = schema+"."+table;
+        }
+        cache.invalidate(table);
+    }
+
+    // eliminate older table meta in cache
+    private void eliminate(Map<Long, TableMeta> tableMetaMap) {
+        int MAX_CAPABILITY = 20;
+        if (tableMetaMap.keySet().size() < MAX_CAPABILITY) {
+            return;
+        }
+        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
+        while(iterator.hasNext()) {
+            iterator.next();
+        }
+        iterator.remove();
+    }
+
+    private Long calculateNewTimestamp(Long oldTimestamp) {
+        return oldTimestamp + 1000;
+    }
+
+    private ResultSetPacket connectionQuery(String query) throws CacheConnectionNull, IOException {
+        if (metaConnection == null) {
+            throw new CacheConnectionNull();
+        }
+        try {
+            return metaConnection.query(query);
+        } catch (IOException e) {
+            try {
+                metaConnection.reconnect();
+                return metaConnection.query(query);
+            } catch (IOException e1) {
+                throw e1;
+            }
+        }
+    }
+
+    public void setMetaConnection(MysqlConnection metaConnection) {
+        this.metaConnection = metaConnection;
+    }
+
+    public void setTableMetaStorage(TableMetaStorage tableMetaStorage) {
+        this.tableMetaStorage = tableMetaStorage;
+    }
+}

+ 20 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheInterface.java

@@ -0,0 +1,20 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+
+public interface TableMetaCacheInterface {
+
+    TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position);
+
+    void clearTableMeta();
+
+    void clearTableMetaWithSchemaName(String schema);
+
+    void clearTableMeta(String schema, String table);
+
+    boolean apply(EntryPosition position, String schema, String ddl, String extra);
+
+    boolean isOnRDS();
+
+}

+ 105 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheWithStorage.java

@@ -0,0 +1,105 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class TableMetaCacheWithStorage implements TableMetaCacheInterface {
+
+    private static Logger logger = LoggerFactory.getLogger(TableMetaCacheWithStorage.class);
+    private TableMetaStorage tableMetaStorage; // TableMeta存储
+    private HistoryTableMetaCache cache = new HistoryTableMetaCache(); // cache
+
+    public TableMetaCacheWithStorage(MysqlConnection con, TableMetaStorage tableMetaStorage) {
+        this.tableMetaStorage = tableMetaStorage;
+        InetSocketAddress address = con.getAddress();
+        this.tableMetaStorage.setDbAddress(address.getHostName()+":"+address.getPort());
+        cache.setMetaConnection(con);
+        cache.setTableMetaStorage(tableMetaStorage);
+        if (tableMetaStorage != null) {
+            try {
+                cache.init(tableMetaStorage.fetch()); // 初始化,从存储拉取TableMeta
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public boolean apply(EntryPosition position, String fullTableName, String ddl, String extra) {
+        String[] strs = fullTableName.split("\\.");
+        String schema = strs[0];
+        if (schema.equalsIgnoreCase("null")) { // ddl schema为null,放弃处理
+            return false;
+        }
+        try {
+            TableMeta tableMeta = cache.get(schema, fullTableName, position.getTimestamp());
+            if (!compare(tableMeta, ddl)) { // 获取最近的TableMeta,进行比对
+                TableMeta result = cache.put(schema, fullTableName, ddl, calTimestamp(position.getTimestamp()));
+                if (tableMetaStorage != null && result != null) { // 储存
+                    tableMetaStorage.store(schema, fullTableName, result.getDdl(), calTimestamp(position.getTimestamp()));
+                }
+            }
+            return true;
+        } catch (Exception e) {
+            logger.error(e.toString());
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean isOnRDS() {
+        return false;
+    }
+
+    /***
+     *
+     * @param schema dbname
+     * @param table tablename
+     * @param useCache unused
+     * @param position timestamp
+     * @return
+     */
+    @Override
+    public TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
+        String fulltbName = schema + "." + table;
+        try {
+            return cache.get(schema, fulltbName, position.getTimestamp());
+        } catch (Exception e) {
+            logger.error(e.toString());
+        }
+        return null;
+    }
+
+    @Override
+    public void clearTableMeta() {
+        cache.clearTableMeta();
+    }
+
+    @Override
+    public void clearTableMetaWithSchemaName(String schema) {
+        cache.clearTableMetaWithSchemaName(schema);
+    }
+
+    @Override
+    public void clearTableMeta(String schema, String table) {
+        cache.clearTableMeta(schema, table);
+    }
+
+    private boolean compare(TableMeta tableMeta, String ddl) {
+        if (tableMeta == null) {
+            return false;
+        }
+        return tableMeta.getDdl().equalsIgnoreCase(ddl);
+    }
+
+    private Long calTimestamp(Long timestamp) {
+        return timestamp;
+    }
+}

+ 55 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaEntry.java

@@ -0,0 +1,55 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import java.io.Serializable;
+
+public class TableMetaEntry implements Serializable {
+
+    private static final long serialVersionUID = -1350200637109107904L;
+
+    private String dbAddress;
+    private String schema;
+    private String table;
+    private String ddl;
+    private Long timestamp;
+
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getDdl() {
+        return ddl;
+    }
+
+    public void setDdl(String ddl) {
+        this.ddl = ddl;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public String getDbAddress() {
+        return dbAddress;
+    }
+
+    public void setDbAddress(String dbAddress) {
+        this.dbAddress = dbAddress;
+    }
+}

+ 18 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorage.java

@@ -0,0 +1,18 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import java.util.List;
+
+public interface TableMetaStorage {
+
+    void store(String schema, String table, String ddl, Long timestamp);
+
+    List<TableMetaEntry> fetch();
+
+    List<TableMetaEntry> fetchByTableName(String tableName);
+
+    String getDbName();
+
+    String getDbAddress();
+
+    void setDbAddress(String address);
+}

+ 9 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorageFactory.java

@@ -0,0 +1,9 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+public interface TableMetaStorageFactory {
+
+    TableMetaStorage getTableMetaStorage();
+
+    String getDbName();
+
+}

+ 9 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/CacheConnectionNull.java

@@ -0,0 +1,9 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception;
+
+public class CacheConnectionNull extends Exception{
+
+    @Override
+    public String toString() {
+        return "CacheConnectionNull";
+    }
+}

+ 21 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/NoHistoryException.java

@@ -0,0 +1,21 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception;
+
+public class NoHistoryException extends Exception{
+
+    private String dbName;
+    private String tbName;
+
+    public NoHistoryException(String dbName, String tbName) {
+        this.dbName = dbName;
+        this.tbName = tbName;
+    }
+
+    public void printTableName() {
+        System.out.println(dbName+"."+tbName);
+    }
+
+    @Override
+    public String toString() {
+        return "NioHistoryException: " + dbName + " " + tbName;
+    }
+}

+ 14 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaCallback.java

@@ -0,0 +1,14 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
+
+import java.util.List;
+
+public interface MySqlTableMetaCallback {
+
+    void save(String dbAddress, String schema, String table,String ddl, Long timestamp);
+
+    List<TableMetaEntry> fetch(String dbAddress, String dbName);
+
+    List<TableMetaEntry> fetch(String dbAddress, String dbName, String tableName);
+}

+ 48 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorage.java

@@ -0,0 +1,48 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
+
+import java.util.List;
+
+public class MySqlTableMetaStorage implements TableMetaStorage {
+    private MySqlTableMetaCallback mySqlTableMetaCallback;
+    private String dbName;
+    private String dbAddress;
+
+    MySqlTableMetaStorage(MySqlTableMetaCallback callback, String dbName) {
+        mySqlTableMetaCallback = callback;
+        this.dbName = dbName;
+    }
+
+
+    @Override
+    public void store(String schema, String table, String ddl, Long timestamp) {
+        mySqlTableMetaCallback.save(dbAddress, schema, table, ddl, timestamp);
+    }
+
+    @Override
+    public List<TableMetaEntry> fetch() {
+        return mySqlTableMetaCallback.fetch(dbAddress, dbName);
+    }
+
+    @Override
+    public List<TableMetaEntry> fetchByTableName(String tableName) {
+        return mySqlTableMetaCallback.fetch(dbAddress, dbName, tableName);
+    }
+
+    @Override
+    public String getDbName() {
+        return dbName;
+    }
+
+    @Override
+    public String getDbAddress() {
+        return dbAddress;
+    }
+
+    @Override
+    public void setDbAddress(String address) {
+        this.dbAddress = address;
+    }
+}

+ 26 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorageFactory.java

@@ -0,0 +1,26 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
+
+public class MySqlTableMetaStorageFactory implements TableMetaStorageFactory {
+
+    private MySqlTableMetaCallback mySQLTableMetaCallback;
+    private String dbName;
+
+    public MySqlTableMetaStorageFactory(MySqlTableMetaCallback callback, String dbName) {
+        mySQLTableMetaCallback = callback;
+        this.dbName = dbName;
+    }
+
+    @Override
+    public TableMetaStorage getTableMetaStorage() {
+        return new MySqlTableMetaStorage(mySQLTableMetaCallback, dbName);
+    }
+
+    @Override
+    public String getDbName() {
+        return dbName;
+    }
+
+}

+ 123 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogEventParserProxyTest.java

@@ -0,0 +1,123 @@
+package com.alibaba.otter.canal.parse.inbound.mysql;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
+import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaCallback;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaStorageFactory;
+import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
+import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
+import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import com.alibaba.otter.canal.protocol.position.LogPosition;
+import com.alibaba.otter.canal.sink.exception.CanalSinkException;
+
+/**
+ * @author chengjin.lyf on 2018/7/21 下午5:24
+ * @since 1.0.25
+ */
+public class RdsBinlogEventParserProxyTest {
+
+    private static final String DETECTING_SQL = "insert into retl.xdual values(1,now()) on duplicate key update x=now()";
+    private static final String MYSQL_ADDRESS = "";
+    private static final String USERNAME      = "";
+    private static final String PASSWORD      = "";
+    public static final String DBNAME = "";
+    public static final String TBNAME = "";
+    public static final String DDL = "";
+
+
+    @Test
+    public void test_timestamp() throws InterruptedException {
+        final TimeoutChecker timeoutChecker = new TimeoutChecker(3000 * 1000);
+        final AtomicLong entryCount = new AtomicLong(0);
+        final EntryPosition entryPosition = new EntryPosition();
+
+        final RdsBinlogEventParserProxy controller = new RdsBinlogEventParserProxy();
+        Calendar calendar = Calendar.getInstance();
+        calendar.add(Calendar.DAY_OF_YEAR, -1);
+        final EntryPosition defaultPosition = buildPosition(null, null, calendar.getTimeInMillis());
+        controller.setSlaveId(3344L);
+        controller.setDetectingEnable(false);
+        controller.setDetectingSQL(DETECTING_SQL);
+        controller.setMasterInfo(buildAuthentication());
+        controller.setMasterPosition(defaultPosition);
+        controller.setInstanceId("");
+        controller.setAccesskey("");
+        controller.setSecretkey("");
+        controller.setBatchSize(4);
+//        controller.setRdsOpenApiUrl("https://rds.aliyuncs.com/");
+        controller.setEventSink(new AbstractCanalEventSinkTest<List<CanalEntry.Entry>>() {
+
+            @Override
+            public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination)
+                    throws CanalSinkException {
+                for (CanalEntry.Entry entry : entrys) {
+                    if (entry.getEntryType() != CanalEntry.EntryType.HEARTBEAT) {
+                        entryCount.incrementAndGet();
+
+                        String logfilename = entry.getHeader().getLogfileName();
+                        long logfileoffset = entry.getHeader().getLogfileOffset();
+                        long executeTime = entry.getHeader().getExecuteTime();
+
+                        entryPosition.setJournalName(logfilename);
+                        entryPosition.setPosition(logfileoffset);
+                        entryPosition.setTimestamp(executeTime);
+                        break;
+                    }
+                }
+                return true;
+            }
+        });
+
+        controller.setLogPositionManager(new AbstractLogPositionManager() {
+
+            private LogPosition logPosition;
+            public void persistLogPosition(String destination, LogPosition logPosition) {
+                System.out.println(logPosition);
+                this.logPosition = logPosition;
+            }
+
+            public LogPosition getLatestIndexBy(String destination) {
+                return logPosition;
+            }
+        });
+
+        controller.start();
+        timeoutChecker.waitForIdle();
+
+        if (controller.isStart()) {
+            controller.stop();
+        }
+
+        // check
+        Assert.assertTrue(entryCount.get() > 0);
+
+        // 对比第一条数据和起始的position相同
+        Assert.assertEquals(entryPosition.getJournalName(), "mysql-bin.000001");
+        Assert.assertTrue(entryPosition.getPosition() <= 6163L);
+        Assert.assertTrue(entryPosition.getTimestamp() <= defaultPosition.getTimestamp());
+    }
+
+
+    // ======================== helper method =======================
+
+    private EntryPosition buildPosition(String binlogFile, Long offest, Long timestamp) {
+        return new EntryPosition(binlogFile, offest, timestamp);
+    }
+
+    private AuthenticationInfo buildAuthentication() {
+        return new AuthenticationInfo(new InetSocketAddress(MYSQL_ADDRESS, 3306), USERNAME, PASSWORD);
+    }
+}