Browse Source

fixed merge and blob encoding example

七锋 6 years ago
parent
commit
b57ac0bcdf
37 changed files with 181 additions and 1990 deletions
  1. 13 2
      example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java
  2. 0 30
      parse/src/main/java/com/alibaba/otter/canal/parse/exception/PositionNotFoundException.java
  3. 0 30
      parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerIdNotMatchException.java
  4. 1 29
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java
  5. 0 2
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java
  6. 0 9
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ParserExceptionHandler.java
  7. 2 48
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java
  8. 0 12
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java
  9. 4 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  10. 3 8
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java
  11. 5 10
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  12. 1 6
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
  13. 0 260
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java
  14. 0 149
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java
  15. 8 53
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java
  16. 29 130
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java
  17. 0 72
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/BinlogFile.java
  18. 0 62
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/DescribeBinlogFileResult.java
  19. 0 69
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsBackupPolicy.java
  20. 0 19
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsItem.java
  21. 0 250
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/AbstractRequest.java
  22. 0 41
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBackupPolicyRequest.java
  23. 0 56
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBinlogFilesRequest.java
  24. 0 191
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/HistoryTableMetaCache.java
  25. 0 20
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheInterface.java
  26. 0 105
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheWithStorage.java
  27. 0 55
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaEntry.java
  28. 0 18
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorage.java
  29. 0 9
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorageFactory.java
  30. 0 9
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/CacheConnectionNull.java
  31. 0 21
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/NoHistoryException.java
  32. 0 14
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaCallback.java
  33. 0 48
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorage.java
  34. 0 26
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorageFactory.java
  35. 0 123
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogEventParserProxyTest.java
  36. 38 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/NoStorageTest.java
  37. 77 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/StorageTest.java

+ 13 - 2
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -1,9 +1,9 @@
 package com.alibaba.otter.canal.example;
 
+import java.io.UnsupportedEncodingException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.SystemUtils;
@@ -254,7 +254,18 @@ public class AbstractCanalClientTest {
     protected void printColumn(List<Column> columns) {
         for (Column column : columns) {
             StringBuilder builder = new StringBuilder();
-            builder.append(column.getName() + " : " + column.getValue());
+            try {
+                if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
+                    || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
+                    // get value bytes
+                    builder.append(column.getName() + " : "
+                                   + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
+                } else {
+                    builder.append(column.getName() + " : " + column.getValue());
+                }
+            } catch (UnsupportedEncodingException e) {
+            }
+
             builder.append("    type=" + column.getMysqlType());
             if (column.getUpdated()) {
                 builder.append("    update=" + column.getUpdated());

+ 0 - 30
parse/src/main/java/com/alibaba/otter/canal/parse/exception/PositionNotFoundException.java

@@ -1,30 +0,0 @@
-package com.alibaba.otter.canal.parse.exception;
-
-/**
- * @author chengjin.lyf on 2018/7/20 下午2:54
- * @since 1.0.25
- */
-public class PositionNotFoundException extends CanalParseException {
-
-    private static final long serialVersionUID = -7382448928116244017L;
-
-    public PositionNotFoundException(String errorCode) {
-        super(errorCode);
-    }
-
-    public PositionNotFoundException(String errorCode, Throwable cause) {
-        super(errorCode, cause);
-    }
-
-    public PositionNotFoundException(String errorCode, String errorDesc) {
-        super(errorCode, errorDesc);
-    }
-
-    public PositionNotFoundException(String errorCode, String errorDesc, Throwable cause) {
-        super(errorCode, errorDesc, cause);
-    }
-
-    public PositionNotFoundException(Throwable cause) {
-        super(cause);
-    }
-}

+ 0 - 30
parse/src/main/java/com/alibaba/otter/canal/parse/exception/ServerIdNotMatchException.java

@@ -1,30 +0,0 @@
-package com.alibaba.otter.canal.parse.exception;
-
-import com.alibaba.otter.canal.common.CanalException;
-
-/**
- * @author chengjin.lyf on 2018/8/8 下午1:07
- * @since 1.0.25
- */
-public class ServerIdNotMatchException extends CanalException{
-
-    public ServerIdNotMatchException(String errorCode) {
-        super(errorCode);
-    }
-
-    public ServerIdNotMatchException(String errorCode, Throwable cause) {
-        super(errorCode, cause);
-    }
-
-    public ServerIdNotMatchException(String errorCode, String errorDesc) {
-        super(errorCode, errorDesc);
-    }
-
-    public ServerIdNotMatchException(String errorCode, String errorDesc, Throwable cause) {
-        super(errorCode, errorDesc, cause);
-    }
-
-    public ServerIdNotMatchException(Throwable cause) {
-        super(cause);
-    }
-}

+ 1 - 29
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,7 +8,6 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -95,10 +94,6 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                                                                                     .availableProcessors() * 60 / 100;     // 60%的能力跑解析,剩余部分处理网络
     protected int                                    parallelBufferSize         = 256;                                     // 必须为2的幂
     protected MultiStageCoprocessor                  multiStageCoprocessor;
-    protected ParserExceptionHandler                 parserExceptionHandler;
-    protected long serverId;
-
-
 
     protected abstract BinlogParser buildParser();
 
@@ -175,16 +170,11 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                         preDump(erosaConnection);
 
                         erosaConnection.connect();// 链接
-
-                        long queryServerId = erosaConnection.queryServerId();
-                        if (queryServerId != 0){
-                            serverId = queryServerId;
-                        }
                         // 4. 获取最后的位置信息
                         EntryPosition position = findStartPosition(erosaConnection);
                         final EntryPosition startPosition = position;
                         if (startPosition == null) {
-                            throw new PositionNotFoundException("can't find start position for " + destination);
+                            throw new CanalParseException("can't find start position for " + destination);
                         }
 
                         if (!processTableMeta(startPosition)) {
@@ -287,9 +277,6 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
                                 runningInfo.getAddress().toString()), e);
                             sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
                         }
-                        if (parserExceptionHandler!=null){
-                            parserExceptionHandler.handle(e);
-                        }
                     } finally {
                         // 重新置为中断状态
                         Thread.interrupted();
@@ -628,19 +615,4 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         this.parallelBufferSize = parallelBufferSize;
     }
 
-    public ParserExceptionHandler getParserExceptionHandler() {
-        return parserExceptionHandler;
-    }
-
-    public void setParserExceptionHandler(ParserExceptionHandler parserExceptionHandler) {
-        this.parserExceptionHandler = parserExceptionHandler;
-    }
-
-    public long getServerId() {
-        return serverId;
-    }
-
-    public void setServerId(long serverId) {
-        this.serverId = serverId;
-    }
 }

+ 0 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

@@ -40,6 +40,4 @@ public interface ErosaConnection {
     public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException;
 
     ErosaConnection fork();
-
-    public long queryServerId() throws IOException;
 }

+ 0 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ParserExceptionHandler.java

@@ -1,9 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound;
-
-/**
- * @author chengjin.lyf on 2018/7/20 下午3:55
- * @since 1.0.25
- */
-public interface ParserExceptionHandler {
-    void handle(Throwable e);
-}

+ 2 - 48
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

@@ -4,7 +4,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.exception.ServerIdNotMatchException;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -37,9 +36,6 @@ public class LocalBinLogConnection implements ErosaConnection {
     private String              directory;
     private int                 bufferSize = 16 * 1024;
     private boolean             running    = false;
-    private long                serverId;
-    private FileParserListener  parserListener;
-
 
     public LocalBinLogConnection(){
     }
@@ -100,9 +96,6 @@ public class LocalBinLogConnection implements ErosaConnection {
                     if (event == null) {
                         continue;
                     }
-                    if (serverId != 0 && event.getServerId() != serverId){
-                        throw new ServerIdNotMatchException("unexpected serverId "+serverId + " in binlog file !");
-                    }
 
                     if (!func.sink(event)) {
                         needContinue = false;
@@ -110,9 +103,8 @@ public class LocalBinLogConnection implements ErosaConnection {
                     }
                 }
 
-                fetcher.close(); // 关闭上一个文件
-                parserFinish(current.getName());
                 if (needContinue) {// 读取下一个
+                    fetcher.close(); // 关闭上一个文件
 
                     File nextFile;
                     if (needWait) {
@@ -168,11 +160,6 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                     LogEvent event = decoder.decode(fetcher, context);
                     if (event != null) {
-
-                        if (serverId != 0 && event.getServerId() != serverId){
-                            throw new ServerIdNotMatchException("unexpected serverId "+serverId + " in binlog file !");
-                        }
-
                         if (event.getWhen() > timestampSeconds) {
                             break;
                         }
@@ -241,9 +228,8 @@ public class LocalBinLogConnection implements ErosaConnection {
                     }
                 }
 
-                fetcher.close(); // 关闭上一个文件
-                parserFinish(binlogfilename);
                 if (needContinue) {// 读取下一个
+                    fetcher.close(); // 关闭上一个文件
 
                     File nextFile;
                     if (needWait) {
@@ -272,12 +258,6 @@ public class LocalBinLogConnection implements ErosaConnection {
         }
     }
 
-    private void parserFinish(String fileName){
-        if (parserListener != null){
-            parserListener.onFinish(fileName);
-        }
-    }
-
     @Override
     public void dump(long timestampMills, MultiStageCoprocessor coprocessor) throws IOException {
         List<File> currentBinlogs = binlogs.currentBinlogs();
@@ -306,11 +286,6 @@ public class LocalBinLogConnection implements ErosaConnection {
                 while (fetcher.fetch()) {
                     LogEvent event = decoder.decode(fetcher, context);
                     if (event != null) {
-
-                        if (serverId != 0 && event.getServerId() != serverId){
-                            throw new ServerIdNotMatchException("unexpected serverId "+serverId + " in binlog file !");
-                        }
-
                         if (event.getWhen() > timestampSeconds) {
                             break;
                         }
@@ -369,11 +344,6 @@ public class LocalBinLogConnection implements ErosaConnection {
         return connection;
     }
 
-    @Override
-    public long queryServerId() {
-        return 0;
-    }
-
     public boolean isNeedWait() {
         return needWait;
     }
@@ -398,20 +368,4 @@ public class LocalBinLogConnection implements ErosaConnection {
         this.bufferSize = bufferSize;
     }
 
-    public long getServerId() {
-        return serverId;
-    }
-
-    public void setServerId(long serverId) {
-        this.serverId = serverId;
-    }
-
-    public void setParserListener(FileParserListener parserListener) {
-        this.parserListener = parserListener;
-    }
-
-    public interface FileParserListener{
-        void onFinish(String fileName);
-    }
-
 }

+ 0 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

@@ -9,10 +9,8 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
 import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor;
@@ -326,16 +324,6 @@ public class MysqlConnection implements ErosaConnection {
         return connection;
     }
 
-    @Override
-    public long queryServerId() throws IOException {
-        ResultSetPacket resultSetPacket = query("show variables like 'server_id'");
-        List<String> fieldValues = resultSetPacket.getFieldValues();
-        if (fieldValues == null || fieldValues.size() != 2){
-            return 0;
-        }
-        return NumberUtils.toLong(fieldValues.get(1));
-    }
-
     // ====================== help method ====================
 
     /**

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -53,11 +53,11 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private int                receiveBufferSize                 = 64 * 1024;
     private int                sendBufferSize                    = 64 * 1024;
     // 数据库信息
-    protected AuthenticationInfo masterInfo;                                   // 主库
-    protected AuthenticationInfo standbyInfo;                                  // 备库
+    private AuthenticationInfo masterInfo;                                   // 主库
+    private AuthenticationInfo standbyInfo;                                  // 备库
     // binlog信息
-    protected EntryPosition      masterPosition;
-    protected EntryPosition      standbyPosition;
+    private EntryPosition      masterPosition;
+    private EntryPosition      standbyPosition;
     private long               slaveId;                                      // 链接到mysql的slave
     // 心跳检查信息
     private String             detectingSQL;                                 // 心跳sql

+ 3 - 8
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -203,15 +203,10 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
             try {
                 LogBuffer buffer = event.getBuffer();
                 if (StringUtils.isNotEmpty(event.getBinlogFileName())
-                    && (context.getLogPosition() == null
-                    || !context.getLogPosition().getFileName().equals(event.getBinlogFileName()))) {
+                    && !context.getLogPosition().getFileName().equals(event.getBinlogFileName())) {
                     // set roate binlog file name
-                    if (context.getLogPosition() == null){
-                        context.setLogPosition(new LogPosition(event.getBinlogFileName(), 0));
-                    }else{
-                        context.setLogPosition(new LogPosition(event.getBinlogFileName(), context.getLogPosition()
-                                .getPosition()));
-                    }
+                    context.setLogPosition(new LogPosition(event.getBinlogFileName(), context.getLogPosition()
+                        .getPosition()));
                 }
 
                 LogEvent logEvent = decoder.decode(buffer, context);

+ 5 - 10
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -10,9 +10,6 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.NoHistoryException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -90,10 +87,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private volatile AviaterRegexFilter nameFilter;                                                          // 运行时引用可能会有变化,比如规则发生变化时
     private volatile AviaterRegexFilter nameBlackFilter;
 
-
-    private TableMetaCacheInterface tableMetaCache;
-    private String                      binlogFileName      = "mysql-bin.000001";
-
+    private TableMetaCache              tableMetaCache;
     private Charset                     charset             = Charset.defaultCharset();
     private boolean                     filterQueryDcl      = false;
     private boolean                     filterQueryDml      = false;
@@ -268,8 +262,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             if (!isSeek) {
                 // 使用新的表结构元数据管理方式
                 EntryPosition position = createPosition(event.getHeader());
-                String fulltbName = schemaName+"."+tableName;
-                tableMetaCache.apply(position, fulltbName, queryString, null);
+                tableMetaCache.apply(position, event.getDbName(), queryString, null);
             }
 
             Header header = createHeader(event.getHeader(), schemaName, tableName, type);
@@ -766,6 +759,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                         } else {
                             // byte数组,直接使用iso-8859-1保留对应编码,浪费内存
                             columnBuilder.setValue(new String((byte[]) value, ISO_8859_1));
+                            // columnBuilder.setValueBytes(ByteString.copyFrom((byte[])
+                            // value));
                             javaType = Types.BLOB;
                         }
                         break;
@@ -944,7 +939,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         this.nameBlackFilter = nameBlackFilter;
     }
 
-    public void setTableMetaCache(TableMetaCacheInterface tableMetaCache) {
+    public void setTableMetaCache(TableMetaCache tableMetaCache) {
         this.tableMetaCache = tableMetaCache;
     }
 

+ 1 - 6
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -6,7 +6,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
@@ -30,7 +29,7 @@ import com.google.common.cache.LoadingCache;
  * @author jianghang 2013-1-17 下午10:15:16
  * @version 1.0.0
  */
-public class TableMetaCache implements TableMetaCacheInterface {
+public class TableMetaCache {
 
     public static final String              COLUMN_NAME    = "COLUMN_NAME";
     public static final String              COLUMN_TYPE    = "COLUMN_TYPE";
@@ -100,10 +99,6 @@ public class TableMetaCache implements TableMetaCacheInterface {
             String createDDL = packet.getFieldValues().get(1);
             MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
             memoryTableMeta.apply(DatabaseTableMeta.INIT_POSITION, schema, createDDL, null);
-            String[] strings = table.split("\\.");
-            if (strings.length > 1) {
-                table = strings[1];
-            }
             TableMeta tableMeta = memoryTableMeta.find(schema, table);
             return tableMeta.getFields();
         } else {

+ 0 - 260
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/BinlogDownloadQueue.java

@@ -1,260 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.rds;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
-
-import io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * @author chengjin.lyf on 2018/8/7 下午3:10
- * @since 1.0.25
- */
-public class BinlogDownloadQueue {
-
-    private static final Logger logger = LoggerFactory.getLogger(BinlogDownloadQueue.class);
-    private static final int      TIMEOUT             = 10000;
-
-    private LinkedBlockingQueue<BinlogFile> downloadQueue = new LinkedBlockingQueue<BinlogFile>();
-    private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
-    private LinkedList<BinlogFile> binlogList;
-    private final int batchSize;
-    private Thread downloadThread;
-    public boolean running = true;
-    private final String destDir;
-    private String hostId;
-    private int currentSize;
-    private String lastDownload;
-
-    public BinlogDownloadQueue(List<BinlogFile> downloadQueue, int batchSize, String destDir) throws IOException {
-        this.binlogList = new LinkedList(downloadQueue);
-        this.batchSize = batchSize;
-        this.destDir = destDir;
-        this.currentSize = 0;
-        prepareBinlogList();
-        cleanDir();
-    }
-
-    private void prepareBinlogList(){
-        for (BinlogFile binlog : this.binlogList) {
-            String fileName = StringUtils.substringBetween(binlog.getDownloadLink(), "mysql-bin.", "?");
-            binlog.setFileName(fileName);
-        }
-        Collections.sort(this.binlogList, new Comparator<BinlogFile>() {
-            @Override
-            public int compare(BinlogFile o1, BinlogFile o2) {
-                return o1.getFileName().compareTo(o2.getFileName());
-            }
-        });
-    }
-
-    public void cleanDir() throws IOException {
-        File destDirFile = new File(destDir);
-        FileUtils.forceMkdir(destDirFile);
-        FileUtils.cleanDirectory(destDirFile);
-    }
-
-    public void silenceDownload() {
-        if (downloadThread != null) {
-            return;
-        }
-        downloadThread = new Thread(new DownloadThread());
-        downloadThread.start();
-    }
-
-
-    public BinlogFile tryOne() throws IOException {
-        BinlogFile binlogFile = binlogList.poll();
-        download(binlogFile);
-        hostId = binlogFile.getHostInstanceID();
-        this.currentSize ++;
-        return binlogFile;
-    }
-
-    public void notifyNotMatch(){
-        this.currentSize --;
-        filter(hostId);
-    }
-
-    private void filter(String hostInstanceId){
-        Iterator<BinlogFile> it = binlogList.iterator();
-        while (it.hasNext()){
-            BinlogFile bf = it.next();
-            if(bf.getHostInstanceID().equalsIgnoreCase(hostInstanceId)){
-                it.remove();
-            }else{
-                hostId = bf.getHostInstanceID();
-            }
-        }
-    }
-
-    public boolean isLastFile(String fileName){
-        String needCompareName = lastDownload;
-        if (StringUtils.isNotEmpty(needCompareName) && StringUtils.endsWith(needCompareName, "tar")){
-            needCompareName = needCompareName.substring(0, needCompareName.indexOf("."));
-        }
-        return fileName.equalsIgnoreCase(needCompareName) && binlogList.isEmpty();
-    }
-
-    public void prepare() throws InterruptedException {
-        for (int i = this.currentSize; i < batchSize && !binlogList.isEmpty(); i++) {
-            BinlogFile binlogFile = null;
-            while (!binlogList.isEmpty()){
-                binlogFile = binlogList.poll();
-                if (!binlogFile.getHostInstanceID().equalsIgnoreCase(hostId)){
-                    continue;
-                }
-                break;
-            }
-            if (binlogFile == null){
-                break;
-            }
-            this.downloadQueue.put(binlogFile);
-            this.lastDownload = "mysql-bin." + binlogFile.getFileName();
-            this.currentSize ++;
-        }
-    }
-
-    public void downOne(){
-        this.currentSize --;
-    }
-
-    public void release(){
-        running = false;
-        this.currentSize = 0;
-        binlogList.clear();
-        downloadQueue.clear();
-    }
-
-    private void download(BinlogFile binlogFile) throws IOException {
-        String downloadLink = binlogFile.getDownloadLink();
-        String fileName = binlogFile.getFileName();
-        HttpGet httpGet = new HttpGet(downloadLink);
-        CloseableHttpClient httpClient = HttpClientBuilder.create()
-                .setMaxConnPerRoute(50)
-                .setMaxConnTotal(100)
-                .build();
-        RequestConfig requestConfig = RequestConfig.custom()
-                .setConnectTimeout(TIMEOUT)
-                .setConnectionRequestTimeout(TIMEOUT)
-                .setSocketTimeout(TIMEOUT)
-                .build();
-        httpGet.setConfig(requestConfig);
-        HttpResponse response = httpClient.execute(httpGet);
-        int statusCode = response.getStatusLine().getStatusCode();
-        if (statusCode != HttpResponseStatus.OK.code()) {
-            throw new RuntimeException("download failed , url:" + downloadLink + " , statusCode:"
-                                       + statusCode);
-        }
-        saveFile(new File(destDir), "mysql-bin." + fileName, response);
-    }
-
-    private static void saveFile(File parentFile, String fileName, HttpResponse response) throws IOException {
-        InputStream is = response.getEntity().getContent();
-        long totalSize = Long.parseLong(response.getFirstHeader("Content-Length").getValue());
-        if(response.getFirstHeader("Content-Disposition")!=null){
-            fileName = response.getFirstHeader("Content-Disposition").getValue();
-            fileName = StringUtils.substringAfter(fileName, "filename=");
-        }
-        boolean isTar = StringUtils.endsWith(fileName, ".tar");
-        FileUtils.forceMkdir(parentFile);
-        FileOutputStream fos = null;
-        try {
-            if (isTar) {
-                TarArchiveInputStream tais = new TarArchiveInputStream(is);
-                TarArchiveEntry tarArchiveEntry = null;
-                while ((tarArchiveEntry = tais.getNextTarEntry()) != null) {
-                    String name = tarArchiveEntry.getName();
-                    File tarFile = new File(parentFile, name + ".tmp");
-                    logger.info("start to download file " + tarFile.getName());
-                    BufferedOutputStream bos = null;
-                    try {
-                        bos = new BufferedOutputStream(new FileOutputStream(tarFile));
-                        int read = -1;
-                        byte[] buffer = new byte[1024];
-                        while ((read = tais.read(buffer)) != -1) {
-                            bos.write(buffer, 0, read);
-                        }
-                        logger.info("download file " + tarFile.getName() + " end!");
-                        tarFile.renameTo(new File(parentFile, name));
-                    } finally {
-                        IOUtils.closeQuietly(bos);
-                    }
-                }
-                tais.close();
-            } else {
-                File file = new File(parentFile, fileName + ".tmp");
-                if (!file.isFile()) {
-                    file.createNewFile();
-                }
-                try {
-                    fos = new FileOutputStream(file);
-                    byte[] buffer = new byte[1024];
-                    int len;
-                    long copySize = 0;
-                    long nextPrintProgress = 0;
-                    logger.info("start to download file " + file.getName());
-                    while ((len = is.read(buffer)) != -1) {
-                        fos.write(buffer, 0, len);
-                        copySize += len;
-                        long progress = copySize * 100 / totalSize;
-                        if (progress >= nextPrintProgress) {
-                            logger.info("download " + file.getName() + " progress : " + progress
-                                        + "% , download size : " + copySize + ", total size : " + totalSize);
-                            nextPrintProgress += 10;
-                        }
-                    }
-                    logger.info("download file " + file.getName() + " end!");
-                    fos.flush();
-                } finally {
-                    IOUtils.closeQuietly(fos);
-                }
-                file.renameTo(new File(parentFile, fileName));
-            }
-        } finally {
-            IOUtils.closeQuietly(fos);
-        }
-    }
-
-    public void execute(Runnable runnable) throws InterruptedException {
-        taskQueue.put(runnable);
-    }
-
-    private class DownloadThread implements Runnable {
-
-        @Override
-        public void run() {
-            while (running) {
-                try {
-                    BinlogFile binlogFile = downloadQueue.poll(5000, TimeUnit.MILLISECONDS);
-                    if (binlogFile != null){
-                        download(binlogFile);
-                    }
-                    Runnable runnable = taskQueue.poll(5000, TimeUnit.MILLISECONDS);
-                    if (runnable != null){
-                        runnable.run();
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-
-        }
-    }
-}

+ 0 - 149
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogEventParserProxy.java

@@ -1,149 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.rds;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
-import com.alibaba.otter.canal.parse.inbound.ParserExceptionHandler;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-
-/**
- * @author chengjin.lyf on 2018/7/20 上午10:52
- * @since 1.0.25
- */
-public class RdsBinlogEventParserProxy extends MysqlEventParser {
-
-    private String rdsOpenApiUrl = "https://rds.aliyuncs.com/"; // openapi地址
-    private String accesskey; // 云账号的ak
-    private String secretkey; // 云账号sk
-    private String instanceId; // rds实例id
-    private Long startTime;
-    private Long endTime;
-    private String directory; //binlog 目录
-    private int batchSize = 4; //最多下载的binlog文件数量
-
-    private RdsLocalBinlogEventParser rdsBinlogEventParser = new RdsLocalBinlogEventParser();
-    private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
-
-        @Override
-        public Thread newThread(Runnable r) {
-            Thread t = new Thread(r, "rds-binlog-daemon-thread");
-            t.setDaemon(true);
-            return t;
-        }
-    });
-
-    @Override
-    public void start() {
-        final ParserExceptionHandler targetHandler = this.getParserExceptionHandler();
-        rdsBinlogEventParser.setLogPositionManager(this.getLogPositionManager());
-        rdsBinlogEventParser.setDestination(destination);
-        rdsBinlogEventParser.setAlarmHandler(this.getAlarmHandler());
-        rdsBinlogEventParser.setConnectionCharset(this.connectionCharset);
-        rdsBinlogEventParser.setConnectionCharsetNumber(this.connectionCharsetNumber);
-        rdsBinlogEventParser.setEnableTsdb(this.enableTsdb);
-        rdsBinlogEventParser.setEventBlackFilter(this.eventBlackFilter);
-        rdsBinlogEventParser.setFilterQueryDcl(this.filterQueryDcl);
-        rdsBinlogEventParser.setFilterQueryDdl(this.filterQueryDdl);
-        rdsBinlogEventParser.setFilterQueryDml(this.filterQueryDml);
-        rdsBinlogEventParser.setFilterRows(this.filterRows);
-        rdsBinlogEventParser.setFilterTableError(this.filterTableError);
-        rdsBinlogEventParser.setIsGTIDMode(this.isGTIDMode);
-        rdsBinlogEventParser.setMasterInfo(this.masterInfo);
-        rdsBinlogEventParser.setEventFilter(this.eventFilter);
-        rdsBinlogEventParser.setMasterPosition(this.masterPosition);
-        rdsBinlogEventParser.setTransactionSize(this.transactionSize);
-        rdsBinlogEventParser.setUrl(this.rdsOpenApiUrl);
-        rdsBinlogEventParser.setAccesskey(this.accesskey);
-        rdsBinlogEventParser.setSecretkey(this.secretkey);
-        rdsBinlogEventParser.setInstanceId(this.instanceId);
-        rdsBinlogEventParser.setEventSink(eventSink);
-        rdsBinlogEventParser.setDirectory(directory);
-        rdsBinlogEventParser.setBatchSize(batchSize);
-        rdsBinlogEventParser.setFinishListener(new RdsLocalBinlogEventParser.ParseFinishListener() {
-            @Override
-            public void onFinish() {
-                executorService.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        rdsBinlogEventParser.stop();
-                        RdsBinlogEventParserProxy.this.start();
-                    }
-                });
-
-            }
-        });
-        this.setParserExceptionHandler(new ParserExceptionHandler() {
-
-            @Override
-            public void handle(Throwable e) {
-                handleMysqlParserException(e);
-                if (targetHandler != null) {
-                    targetHandler.handle(e);
-                }
-            }
-        });
-        super.start();
-    }
-
-    public void handleMysqlParserException(Throwable throwable) {
-        if (throwable instanceof PositionNotFoundException) {
-            logger.info("remove rds not found position, try download rds binlog!");
-            executorService.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        logger.info("stop mysql parser!");
-                        RdsBinlogEventParserProxy rdsBinlogEventParserProxy = RdsBinlogEventParserProxy.this;
-                        long serverId = rdsBinlogEventParserProxy.getServerId();
-                        rdsBinlogEventParser.setServerId(serverId);
-                        rdsBinlogEventParserProxy.stop();
-                        logger.info("start rds mysql binlog parser!");
-                        rdsBinlogEventParser.start();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-        }
-    }
-
-    @Override
-    public void stop() {
-        super.stop();
-    }
-
-    @Override
-    public boolean isStart() {
-        return super.isStart();
-    }
-
-    public void setRdsOpenApiUrl(String rdsOpenApiUrl) {
-        this.rdsOpenApiUrl = rdsOpenApiUrl;
-    }
-
-
-    public void setAccesskey(String accesskey) {
-        this.accesskey = accesskey;
-    }
-
-
-    public void setSecretkey(String secretkey) {
-        this.secretkey = secretkey;
-    }
-
-
-    public void setInstanceId(String instanceId) {
-        this.instanceId = instanceId;
-    }
-
-    public void setDirectory(String directory) {
-        this.directory = directory;
-    }
-
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-}

+ 8 - 53
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsBinlogOpenApi.java

@@ -1,9 +1,5 @@
 package com.alibaba.otter.canal.parse.inbound.mysql.rds;
 
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.DescribeBinlogFileResult;
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsItem;
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.request.DescribeBinlogFilesRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 import java.io.BufferedOutputStream;
@@ -13,11 +9,16 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.UUID;
 
 import javax.crypto.Mac;
 import javax.crypto.SecretKey;
@@ -55,52 +56,6 @@ public class RdsBinlogOpenApi {
     private static final String   API_VERSION         = "2014-08-15";
     private static final String   SIGNATURE_VERSION   = "1.0";
 
-
-    public static List<BinlogFile> listBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
-                                                   Date endTime) {
-        DescribeBinlogFilesRequest request = new DescribeBinlogFilesRequest();
-        if (StringUtils.isNotEmpty(url)){
-            try {
-                URI uri = new URI(url);
-                request.setEndPoint(uri.getHost());
-            } catch (URISyntaxException e) {
-                logger.error("resolve url host failed, will use default rds endpoint!");
-            }
-        }
-        request.setStartDate(startTime);
-        request.setEndDate(endTime);
-        request.setPageNumber(1);
-        request.setPageSize(100);
-        request.setRdsInstanceId(dbInstanceId);
-        request.setAccessKeyId(ak);
-        request.setAccessKeySecret(sk);
-        DescribeBinlogFileResult result = null;
-        int retryTime = 3;
-        while (true){
-            try{
-                result = request.doAction();
-                break;
-            }catch (Exception e){
-                if(retryTime-- <= 0){
-                    throw new RuntimeException(e);
-                }
-                try {
-                    Thread.sleep(100L);
-                } catch (InterruptedException e1) {
-                }
-            }
-        }
-        if (result == null){
-            return Collections.EMPTY_LIST;
-        }
-        RdsItem rdsItem = result.getItems();
-        if (rdsItem != null){
-            return rdsItem.getBinLogFile();
-        }
-        return Collections.EMPTY_LIST;
-    }
-
-
     public static void downloadBinlogFiles(String url, String ak, String sk, String dbInstanceId, Date startTime,
                                            Date endTime, File destDir) throws Throwable {
         int pageSize = 100;

+ 29 - 130
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/RdsLocalBinlogEventParser.java

@@ -2,23 +2,14 @@ package com.alibaba.otter.canal.parse.inbound.mysql.rds;
 
 import java.io.File;
 import java.util.Date;
-import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
 import org.springframework.util.Assert;
 
 import com.alibaba.otter.canal.parse.CanalEventParser;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
-import com.alibaba.otter.canal.parse.exception.ServerIdNotMatchException;
-import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
-import com.alibaba.otter.canal.parse.inbound.ParserExceptionHandler;
-import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinLogConnection;
 import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.BinlogFile;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
-import com.alibaba.otter.canal.protocol.position.LogPosition;
 
 /**
  * 基于rds binlog备份文件的复制
@@ -26,102 +17,47 @@ import com.alibaba.otter.canal.protocol.position.LogPosition;
  * @author agapple 2017年10月15日 下午1:27:36
  * @since 1.0.25
  */
-public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements CanalEventParser, LocalBinLogConnection.FileParserListener {
+public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements CanalEventParser {
 
     private String url = "https://rds.aliyuncs.com/"; // openapi地址
-    private String accesskey; // 云账号的ak
-    private String secretkey; // 云账号sk
-    private String instanceId; // rds实例id
-    private Long startTime;
-    private Long endTime;
-    private BinlogDownloadQueue binlogDownloadQueue;
-    private ParseFinishListener finishListener;
-    private int batchSize;
+    private String accesskey;                        // 云账号的ak
+    private String secretkey;                        // 云账号sk
+    private String instanceId;                       // rds实例id
+    private Long   startTime;
+    private Long   endTime;
 
     public RdsLocalBinlogEventParser(){
     }
 
     public void start() throws CanalParseException {
         try {
+            Assert.notNull(startTime);
             Assert.notNull(accesskey);
             Assert.notNull(secretkey);
             Assert.notNull(instanceId);
             Assert.notNull(url);
-            Assert.notNull(directory);
-
             if (endTime == null) {
                 endTime = System.currentTimeMillis();
             }
 
-            EntryPosition entryPosition = findStartPosition(null);
-            if (entryPosition == null) {
-                throw new PositionNotFoundException("position not found!");
-            }
-            long startTimeInMill = entryPosition.getTimestamp();
-            startTime = startTimeInMill;
-            List<BinlogFile> binlogFiles = RdsBinlogOpenApi.listBinlogFiles(url, accesskey,
+            RdsBinlogOpenApi.downloadBinlogFiles(url,
+                accesskey,
                 secretkey,
                 instanceId,
                 new Date(startTime),
-                new Date(endTime));
-            binlogDownloadQueue = new BinlogDownloadQueue(binlogFiles, batchSize, directory);
-            binlogDownloadQueue.silenceDownload();
-            needWait = true;
-            parallel = false;
-            // try to download one file,use to test server id
-            binlogDownloadQueue.tryOne();
+                new Date(endTime),
+                new File(directory));
+
+            // 更新一下时间戳
+            masterPosition = new EntryPosition(startTime);
         } catch (Throwable e) {
             logger.error("download binlog failed", e);
             throw new CanalParseException(e);
         }
-        setParserExceptionHandler(new ParserExceptionHandler() {
 
-            @Override
-            public void handle(Throwable e) {
-                handleMysqlParserException(e);
-            }
-        });
         super.start();
     }
 
-    private void handleMysqlParserException(Throwable throwable) {
-        if (throwable instanceof ServerIdNotMatchException) {
-            logger.error("server id not match, try download another rds binlog!");
-            binlogDownloadQueue.notifyNotMatch();
-            try {
-                binlogDownloadQueue.cleanDir();
-                binlogDownloadQueue.prepare();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-            try {
-                binlogDownloadQueue.execute(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        RdsLocalBinlogEventParser.super.stop();
-                        RdsLocalBinlogEventParser.super.start();
-                    }
-                });
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-
-        }
-    }
-
-    @Override
-    protected ErosaConnection buildErosaConnection() {
-        ErosaConnection connection = super.buildErosaConnection();
-        if (connection instanceof LocalBinLogConnection) {
-            LocalBinLogConnection localBinLogConnection = (LocalBinLogConnection) connection;
-            localBinLogConnection.setNeedWait(true);
-            localBinLogConnection.setServerId(serverId);
-            localBinLogConnection.setParserListener(this);
-        }
-        return connection;
-    }
-
     public String getUrl() {
         return url;
     }
@@ -132,81 +68,44 @@ public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements
         }
     }
 
+    public String getAccesskey() {
+        return accesskey;
+    }
 
     public void setAccesskey(String accesskey) {
         this.accesskey = accesskey;
     }
 
+    public String getSecretkey() {
+        return secretkey;
+    }
 
     public void setSecretkey(String secretkey) {
         this.secretkey = secretkey;
     }
 
+    public String getInstanceId() {
+        return instanceId;
+    }
 
     public void setInstanceId(String instanceId) {
         this.instanceId = instanceId;
     }
 
+    public Long getStartTime() {
+        return startTime;
+    }
 
     public void setStartTime(Long startTime) {
         this.startTime = startTime;
     }
 
+    public Long getEndTime() {
+        return endTime;
+    }
 
     public void setEndTime(Long endTime) {
         this.endTime = endTime;
     }
 
-    @Override
-    public void onFinish(String fileName) {
-        try {
-            binlogDownloadQueue.downOne();
-            File needDeleteFile = new File(directory + File.separator + fileName);
-            if (needDeleteFile.exists()){
-                needDeleteFile.delete();
-            }
-            // 处理下logManager位点问题
-            LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
-            EntryPosition position = logPosition.getPostion();
-            if (position != null){
-                LogPosition newLogPosition = new LogPosition();
-                String journalName = position.getJournalName();
-                int sepIdx = journalName.indexOf(".");
-                String fileIndex = journalName.substring(sepIdx+1);
-                int index = NumberUtils.toInt(fileIndex) + 1;
-                String newJournalName = journalName.substring(0, sepIdx) + "." + StringUtils.leftPad(String.valueOf(index), fileIndex.length(), "0");
-                newLogPosition.setPostion(new EntryPosition(newJournalName, 4L, position.getTimestamp(), position.getServerId()));
-                newLogPosition.setIdentity(logPosition.getIdentity());
-                logPositionManager.persistLogPosition(destination, newLogPosition);
-            }
-
-            if (binlogDownloadQueue.isLastFile(fileName)) {
-                logger.info("all file parse complete, switch to mysql parser!");
-                finishListener.onFinish();
-                return;
-            }
-            binlogDownloadQueue.prepare();
-        } catch (Exception e) {
-            logger.error("prepare download binlog file failed!", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void stop() {
-        this.binlogDownloadQueue.release();
-        super.stop();
-    }
-
-    public void setFinishListener(ParseFinishListener finishListener) {
-        this.finishListener = finishListener;
-    }
-
-    public interface ParseFinishListener{
-        void onFinish();
-    }
-
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
 }

+ 0 - 72
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/BinlogFile.java

@@ -1,72 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
-
-/**
- * @author chengjin.lyf on 2018/8/7 下午2:26
- * @since 1.0.25
- */
-public class BinlogFile {
-
-    private Long FileSize;
-    private String LogBeginTime;
-    private String LogEndTime;
-    private String DownloadLink;
-    private String HostInstanceID;
-    private String LinkExpiredTime;
-    private String fileName;
-
-    public Long getFileSize() {
-        return FileSize;
-    }
-
-    public void setFileSize(Long fileSize) {
-        FileSize = fileSize;
-    }
-
-    public String getLogBeginTime() {
-        return LogBeginTime;
-    }
-
-    public void setLogBeginTime(String logBeginTime) {
-        LogBeginTime = logBeginTime;
-    }
-
-    public String getLogEndTime() {
-        return LogEndTime;
-    }
-
-    public void setLogEndTime(String logEndTime) {
-        LogEndTime = logEndTime;
-    }
-
-    public String getDownloadLink() {
-        return DownloadLink;
-    }
-
-    public void setDownloadLink(String downloadLink) {
-        DownloadLink = downloadLink;
-    }
-
-    public String getHostInstanceID() {
-        return HostInstanceID;
-    }
-
-    public void setHostInstanceID(String hostInstanceID) {
-        HostInstanceID = hostInstanceID;
-    }
-
-    public String getLinkExpiredTime() {
-        return LinkExpiredTime;
-    }
-
-    public void setLinkExpiredTime(String linkExpiredTime) {
-        LinkExpiredTime = linkExpiredTime;
-    }
-
-    public String getFileName() {
-        return fileName;
-    }
-
-    public void setFileName(String fileName) {
-        this.fileName = fileName;
-    }
-}

+ 0 - 62
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/DescribeBinlogFileResult.java

@@ -1,62 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
-
-/**
- * @author chengjin.lyf on 2018/8/7 下午2:26
- * @since 1.0.25
- */
-public class DescribeBinlogFileResult {
-    private RdsItem Items;
-    private long PageNumber;
-    private long TotalRecordCount;
-    private long TotalFileSize;
-    private String RequestId;
-    private long PageRecordCount;
-
-    public RdsItem getItems() {
-        return Items;
-    }
-
-    public void setItems(RdsItem items) {
-        Items = items;
-    }
-
-    public long getPageNumber() {
-        return PageNumber;
-    }
-
-    public void setPageNumber(long pageNumber) {
-        PageNumber = pageNumber;
-    }
-
-    public long getTotalRecordCount() {
-        return TotalRecordCount;
-    }
-
-    public void setTotalRecordCount(long totalRecordCount) {
-        TotalRecordCount = totalRecordCount;
-    }
-
-    public long getTotalFileSize() {
-        return TotalFileSize;
-    }
-
-    public void setTotalFileSize(long totalFileSize) {
-        TotalFileSize = totalFileSize;
-    }
-
-    public String getRequestId() {
-        return RequestId;
-    }
-
-    public void setRequestId(String requestId) {
-        RequestId = requestId;
-    }
-
-    public long getPageRecordCount() {
-        return PageRecordCount;
-    }
-
-    public void setPageRecordCount(long pageRecordCount) {
-        PageRecordCount = pageRecordCount;
-    }
-}

+ 0 - 69
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsBackupPolicy.java

@@ -1,69 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
-
-/**
- * @author chengjin.lyf on 2018/8/7 下午2:26
- * @since 1.0.25
- */
-public class RdsBackupPolicy {
-
-    /**
-     * 数据备份保留天数(7到730天)。
-     */
-    private String BackupRetentionPeriod;
-    /**
-     * 数据备份时间,格式:HH:mmZ- HH:mm Z。
-     */
-    private String PreferredBackupTime;
-    /**
-     * 数据备份周期。Monday:周一;Tuesday:周二;Wednesday:周三;Thursday:周四;Friday:周五;Saturday:周六;Sunday:周日。
-     */
-    private String PreferredBackupPeriod;
-    /**
-     * 日志备份状态。Enable:开启;Disabled:关闭。
-     */
-    private boolean BackupLog;
-    /**
-     * 日志备份保留天数(7到730天)。
-     */
-    private int LogBackupRetentionPeriod;
-
-    public String getBackupRetentionPeriod() {
-        return BackupRetentionPeriod;
-    }
-
-    public void setBackupRetentionPeriod(String backupRetentionPeriod) {
-        BackupRetentionPeriod = backupRetentionPeriod;
-    }
-
-    public String getPreferredBackupTime() {
-        return PreferredBackupTime;
-    }
-
-    public void setPreferredBackupTime(String preferredBackupTime) {
-        PreferredBackupTime = preferredBackupTime;
-    }
-
-    public String getPreferredBackupPeriod() {
-        return PreferredBackupPeriod;
-    }
-
-    public void setPreferredBackupPeriod(String preferredBackupPeriod) {
-        PreferredBackupPeriod = preferredBackupPeriod;
-    }
-
-    public boolean isBackupLog() {
-        return BackupLog;
-    }
-
-    public void setBackupLog(boolean backupLog) {
-        BackupLog = backupLog;
-    }
-
-    public int getLogBackupRetentionPeriod() {
-        return LogBackupRetentionPeriod;
-    }
-
-    public void setLogBackupRetentionPeriod(int logBackupRetentionPeriod) {
-        LogBackupRetentionPeriod = logBackupRetentionPeriod;
-    }
-}

+ 0 - 19
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/data/RdsItem.java

@@ -1,19 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.rds.data;
-
-import java.util.List;
-
-/**
- * @author chengjin.lyf on 2018/8/7 下午2:26
- * @since 1.0.25
- */
-public class RdsItem {
-    private List<BinlogFile> BinLogFile;
-
-    public List<BinlogFile> getBinLogFile() {
-        return BinLogFile;
-    }
-
-    public void setBinLogFile(List<BinlogFile> binLogFile) {
-        BinLogFile = binLogFile;
-    }
-}

+ 0 - 250
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/AbstractRequest.java

@@ -1,250 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.rds.request;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-import javax.crypto.Mac;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.SecretKeySpec;
-import javax.net.ssl.SSLContext;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.config.Registry;
-import org.apache.http.config.RegistryBuilder;
-import org.apache.http.conn.HttpClientConnectionManager;
-import org.apache.http.conn.socket.PlainConnectionSocketFactory;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.conn.ssl.TrustStrategy;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.http.ssl.SSLContexts;
-import org.apache.http.util.EntityUtils;
-
-import io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * @author chengjin.lyf on 2018/8/7 下午2:26
- * @since 1.0.25
- */
-public abstract class AbstractRequest<T> {
-
-    /**
-     * 要求的编码格式
-     */
-    private static final String ENCODING = "UTF-8";
-    /**
-     * 要求的sign签名算法
-     */
-    private static final String MAC_NAME = "HmacSHA1";
-
-    private String accessKeyId;
-
-    private String accessKeySecret;
-
-    /**
-     *  api 版本
-     *
-     */
-    private String version;
-
-    private String endPoint = "rds.aliyuncs.com";
-
-    private String protocol = "http";
-
-    public void setProtocol(String protocol) {
-        this.protocol = protocol;
-    }
-
-    private int timeout = (int) TimeUnit.MINUTES.toMillis(1);
-
-
-    private Map<String, String> treeMap = new TreeMap();
-
-    public void putQueryString(String name, String value){
-        if (StringUtils.isBlank(name) || StringUtils.isBlank(value)){
-            return;
-        }
-        treeMap.put(name, value);
-    }
-
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-
-    public void setEndPoint(String endPoint) {
-        this.endPoint = endPoint;
-    }
-
-    public void setAccessKeyId(String accessKeyId) {
-        this.accessKeyId = accessKeyId;
-    }
-
-    public void setAccessKeySecret(String accessKeySecret) {
-        this.accessKeySecret = accessKeySecret;
-    }
-
-    /**
-     * 使用 HMAC-SHA1 签名方法对对encryptText进行签名
-     *
-     * @param encryptText 被签名的字符串
-     * @param encryptKey 密钥
-     * @return
-     * @throws Exception
-     */
-    private byte[] HmacSHA1Encrypt(String encryptText, String encryptKey) throws Exception {
-        byte[] data = encryptKey.getBytes(ENCODING);
-        // 根据给定的字节数组构造一个密钥,第二参数指定一个密钥算法的名称
-        SecretKey secretKey = new SecretKeySpec(data, MAC_NAME);
-        // 生成一个指定 Mac 算法 的 Mac 对象
-        Mac mac = Mac.getInstance(MAC_NAME);
-        // 用给定密钥初始化 Mac 对象
-        mac.init(secretKey);
-
-        byte[] text = encryptText.getBytes(ENCODING);
-        // 完成 Mac 操作
-        return mac.doFinal(text);
-    }
-
-    private String base64(byte input[]) throws UnsupportedEncodingException {
-        return new String(Base64.encodeBase64(input), ENCODING);
-    }
-
-    private String concatQueryString(Map<String, String> parameters) throws UnsupportedEncodingException {
-        if (null == parameters) {
-            return null;
-        }
-        StringBuilder urlBuilder = new StringBuilder("");
-        for (Map.Entry<String, String> entry : parameters.entrySet()) {
-            String key = entry.getKey();
-            String val = entry.getValue();
-            urlBuilder.append(encode(key));
-            if (val != null) {
-                urlBuilder.append("=").append(encode(val));
-            }
-            urlBuilder.append("&");
-        }
-        int strIndex = urlBuilder.length();
-        if (parameters.size() > 0) {
-            urlBuilder.deleteCharAt(strIndex - 1);
-        }
-        return urlBuilder.toString();
-    }
-
-    private String encode(String value) throws UnsupportedEncodingException {
-        return URLEncoder.encode(value, "UTF-8");
-    }
-
-    private String makeSignature(TreeMap<String, String> paramMap) throws Exception {
-        String cqs = concatQueryString(paramMap);
-        cqs = encode(cqs);
-        cqs = cqs.replaceAll("\\+", "%20");
-        cqs = cqs.replaceAll("\\*", "%2A");
-        cqs = cqs.replaceAll("%7E", "~");
-        StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append("GET").append("&").append(encode("/")).append("&").append(cqs);
-        return base64(HmacSHA1Encrypt(stringBuilder.toString(), accessKeySecret + "&"));
-    }
-
-    public final String formatUTCTZ(Date date) {
-        SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd'T'HH:mm:ss'Z'");
-        sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
-        return sdf.format(date);
-    }
-
-    private void fillCommonParam(Map<String, String> p) {
-        p.put("Format", "JSON");
-        p.put("Version", version);
-        p.put("AccessKeyId", accessKeyId);
-        p.put("SignatureMethod", "HMAC-SHA1"); //此处不能用变量 MAC_NAME
-        p.put("Timestamp", formatUTCTZ(new Date()));
-        p.put("SignatureVersion", "1.0");
-        p.put("SignatureNonce", UUID.randomUUID().toString());
-    }
-
-    private String makeRequestString(Map<String, String> param) throws Exception {
-        fillCommonParam(param);
-        String sign = makeSignature(new TreeMap<String, String>(param));
-        StringBuilder builder = new StringBuilder();
-        for (Map.Entry<String, String> entry : param.entrySet()) {
-            builder.append(encode(entry.getKey())).append("=").append(encode(entry.getValue())).append("&");
-        }
-        builder.append("Signature").append("=").append(sign);
-        return builder.toString();
-    }
-
-    /**
-     * 执行http请求
-     *
-     * @param getMethod
-     * @return
-     * @throws IOException
-     */
-    private final HttpResponse executeHttpRequest(HttpGet getMethod, String host) throws Exception {
-        SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() {
-
-            @Override
-            public boolean isTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
-                return true;
-            }
-        }).build();
-        SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext,
-                new String[] { "TLSv1" },
-                null,
-                SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
-        Registry registry = RegistryBuilder.create()
-                .register("http", PlainConnectionSocketFactory.INSTANCE)
-                .register("https", sslsf)
-                .build();
-        HttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(registry);
-        CloseableHttpClient httpClient = HttpClientBuilder.create()
-                .setMaxConnPerRoute(50)
-                .setMaxConnTotal(100)
-                .setConnectionManager(httpClientConnectionManager)
-                .build();
-        RequestConfig requestConfig = RequestConfig.custom()
-                .setConnectTimeout(timeout)
-                .setConnectionRequestTimeout(timeout)
-                .setSocketTimeout(timeout)
-                .build();
-        getMethod.setConfig(requestConfig);
-        HttpResponse response = httpClient.execute(getMethod);
-        int statusCode = response.getStatusLine().getStatusCode();
-        if (statusCode != HttpResponseStatus.OK.code() && statusCode != HttpResponseStatus.PARTIAL_CONTENT.code()) {
-            String result = EntityUtils.toString(response.getEntity());
-            throw new RuntimeException("return error !" + response.getStatusLine().getReasonPhrase() + ", " + result);
-        }
-        return response;
-    }
-
-    protected abstract T processResult(HttpResponse response) throws Exception;
-
-    protected void processBefore(){
-
-    }
-
-    public final T  doAction() throws Exception {
-        processBefore();
-        String requestStr = makeRequestString(treeMap);
-        HttpGet httpGet = new HttpGet(protocol + "://" +endPoint + "?" + requestStr);
-        HttpResponse response = executeHttpRequest(httpGet, endPoint);
-        if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-            String result = EntityUtils.toString(response.getEntity());
-            throw new RuntimeException("http request failed! " + result);
-        }
-        return processResult(response);
-    }
-}

+ 0 - 41
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBackupPolicyRequest.java

@@ -1,41 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.rds.request;
-
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.RdsBackupPolicy;
-import org.apache.http.HttpResponse;
-import org.apache.http.util.EntityUtils;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-
-/**
- * rds 备份策略查询
- * @author chengjin.lyf on 2018/8/7 下午3:41
- * @since 1.0.25
- */
-public class DescribeBackupPolicyRequest extends AbstractRequest<RdsBackupPolicy> {
-
-
-    public DescribeBackupPolicyRequest() {
-        setVersion("2014-08-15");
-        putQueryString("Action", "DescribeBackupPolicy");
-
-    }
-
-
-    public void setRdsInstanceId(String rdsInstanceId) {
-        putQueryString("DBInstanceId", rdsInstanceId);
-    }
-
-    @Override
-    protected RdsBackupPolicy processResult(HttpResponse response) throws Exception {
-        String result = EntityUtils.toString(response.getEntity());
-        JSONObject jsonObj = JSON.parseObject(result);
-        RdsBackupPolicy policy = new RdsBackupPolicy();
-        policy.setBackupRetentionPeriod(jsonObj.getString("BackupRetentionPeriod"));
-        policy.setBackupLog(jsonObj.getString("BackupLog").equalsIgnoreCase("Enable"));
-        policy.setLogBackupRetentionPeriod(jsonObj.getIntValue("LogBackupRetentionPeriod"));
-        policy.setPreferredBackupPeriod(jsonObj.getString("PreferredBackupPeriod"));
-        policy.setPreferredBackupTime(jsonObj.getString("PreferredBackupTime"));
-        return policy;
-    }
-}

+ 0 - 56
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/rds/request/DescribeBinlogFilesRequest.java

@@ -1,56 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.rds.request;
-
-import java.util.Date;
-
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.data.DescribeBinlogFileResult;
-import org.apache.http.HttpResponse;
-import org.apache.http.util.EntityUtils;
-
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
-
-/**
- * @author chengjin.lyf on 2018/8/7 下午3:41
- * @since 1.0.25
- */
-public class DescribeBinlogFilesRequest extends AbstractRequest<DescribeBinlogFileResult> {
-
-
-    public DescribeBinlogFilesRequest() {
-        setVersion("2014-08-15");
-        putQueryString("Action", "DescribeBinlogFiles");
-
-    }
-
-    public void setRdsInstanceId(String rdsInstanceId) {
-        putQueryString("DBInstanceId", rdsInstanceId);
-    }
-
-    public void setPageSize(int pageSize) {
-        putQueryString("PageSize", String.valueOf(pageSize));
-    }
-
-    public void setPageNumber(int pageNumber) {
-        putQueryString("PageNumber", String.valueOf(pageNumber));
-    }
-
-    public void setStartDate(Date startDate) {
-        putQueryString("StartTime" , formatUTCTZ(startDate));
-    }
-
-    public void setEndDate(Date endDate) {
-        putQueryString("EndTime" , formatUTCTZ(endDate));
-    }
-
-    public void setResourceOwnerId(Long resourceOwnerId) {
-        putQueryString("ResourceOwnerId", String.valueOf(resourceOwnerId));
-    }
-
-    @Override
-    protected DescribeBinlogFileResult processResult(HttpResponse response) throws Exception {
-        String result = EntityUtils.toString(response.getEntity());
-        DescribeBinlogFileResult describeBinlogFileResult = JSONObject.parseObject(result, new TypeReference<DescribeBinlogFileResult>() {
-        });
-        return describeBinlogFileResult;
-    }
-}

+ 0 - 191
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/HistoryTableMetaCache.java

@@ -1,191 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
-import com.alibaba.otter.canal.parse.inbound.TableMeta;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.CacheConnectionNull;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.NoHistoryException;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-import java.io.IOException;
-import java.util.*;
-
-public class HistoryTableMetaCache {
-    private TableMetaStorage tableMetaStorage;
-    private MysqlConnection metaConnection;
-    private LoadingCache<String, Map<Long, TableMeta>> cache; // 第一层:数据库名.表名,第二层时间戳,TableMeta
-
-    public HistoryTableMetaCache() {
-        cache = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Long, TableMeta>>() {
-            @Override
-            public Map<Long, TableMeta> load(String tableName) throws Exception {
-                Long timestamp = new Date().getTime();
-                String[] strs = tableName.split("\\.");
-                String schema = strs[0];
-                if (tableMetaStorage != null) {
-                    init(tableMetaStorage.fetchByTableName(tableName)); // 从存储中读取表的历史ddl
-                }
-                ResultSetPacket resultSetPacket = connectionQuery("show create table " + tableName); // 获取当前ddl
-                String currentDdl = resultSetPacket.getFieldValues().get(1);
-                if (cache.asMap().containsKey(tableName)) {
-                    Map<Long, TableMeta> tableMetaMap = cache.getUnchecked(tableName);
-                    if (tableMetaMap.isEmpty()) {
-                        put(schema, tableName, currentDdl, timestamp - 1000L); // 放入当前schema,取时间为当前时间-1s
-                    } else {                                               // 如果table存在历史
-                        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
-                        Long firstTimestamp = iterator.next();
-                        TableMeta first = tableMetaMap.get(firstTimestamp); // 拿第一条ddl
-                        if (!first.getDdl().equalsIgnoreCase(currentDdl)) { // 当前ddl与历史第一条不一致,放入当前ddl
-                            put(schema, tableName, currentDdl, calculateNewTimestamp(firstTimestamp)); // 计算放入的timestamp,设为第一条时间+1s
-                        }
-                    }
-                } else {
-                    put(schema, tableName, currentDdl, timestamp - 1000L); // 放入当前schema
-                }
-                return cache.get(tableName);
-            }
-        });
-    }
-
-    public void init(List<TableMetaEntry> entries) throws IOException {
-        if (entries == null) {
-            return;
-        }
-        for (TableMetaEntry entry : entries) {
-            try {
-                put(entry.getSchema(), entry.getTable(), entry.getDdl(), entry.getTimestamp());
-            } catch (CacheConnectionNull cacheConnectionNull) {
-                cacheConnectionNull.printStackTrace();
-            }
-        }
-    }
-
-    public TableMeta put(String schema, String table, String ddl, Long timestamp) throws CacheConnectionNull, IOException {
-        ResultSetPacket resultSetPacket;
-        if (!(ddl.contains("CREATE TABLE") || ddl.contains("create table"))) { // 尝试直接从数据库拉取CREATE TABLE的DDL
-            resultSetPacket = connectionQuery("show create table " + table);
-            ddl = resultSetPacket.getFieldValues().get(1);
-        } else { // CREATE TABLE 的 DDL
-            resultSetPacket = new ResultSetPacket();
-            List<String> fields = new ArrayList<String>();
-            String[] strings = table.split("\\.");
-            String shortTable = table;
-            if (strings.length > 1) {
-                shortTable = strings[1];
-            }
-            fields.add(0, shortTable);
-            fields.add(1, ddl);
-            resultSetPacket.setFieldValues(fields);
-            if (metaConnection != null) {
-                resultSetPacket.setSourceAddress(metaConnection.getAddress());
-            }
-        }
-        Map<Long, TableMeta> tableMetaMap;
-        if (!cache.asMap().containsKey(table)) {
-            tableMetaMap = new TreeMap<Long, TableMeta>(new Comparator<Long>() {
-                @Override
-                public int compare(Long o1, Long o2) {
-                    return o2.compareTo(o1);
-                }
-            });
-            cache.put(table, tableMetaMap);
-        } else {
-            tableMetaMap = cache.getUnchecked(table);
-        }
-        eliminate(tableMetaMap); // 淘汰旧的TableMeta
-        TableMeta tableMeta = new TableMeta(schema, table, TableMetaCache.parseTableMeta(schema, table, resultSetPacket));
-        if (tableMeta.getDdl() == null) { // 生成的TableMeta有时DDL为null
-            tableMeta.setDdl(ddl);
-        }
-        tableMetaMap.put(timestamp, tableMeta);
-        return tableMeta;
-    }
-
-    public TableMeta get(String schema, String table, Long timestamp) throws NoHistoryException, CacheConnectionNull {
-        Map<Long, TableMeta> tableMetaMap = cache.getUnchecked(table);
-        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
-        Long selected = null;
-        while(iterator.hasNext()) {
-            Long temp = iterator.next();
-            if (timestamp > temp) {
-                selected = temp;
-                break;
-            }
-        }
-
-        if (selected == null) {
-            iterator = tableMetaMap.keySet().iterator();
-            if (iterator.hasNext()) {
-                selected = iterator.next();
-            } else {
-                throw new NoHistoryException(schema, table);
-            }
-        }
-
-        return tableMetaMap.get(selected);
-    }
-
-    public void clearTableMeta() {
-        cache.invalidateAll();
-    }
-
-    public void clearTableMetaWithSchemaName(String schema) {
-        for (String tableName : cache.asMap().keySet()) {
-            String[] strs = tableName.split("\\.");
-            if (schema.equalsIgnoreCase(strs[0])) {
-                cache.invalidate(tableName);
-            }
-        }
-    }
-
-    public void clearTableMeta(String schema, String table) {
-        if (!table.contains(".")) {
-            table = schema+"."+table;
-        }
-        cache.invalidate(table);
-    }
-
-    // eliminate older table meta in cache
-    private void eliminate(Map<Long, TableMeta> tableMetaMap) {
-        int MAX_CAPABILITY = 20;
-        if (tableMetaMap.keySet().size() < MAX_CAPABILITY) {
-            return;
-        }
-        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
-        while(iterator.hasNext()) {
-            iterator.next();
-        }
-        iterator.remove();
-    }
-
-    private Long calculateNewTimestamp(Long oldTimestamp) {
-        return oldTimestamp + 1000;
-    }
-
-    private ResultSetPacket connectionQuery(String query) throws CacheConnectionNull, IOException {
-        if (metaConnection == null) {
-            throw new CacheConnectionNull();
-        }
-        try {
-            return metaConnection.query(query);
-        } catch (IOException e) {
-            try {
-                metaConnection.reconnect();
-                return metaConnection.query(query);
-            } catch (IOException e1) {
-                throw e1;
-            }
-        }
-    }
-
-    public void setMetaConnection(MysqlConnection metaConnection) {
-        this.metaConnection = metaConnection;
-    }
-
-    public void setTableMetaStorage(TableMetaStorage tableMetaStorage) {
-        this.tableMetaStorage = tableMetaStorage;
-    }
-}

+ 0 - 20
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheInterface.java

@@ -1,20 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import com.alibaba.otter.canal.parse.inbound.TableMeta;
-import com.alibaba.otter.canal.protocol.position.EntryPosition;
-
-public interface TableMetaCacheInterface {
-
-    TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position);
-
-    void clearTableMeta();
-
-    void clearTableMetaWithSchemaName(String schema);
-
-    void clearTableMeta(String schema, String table);
-
-    boolean apply(EntryPosition position, String schema, String ddl, String extra);
-
-    boolean isOnRDS();
-
-}

+ 0 - 105
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheWithStorage.java

@@ -1,105 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import com.alibaba.otter.canal.parse.inbound.TableMeta;
-import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
-import com.alibaba.otter.canal.protocol.position.EntryPosition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-public class TableMetaCacheWithStorage implements TableMetaCacheInterface {
-
-    private static Logger logger = LoggerFactory.getLogger(TableMetaCacheWithStorage.class);
-    private TableMetaStorage tableMetaStorage; // TableMeta存储
-    private HistoryTableMetaCache cache = new HistoryTableMetaCache(); // cache
-
-    public TableMetaCacheWithStorage(MysqlConnection con, TableMetaStorage tableMetaStorage) {
-        this.tableMetaStorage = tableMetaStorage;
-        InetSocketAddress address = con.getAddress();
-        this.tableMetaStorage.setDbAddress(address.getHostName()+":"+address.getPort());
-        cache.setMetaConnection(con);
-        cache.setTableMetaStorage(tableMetaStorage);
-        if (tableMetaStorage != null) {
-            try {
-                cache.init(tableMetaStorage.fetch()); // 初始化,从存储拉取TableMeta
-            } catch (IOException e) {
-                logger.error(e.getMessage());
-            }
-        }
-    }
-
-    @Override
-    public boolean apply(EntryPosition position, String fullTableName, String ddl, String extra) {
-        String[] strs = fullTableName.split("\\.");
-        String schema = strs[0];
-        if (schema.equalsIgnoreCase("null")) { // ddl schema为null,放弃处理
-            return false;
-        }
-        try {
-            TableMeta tableMeta = cache.get(schema, fullTableName, position.getTimestamp());
-            if (!compare(tableMeta, ddl)) { // 获取最近的TableMeta,进行比对
-                TableMeta result = cache.put(schema, fullTableName, ddl, calTimestamp(position.getTimestamp()));
-                if (tableMetaStorage != null && result != null) { // 储存
-                    tableMetaStorage.store(schema, fullTableName, result.getDdl(), calTimestamp(position.getTimestamp()));
-                }
-            }
-            return true;
-        } catch (Exception e) {
-            logger.error(e.toString());
-        }
-
-        return false;
-    }
-
-    @Override
-    public boolean isOnRDS() {
-        return false;
-    }
-
-    /***
-     *
-     * @param schema dbname
-     * @param table tablename
-     * @param useCache unused
-     * @param position timestamp
-     * @return
-     */
-    @Override
-    public TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
-        String fulltbName = schema + "." + table;
-        try {
-            return cache.get(schema, fulltbName, position.getTimestamp());
-        } catch (Exception e) {
-            logger.error(e.toString());
-        }
-        return null;
-    }
-
-    @Override
-    public void clearTableMeta() {
-        cache.clearTableMeta();
-    }
-
-    @Override
-    public void clearTableMetaWithSchemaName(String schema) {
-        cache.clearTableMetaWithSchemaName(schema);
-    }
-
-    @Override
-    public void clearTableMeta(String schema, String table) {
-        cache.clearTableMeta(schema, table);
-    }
-
-    private boolean compare(TableMeta tableMeta, String ddl) {
-        if (tableMeta == null) {
-            return false;
-        }
-        return tableMeta.getDdl().equalsIgnoreCase(ddl);
-    }
-
-    private Long calTimestamp(Long timestamp) {
-        return timestamp;
-    }
-}

+ 0 - 55
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaEntry.java

@@ -1,55 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import java.io.Serializable;
-
-public class TableMetaEntry implements Serializable {
-
-    private static final long serialVersionUID = -1350200637109107904L;
-
-    private String dbAddress;
-    private String schema;
-    private String table;
-    private String ddl;
-    private Long timestamp;
-
-
-    public String getSchema() {
-        return schema;
-    }
-
-    public void setSchema(String schema) {
-        this.schema = schema;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public void setTable(String table) {
-        this.table = table;
-    }
-
-    public String getDdl() {
-        return ddl;
-    }
-
-    public void setDdl(String ddl) {
-        this.ddl = ddl;
-    }
-
-    public Long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(Long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    public String getDbAddress() {
-        return dbAddress;
-    }
-
-    public void setDbAddress(String dbAddress) {
-        this.dbAddress = dbAddress;
-    }
-}

+ 0 - 18
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorage.java

@@ -1,18 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-import java.util.List;
-
-public interface TableMetaStorage {
-
-    void store(String schema, String table, String ddl, Long timestamp);
-
-    List<TableMetaEntry> fetch();
-
-    List<TableMetaEntry> fetchByTableName(String tableName);
-
-    String getDbName();
-
-    String getDbAddress();
-
-    void setDbAddress(String address);
-}

+ 0 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorageFactory.java

@@ -1,9 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
-
-public interface TableMetaStorageFactory {
-
-    TableMetaStorage getTableMetaStorage();
-
-    String getDbName();
-
-}

+ 0 - 9
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/CacheConnectionNull.java

@@ -1,9 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception;
-
-public class CacheConnectionNull extends Exception{
-
-    @Override
-    public String toString() {
-        return "CacheConnectionNull";
-    }
-}

+ 0 - 21
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/NoHistoryException.java

@@ -1,21 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception;
-
-public class NoHistoryException extends Exception{
-
-    private String dbName;
-    private String tbName;
-
-    public NoHistoryException(String dbName, String tbName) {
-        this.dbName = dbName;
-        this.tbName = tbName;
-    }
-
-    public void printTableName() {
-        System.out.println(dbName+"."+tbName);
-    }
-
-    @Override
-    public String toString() {
-        return "NioHistoryException: " + dbName + " " + tbName;
-    }
-}

+ 0 - 14
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaCallback.java

@@ -1,14 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
-
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
-
-import java.util.List;
-
-public interface MySqlTableMetaCallback {
-
-    void save(String dbAddress, String schema, String table,String ddl, Long timestamp);
-
-    List<TableMetaEntry> fetch(String dbAddress, String dbName);
-
-    List<TableMetaEntry> fetch(String dbAddress, String dbName, String tableName);
-}

+ 0 - 48
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorage.java

@@ -1,48 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
-
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
-
-import java.util.List;
-
-public class MySqlTableMetaStorage implements TableMetaStorage {
-    private MySqlTableMetaCallback mySqlTableMetaCallback;
-    private String dbName;
-    private String dbAddress;
-
-    MySqlTableMetaStorage(MySqlTableMetaCallback callback, String dbName) {
-        mySqlTableMetaCallback = callback;
-        this.dbName = dbName;
-    }
-
-
-    @Override
-    public void store(String schema, String table, String ddl, Long timestamp) {
-        mySqlTableMetaCallback.save(dbAddress, schema, table, ddl, timestamp);
-    }
-
-    @Override
-    public List<TableMetaEntry> fetch() {
-        return mySqlTableMetaCallback.fetch(dbAddress, dbName);
-    }
-
-    @Override
-    public List<TableMetaEntry> fetchByTableName(String tableName) {
-        return mySqlTableMetaCallback.fetch(dbAddress, dbName, tableName);
-    }
-
-    @Override
-    public String getDbName() {
-        return dbName;
-    }
-
-    @Override
-    public String getDbAddress() {
-        return dbAddress;
-    }
-
-    @Override
-    public void setDbAddress(String address) {
-        this.dbAddress = address;
-    }
-}

+ 0 - 26
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorageFactory.java

@@ -1,26 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
-
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
-
-public class MySqlTableMetaStorageFactory implements TableMetaStorageFactory {
-
-    private MySqlTableMetaCallback mySQLTableMetaCallback;
-    private String dbName;
-
-    public MySqlTableMetaStorageFactory(MySqlTableMetaCallback callback, String dbName) {
-        mySQLTableMetaCallback = callback;
-        this.dbName = dbName;
-    }
-
-    @Override
-    public TableMetaStorage getTableMetaStorage() {
-        return new MySqlTableMetaStorage(mySQLTableMetaCallback, dbName);
-    }
-
-    @Override
-    public String getDbName() {
-        return dbName;
-    }
-
-}

+ 0 - 123
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/RdsBinlogEventParserProxyTest.java

@@ -1,123 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.alibaba.otter.canal.parse.helper.TimeoutChecker;
-import com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaCallback;
-import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaStorageFactory;
-import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
-import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
-import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.position.EntryPosition;
-import com.alibaba.otter.canal.protocol.position.LogPosition;
-import com.alibaba.otter.canal.sink.exception.CanalSinkException;
-
-/**
- * @author chengjin.lyf on 2018/7/21 下午5:24
- * @since 1.0.25
- */
-public class RdsBinlogEventParserProxyTest {
-
-    private static final String DETECTING_SQL = "insert into retl.xdual values(1,now()) on duplicate key update x=now()";
-    private static final String MYSQL_ADDRESS = "";
-    private static final String USERNAME      = "";
-    private static final String PASSWORD      = "";
-    public static final String DBNAME = "";
-    public static final String TBNAME = "";
-    public static final String DDL = "";
-
-
-    @Test
-    public void test_timestamp() throws InterruptedException {
-        final TimeoutChecker timeoutChecker = new TimeoutChecker(3000 * 1000);
-        final AtomicLong entryCount = new AtomicLong(0);
-        final EntryPosition entryPosition = new EntryPosition();
-
-        final RdsBinlogEventParserProxy controller = new RdsBinlogEventParserProxy();
-        Calendar calendar = Calendar.getInstance();
-        calendar.add(Calendar.DAY_OF_YEAR, -1);
-        final EntryPosition defaultPosition = buildPosition(null, null, calendar.getTimeInMillis());
-        controller.setSlaveId(3344L);
-        controller.setDetectingEnable(false);
-        controller.setDetectingSQL(DETECTING_SQL);
-        controller.setMasterInfo(buildAuthentication());
-        controller.setMasterPosition(defaultPosition);
-        controller.setInstanceId("");
-        controller.setAccesskey("");
-        controller.setSecretkey("");
-        controller.setBatchSize(4);
-//        controller.setRdsOpenApiUrl("https://rds.aliyuncs.com/");
-        controller.setEventSink(new AbstractCanalEventSinkTest<List<CanalEntry.Entry>>() {
-
-            @Override
-            public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination)
-                    throws CanalSinkException {
-                for (CanalEntry.Entry entry : entrys) {
-                    if (entry.getEntryType() != CanalEntry.EntryType.HEARTBEAT) {
-                        entryCount.incrementAndGet();
-
-                        String logfilename = entry.getHeader().getLogfileName();
-                        long logfileoffset = entry.getHeader().getLogfileOffset();
-                        long executeTime = entry.getHeader().getExecuteTime();
-
-                        entryPosition.setJournalName(logfilename);
-                        entryPosition.setPosition(logfileoffset);
-                        entryPosition.setTimestamp(executeTime);
-                        break;
-                    }
-                }
-                return true;
-            }
-        });
-
-        controller.setLogPositionManager(new AbstractLogPositionManager() {
-
-            private LogPosition logPosition;
-            public void persistLogPosition(String destination, LogPosition logPosition) {
-                System.out.println(logPosition);
-                this.logPosition = logPosition;
-            }
-
-            public LogPosition getLatestIndexBy(String destination) {
-                return logPosition;
-            }
-        });
-
-        controller.start();
-        timeoutChecker.waitForIdle();
-
-        if (controller.isStart()) {
-            controller.stop();
-        }
-
-        // check
-        Assert.assertTrue(entryCount.get() > 0);
-
-        // 对比第一条数据和起始的position相同
-        Assert.assertEquals(entryPosition.getJournalName(), "mysql-bin.000001");
-        Assert.assertTrue(entryPosition.getPosition() <= 6163L);
-        Assert.assertTrue(entryPosition.getTimestamp() <= defaultPosition.getTimestamp());
-    }
-
-
-    // ======================== helper method =======================
-
-    private EntryPosition buildPosition(String binlogFile, Long offest, Long timestamp) {
-        return new EntryPosition(binlogFile, offest, timestamp);
-    }
-
-    private AuthenticationInfo buildAuthentication() {
-        return new AuthenticationInfo(new InetSocketAddress(MYSQL_ADDRESS, 3306), USERNAME, PASSWORD);
-    }
-}

+ 38 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/NoStorageTest.java

@@ -0,0 +1,38 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Date;
+
+public class NoStorageTest {
+    final String DBNAME = "testdb";
+    final String TBNAME = "testtb";
+    final String DDL = "CREATE TABLE `testtb` (\n" +
+            "   `id` int(11) NOT NULL AUTO_INCREMENT,\n" +
+            "   `name` varchar(2048) DEFAULT NULL,\n" +
+            "   `datachange_lasttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最晚更新时间',\n" +
+            "   `otter_testcol` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol1` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol2` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol3` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol4` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol5` varchar(45) DEFAULT NULL,\n" +
+            "   PRIMARY KEY (`id`)\n" +
+            " ) ENGINE=InnoDB AUTO_INCREMENT=58333898 DEFAULT CHARSET=utf8mb4";
+    @Test
+    public void nostorage() {
+        MysqlConnection connection = new MysqlConnection(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+        TableMetaCacheWithStorage tableMetaCacheWithStorage = new TableMetaCacheWithStorage(connection, null);
+        EntryPosition entryPosition = new EntryPosition();
+        entryPosition.setTimestamp(new Date().getTime());
+        String fullTableName = DBNAME + "." + TBNAME;
+        tableMetaCacheWithStorage.apply(entryPosition, fullTableName, DDL, null);
+        entryPosition.setTimestamp(new Date().getTime() + 1000L);
+        TableMeta result = tableMetaCacheWithStorage.getTableMeta(DBNAME, TBNAME, false, entryPosition);
+        assert result.getDdl().equalsIgnoreCase(DDL);
+    }
+}

+ 77 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/StorageTest.java

@@ -0,0 +1,77 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaCallback;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaStorageFactory;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import com.alibaba.otter.canal.protocol.position.Position;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class StorageTest {
+
+    final String DBNAME = "testdb";
+    final String TBNAME = "testtb";
+    final String DDL = "CREATE TABLE `testtb` (\n" +
+            "   `id` int(11) NOT NULL AUTO_INCREMENT,\n" +
+            "   `name` varchar(2048) DEFAULT NULL,\n" +
+            "   `datachange_lasttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最晚更新时间',\n" +
+            "   `otter_testcol` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol1` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol2` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol3` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol4` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol5` varchar(45) DEFAULT NULL,\n" +
+            "   PRIMARY KEY (`id`)\n" +
+            " ) ENGINE=InnoDB AUTO_INCREMENT=58333898 DEFAULT CHARSET=utf8mb4";
+
+    @Test
+    public void storage() {
+
+        MySqlTableMetaStorageFactory factory = new MySqlTableMetaStorageFactory(new MySqlTableMetaCallback() {
+            @Override
+            public void save(String dbAddress, String schema, String table, String ddl, Long timestamp) {
+
+            }
+
+            @Override
+            public List<TableMetaEntry> fetch(String dbAddress, String dbName) {
+                TableMetaEntry tableMeta = new TableMetaEntry();
+                tableMeta.setSchema(DBNAME);
+                tableMeta.setTable(TBNAME);
+                tableMeta.setDdl(DDL);
+                tableMeta.setTimestamp(new Date().getTime());
+                List<TableMetaEntry> entries = new ArrayList<TableMetaEntry>();
+                entries.add(tableMeta);
+                return entries;
+            }
+
+            @Override
+            public List<TableMetaEntry> fetch(String dbAddress, String dbName, String tableName) {
+                TableMetaEntry tableMeta = new TableMetaEntry();
+                tableMeta.setSchema(DBNAME);
+                tableMeta.setTable(TBNAME);
+                tableMeta.setDdl(DDL);
+                tableMeta.setTimestamp(new Date().getTime());
+                List<TableMetaEntry> entries = new ArrayList<TableMetaEntry>();
+                entries.add(tableMeta);
+                return entries;
+            }
+        }, DBNAME);
+        MysqlConnection connection = new MysqlConnection(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+        TableMetaCacheWithStorage tableMetaCacheWithStorage = new TableMetaCacheWithStorage(connection, factory.getTableMetaStorage());
+        EntryPosition entryPosition = new EntryPosition();
+        entryPosition.setTimestamp(new Date().getTime());
+        String fullTableName = DBNAME + "." + TBNAME;
+        tableMetaCacheWithStorage.apply(entryPosition, fullTableName, DDL, null);
+
+        entryPosition.setTimestamp(new Date().getTime() + 1000L);
+        TableMeta result = tableMetaCacheWithStorage.getTableMeta(DBNAME, TBNAME, false, entryPosition);
+        assert result.getDdl().equalsIgnoreCase(DDL);
+    }
+}