Browse Source

为了解决回追数据时Rowdata与当前schema不匹配
对TableMeta提供持久化接口,可以根据timestamp匹配历史schema

yfpeng 6 years ago
parent
commit
02868d8fec
19 changed files with 687 additions and 11 deletions
  1. 8 0
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java
  2. 12 0
      instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java
  3. 22 5
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
  4. 7 4
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java
  5. 6 1
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
  6. 191 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/HistoryTableMetaCache.java
  7. 20 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheInterface.java
  8. 105 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheWithStorage.java
  9. 55 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaEntry.java
  10. 18 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorage.java
  11. 9 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorageFactory.java
  12. 9 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/CacheConnectionNull.java
  13. 21 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/NoHistoryException.java
  14. 14 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaCallback.java
  15. 48 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorage.java
  16. 26 0
      parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorageFactory.java
  17. 38 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/NoStorageTest.java
  18. 77 0
      parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/StorageTest.java
  19. 1 1
      pom.xml

+ 8 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -6,6 +6,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.HistoryTableMetaCache;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -242,6 +243,13 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             mysqlEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
             // 数据库信息参数
             mysqlEventParser.setSlaveId(parameters.getSlaveId());
+            mysqlEventParser.setTableMetaStorageFactory(parameters.getTableMetaStorageFactory());
+            // Ctrip callback
+//            mysqlEventParser.setCallback(parameters.getCallback());
+//            HistoryTableMetaCache cache = new HistoryTableMetaCache();
+//            cache.init(parameters.getEntries());
+//            mysqlEventParser.setHistoryTableMetaCache(cache);
+
             if (!CollectionUtils.isEmpty(dbAddresses)) {
                 mysqlEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),
                     parameters.getDbUsername(),

+ 12 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -5,6 +5,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
@@ -106,6 +107,9 @@ public class CanalParameter implements Serializable {
     private Long                     standbyLogfileOffest               = null;
     private Long                     standbyTimestamp                   = null;
 
+    // Ctrip Table Meta
+    TableMetaStorageFactory tableMetaStorageFactory;
+
     public static enum RunMode {
 
         /** 嵌入式 */
@@ -859,6 +863,14 @@ public class CanalParameter implements Serializable {
         this.blackFilter = blackFilter;
     }
 
+    public TableMetaStorageFactory getTableMetaStorageFactory() {
+        return tableMetaStorageFactory;
+    }
+
+    public void setTableMetaStorageFactory(TableMetaStorageFactory tableMetaStorageFactory) {
+        this.tableMetaStorageFactory = tableMetaStorageFactory;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 22 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -10,6 +10,11 @@ import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.alibaba.otter.canal.parse.inbound.*;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheWithStorage;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.util.CollectionUtils;
 
@@ -20,9 +25,6 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.ha.CanalHAController;
-import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
-import com.alibaba.otter.canal.parse.inbound.HeartBeatCallback;
-import com.alibaba.otter.canal.parse.inbound.SinkFunction;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogFormat;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogImage;
 import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
@@ -62,7 +64,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     // 心跳检查信息
     private String             detectingSQL;                                 // 心跳sql
     private MysqlConnection    metaConnection;                               // 查询meta信息的链接
-    private TableMetaCache     tableMetaCache;                               // 对应meta
+    private TableMetaCacheInterface tableMetaCache;                               // 对应meta
                                                                               // cache
     private int                fallbackIntervalInSeconds         = 60;       // 切换回退时间
     private BinlogFormat[]     supportBinlogFormats;                         // 支持的binlogFormat,如果设置会执行强校验
@@ -72,6 +74,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
     private int                dumpErrorCount                    = 0;        // binlogDump失败异常计数
     private int                dumpErrorCountThreshold           = 2;        // binlogDump失败异常计数阀值
 
+    private TableMetaStorageFactory tableMetaStorageFactory;
+
     protected ErosaConnection buildErosaConnection() {
         return buildMysqlConnection(this.runningInfo);
     }
@@ -125,7 +129,13 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
             }
 
-            tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
+
+            TableMetaStorage storage = null;
+            if (tableMetaStorageFactory != null) {
+                storage = tableMetaStorageFactory.getTableMetaStorage();
+            }
+
+            tableMetaCache = new TableMetaCacheWithStorage(metaConnection, storage);
             ((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
         }
     }
@@ -908,4 +918,11 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
         this.dumpErrorCountThreshold = dumpErrorCountThreshold;
     }
 
+    public TableMetaStorageFactory getTableMetaStorageFactory() {
+        return tableMetaStorageFactory;
+    }
+
+    public void setTableMetaStorageFactory(TableMetaStorageFactory tableMetaStorageFactory) {
+        this.tableMetaStorageFactory = tableMetaStorageFactory;
+    }
 }

+ 7 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -10,6 +10,9 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.NoHistoryException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -88,7 +91,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private volatile AviaterRegexFilter nameFilter;                                                          // 运行时引用可能会有变化,比如规则发生变化时
     private volatile AviaterRegexFilter nameBlackFilter;
 
-    private TableMetaCache              tableMetaCache;
+    private TableMetaCacheInterface tableMetaCache;
     private String                      binlogFileName      = "mysql-bin.000001";
     private Charset                     charset             = Charset.defaultCharset();
     private boolean                     filterQueryDcl      = false;
@@ -262,7 +265,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             if (!isSeek) {
                 // 使用新的表结构元数据管理方式
                 EntryPosition position = createPosition(event.getHeader());
-                tableMetaCache.apply(position, event.getDbName(), queryString, null);
+                String fulltbName = schemaName+"."+tableName;
+                tableMetaCache.apply(position, fulltbName, queryString, null);
             }
 
             Header header = createHeader(binlogFileName, event.getHeader(), schemaName, tableName, type);
@@ -901,7 +905,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         this.nameBlackFilter = nameBlackFilter;
     }
 
-    public void setTableMetaCache(TableMetaCache tableMetaCache) {
+    public void setTableMetaCache(TableMetaCacheInterface tableMetaCache) {
         this.tableMetaCache = tableMetaCache;
     }
 
@@ -928,5 +932,4 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     public void setGtidSet(GTIDSet gtidSet) {
         this.gtidSet = gtidSet;
     }
-
 }

+ 6 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -6,6 +6,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaCacheInterface;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.driver.mysql.packets.server.FieldPacket;
@@ -29,7 +30,7 @@ import com.google.common.cache.LoadingCache;
  * @author jianghang 2013-1-17 下午10:15:16
  * @version 1.0.0
  */
-public class TableMetaCache {
+public class TableMetaCache implements TableMetaCacheInterface {
 
     public static final String              COLUMN_NAME    = "COLUMN_NAME";
     public static final String              COLUMN_TYPE    = "COLUMN_TYPE";
@@ -99,6 +100,10 @@ public class TableMetaCache {
             String createDDL = packet.getFieldValues().get(1);
             MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
             memoryTableMeta.apply(DatabaseTableMeta.INIT_POSITION, schema, createDDL, null);
+            String[] strings = table.split("\\.");
+            if (strings.length > 1) {
+                table = strings[1];
+            }
             TableMeta tableMeta = memoryTableMeta.find(schema, table);
             return tableMeta.getFields();
         } else {

+ 191 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/HistoryTableMetaCache.java

@@ -0,0 +1,191 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.driver.mysql.packets.server.ResultSetPacket;
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.CacheConnectionNull;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception.NoHistoryException;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+import java.io.IOException;
+import java.util.*;
+
+public class HistoryTableMetaCache {
+    private TableMetaStorage tableMetaStorage;
+    private MysqlConnection metaConnection;
+    private LoadingCache<String, Map<Long, TableMeta>> cache; // 第一层:数据库名.表名,第二层时间戳,TableMeta
+
+    public HistoryTableMetaCache() {
+        cache = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Long, TableMeta>>() {
+            @Override
+            public Map<Long, TableMeta> load(String tableName) throws Exception {
+                Long timestamp = new Date().getTime();
+                String[] strs = tableName.split("\\.");
+                String schema = strs[0];
+                if (tableMetaStorage != null) {
+                    init(tableMetaStorage.fetchByTableName(tableName)); // 从存储中读取表的历史ddl
+                }
+                ResultSetPacket resultSetPacket = connectionQuery("show create table " + tableName); // 获取当前ddl
+                String currentDdl = resultSetPacket.getFieldValues().get(1);
+                if (cache.asMap().containsKey(tableName)) {
+                    Map<Long, TableMeta> tableMetaMap = cache.getUnchecked(tableName);
+                    if (tableMetaMap.isEmpty()) {
+                        put(schema, tableName, currentDdl, timestamp - 1000L); // 放入当前schema,取时间为当前时间-1s
+                    } else {                                               // 如果table存在历史
+                        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
+                        Long firstTimestamp = iterator.next();
+                        TableMeta first = tableMetaMap.get(firstTimestamp); // 拿第一条ddl
+                        if (!first.getDdl().equalsIgnoreCase(currentDdl)) { // 当前ddl与历史第一条不一致,放入当前ddl
+                            put(schema, tableName, currentDdl, calculateNewTimestamp(firstTimestamp)); // 计算放入的timestamp,设为第一条时间+1s
+                        }
+                    }
+                } else {
+                    put(schema, tableName, currentDdl, timestamp - 1000L); // 放入当前schema
+                }
+                return cache.get(tableName);
+            }
+        });
+    }
+
+    public void init(List<TableMetaEntry> entries) throws IOException {
+        if (entries == null) {
+            return;
+        }
+        for (TableMetaEntry entry : entries) {
+            try {
+                put(entry.getSchema(), entry.getTable(), entry.getDdl(), entry.getTimestamp());
+            } catch (CacheConnectionNull cacheConnectionNull) {
+                cacheConnectionNull.printStackTrace();
+            }
+        }
+    }
+
+    public TableMeta put(String schema, String table, String ddl, Long timestamp) throws CacheConnectionNull, IOException {
+        ResultSetPacket resultSetPacket;
+        if (!(ddl.contains("CREATE TABLE") || ddl.contains("create table"))) { // 尝试直接从数据库拉取CREATE TABLE的DDL
+            resultSetPacket = connectionQuery("show create table " + table);
+            ddl = resultSetPacket.getFieldValues().get(1);
+        } else { // CREATE TABLE 的 DDL
+            resultSetPacket = new ResultSetPacket();
+            List<String> fields = new ArrayList<String>();
+            String[] strings = table.split("\\.");
+            String shortTable = table;
+            if (strings.length > 1) {
+                shortTable = strings[1];
+            }
+            fields.add(0, shortTable);
+            fields.add(1, ddl);
+            resultSetPacket.setFieldValues(fields);
+            if (metaConnection != null) {
+                resultSetPacket.setSourceAddress(metaConnection.getAddress());
+            }
+        }
+        Map<Long, TableMeta> tableMetaMap;
+        if (!cache.asMap().containsKey(table)) {
+            tableMetaMap = new TreeMap<Long, TableMeta>(new Comparator<Long>() {
+                @Override
+                public int compare(Long o1, Long o2) {
+                    return o2.compareTo(o1);
+                }
+            });
+            cache.put(table, tableMetaMap);
+        } else {
+            tableMetaMap = cache.getUnchecked(table);
+        }
+        eliminate(tableMetaMap); // 淘汰旧的TableMeta
+        TableMeta tableMeta = new TableMeta(schema, table, TableMetaCache.parseTableMeta(schema, table, resultSetPacket));
+        if (tableMeta.getDdl() == null) { // 生成的TableMeta有时DDL为null
+            tableMeta.setDdl(ddl);
+        }
+        tableMetaMap.put(timestamp, tableMeta);
+        return tableMeta;
+    }
+
+    public TableMeta get(String schema, String table, Long timestamp) throws NoHistoryException, CacheConnectionNull {
+        Map<Long, TableMeta> tableMetaMap = cache.getUnchecked(table);
+        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
+        Long selected = null;
+        while(iterator.hasNext()) {
+            Long temp = iterator.next();
+            if (timestamp > temp) {
+                selected = temp;
+                break;
+            }
+        }
+
+        if (selected == null) {
+            iterator = tableMetaMap.keySet().iterator();
+            if (iterator.hasNext()) {
+                selected = iterator.next();
+            } else {
+                throw new NoHistoryException(schema, table);
+            }
+        }
+
+        return tableMetaMap.get(selected);
+    }
+
+    public void clearTableMeta() {
+        cache.invalidateAll();
+    }
+
+    public void clearTableMetaWithSchemaName(String schema) {
+        for (String tableName : cache.asMap().keySet()) {
+            String[] strs = tableName.split("\\.");
+            if (schema.equalsIgnoreCase(strs[0])) {
+                cache.invalidate(tableName);
+            }
+        }
+    }
+
+    public void clearTableMeta(String schema, String table) {
+        if (!table.contains(".")) {
+            table = schema+"."+table;
+        }
+        cache.invalidate(table);
+    }
+
+    // eliminate older table meta in cache
+    private void eliminate(Map<Long, TableMeta> tableMetaMap) {
+        int MAX_CAPABILITY = 20;
+        if (tableMetaMap.keySet().size() < MAX_CAPABILITY) {
+            return;
+        }
+        Iterator<Long> iterator = tableMetaMap.keySet().iterator();
+        while(iterator.hasNext()) {
+            iterator.next();
+        }
+        iterator.remove();
+    }
+
+    private Long calculateNewTimestamp(Long oldTimestamp) {
+        return oldTimestamp + 1000;
+    }
+
+    private ResultSetPacket connectionQuery(String query) throws CacheConnectionNull, IOException {
+        if (metaConnection == null) {
+            throw new CacheConnectionNull();
+        }
+        try {
+            return metaConnection.query(query);
+        } catch (IOException e) {
+            try {
+                metaConnection.reconnect();
+                return metaConnection.query(query);
+            } catch (IOException e1) {
+                throw e1;
+            }
+        }
+    }
+
+    public void setMetaConnection(MysqlConnection metaConnection) {
+        this.metaConnection = metaConnection;
+    }
+
+    public void setTableMetaStorage(TableMetaStorage tableMetaStorage) {
+        this.tableMetaStorage = tableMetaStorage;
+    }
+}

+ 20 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheInterface.java

@@ -0,0 +1,20 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+
+public interface TableMetaCacheInterface {
+
+    TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position);
+
+    void clearTableMeta();
+
+    void clearTableMetaWithSchemaName(String schema);
+
+    void clearTableMeta(String schema, String table);
+
+    boolean apply(EntryPosition position, String schema, String ddl, String extra);
+
+    boolean isOnRDS();
+
+}

+ 105 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaCacheWithStorage.java

@@ -0,0 +1,105 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class TableMetaCacheWithStorage implements TableMetaCacheInterface {
+
+    private static Logger logger = LoggerFactory.getLogger(TableMetaCacheWithStorage.class);
+    private TableMetaStorage tableMetaStorage; // TableMeta存储
+    private HistoryTableMetaCache cache = new HistoryTableMetaCache(); // cache
+
+    public TableMetaCacheWithStorage(MysqlConnection con, TableMetaStorage tableMetaStorage) {
+        this.tableMetaStorage = tableMetaStorage;
+        InetSocketAddress address = con.getAddress();
+        this.tableMetaStorage.setDbAddress(address.getHostName()+":"+address.getPort());
+        cache.setMetaConnection(con);
+        cache.setTableMetaStorage(tableMetaStorage);
+        if (tableMetaStorage != null) {
+            try {
+                cache.init(tableMetaStorage.fetch()); // 初始化,从存储拉取TableMeta
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public boolean apply(EntryPosition position, String fullTableName, String ddl, String extra) {
+        String[] strs = fullTableName.split("\\.");
+        String schema = strs[0];
+        if (schema.equalsIgnoreCase("null")) { // ddl schema为null,放弃处理
+            return false;
+        }
+        try {
+            TableMeta tableMeta = cache.get(schema, fullTableName, position.getTimestamp());
+            if (!compare(tableMeta, ddl)) { // 获取最近的TableMeta,进行比对
+                TableMeta result = cache.put(schema, fullTableName, ddl, calTimestamp(position.getTimestamp()));
+                if (tableMetaStorage != null && result != null) { // 储存
+                    tableMetaStorage.store(schema, fullTableName, result.getDdl(), calTimestamp(position.getTimestamp()));
+                }
+            }
+            return true;
+        } catch (Exception e) {
+            logger.error(e.toString());
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean isOnRDS() {
+        return false;
+    }
+
+    /***
+     *
+     * @param schema dbname
+     * @param table tablename
+     * @param useCache unused
+     * @param position timestamp
+     * @return
+     */
+    @Override
+    public TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
+        String fulltbName = schema + "." + table;
+        try {
+            return cache.get(schema, fulltbName, position.getTimestamp());
+        } catch (Exception e) {
+            logger.error(e.toString());
+        }
+        return null;
+    }
+
+    @Override
+    public void clearTableMeta() {
+        cache.clearTableMeta();
+    }
+
+    @Override
+    public void clearTableMetaWithSchemaName(String schema) {
+        cache.clearTableMetaWithSchemaName(schema);
+    }
+
+    @Override
+    public void clearTableMeta(String schema, String table) {
+        cache.clearTableMeta(schema, table);
+    }
+
+    private boolean compare(TableMeta tableMeta, String ddl) {
+        if (tableMeta == null) {
+            return false;
+        }
+        return tableMeta.getDdl().equalsIgnoreCase(ddl);
+    }
+
+    private Long calTimestamp(Long timestamp) {
+        return timestamp;
+    }
+}

+ 55 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaEntry.java

@@ -0,0 +1,55 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import java.io.Serializable;
+
+public class TableMetaEntry implements Serializable {
+
+    private static final long serialVersionUID = -1350200637109107904L;
+
+    private String dbAddress;
+    private String schema;
+    private String table;
+    private String ddl;
+    private Long timestamp;
+
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getDdl() {
+        return ddl;
+    }
+
+    public void setDdl(String ddl) {
+        this.ddl = ddl;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public String getDbAddress() {
+        return dbAddress;
+    }
+
+    public void setDbAddress(String dbAddress) {
+        this.dbAddress = dbAddress;
+    }
+}

+ 18 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorage.java

@@ -0,0 +1,18 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import java.util.List;
+
+public interface TableMetaStorage {
+
+    void store(String schema, String table, String ddl, Long timestamp);
+
+    List<TableMetaEntry> fetch();
+
+    List<TableMetaEntry> fetchByTableName(String tableName);
+
+    String getDbName();
+
+    String getDbAddress();
+
+    void setDbAddress(String address);
+}

+ 9 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/TableMetaStorageFactory.java

@@ -0,0 +1,9 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+public interface TableMetaStorageFactory {
+
+    TableMetaStorage getTableMetaStorage();
+
+    String getDbName();
+
+}

+ 9 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/CacheConnectionNull.java

@@ -0,0 +1,9 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception;
+
+public class CacheConnectionNull extends Exception{
+
+    @Override
+    public String toString() {
+        return "CacheConnectionNull";
+    }
+}

+ 21 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/exception/NoHistoryException.java

@@ -0,0 +1,21 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.exception;
+
+public class NoHistoryException extends Exception{
+
+    private String dbName;
+    private String tbName;
+
+    public NoHistoryException(String dbName, String tbName) {
+        this.dbName = dbName;
+        this.tbName = tbName;
+    }
+
+    public void printTableName() {
+        System.out.println(dbName+"."+tbName);
+    }
+
+    @Override
+    public String toString() {
+        return "NioHistoryException: " + dbName + " " + tbName;
+    }
+}

+ 14 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaCallback.java

@@ -0,0 +1,14 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
+
+import java.util.List;
+
+public interface MySqlTableMetaCallback {
+
+    void save(String dbAddress, String schema, String table,String ddl, Long timestamp);
+
+    List<TableMetaEntry> fetch(String dbAddress, String dbName);
+
+    List<TableMetaEntry> fetch(String dbAddress, String dbName, String tableName);
+}

+ 48 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorage.java

@@ -0,0 +1,48 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaEntry;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
+
+import java.util.List;
+
+public class MySqlTableMetaStorage implements TableMetaStorage {
+    private MySqlTableMetaCallback mySqlTableMetaCallback;
+    private String dbName;
+    private String dbAddress;
+
+    MySqlTableMetaStorage(MySqlTableMetaCallback callback, String dbName) {
+        mySqlTableMetaCallback = callback;
+        this.dbName = dbName;
+    }
+
+
+    @Override
+    public void store(String schema, String table, String ddl, Long timestamp) {
+        mySqlTableMetaCallback.save(dbAddress, schema, table, ddl, timestamp);
+    }
+
+    @Override
+    public List<TableMetaEntry> fetch() {
+        return mySqlTableMetaCallback.fetch(dbAddress, dbName);
+    }
+
+    @Override
+    public List<TableMetaEntry> fetchByTableName(String tableName) {
+        return mySqlTableMetaCallback.fetch(dbAddress, dbName, tableName);
+    }
+
+    @Override
+    public String getDbName() {
+        return dbName;
+    }
+
+    @Override
+    public String getDbAddress() {
+        return dbAddress;
+    }
+
+    @Override
+    public void setDbAddress(String address) {
+        this.dbAddress = address;
+    }
+}

+ 26 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/impl/mysql/MySqlTableMetaStorageFactory.java

@@ -0,0 +1,26 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql;
+
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorage;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.TableMetaStorageFactory;
+
+public class MySqlTableMetaStorageFactory implements TableMetaStorageFactory {
+
+    private MySqlTableMetaCallback mySQLTableMetaCallback;
+    private String dbName;
+
+    public MySqlTableMetaStorageFactory(MySqlTableMetaCallback callback, String dbName) {
+        mySQLTableMetaCallback = callback;
+        this.dbName = dbName;
+    }
+
+    @Override
+    public TableMetaStorage getTableMetaStorage() {
+        return new MySqlTableMetaStorage(mySQLTableMetaCallback, dbName);
+    }
+
+    @Override
+    public String getDbName() {
+        return dbName;
+    }
+
+}

+ 38 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/NoStorageTest.java

@@ -0,0 +1,38 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Date;
+
+public class NoStorageTest {
+    final String DBNAME = "testdb";
+    final String TBNAME = "testtb";
+    final String DDL = "CREATE TABLE `testtb` (\n" +
+            "   `id` int(11) NOT NULL AUTO_INCREMENT,\n" +
+            "   `name` varchar(2048) DEFAULT NULL,\n" +
+            "   `datachange_lasttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最晚更新时间',\n" +
+            "   `otter_testcol` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol1` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol2` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol3` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol4` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol5` varchar(45) DEFAULT NULL,\n" +
+            "   PRIMARY KEY (`id`)\n" +
+            " ) ENGINE=InnoDB AUTO_INCREMENT=58333898 DEFAULT CHARSET=utf8mb4";
+    @Test
+    public void nostorage() {
+        MysqlConnection connection = new MysqlConnection(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+        TableMetaCacheWithStorage tableMetaCacheWithStorage = new TableMetaCacheWithStorage(connection, null);
+        EntryPosition entryPosition = new EntryPosition();
+        entryPosition.setTimestamp(new Date().getTime());
+        String fullTableName = DBNAME + "." + TBNAME;
+        tableMetaCacheWithStorage.apply(entryPosition, fullTableName, DDL, null);
+        entryPosition.setTimestamp(new Date().getTime() + 1000L);
+        TableMeta result = tableMetaCacheWithStorage.getTableMeta(DBNAME, TBNAME, false, entryPosition);
+        assert result.getDdl().equalsIgnoreCase(DDL);
+    }
+}

+ 77 - 0
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/tablemeta/StorageTest.java

@@ -0,0 +1,77 @@
+package com.alibaba.otter.canal.parse.inbound.mysql.tablemeta;
+
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
+import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaCallback;
+import com.alibaba.otter.canal.parse.inbound.mysql.tablemeta.impl.mysql.MySqlTableMetaStorageFactory;
+import com.alibaba.otter.canal.protocol.position.EntryPosition;
+import com.alibaba.otter.canal.protocol.position.Position;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class StorageTest {
+
+    final String DBNAME = "testdb";
+    final String TBNAME = "testtb";
+    final String DDL = "CREATE TABLE `testtb` (\n" +
+            "   `id` int(11) NOT NULL AUTO_INCREMENT,\n" +
+            "   `name` varchar(2048) DEFAULT NULL,\n" +
+            "   `datachange_lasttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最晚更新时间',\n" +
+            "   `otter_testcol` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol1` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol2` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol3` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol4` varchar(45) DEFAULT NULL,\n" +
+            "   `otter_testcol5` varchar(45) DEFAULT NULL,\n" +
+            "   PRIMARY KEY (`id`)\n" +
+            " ) ENGINE=InnoDB AUTO_INCREMENT=58333898 DEFAULT CHARSET=utf8mb4";
+
+    @Test
+    public void storage() {
+
+        MySqlTableMetaStorageFactory factory = new MySqlTableMetaStorageFactory(new MySqlTableMetaCallback() {
+            @Override
+            public void save(String dbAddress, String schema, String table, String ddl, Long timestamp) {
+
+            }
+
+            @Override
+            public List<TableMetaEntry> fetch(String dbAddress, String dbName) {
+                TableMetaEntry tableMeta = new TableMetaEntry();
+                tableMeta.setSchema(DBNAME);
+                tableMeta.setTable(TBNAME);
+                tableMeta.setDdl(DDL);
+                tableMeta.setTimestamp(new Date().getTime());
+                List<TableMetaEntry> entries = new ArrayList<TableMetaEntry>();
+                entries.add(tableMeta);
+                return entries;
+            }
+
+            @Override
+            public List<TableMetaEntry> fetch(String dbAddress, String dbName, String tableName) {
+                TableMetaEntry tableMeta = new TableMetaEntry();
+                tableMeta.setSchema(DBNAME);
+                tableMeta.setTable(TBNAME);
+                tableMeta.setDdl(DDL);
+                tableMeta.setTimestamp(new Date().getTime());
+                List<TableMetaEntry> entries = new ArrayList<TableMetaEntry>();
+                entries.add(tableMeta);
+                return entries;
+            }
+        }, DBNAME);
+        MysqlConnection connection = new MysqlConnection(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
+        TableMetaCacheWithStorage tableMetaCacheWithStorage = new TableMetaCacheWithStorage(connection, factory.getTableMetaStorage());
+        EntryPosition entryPosition = new EntryPosition();
+        entryPosition.setTimestamp(new Date().getTime());
+        String fullTableName = DBNAME + "." + TBNAME;
+        tableMetaCacheWithStorage.apply(entryPosition, fullTableName, DDL, null);
+
+        entryPosition.setTimestamp(new Date().getTime() + 1000L);
+        TableMeta result = tableMetaCacheWithStorage.getTableMeta(DBNAME, TBNAME, false, entryPosition);
+        assert result.getDdl().equalsIgnoreCase(DDL);
+    }
+}

+ 1 - 1
pom.xml

@@ -254,7 +254,7 @@
             <dependency>
                 <groupId>com.alibaba.fastsql</groupId>
                 <artifactId>fastsql</artifactId>
-                <version>2.0.0_preview_366</version>
+                <version>2.0.0_preview_186</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>